RabbitMQ帮助类的封装

这篇具有很好参考价值的文章主要介绍了RabbitMQ帮助类的封装。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

RabbitMQ帮助类的封装

基本部分

public class RabbitMQInvoker
{

    #region Identy 
    private static IConnection _CurrentConnection = null;
    private readonly string _HostName = null;
    private readonly string _UserName = null;
    private readonly string _Password = null;
    #endregion

    public RabbitMQInvoker(string hostName = "localhost", string userName = "guest", string password = "guest")
    {
        this._HostName = hostName;
        this._UserName = userName;
        this._Password = password;
    }

    ......
}

初始化链接

	#region 初始化链接 
    private static object RabbitMQInvoker_InitLock = new object();
    private void InitConnection()
    {
        if (_CurrentConnection == null || !_CurrentConnection.IsOpen)
        {
            lock (RabbitMQInvoker_InitLock)
            {
                if (_CurrentConnection == null || !_CurrentConnection.IsOpen)
                {
                    var factory = new ConnectionFactory()
                    {
                        HostName = this._HostName,
                        Password = this._Password,
                        UserName = this._UserName
                    };
                    _CurrentConnection = factory.CreateConnection();
                }
            }
        }
    }
    #endregion

初始化交换机

	#region 初始化交换机 
    private static Dictionary<string, bool> RabbitMQInvoker_ExchangeQueue = new Dictionary<string, bool>();
    private static object RabbitMQInvoker_BindQueueLock = new object();
    /// <summary>
    /// 必须先声明exchange--检查+初始化
    /// </summary>
    /// <param name="rabbitMQConsumerModel"></param>
    private void InitExchange(string exchangeName)
    {
        if (!RabbitMQInvoker_ExchangeQueue.ContainsKey($"InitExchange_{exchangeName}"))//没用api确认
        {
            lock (RabbitMQInvoker_BindQueueLock)
            {
                if (!RabbitMQInvoker_ExchangeQueue.ContainsKey($"InitExchange_{exchangeName}"))
                {
                    this.InitConnection();
                    using (IModel channel = _CurrentConnection.CreateModel())
                    {
                        channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Fanout, durable: true, autoDelete: false, arguments: null);
                    }
                    RabbitMQInvoker_ExchangeQueue[$"InitExchange_{exchangeName}"] = true;
                }
            }
        }
    }

    /// <summary>
    /// 初始化绑定关系
    /// </summary>
    /// <param name="rabbitMQConsumerModel"></param>
    private void InitBindQueue(RabbitMQConsumerModel rabbitMQConsumerModel)
    {
        if (!RabbitMQInvoker_ExchangeQueue.ContainsKey($"InitBindQueue_{rabbitMQConsumerModel.ExchangeName}_{rabbitMQConsumerModel.QueueName}"))
        {
            lock (RabbitMQInvoker_BindQueueLock)
            {
                if (!RabbitMQInvoker_ExchangeQueue.ContainsKey($"InitBindQueue_{rabbitMQConsumerModel.ExchangeName}_{rabbitMQConsumerModel.QueueName}"))
                {
                    this.InitConnection();
                    using (IModel channel = _CurrentConnection.CreateModel())
                    {
                        channel.ExchangeDeclare(exchange: rabbitMQConsumerModel.ExchangeName, type: ExchangeType.Fanout, durable: true, autoDelete: false, arguments: null);
                        channel.QueueDeclare(queue: rabbitMQConsumerModel.QueueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
                        channel.QueueBind(queue: rabbitMQConsumerModel.QueueName, exchange: rabbitMQConsumerModel.ExchangeName, routingKey: string.Empty, arguments: null);
                    }
                    RabbitMQInvoker_ExchangeQueue[$"InitBindQueue_{rabbitMQConsumerModel.ExchangeName}_{rabbitMQConsumerModel.QueueName}"] = true;
                }
            }
        }
    }
    #endregion

发送信息

	#region 发送消息
    /// <summary>
    /// 只管exchange---
    /// 4种路由类型?
    /// 
    /// Send前完成交换机初始化
    /// </summary>
    /// <param name="exchangeName"></param>
    /// <param name="message">建议Json格式</param>
    public void Send(string exchangeName, string message)
    {
        this.InitExchange(exchangeName);

        if (_CurrentConnection == null || !_CurrentConnection.IsOpen)
        {
            this.InitConnection();
        }
        using (var channel = _CurrentConnection.CreateModel())//开辟新的信道通信
        {
            try
            {
                channel.TxSelect();//开启Tx事务---RabbitMQ协议级的事务-----强事务

                var body = Encoding.UTF8.GetBytes(message);
                channel.BasicPublish(exchange: exchangeName,
                                     routingKey: string.Empty,
                                     basicProperties: null,
                                     body: body);
                channel.TxCommit();//提交
                Console.WriteLine($" [x] Sent {body.Length}");
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex.Message);
                Console.WriteLine($"【{message}】发送到Broker失败!{ex.Message}");
                channel.TxRollback(); //事务回滚--前面的所有操作就全部作废了。。。。
            }
        }
    }
    #endregion

接收信息

	#region Receive
    /// <summary>
    /// 注册处理动作
    /// </summary>
    /// <param name="rabbitMQConsumerMode"></param>
    /// <param name="func"></param>
    public void RegistReciveAction(RabbitMQConsumerModel rabbitMQConsumerMode, Func<string, bool> func)
    {
        this.InitBindQueue(rabbitMQConsumerMode);

        Task.Run(() =>
        {
            using (var channel = _CurrentConnection.CreateModel())
            {
                var consumer = new EventingBasicConsumer(channel);
                channel.BasicQos(0, 0, true);
                consumer.Received += (sender, ea) =>
                {
                    string str = Encoding.UTF8.GetString(ea.Body.ToArray());
                    if (func(str))
                    {
                        channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);//确认已消费
                    }
                    else
                    {
                        channel.BasicReject(deliveryTag: ea.DeliveryTag, requeue: true);//放回队列--重新包装信息,放入其他队列
                    }
                };
                channel.BasicConsume(queue: rabbitMQConsumerMode.QueueName,
                                     autoAck: false,//不ACK
                                     consumer: consumer);
                Console.WriteLine($" Register Consumer To {rabbitMQConsumerMode.ExchangeName}-{rabbitMQConsumerMode.QueueName}");
                Console.ReadLine();
                Console.WriteLine($" After Register Consumer To {rabbitMQConsumerMode.ExchangeName}-{rabbitMQConsumerMode.QueueName}");
            }
        });
    }
    #endregion

文章来源地址https://www.toymoban.com/news/detail-600194.html

到了这里,关于RabbitMQ帮助类的封装的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • ASP.Net Core 中startup 类的configservice方法的作用?

    简述: ConfigureServices ,就是配置服务器的DI容器,可以添加一些service进入依赖注入容器。 详解: 把需要的中间件等一些东西添加到DI容器 最后都是添加到IServiceCollection里面 比如 对于.AddProfileService() 已经内置了一个默认实现IProfileService接口的一个类 默认会注入内置的(Defau

    2024年02月16日
    浏览(50)
  • .net使用RabbitMQ小记

    使用RabbitMQ的优点         1.性能全面,rabbitmq性能比较全面,是消息中间件的首选         2.高并发,rabbitmq实现语言是天生就具备高并发高可用的erlang语言         3.任务异步处理,将不需要同步处理的并且耗时长的操作由消息队列通知消息接受方进行异步处理,提高了应

    2024年02月11日
    浏览(39)
  • .NET CORE消息队列RabbitMQ

    目录 1.消息队列概述 2.消息队列的特点及应用场景 3.RabbitMQ 3.1 初始RabbitMQ 3.2 Docker安装RabbitMQ 3.3 RabbitMQ中的六大队列模式 3.3.1 简单队列模式: 最简单的工作队列,其中一个消息生产者,一个消息消费者,一个队列。也称为点对点模式。  3.3.2 工作队列:一个消息生产者,一

    2024年04月28日
    浏览(34)
  • .NET 中使用RabbitMQ初体验

    逛园子的时候看到一篇.NET 学习RabbitMq的文章(视频地址和文章地址放在文章底部了),写的不错,我也来实现一下。 我是把RabbitMQ放在服务器的,然后先说一下如何部署它。 注意:在使用到RabbitMQ的项目中需要安装Nuget包 添加management才能有web控制台 ip地址加15672端口访问 拉

    2024年02月16日
    浏览(51)
  • 01_在NET中使用RabbitMQ

    1.Linux上安装Docken   删除docker-ce命令:yum remove docker-ce   删除镜像、容器、配置文件等内容   rm -rf /var/lib/containerd   rm -rf /var/lib/docker   docker search --镜像名 搜索仓库镜像   docker pull --镜像名 拉取镜像   docker ps 查看目前正在运行的所有容器 (-a 显示包括已经停止的容器)  

    2024年04月11日
    浏览(31)
  • .Net全网最简RabbitMQ操作【强烈推荐】

    【前言】 本文自1年前的1.0版本推出以来,已被业界大量科技公司采用。同时也得到了.Net圈内多位大佬的关注+推荐,文章也被多家顶级.Net/C#公众号转载。 现在更新到了7.0版本,更好的服务各位.Neter。   【正文】 支持.Net/.Net Core/.Net Framework,可以部署在Docker, Windows, Linux, Ma

    2024年02月08日
    浏览(46)
  • rabbitmq载在.net中批量消费的问题记录

    最近遇到了一个问题,在使用rabbitmq的时候出现了丢消息、消息重复消费等一系列的问题,使用的是.net框架,背景是高并发压力下的mq消费,按理说即使队列中堆了几百条消息,我客户端可以同处理5个消息。 原因是多线程同时处理时导致的内存混乱。 官方文档已经解释的很

    2024年02月10日
    浏览(31)
  • .NET中使用RabbitMQ延时队列和死信队列

    延时队列是RabbitMQ中的一种特殊队列,它可以在消息到达队列后延迟一段时间再被消费。 延时队列的实现原理是通过使用消息的过期时间和死信队列来实现。当消息被发送到延时队列时,可以为消息设置一个过期时间,这个过期时间决定了消息在延时队列中等待的时间。如果

    2024年02月15日
    浏览(40)
  • .Net6 记一次RabbitMq消息订阅/发布优化

             首先介绍一下项目情况,项目需要设备在线实时采集,最高采集频率为1次/秒,设备上传数据时,协议规定的是10条/包,服务端通过rabbitMq接收消息,并进行存储、预警、推送等进行多层处理,因为web端要求数据实时展示,且延时不得超过1分钟,因数据量较大,

    2024年01月18日
    浏览(46)
  • (五)「消息队列」之 RabbitMQ 主题(使用 .NET 客户端)

    先决条件 本教程假设 RabbitMQ 已安装并且正在 本地主机 的标准端口( 5672 )上运行。如果您使用了不同的主机、端口或凭证,则要求调整连接设置。 获取帮助 如果您在阅读本教程时遇到问题,可以通过邮件列表或者 RabbitMQ 社区 Slack 与 RabbitMQ 官方取得联系。 在上一篇教程中

    2024年02月16日
    浏览(54)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包