RabbitMQ帮助类的封装
基本部分
public class RabbitMQInvoker
{
#region Identy
private static IConnection _CurrentConnection = null;
private readonly string _HostName = null;
private readonly string _UserName = null;
private readonly string _Password = null;
#endregion
public RabbitMQInvoker(string hostName = "localhost", string userName = "guest", string password = "guest")
{
this._HostName = hostName;
this._UserName = userName;
this._Password = password;
}
......
}
初始化链接
#region 初始化链接
private static object RabbitMQInvoker_InitLock = new object();
private void InitConnection()
{
if (_CurrentConnection == null || !_CurrentConnection.IsOpen)
{
lock (RabbitMQInvoker_InitLock)
{
if (_CurrentConnection == null || !_CurrentConnection.IsOpen)
{
var factory = new ConnectionFactory()
{
HostName = this._HostName,
Password = this._Password,
UserName = this._UserName
};
_CurrentConnection = factory.CreateConnection();
}
}
}
}
#endregion
初始化交换机
#region 初始化交换机
private static Dictionary<string, bool> RabbitMQInvoker_ExchangeQueue = new Dictionary<string, bool>();
private static object RabbitMQInvoker_BindQueueLock = new object();
/// <summary>
/// 必须先声明exchange--检查+初始化
/// </summary>
/// <param name="rabbitMQConsumerModel"></param>
private void InitExchange(string exchangeName)
{
if (!RabbitMQInvoker_ExchangeQueue.ContainsKey($"InitExchange_{exchangeName}"))//没用api确认
{
lock (RabbitMQInvoker_BindQueueLock)
{
if (!RabbitMQInvoker_ExchangeQueue.ContainsKey($"InitExchange_{exchangeName}"))
{
this.InitConnection();
using (IModel channel = _CurrentConnection.CreateModel())
{
channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Fanout, durable: true, autoDelete: false, arguments: null);
}
RabbitMQInvoker_ExchangeQueue[$"InitExchange_{exchangeName}"] = true;
}
}
}
}
/// <summary>
/// 初始化绑定关系
/// </summary>
/// <param name="rabbitMQConsumerModel"></param>
private void InitBindQueue(RabbitMQConsumerModel rabbitMQConsumerModel)
{
if (!RabbitMQInvoker_ExchangeQueue.ContainsKey($"InitBindQueue_{rabbitMQConsumerModel.ExchangeName}_{rabbitMQConsumerModel.QueueName}"))
{
lock (RabbitMQInvoker_BindQueueLock)
{
if (!RabbitMQInvoker_ExchangeQueue.ContainsKey($"InitBindQueue_{rabbitMQConsumerModel.ExchangeName}_{rabbitMQConsumerModel.QueueName}"))
{
this.InitConnection();
using (IModel channel = _CurrentConnection.CreateModel())
{
channel.ExchangeDeclare(exchange: rabbitMQConsumerModel.ExchangeName, type: ExchangeType.Fanout, durable: true, autoDelete: false, arguments: null);
channel.QueueDeclare(queue: rabbitMQConsumerModel.QueueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
channel.QueueBind(queue: rabbitMQConsumerModel.QueueName, exchange: rabbitMQConsumerModel.ExchangeName, routingKey: string.Empty, arguments: null);
}
RabbitMQInvoker_ExchangeQueue[$"InitBindQueue_{rabbitMQConsumerModel.ExchangeName}_{rabbitMQConsumerModel.QueueName}"] = true;
}
}
}
}
#endregion
发送信息
#region 发送消息
/// <summary>
/// 只管exchange---
/// 4种路由类型?
///
/// Send前完成交换机初始化
/// </summary>
/// <param name="exchangeName"></param>
/// <param name="message">建议Json格式</param>
public void Send(string exchangeName, string message)
{
this.InitExchange(exchangeName);
if (_CurrentConnection == null || !_CurrentConnection.IsOpen)
{
this.InitConnection();
}
using (var channel = _CurrentConnection.CreateModel())//开辟新的信道通信
{
try
{
channel.TxSelect();//开启Tx事务---RabbitMQ协议级的事务-----强事务
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: exchangeName,
routingKey: string.Empty,
basicProperties: null,
body: body);
channel.TxCommit();//提交
Console.WriteLine($" [x] Sent {body.Length}");
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
Console.WriteLine($"【{message}】发送到Broker失败!{ex.Message}");
channel.TxRollback(); //事务回滚--前面的所有操作就全部作废了。。。。
}
}
}
#endregion
接收信息
#region Receive
/// <summary>
/// 注册处理动作
/// </summary>
/// <param name="rabbitMQConsumerMode"></param>
/// <param name="func"></param>
public void RegistReciveAction(RabbitMQConsumerModel rabbitMQConsumerMode, Func<string, bool> func)
{
this.InitBindQueue(rabbitMQConsumerMode);
Task.Run(() =>
{
using (var channel = _CurrentConnection.CreateModel())
{
var consumer = new EventingBasicConsumer(channel);
channel.BasicQos(0, 0, true);
consumer.Received += (sender, ea) =>
{
string str = Encoding.UTF8.GetString(ea.Body.ToArray());
if (func(str))
{
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);//确认已消费
}
else
{
channel.BasicReject(deliveryTag: ea.DeliveryTag, requeue: true);//放回队列--重新包装信息,放入其他队列
}
};
channel.BasicConsume(queue: rabbitMQConsumerMode.QueueName,
autoAck: false,//不ACK
consumer: consumer);
Console.WriteLine($" Register Consumer To {rabbitMQConsumerMode.ExchangeName}-{rabbitMQConsumerMode.QueueName}");
Console.ReadLine();
Console.WriteLine($" After Register Consumer To {rabbitMQConsumerMode.ExchangeName}-{rabbitMQConsumerMode.QueueName}");
}
});
}
#endregion
文章来源地址https://www.toymoban.com/news/detail-600194.html
文章来源:https://www.toymoban.com/news/detail-600194.html
到了这里,关于RabbitMQ帮助类的封装的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!