下面是一个简单的 C# RabbitMQ 发送和接收消息的封装工具类的示例代码:文章来源:https://www.toymoban.com/news/detail-512662.html
工具类
通过NuGet安装RabbitMQ.Client
文章来源地址https://www.toymoban.com/news/detail-512662.html
using Newtonsoft.Json;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Channels;
using System.Threading.Tasks;
namespace WorkerService1
{
public class RabbitMQHelper : IDisposable
{
private readonly ConnectionFactory _factory;
private IConnection _connection;
private IModel _channel;
public RabbitMQHelper()
{
// 设置连接参数
_factory = new ConnectionFactory() { HostName = "localhost", Port = 5672, UserName = "guest", Password = "guest" };
}
/// <summary>
/// 发送消息
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="queueName"></param>
/// <param name="message"></param>
public void SendMessage<T>(string queueName, T message)
{
try
{
InitConnection();
// 声明队列
_channel.QueueDeclare(queue: queueName,
durable: true,// 设置为true表示队列是持久化的
exclusive: false,
autoDelete: false,
arguments: null);
var body = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(message));
_channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: null, body: body);
}
catch (Exception ex)
{
Console.WriteLine("Failed to send message: {0}", ex.Message);
}
}
/// <summary>
/// 接收消息
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="queueName"></param>
/// <param name="messageHandler"></param>
public void ReceiveMessage<T>(string queueName, Action<T> messageHandler)
{
try
{
InitConnection();
// 声明队列(接收需声明队列,否则队列不存在时,无法接收消息)
_channel.QueueDeclare(queue: queueName,
durable: true, // 设置为true表示队列是持久化的
exclusive: false,
autoDelete: false,
arguments: null);
//设置消费者数量(并发度),每个消费者每次只能处理一条消息
_channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
// 创建消费者
var consumer = new EventingBasicConsumer(_channel);
consumer.Received += (model, ea) =>
{
try
{
var message = Encoding.UTF8.GetString(ea.Body.ToArray());
var convertedMessage = JsonConvert.DeserializeObject<T>(message);
//委托方法
messageHandler.Invoke(convertedMessage);
// 消息处理成功,确认消息
_channel.BasicAck(ea.DeliveryTag, false);
}
catch (Exception ex)
{
// 消息处理异常,确认消息
_channel.BasicAck(ea.DeliveryTag, false);
}
};
_channel.BasicConsume(queue: queueName,
autoAck: false,// 设置为true表示自动确认消息
consumer: consumer);
}
catch (Exception ex)
{
Console.WriteLine("Failed to receive message: {0}", ex.Message);
}
}
/// <summary>
/// 初始化链接
/// </summary>
private void InitConnection()
{
if (_connection == null || !_connection.IsOpen)
{
_connection = _factory.CreateConnection();
_channel = _connection.CreateModel();
}
}
/// <summary>
/// 释放资源
/// </summary>
public void Dispose()
{
_channel?.Close();
_channel?.Dispose();
_connection?.Close();
_connection?.Dispose();
}
}
}
使用示例
using System;
using System.Text;
using System.Threading.Tasks;
using WorkerService1;
public class Program
{
private static string QueueName = "myqueue_key";
public static void Main()
{
var rabbitMQHelper = new RabbitMQHelper();
for (long i = 0; i < 30; i++)
{
rabbitMQHelper.SendMessage(QueueName, i);
}
rabbitMQHelper.ReceiveMessage<long>(QueueName, ReceivedHandle);
Console.ReadLine();
}
/// <summary>
/// 接收处理
/// </summary>
/// <param name="index"></param>
private static void ReceivedHandle(long index)
{
try
{
Console.WriteLine($"第{index}次开始{DateTime.Now}");
Thread.Sleep(2000);
Console.WriteLine($"第{index}次结束{DateTime.Now}");
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
}
}
到了这里,关于C#使用RabbitMQ发送和接收消息工具类的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!