博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
ASP.NET Core2利用MassTransit集成RabbitMQ
阅读量:5079 次
发布时间:2019-06-12

本文共 5959 字,大约阅读时间需要 19 分钟。

在ASP.NET Core上利用MassTransit来集成使用RabbitMQ真的很简单,代码也很简洁。近期因为项目需要,我便在这基础上再次进行了封装,抽成了公共方法,使得使用RabbitMQ的调用变得更方便简洁。那么,就让咱们来瞧瞧其魅力所在吧。

 

MassTransit

先看看MassTransit是个什么宝贝(MassTransit官网的简介):

MassTransit是一个免费的开源轻量级消息总线,用于使用.NET框架创建分布式应用程序。MassTransit在现有的顶级消息传输上提供了一系列广泛的功能,从而以开发人员友好的方式使用基于消息的会话模式异步连接服务。基于消息的通信是实现面向服务的体系结构的可靠且可扩展的方式。

通俗描述:

MassTransit就是一套基于消息服务的高级封装类库,下游可联接RabbitMQ、Redis、MongoDb等服务。

github官网:

 

RabbitMQ

RabbitMQ是成熟的MQ队列服务,是由 Erlang 语言开发的 AMQP 的开源实现。关于介绍RabbitMQ的中文资料也很多,有需要可以自行查找。我这里贴出其官网与下载安装的链接,如下:

官网:

下载与安装:

 

实现代码

通过上面的介绍,咱们已对MassTransit与RabbitMQ有了初步了解,那么现在来看看如何在ASP.NET Core上优雅的使用RabbitMQ吧。

1、创建一个名为“RabbitMQHelp.cs”公共类,用于封装操作RabbitMQ的公共方法,并通过Nuget来管理并引用“MassTransit”与“MassTransit.RabbitMQ”类库。

2、“RabbitMQHelp.cs”公共类主要对外封装两个静态方法,其代码如下:

1 using MassTransit;  2 using MassTransit.RabbitMqTransport;  3 using System;  4 using System.Collections.Generic;  5 using System.Text;  6 using System.Threading.Tasks;  7   8 namespace Lezhima.Comm  9 { 10     ///  11     /// RabbitMQ公共操作类,基于MassTransit库 12     ///  13     public class RabbitMQHelp 14     { 15         #region 交换器 16  17         ///  18         /// 操作日志交换器 19         /// 同时需在RabbitMQ的管理后台创建同名交换器 20         ///  21         public static readonly string actionLogExchange = "Lezhima.ActionLogExchange"; 22  23  24         #endregion 25  26  27         #region 声明变量 28  29         ///  30         /// MQ联接地址,建议放到配置文件 31         ///  32         private static readonly string mqUrl = "rabbitmq://192.168.1.181/"; 33  34         ///  35         /// MQ联接账号,建议放到配置文件 36         ///  37         private static readonly string mqUser = "admin"; 38  39         ///  40         /// MQ联接密码,建议放到配置文件 41         ///  42         private static readonly string mqPwd = "admin"; 43  44         #endregion 45  46         ///  47         /// 创建连接对象 48         /// 不对外公开 49         ///  50         private static IBusControl CreateBus(Action
registrationAction = null) 51 { 52 //通过MassTransit创建MQ联接工厂 53 return Bus.Factory.CreateUsingRabbitMq(cfg => 54 { 55 var host = cfg.Host(new Uri(mqUrl), hst => 56 { 57 hst.Username(mqUser); 58 hst.Password(mqPwd); 59 }); 60 registrationAction?.Invoke(cfg, host); 61 }); 62 } 63 64 65 ///
66 /// MQ生产者 67 /// 这里使用fanout的交换类型 68 /// 69 ///
70 public async static Task PushMessage(string exchange, object obj) 71 { 72 var bus = CreateBus(); 73 var sendToUri = new Uri($"{mqUrl}{exchange}"); 74 var endPoint = await bus.GetSendEndpoint(sendToUri); 75 await endPoint.Send(obj); 76 } 77 78 ///
79 /// MQ消费者 80 /// 这里使用fanout的交换类型 81 /// consumer必需是实现IConsumer接口的类实例 82 /// 83 ///
84 public static void ReceiveMessage(string exchange, object consumer) 85 { 86 var bus = CreateBus((cfg, host) => 87 { 88 //从指定的消息队列获取消息 通过consumer来实现消息接收 89 cfg.ReceiveEndpoint(host, exchange, e => 90 { 91 e.Instance(consumer); 92 }); 93 }); 94 bus.Start(); 95 } 96 } 97 } 98

 

3、“RabbitMQHelp.cs”公共类已经有了MQ“生产者”与“消费者”两个对外的静态公共方法,其中“生产者”方法可以在业务代码中直接调用,可传递JSON、对象等类型的参数向指定的交换器发送数据。而“消费者”方法是从指定交换器中进行接收绑定,但接收到的数据处理功能则交给了“consumer”类(因为在实际项目中,不同的数据有不同的业务处理逻辑,所以这里我们直接就通过IConsumer接口交给具体的实现类去做了)。那么,下面我们再来看看消费者里传递进来的“consumer”类的代码吧:

1 using MassTransit;  2 using System;  3 using System.Collections.Generic;  4 using System.Text;  5 using System.Threading.Tasks;  6   7 namespace Lezhima.Storage.Consumer  8 {  9     ///  10     /// 从MQ接收并处理数据 11     /// 实现MassTransit的IConsumer接口 12     ///  13     public class LogConsumer : IConsumer
14 { 15 ///
16 /// 实现Consume方法 17 /// 接收并处理数据 18 /// 19 ///
20 ///
21 public Task Consume(ConsumeContext
context) 22 { 23 return Task.Run(async () => 24 { 25 //获取接收到的对象 26 var amsg = context.Message; 27 Console.WriteLine($"Recevied By Consumer:{amsg}"); 28 Console.WriteLine($"Recevied By Consumer:{amsg.ActionLogId}"); 29 }); 30 } 31 } 32 } 33

 

调用代码

1、生产者调用代码如下:

1 ///   2 /// 测试MQ生产者  3 ///   4 /// 
5 [HttpGet] 6 public async Task
AddMessageTest() 7 { 8 //声明一个实体对象 9 var model = new ActionLog(); 10 model.ActionLogId = Guid.NewGuid(); 11 model.CreateTime = DateTime.Now; 12 model.UpdateTime = DateTime.Now; 13 //调用MQ 14 await RabbitMQHelp.PushMessage(RabbitMQHelp.actionLogExchange, model); 15 16 return new MobiResult(1000, "操作成功"); 17 }

 

2、消费者调用代码如下:

1 using Lezhima.Storage.Consumer;  2 using Microsoft.Extensions.Configuration;  3 using System;  4 using System.IO;  5   6 namespace Lezhima.Storage  7 {  8     class Program  9     { 10         static void Main(string[] args) 11         { 12             var conf = new ConfigurationBuilder() 13               .SetBasePath(Directory.GetCurrentDirectory()) 14               .AddJsonFile("appsettings.json", true, true) 15               .Build(); 16  17             //调用接收者 18             RabbitMQHelp.ReceiveMessage(RabbitMQHelp.actionLogExchange, 19              new LogConsumer() 20             ); 21  22             Console.ReadLine(); 23         } 24     } 25 } 26

 

总结

1、基于MassTransit库使得我们使用RabbitMQ变得更简洁、方便。而基于再次封装后,生产者与消费者将不需要关注具体的业务,也跟业务代码解耦了,更能适应项目的需要。

2、RabbitMQ的交换器需在其管理后台自行创建,而这里使用的fanout类型是因为其发送速度最快,且能满足我的项目需要,各位可视自身情况选用不同的类型。fanout类型不会存储消息,必需要消费者绑定交换器后才会发送给消费者。

 

声明

本文为作者原创,转载请备注出处与保留原文地址,谢谢。如文章能给您带来帮助,请点下推荐或关注,感谢您的支持!

 

转载于:https://www.cnblogs.com/Miidy/p/9579764.html

你可能感兴趣的文章
网卡流量检测.py
查看>>
poj1981 Circle and Points 单位圆覆盖问题
查看>>
POP的Stroke动画
查看>>
SQL语句在查询分析器中可以执行,代码中不能执行
查看>>
yii 1.x 添加 rules 验证url数组
查看>>
html+css 布局篇
查看>>
SQL优化
查看>>
用C语言操纵Mysql
查看>>
轻松学MVC4.0–6 MVC的执行流程
查看>>
redis集群如何清理前缀相同的key
查看>>
Python 集合(Set)、字典(Dictionary)
查看>>
获取元素
查看>>
proxy写监听方法,实现响应式
查看>>
第一阶段冲刺06
查看>>
十个免费的 Web 压力测试工具
查看>>
EOS生产区块:解析插件producer_plugin
查看>>
mysql重置密码
查看>>
jQuery轮 播的封装
查看>>
一天一道算法题--5.30---递归
查看>>
JS取得绝对路径
查看>>