一、概念及介绍
通常通过使用事件总线实现来执行此发布/订阅系统。 事件总线可以设计为包含 API 的接口,该 API 是订阅和取消订阅事件和发布事件所需的。 它还可以包含一个或多个基于跨进程或消息通信的实现,例如支持异步通信和发布/订阅模型的消息队列或服务总线。
本问介绍如何使用RabbitMQ通用事件总线接口实现这种与 .NET 的通信,并结合项目代码实践演示。 存在多种可能的实现,每种实现使用不同的技术或基础结构,例如 RabbitMQ、Azure 服务总线或任何其他第三方开源或商用服务总线。
二、什么是观察者模式?
观察者模式又名发布-订阅模式,具体概念就是定义对象间的一种一对多的依赖关系,当一个对象的状态发生变化时,所有依赖的对象都得到通知并被自动更新。
三、创建下发订单的项目(发布服务)
使用Visual Studio2022新建web api 项目, 该项目主要实现简易的订单和工单发布的测试样例
1、新增IntegrationEvent类
public abstract class IntegrationEvent
{
public string? Status { get; set; }
public DateTime OccurredOn { get; set; }
}
2、新建IEventBusServices.cs,定义发布、订阅、取消订阅接口的泛型参数接口
参数说明:
TEvent:表示要订阅的事件类型。它是泛型参数,必须继承自 IntegrationEvent 类
TEventHandler:表示要处理该事件的事件处理程序类型。它是泛型参数,必须实现 IIntegrationEventHandler<TEvent> 接口
public interface IEventBusServices
{
/// <summary>
/// 发布消息
/// </summary>
/// <typeparam name="TEvent">表示要订阅的事件类型。它是泛型参数,必须继承自 IntegrationEvent 类</typeparam>
/// <param name="event">事件</param>
void Publish<TEvent>(TEvent @event) where TEvent : IntegrationEvent;
/// <summary>
/// 订阅消息
/// </summary>
/// <typeparam name="TEvent"></typeparam>
/// <typeparam name="TEventHandler">表示要处理该事件的事件处理程序类型。它是泛型参数,必须实现 IIntegrationEventHandler<TEvent> 接口</typeparam>
void Subscribe<TEvent, TEventHandler>()
where TEvent : IntegrationEvent
where TEventHandler: IIntegrationEventHandler<TEvent>;
/// <summary>
/// 取消订阅
/// </summary>
/// <typeparam name="TEvent"></typeparam>
/// <typeparam name="TEventHandler"></typeparam>
void Unsubscribe<TEvent,TEventHandler>()
where TEvent : IntegrationEvent
where TEventHandler : IIntegrationEventHandler<TEvent>;
}
3、新建IIntegrationEventHandler.cs
public interface IIntegrationEventHandler<in TEvent> where TEvent : IntegrationEvent
{
Task Handle(TEvent @event);
}
4、创建RabbitMQConfigs.cs 抽象基类
通过实例化 ConnectionFactory
对象,并设置其属性,来创建一个 RabbitMQ 连接工厂。然后,将从配置中获取的相关信息分配给连接工厂的属性,例如主机名、端口、用户名和密码。最后,返回创建的连接工厂对象。
public abstract class RabbitMQConfigs
{
protected ConnectionFactory GreateConnectionFactory(IConfiguration configuration)
{
// 在这里设置ConnectionFactory的属性
var factory = new ConnectionFactory
{
// 设置连接属性
HostName = configuration["RabbitMQ:HostName"],
Port = int.Parse(configuration["RabbitMQ:Port"]),
UserName = configuration["RabbitMQ:UserName"],
Password = configuration["RabbitMQ:Password"]
};
return factory;
}
}
5、新建EventBusServices.cs
通过继承 RabbitMQConfigs
类,并调用 GreateConnectionFactory
方法,创建和配置 RabbitMQ 的连接工厂对象,并创建RabbitMQ通道,实现消息发布、订阅、取消订阅接口
public class EventBusServices : RabbitMQConfigs,IEventBusServices
{
private readonly IConnection _connection;
private readonly IModel _channel;
public EventBusServices(IConfiguration configuration) {
var factory = GreateConnectionFactory(configuration);
_connection = factory.CreateConnection();
_channel = _connection.CreateModel();
}
public void Publish<TEvent>(TEvent @event) where TEvent : IntegrationEvent
{
var eventName = @event.GetType().Name;
var message = JsonSerializer.Serialize(@event);
var body = Encoding.UTF8.GetBytes(message);
_channel.ExchangeDeclare(exchange: "events", type: ExchangeType.Direct);
_channel.BasicPublish(exchange: "events",
routingKey: eventName,
basicProperties: null,
body: body);
}
public void Subscribe<TEvent, TEventHandler>()
where TEvent : IntegrationEvent
where TEventHandler : IIntegrationEventHandler<TEvent>
{
var eventName = typeof(TEvent).Name;
_channel.ExchangeDeclare(exchange: "events", type: ExchangeType.Direct);
_channel.QueueDeclare(queue: eventName, durable: true, exclusive: false, autoDelete: false, arguments: null);
_channel.QueueBind(queue: eventName, exchange: "events", routingKey: eventName);
var consumer = new EventingBasicConsumer(_channel);
consumer.Received += async (model, ea) =>
{
var message = Encoding.UTF8.GetString(ea.Body.ToArray());
var @event = JsonSerializer.Deserialize<TEvent>(message);
var handler = Activator.CreateInstance<TEventHandler>();
if(@event != null)
{
await handler.Handle(@event);
}
};
_channel.BasicConsume(queue:eventName,autoAck:true,consumer:consumer);
}
public void Unsubscribe<TEvent, TEventHandler>()
where TEvent : IntegrationEvent
where TEventHandler : IIntegrationEventHandler<TEvent>
{
var eventName = typeof(TEvent).Name;
_channel.QueueUnbind(queue: eventName, exchange: "events", routingKey: eventName);
}
}
6、创建测试订单服务OrderServices.cs
通过构造函数依赖注入IEventBusServices事件总线服务,使用事件总线实现订单消息的发布
public class OrderServices: IOrderServices
{
private readonly IEventBusServices _eventBusServices;
public OrderServices(IEventBusServices eventBusServices)
{
_eventBusServices = eventBusServices;
}
public void PlaceOrder(OrderCreatedEvent order)
{
//处理订单
order.OccurredOn = DateTime.Now;
//发布订单消息
_eventBusServices.Publish(order);
}
public void PlaceWorkOrder(WorkOrderCreatedEvent workOrder)
{
//处理
workOrder.OccurredOn = DateTime.Now;
//发布消息
_eventBusServices.Publish(workOrder);
}
}
public class OrderCreatedEvent : IntegrationEvent
{
public int OrderId { get; set; }
public string? OrderName { get; set; }
}
public class WorkOrderCreatedEvent : IntegrationEvent
{
public int WorkOrderId { get; set; }
public string? WorkOrderName { get; set; }
}
7、实现模拟订单发送请求的代码
[Route("api/createOrder")]
[HttpPut]
public void PutOrder()
{
//创建一个子线程:发布订单到MQ中(发布服务)
Task.Run(() =>
{
// 调用订单服务的方法
for (int i = 0; i< 8; i++)
{
var order = new OrderCreatedEvent { OrderId = i, OrderName =$"测试订单{i}", Status ="下单中" };
_orderService.PlaceOrder(order);
Console.WriteLine($"订单创建完成,订单ID:{order.OrderId};订单:{order.OrderName}{order.Status}");
Thread.Sleep(1000);
}
});
//创建一个子线程:发布工单到MQ中(发布服务)
Task.Run(() =>
{
// 调用订单服务的方法
for (int i = 0; i< 8; i++)
{
var workorder = new WorkOrderCreatedEvent { WorkOrderId = i, WorkOrderName =$"测试工单{i}", Status ="下单中" };
_orderService.PlaceWorkOrder(workorder);
Console.WriteLine($"工单创建完成,工单ID: {workorder.WorkOrderId} ;工单: {workorder.WorkOrderName}{workorder.Status}");
Thread.Sleep(1000);
}
});
}
四、创建接收订单并处理的项目(订阅服务)
使用Visual Studio2022新建控制台应用项目, 该项目主要实现订阅监听订单服务,接收订单服务发布的订单消息,并实现对订单的处理完成
1、新增EventBusClass类
public class EventBusClass
{
public abstract class IntegrationEvent
{
public string? Status { get; set; }
public DateTime OccurredOn { get; set; }
}
public class OrderCreatedEvent : IntegrationEvent
{
public int OrderId { get; set; }
public string? OrderName { get; set; }
}
public class WorkOrderCreatedEvent : IntegrationEvent
{
public int WorkOrderId { get; set; }
public string? WorkOrderName { get; set; }
}
}
2、新建IEventBusServices.cs,定义发布、订阅、取消订阅接口的泛型参数接口
参数说明:
TEvent:表示要订阅的事件类型。它是泛型参数,必须继承自 IntegrationEvent 类
TEventHandler:表示要处理该事件的事件处理程序类型。它是泛型参数,必须实现 IIntegrationEventHandler<TEvent> 接口
public interface IEventBusServices
{
/// <summary>
/// 发布消息
/// </summary>
/// <typeparam name="TEvent">表示要订阅的事件类型。它是泛型参数,必须继承自 IntegrationEvent 类</typeparam>
/// <param name="event">事件</param>
void Publish<TEvent>(TEvent @event) where TEvent : IntegrationEvent;
/// <summary>
/// 订阅消息
/// </summary>
/// <typeparam name="TEvent"></typeparam>
/// <typeparam name="TEventHandler">表示要处理该事件的事件处理程序类型。它是泛型参数,必须实现 IIntegrationEventHandler<TEvent> 接口</typeparam>
void Subscribe<TEvent, TEventHandler>()
where TEvent : IntegrationEvent
where TEventHandler: IIntegrationEventHandler<TEvent>;
/// <summary>
/// 取消订阅
/// </summary>
/// <typeparam name="TEvent"></typeparam>
/// <typeparam name="TEventHandler"></typeparam>
void Unsubscribe<TEvent,TEventHandler>()
where TEvent : IntegrationEvent
where TEventHandler : IIntegrationEventHandler<TEvent>;
}
3、新建IIntegrationEventHandler.cs
public interface IIntegrationEventHandler<in TEvent> where TEvent : IntegrationEvent
{
Task Handle(TEvent @event);
}
4、新建EventBusServices.cs
public class EventBusServices : IEventBusServices
{
private readonly IConnection _connection;
private readonly IModel _channel;
public EventBusServices() {
var factory = new ConnectionFactory
{
// 设置连接属性
HostName = "localhost",
Port = 5672,
UserName = "my",
Password = "123456"
}; ;
_connection = factory.CreateConnection();
_channel = _connection.CreateModel();
}
public void Publish<TEvent>(TEvent @event) where TEvent : IntegrationEvent
{
var eventName = @event.GetType().Name;
var message = JsonSerializer.Serialize(@event);
var body = Encoding.UTF8.GetBytes(message);
_channel.ExchangeDeclare(exchange: "events", type: ExchangeType.Direct);
_channel.BasicPublish(exchange: "events",
routingKey: eventName,
basicProperties: null,
body: body);
}
public void Subscribe<TEvent, TEventHandler>()
where TEvent : IntegrationEvent
where TEventHandler : IIntegrationEventHandler<TEvent>
{
var eventName = typeof(TEvent).Name;
_channel.ExchangeDeclare(exchange: "events", type: ExchangeType.Direct);
_channel.QueueDeclare(queue: eventName, durable: true, exclusive: false, autoDelete: false, arguments: null);
_channel.QueueBind(queue: eventName, exchange: "events", routingKey: eventName);
var consumer = new EventingBasicConsumer(_channel);
consumer.Received += async (model, ea) =>
{
var message = Encoding.UTF8.GetString(ea.Body.ToArray());
var @event = JsonSerializer.Deserialize<TEvent>(message);
var handler = Activator.CreateInstance<TEventHandler>();
if(@event != null)
{
await handler.Handle(@event);
}
};
_channel.BasicConsume(queue:eventName,autoAck:true,consumer:consumer);
}
public void Unsubscribe<TEvent, TEventHandler>()
where TEvent : IntegrationEvent
where TEventHandler : IIntegrationEventHandler<TEvent>
{
var eventName = typeof(TEvent).Name;
_channel.QueueUnbind(queue: eventName, exchange: "events", routingKey: eventName);
}
}
4、新建订单处理集成事件OrderHandlerServices.cs
除了事件订阅逻辑外,还需要实现集成事件处理程序的内部代码(例如回调方法)。 在事件处理程序中,可指定接收和处理某种事件消息的位置
public class OrderCreatedEventHandler : IIntegrationEventHandler<OrderCreatedEvent>
{
public async Task Handle(OrderCreatedEvent @event)
{
await Task.Run(() =>
{
// 处理订单创建事件逻辑...
@event.Status = "完成";
Console.WriteLine($"订单创建事件处理完成,订单ID:{@event.OrderId};订单:{@event.OrderName}{@event.Status}");
});
}
}
public class WorkOrderCreatedEventHandler : IIntegrationEventHandler<WorkOrderCreatedEvent>
{
public async Task Handle(WorkOrderCreatedEvent @event)
{
await Task.Run(() =>
{
// 处理订单创建事件逻辑...
@event.Status = "完成";
Console.WriteLine($"工单创建事件处理完成,工单ID:{@event.WorkOrderId};工单:{@event.WorkOrderName}{@event.Status}");
});
}
}
5、修改Program.cs程序主入口,实现订阅
using rabbirmqtestReciver;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using study_project.IServices.EventBus;
using study_project.Services.EventBus;
using System.Text;
EventBusServices eventBusServices = new EventBusServices();
//订阅订单服务
eventBusServices.Subscribe<OrderCreatedEvent, OrderCreatedEventHandler>();
//订阅工单服务
eventBusServices.Subscribe<WorkOrderCreatedEvent, WorkOrderCreatedEventHandler>();
Console.WriteLine("按任意键退出...");
Console.ReadKey();
五、测试运行与结果
分别启动运行创建的api项目和控制台应用项目,请求api的发送订单接口api/createOrder
发布服务:
订阅服务:
结果对比:
文章来源:https://www.toymoban.com/news/detail-861225.html
六、结语
至此,本文已经演示了使用基于RabbitMQ实现的事件总线,展示了2个实例:订单和工单发布的案例,通过使用EventBus实现发布与订阅,指定事件类型和集成事件,便可以实现发布和订阅,减少了使用RabbitMQ实现信息发布订阅的部分代码编程;如果是分布式应用中,可以使用EventBus实现应用的业务逻辑模块与RabbitMQ发布订阅模块的解耦。文章来源地址https://www.toymoban.com/news/detail-861225.html
到了这里,关于.NET6 项目使用RabbitMQ实现基于事件总线EventBus通信的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!