Linux下RabbitMQ

这篇具有很好参考价值的文章主要介绍了Linux下RabbitMQ。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

安装

网址 rabbitmq官网
需要下载一个市erlang环境,因为rabbitmq是用erlang开发的。
·erlang-21.3-1.el7.x86_64.rpm
·rabbitmq-server-3.8.8-1.el7.noarch.rpm

上传

上传到/usr/local下
安装命令:

rpm -ivh erlang-21.3-1.el7.x86_64.rpm #erlang环境
yum install socat -y #socat依赖
rpm -ivh rabbitmq-server-3.8.8-1.el7.noarch.rpm #rabbitmq安装

一些常用命令

添加开机启动 RabbitMQ 服务
chkconfig rabbitmq-server on
启动服务
/sbin/service rabbitmq-server start
查看服务状态
/sbin/service rabbitmq-server status
停止服务(选择执行)
/sbin/service rabbitmq-server stop
开启 web 管理插件
rabbitmq-plugins enable rabbitmq_management

rabbitmq有时候登录会出现用户权限问题导致无法登录
添加新用户:

创建账号
rabbitmqctl add_user admin 123
设置用户角色
rabbitmqctl set_user_tags admin administrator#超级管理员
设置用户权限
set_permissions [-p <vhostpath>] <user> <conf> <write> <read>
rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
用户 user_admin 具有/vhost1 这个 virtual host 中所有资源的配置、写、读权限
当前用户和角色
rabbitmqctl list_users
关闭应用的命令为
rabbitmqctl stop_app
清除的命令为
rabbitmqctl reset
重新启动命令为
rabbitmqctl start_app

依赖引入:

<!--
指定 jdk
编译版本-->
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<!--rabbitmq
依赖客户端-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.8.0</version>
</dependency>
<!--
操作文件流的一个依赖-->
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.6</version>
</dependency>
</dependencies>

测试代码

生产者:

public class Producer {
    public static final String QUEUE_NAME="hello";
    public static final String HOST="47.99.113.73";
    public static final String USER_NAME="admin";
    public static final String PASSWORD="123";
    public static void main(String[] args) throws Exception{
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(HOST);
        factory.setUsername(USER_NAME);
        factory.setPassword(PASSWORD);
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        /**
         * @param queue the name of the queue
         * @param durable true if we are declaring a durable queue (the queue will survive a server restart)
         * @param exclusive true if we are declaring an exclusive queue (restricted to this connection)
         * @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use)
         * @param arguments other properties (construction arguments) for the queue
         * */

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
         /**
         * @param exchange the exchange to publish the message to
         * @param routingKey the routing key
          *@param mandatory true if the 'mandatory' flag is to be set
        *  @param immediate true if the 'immediate' flag is to be
                 set. Note that the RabbitMQ server does not support this flag.
          @param props other properties for the message - routing headers etc
          @param body the message body
                 * */
         String message="hello world!";
        channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
        System.out.println("发送消息完毕!");
    }
}

需要打开服务器5672端口,否则会出现连接超时。15672和5672的区别
有几个细节:ConnectionFactory和channel实现了java.lang.AutoCloseable接口不需要手动关闭;如果队列不存在就会自己创建;有时候会出现消息生产失败也可能是内存大小问题,默认是至少有200M磁盘空闲,可以通过设置disk_free_limit来修改参数。
为什么尝试使用自动关闭资源而不是手动关闭?
“By doing so we would simply make the program move on, close everything, and exit! This would be awkward because we want the process to stay alive while the consumer is listening asynchronously for messages to arrive.”
消费者:

public class Consumer {
    public static final String QUEUE_NAME="hello";
    public static final String HOST="47.99.113.73";
    public static final String USER_NAME="admin";
    public static final String PASSWORD="123";
    public static void main(String[] args) throws Exception{
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(HOST);
        factory.setUsername(USER_NAME);
        factory.setPassword(PASSWORD);
        Channel channel = factory.newConnection().createChannel();
        /**
         * @param queue the name of the queue
         * @param autoAck true if the server should consider messages
         *      acknowledged once delivered; false if the server should expect
         *      explicit acknowledgements
         * @param deliverCallback callback when a message is delivered
         * @param shutdownSignalCallback callback when the channel/connection is shut down
         * */
        channel.basicConsume(QUEUE_NAME, true,
                (String consumerTag, Delivery message)->{
                    //处理消息
                    System.out.println("message:"+new String(message.getBody()));
                },
                (String consumerTag)->{
                    System.out.println("消息被中断;"+consumerTag);
                });
        System.out.println("消息接收完毕!");
    }
}

工作队列

The main idea behind Work Queues (aka: Task Queues) is to avoid doing a resource-intensive task immediately and having to wait for it to complete. Instead we schedule the task to be done later. We encapsulate a task as a message and send it to a queue. A worker process running in the background will pop the tasks and eventually execute the job. When you run many workers the tasks will be shared between them.
工作队列(又称:任务队列)的主要思想是避免立即做一个资源密集型的任务,而不得不等待它完成。相反,我们将任务安排在以后完成。我们将一个任务封装为一条消息,并将其发送到一个队列中。一个在后台运行的工作者进程将弹出任务,并最终执行工作。当你运行许多工作者时,任务将在他们之间共享。
简单来说就是解决大量消息被轮流发放处理。
生产者:

/**
 * 工作队列生产者
 * */
public class Sender {
    public static final String QUEUE_NAME="hello";
    public static void main(String[] args) {
        Channel channel = ChannelUtil.getChannel();
        Scanner sc = new Scanner(System.in);
        while (sc.hasNext()){
            String next = sc.next();
            try {
                channel.queueDeclare(QUEUE_NAME, false,false, false, null);
                channel.basicPublish("", QUEUE_NAME,null, next.getBytes() );
                System.out.println("工作队列1已经发送消息"+next);
            }catch (Exception e){
                System.out.println("发送异常:"+e.getMessage());
            }
        }

    }
}

开启两个消费者:

/**
 * 工作队列处理者,处理生产者产生的大量消息
 * */
public class Worker {
    public static final String QUEUE_NAME="hello";
    public static void main(String[] args) {
        new Thread(()->{
            Channel channel = ChannelUtil.getChannel();

            try {
                channel.basicConsume(QUEUE_NAME, false,
                        (String consumerTag, Delivery message)->{
                            System.out.println("队列1:"+new String(message.getBody()));
                            /**
                             * @param1:消息标记,哪个消息应答了
                             * @param2:取消应答同一信道所有消息
                             * */
                            channel.basicAck(message.getEnvelope().getDeliveryTag(),false );
                        },
                        (String consumerTag)->{});
            }catch (Exception e){
                System.out.println("消息消费异常");
            }
        },"线程1").start();
        new Thread(()->{
            Channel channel = ChannelUtil.getChannel();
            try {
                channel.basicConsume(QUEUE_NAME, false,
                        (String consumerTag, Delivery message)->{
                            System.out.println("队列2:"+new String(message.getBody()));
                            /**
                             * @param1:消息标记,哪个消息应答了
                             * @param2:取消应答同一信道所有消息
                             * */
                            channel.basicAck(message.getEnvelope().getDeliveryTag(),false );
                        },
                        (String consumerTag)->{});
            }catch (Exception e){
                System.out.println("消息消费异常");
            }
        },"线程2").start();
    }
}

rabbitmq消息重新分发机制

在手动应答下,当某个消费者在接收消息后出现宕机等突发情况造成消息丢失但由于,这时候rabbitmq没有接收到手动应答的信号,并没有将该消息丢弃而是转发给其他消费者。

rabbitmq消息持久化

将第二个参数durable改为true即可。

channel.queueDeclare(QUEUE_NAME, true,false, false, null);
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, next.getBytes() );

不公平分发

默认情况下是轮询分发,某些情况下,某些消费者处理消息比较快,使用不公平分发策略消息处理更快一点。

channel.basicQos(1);

预取值

在消费者中提前向其信道中所放的数据数量。指定消费者消费消息数量。

@param prefetchCount maximum number of messages that the server
 will deliver, 0 if unlimited
channel.basicQos(prefetchCount);

未完…文章来源地址https://www.toymoban.com/news/detail-694502.html

到了这里,关于Linux下RabbitMQ的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【Linux】分布式版本控制工具git

    ​ ​📝个人主页:@Sherry的成长之路 🏠学习社区:Sherry的成长之路(个人社区) 📖专栏链接:Linux 🎯 长路漫漫浩浩,万事皆有期待 上一篇博客:【Linux】Linux调试器 - gdb 大家可能用过 Github, Gitee 等代码托管平台。实际上这些平台都是可以通过 git 来进行管理的,而 git 本身

    2024年02月07日
    浏览(45)
  • 【Linux】分布式存储系统 Ceph应用

    提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 1、创建一个名为 rbd-demo 的专门用于 RBD 的存储池 2、将存储池转换为 RBD 模式 3、初始化存储池 4、创建镜像 5、镜像管理 6、Linux客户端使用 7、快照管理 8、镜像的导出导入 1、对象存储概念 对象存储(

    2024年02月16日
    浏览(53)
  • 基于Linux的Hadoop伪分布式安装

    1.1 创建新用户(需注意权限问题:切换为root用户) 1.2 添加新用户hadoop,并设置相关信息(一直回车默认就可以) 1.3 退出当前用户登录hadoop用户(或直接在Ubuntu中切换用户即可) 1.4 以管理员身份(root用户)执行指令visudo,来修改配置 visudo打开的是 /etc/sudoers 文件,修改该

    2024年02月03日
    浏览(41)
  • linux系统zabbix监控分布式监控的部署

    zabbix server端监控到大量zabbix agent端,这样会使zabbix server端压力过大,使用zabbix proxy进行分布式监控 安装工具 安装mysql 导入数据结构 配置proxy端 浏览器配置 这个时候创建主机就可以选择proxy端 选择刚刚创建的agent代理程序

    2024年02月20日
    浏览(50)
  • Linux环境搭建Hadoop及完全分布式集群

    Hadoop是一个开源的分布式计算框架,旨在处理大规模数据集和进行并行计算。核心包括两个组件:HFDS、MapReduce。 配置方案 各虚拟机的用户名分别为test0、test1、test2,主机名为hadoop100、hadoop101、hadoop102 虚拟机的分配如下: hadoop100:NameNode + ResourceManager hadoop101:DataNode + NodeM

    2024年03月23日
    浏览(41)
  • LAXCUS分布式操作系统相比LINUX的优势

    我们官网上一位网友的私信提问:LAXCUS分布式操作系统和LINUX操作系统相比,有什么优势? 答: LAXCUS分布式操作系统做为一种新型的多机操作系统,是操作系统家族的新物种。它最大的特点是聚合能力,能够将海量的物理计算机集合到一起,变成一台物理分散逻辑统一的单台

    2024年02月07日
    浏览(51)
  • Linux | 分布式版本控制工具Git【版本管理 + 远程仓库克隆】

    本文来为读者介绍一下分布式版本控制工具Git,可能你听说过Gitee/GitHub,里面都带有git的字样,那它们和Git之间有什么关联呢❓ Git又是何物,让我们一起走进本文的学习📖 Git(读音为/gɪt/)是一个 开源的分布式版本控制系统 ,可以有效、高速地处理从很小到非常大的项目

    2024年02月01日
    浏览(59)
  • Linux 部署 MinIO 分布式对象存储 & 配置为 typora 图床

    MinIO 是一款高性能的对象存储系统,它可以用于大规模的 AI/ML、数据湖和数据库工作负载。它的 API 与Amazon S3 云存储服务完全兼容,可以在任何云或本地基础设施上运行。MinIO 是 开源软件 ,也提供商业许可和支持 MinIO 的特点有: 简单 :MinIO 的安装和使用都非常简单,只需

    2024年02月07日
    浏览(59)
  • Linux-一篇文章,速通Hadoop集群之伪分布式,完全分布式,高可用搭建(附zookeeper,jdk介绍与安装)。

    文章较长,附目录,此次安装是在VM虚拟环境下进行。文章第一节主要是介绍Hadoop与Hadoop生态圈并了解Hadoop三种集群的区别,第二节和大家一起下载,配置Linux三种集群以及大数据相关所需的jdk,zookeeper,只需安装配置的朋友可以直接跳到文章第二节。同时,希望我的文章能帮

    2024年03月19日
    浏览(53)
  • Linux分布式应用 Zabbix监控软件实例:监控NGINX的性能

        测试 访问设置的网站 获取NGINX状态统计 过滤结果获取需要的值     服务器端验证     ​​​创建模板 点击左边菜单栏【配置】中的【模板】,点击【创建模板】 【模板名称】设置成 Template NGINX Status 【可见的名称】设置成 Template NGINX Status 【群组】选择 Template 【描述

    2024年02月15日
    浏览(51)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包