C#使用RabbitMQ发送和接收消息工具类

这篇具有很好参考价值的文章主要介绍了C#使用RabbitMQ发送和接收消息工具类。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

下面是一个简单的 C# RabbitMQ 发送和接收消息的封装工具类的示例代码:

工具类

通过NuGet安装RabbitMQ.Client文章来源地址https://www.toymoban.com/news/detail-512662.html

using Newtonsoft.Json;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Channels;
using System.Threading.Tasks;

namespace WorkerService1
{
    public class RabbitMQHelper : IDisposable
    {
        private readonly ConnectionFactory _factory;
        private IConnection _connection;
        private IModel _channel;
        public RabbitMQHelper()
        {
            // 设置连接参数
            _factory = new ConnectionFactory() { HostName = "localhost", Port = 5672, UserName = "guest", Password = "guest" };
        }

        /// <summary>
        /// 发送消息
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="queueName"></param>
        /// <param name="message"></param>
        public void SendMessage<T>(string queueName, T message)
        {
            try
            {
                InitConnection();

                // 声明队列
                _channel.QueueDeclare(queue: queueName,
                    durable: true,// 设置为true表示队列是持久化的
                    exclusive: false,
                    autoDelete: false,
                    arguments: null);

                var body = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(message));

                _channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: null, body: body);

            }
            catch (Exception ex)
            {
                Console.WriteLine("Failed to send message: {0}", ex.Message);
            }
        }

        /// <summary>
        /// 接收消息
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="queueName"></param>
        /// <param name="messageHandler"></param>
        public void ReceiveMessage<T>(string queueName, Action<T> messageHandler)
        {
            try
            {
                InitConnection();

                // 声明队列(接收需声明队列,否则队列不存在时,无法接收消息)
                _channel.QueueDeclare(queue: queueName,
                    durable: true, // 设置为true表示队列是持久化的
                    exclusive: false,
                    autoDelete: false,
                    arguments: null);

                //设置消费者数量(并发度),每个消费者每次只能处理一条消息
                _channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

                // 创建消费者
                var consumer = new EventingBasicConsumer(_channel);
                consumer.Received += (model, ea) =>
                {
                    try
                    {
                        var message = Encoding.UTF8.GetString(ea.Body.ToArray());

                        var convertedMessage = JsonConvert.DeserializeObject<T>(message);

                        //委托方法
                        messageHandler.Invoke(convertedMessage);

                        // 消息处理成功,确认消息
                        _channel.BasicAck(ea.DeliveryTag, false);
                    }
                    catch (Exception ex)
                    {
                        // 消息处理异常,确认消息
                        _channel.BasicAck(ea.DeliveryTag, false);
                    }
                };

                _channel.BasicConsume(queue: queueName,
                    autoAck: false,// 设置为true表示自动确认消息
                    consumer: consumer);

            }
            catch (Exception ex)
            {
                Console.WriteLine("Failed to receive message: {0}", ex.Message);
            }
        }

        /// <summary>
        /// 初始化链接
        /// </summary>
        private void InitConnection()
        {
            if (_connection == null || !_connection.IsOpen)
            {
                _connection = _factory.CreateConnection();
                _channel = _connection.CreateModel();
            }
        }

        /// <summary>
        /// 释放资源
        /// </summary>
        public void Dispose()
        {
            _channel?.Close();
            _channel?.Dispose();
            _connection?.Close();
            _connection?.Dispose();
        }
    }
}

使用示例

using System;
using System.Text;
using System.Threading.Tasks;
using WorkerService1;

public class Program
{
    private static string QueueName = "myqueue_key";
    public static void Main()
    {

        var rabbitMQHelper = new RabbitMQHelper();
        for (long i = 0; i < 30; i++)
        {
            rabbitMQHelper.SendMessage(QueueName, i);
        }

        rabbitMQHelper.ReceiveMessage<long>(QueueName, ReceivedHandle);

        Console.ReadLine();
    }

    /// <summary>
    /// 接收处理
    /// </summary>
    /// <param name="index"></param>
    private static void ReceivedHandle(long index)
    {
        try
        {
            Console.WriteLine($"第{index}次开始{DateTime.Now}");
            Thread.Sleep(2000);
            Console.WriteLine($"第{index}次结束{DateTime.Now}");
        }
        catch (Exception ex)
        {
            Console.WriteLine(ex.Message);
        }
    }
}

到了这里,关于C#使用RabbitMQ发送和接收消息工具类的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • RabbitMQ如何保证消息的发送和接收

    一、RabbitMQ如何保证消息的发送和接收 1.ConfirmCallback方法 ConfirmCallback是一个回调接口,消息发送到broker后触发回调,确认消息是否到达broker服务器,也就是只确认消息是否正确到达Exchange交换机中。 2.ReturnCallback方法 通过实现ReturnCallback接口,启动消息失败返回,此接口是在交

    2024年02月15日
    浏览(34)
  • rabbitMQ:绑定Exchange发送和接收消息(direct)

    AMQP 协议中的核心思想就是生产者和消费者的解耦,生产者从不直接将消息发送给队列。生产者通常不知道是否一个消息会被发送到队列中,只是将消息 发送到一个交换机。先由 Exchange 来接收,然后 Exchange 按照特定的策略转发到 Queue 进行存储。Exchange 就类似于一个交换机,

    2024年02月15日
    浏览(24)
  • 「RabbitMQ」实现消息确认机制以确保消息的可靠发送、接收和拒收

    目录 介绍 方案 配置手动确认 使用 「Bean 」 配置RabbitMQ的属性 确定消费、拒绝消费、拒绝消费进入死信队列 模拟生产者发送消息①         RabbitMQ 的消息确认机制应用场景非常广泛,尤其是在需要确保消息可靠性和避免消息丢失的场合下更为重要,例如:金融系统、电

    2024年02月08日
    浏览(28)
  • 207、SpringBoot 整合 RabbitMQ 实现消息的发送 与 接收(监听器)

    1、ContentUtil 先定义常量 2、RabbitMQConfig 创建队列的两种方式之一: 配置式: 在容器中配置 org.springframework.amqp.core.Queue 类型的Bean,RabbitMQ将会自动为该Bean创建对应的队列。 就是在配置类中创建一个生成消息队列的@Bean。 问题: 用 @Configuration 注解声明为配置类,但是项目启动

    2024年02月06日
    浏览(41)
  • RabbitMQ-同步和异步通讯、安装和入门案例、SpringAMQP(5个消息发送接收Demo,jackson消息转换器)

    微服务间通讯有同步和异步两种方式: 同步通讯:就像打电话,需要实时响应。 异步通讯:就像发邮件,不需要马上回复。 两种方式各有优劣,打电话可以立即得到响应,但是你却不能跟多个人同时通话。发送邮件可以同时与多个人收发邮件,但是往往响应会有延迟。 1.

    2024年02月11日
    浏览(23)
  • html写一个向flask_socketio发送消息和接收消息并显示在页面上

    以下是一个简单的HTML页面,它包含一个输入框、一个发送按钮和一个显示区域。用户可以在输入框中输入消息,点击发送按钮,然后这个消息会被发送到 Flask-SocketIO 服务器。当服务器回应消息时,它会在页面的显示区域显示出来。 此外,Flask-SocketIO 服务器端代码可以处理客

    2024年02月11日
    浏览(24)
  • 利用消息中间件RabbitMQ创建队列以及扇出(Fanout)、订阅(Direct)、主题(Topic)交换机来完成消息的发送和监听接收(完整版)

    目录 一、前期项目环境准备 1.1父项目以及子项目 1.2配置pom.xml 1.3配置application.yml 二、扇出(Fanout) 交换机实现消息的发送和接收 2.1编写子项目consumer(消费者,接收消息)的代码实现扇出(Fanout)交换机接收消息 2.1.1consumer子项目结构 2.1.2FanoutConfig类的实现扇出(Fanout)交

    2024年02月05日
    浏览(37)
  • 实战指南:使用Spring Boot实现消息的发送和接收

    当涉及到消息发送和接收的场景时,可以使用Spring Boot和消息中间件RabbitMQ来实现。下面是一个简单的示例代码,展示了如何在Spring Boot应用程序中创建消息发送者和接收者,并发送和接收一条消息。 首先,你需要进行以下准备工作 确保你已经安装了Java和Maven,并设置好相应

    2024年02月11日
    浏览(39)
  • RabbitMQ系列(5)--使用Java实现RabbitMQ的消费者接收消息

    前言:先简单了解RabbitMQ的工作过程,方便后续开发理清思路 简略: 详细: 1、新建消费者类 效果图: 2、编写消费者消费消息的代码 例: 3、查看代码运行结果 运行代码后如果有输出生产者发送的”Hello World”信息,则证明消费者消费消息成功 4、在web页面上查看队列的消

    2024年02月06日
    浏览(32)
  • 使用Java服务器实现UDP消息的发送和接收(多线程)

    在本篇博客中,我们将介绍如何使用Java服务器来实现UDP消息的发送和接收,并通过多线程的方式来处理并发请求。UDP(User Datagram Protocol)是一种无连接、不可靠的传输协议,适合于实时性要求高的应用场景,如实时游戏、语音通信等。 步骤: 首先,我们需要导入Java提供的

    2024年02月12日
    浏览(33)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包