Linux安装RabbitMQ,怎么使用RabbitMQ

这篇具有很好参考价值的文章主要介绍了Linux安装RabbitMQ,怎么使用RabbitMQ。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1.为什么使用MQ(消息队列)、RabbitMQ特点

MQ(消息队列),典型的生产者消费者模式,生产者不断向消息队列发送消息,消费者不断从消息队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入,轻松实现了系统之间的解耦合。通过高效的可靠的消息传递机制进行系统之间的通信来实现分布式系统。

RabbitMQ官网:https://www.rabbitmq.com/
RabbitMQ官方文档:http://www.weicot.com/dev/guides/v2.0/install-gde/prereq/install-rabbitmq.html
RabbitMQ的Java客户端文档:https://www.rabbitmq.com/api-guide.html

RabbitMQ是实现了高级消息队列(AMQP)的开源消息代理软件(消息中间件).RabbitMQ服务器是用Erlang语言编写的,而集群和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。

优点:

  1. 异步处理:消息队列的异步处理机制,不需要立即处理共享数据带来的业务,可以通过消息队列来控制业务的执行。
  2. 解耦:传统的开发模式中,各个模块之间相互调用,数据共享,每个模块都需要关注其他模块是否挂掉。使用消息队列,可以将共享数据放在消息队列中,新增业务模块,业务模块可以订阅该消息,对原有的系统业务没有任何影响,降低了各个系统之间的耦合度,提高系统的可扩展性。
  3. 流量削峰:对服务器的请求通过消息队列再次拦截处理。
  4. 日志处理:通过对消息队列发送消息,来处理日志。
  5. 消息通讯:

缺点:

  1. 系统可用性降低
  2. 系统的复杂度提高
  3. 消息一致性问题
  4. 消息顺序消费问题
  5. 消息重复问题

Kafka、RocketMQ、RabbitMQ、ActiveMQ之间对比

Kafka RocketMQ RabbitMQ ActiveMQ
单机吞吐量 10万级 10万级 万级 万级
开发语言 Scala Java Erlang Java
高可用 分布式架构 分布式架构 主从架构 主从架构
性能 ms ms us ms
功能 简单的MQ功能 分布式扩展性好,解决了顺序消息 并发高,性能高,延时低 功能完备

2.RabbitMQ的基本概念

  • Broker:消息队列服务器实体。
  • Exchange:消息交换机,消息接收规则,消息发送到哪个队列。
  • Queue:消息的载体,存放消息,队列。
  • Binding:绑定Exchange和Queue路由绑定
  • Routing key:路由关键字,Exchange根据这个投递消息到哪个队列
  • Channel:消息通道,客户端每个连接可以建立通道,一个通道代表一次会话任务。
  • Vhost:一个Broker可以有多个Vhost,Vhost可以有不同的Exchange、Queue,不同模块对应不同的Vhost。
  • Producer:消息生产者。
  • Consumer:消息消费者。

3.Linux系统安装RabbitMQ

Erlang版本和RabbitMQ版本对应关系:https://www.rabbitmq.com/which-erlang.html
Erlang下载官网:https://www.erlang.org/downloads

3.1Erlang环境安装

rpm -qa | grep erlang | xargs rpm -e --nodeps #卸载之前安装的erlang
find [path] -name [filename] 查找文件
netstat -anop | grep 80 #查看80相关的端口是否被占用

yum install -y gcc gcc-c++ unixODBC-devel  openssl-devel ncurses-devel #安装编译器
wget https://github.com/erlang/otp/releases/download/OTP-23.3.4.2/otp_src_23.3.4.2.tar.gz
tar -zxvf otp_src_23.3.4.2.tar.gz #解压文件
mv otp_src_23.3.4.2 /usr/local/ #移动解压文件
cd /usr/local/otp_src_23.3.4.2/ #进入解压的目录
mkdir ../erlang #新建安装目录
./configure --prefix=/usr/local/erlang #配置安装路径
make install #安装
echo 'export PATH=/usr/local/erlang/bin:$PATH' >> /etc/profile # 配置erlang环境变量
source /etc/profile #更新配置文件
erl -v # 测试是否安装成功

linux 启动rabbitmq,java-rabbitmq,rabbitmq,linux

3.2RabbiMQ安装

wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.10.6/rabbitmq-server-3.10.6-1.el8.noarch.rpm #下载安装包
rpm -ivh rabbitmq-server-3.10.6-1.el8.noarch.rpm #如果提示安装的erlang环境低,但是自己安装erlang环境是正确的是,添加参数--nodeps
rpm -ivh --nodeps rabbitmq-server-3.10.6-1.el8.noarch.rpm
systemctl start rabbitmq-server
systemctl status rabbitmq-server.service #查看rabbitmq启动状态
journalctl -xe 查询启动日志,发现没有erl环境
vim /usr/lib/rabbitmq/bin/rabbitmq-server #添加erl环境至文件中
export PATH=$PATH:/usr/local/erlang/bin
systemctl start rabbitmq-server # 启动rabbitmq
systemctl status rabbitmq-server #查看rabbitmq状态

注意:在没有安装可视化插件时,访问端口号没有页面

linux 启动rabbitmq,java-rabbitmq,rabbitmq,linux
linux 启动rabbitmq,java-rabbitmq,rabbitmq,linux
linux 启动rabbitmq,java-rabbitmq,rabbitmq,linux

3.3启动rabbitmq并且配置可视化web插件

systemctl start rabbitmq-server
rabbitmq-plugins enable rabbitmq_management #执行插件安装命令
systemctl restart rabbitmq-server #重启服务

#默认本地登陆账户guest 密码guest,远程无法访问

以系统命令方式启动,自动为节点命名为rabbit@hostname,会自动在/var/lib/rabbitmq/mnesia目录下生成对应的文件。使用rabbitmqctl stop关闭服务,会自动删除对应的rabbit@hostnane.pid文件,里面对应的每次会不一样。使用systemctl stop rabbitmq-server也会删除pid文件。这里的pid文件和redis启动生成的pid文件应该类似。
linux 启动rabbitmq,java-rabbitmq,rabbitmq,linux

linux 启动rabbitmq,java-rabbitmq,rabbitmq,linux

3.4账户管理

rabbitmqctl add_user <username> <password> #添加账号rabbitmqctl add_user admin <Aliyunoycm1234>
rabbitmqctl set_user_tags <username> <administrator> #设置账号adminstrator权限,只有4种权限
rabbitmqctl set_permissions -p / <username>".*" ".*" ".*" #授予文件管理权限
rabbitmqctl change_password <username> <password> #修改密码
rabbitmqctl delete_user <username> #删除用户
rabbitmqctl list_users #查看所用用户
rabbitmqctl add_user admin 1234 #添加一个用户
rabbitmqctl set_user_tags admin administrator #设置权限

设置权限对应失败,会导致无法登陆,提示Not management user(非管理用户)
用户权限

  1. administrator
  2. monitoring
  3. policymaker
  4. managment
  5. none

3.5卸载RabbitMQ

linux 启动rabbitmq,java-rabbitmq,rabbitmq,linux

yum命令安装的软件,yum remove 软件名字
rpm命令安装的软件,rpm -e --nodeps 软件名 --nodeps表示忽略依赖关系卸载
tar包安装,使用make uninstall 软件名

yum list | grep rabbitmq #查出yum安装的软件
yum list | grep rabbitmq | xargs yum remove

linux 启动rabbitmq,java-rabbitmq,rabbitmq,linux

3.6 docker安装RabbitMQ

docker pull rabbitmq:management #拉取镜像
docker run -di --name my-rabbitmq -e RABBITMQ_DEFUALT_USER=oycm -e RABBITMQ_DEFAULT_PASS=admin1234 -p 15672:15672 -p 5672:5672 -p 25672:25672 rabbitmq:management #-e配置,并不能直接访问可视化界面
# 15672是可视化web端口
# 5672是RabbitMQ消息接收中心端口,消息获取和发送的端口

3.7 RabbitMQ-server命令

sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged #查看所有队列中的消息确认情况
rabbitmqctl list_exchanges #查看server的交换机name,type
rabbitmqctl list_bindings #查看绑定关系

4.RabbitMQ的6种消息发布模式

4.1、HelloWorld(Simple模式)

linux 启动rabbitmq,java-rabbitmq,rabbitmq,linux

<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client  客户端依赖 -->
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.15.0</version>
</dependency>

linux 启动rabbitmq,java-rabbitmq,rabbitmq,linux

linux 启动rabbitmq,java-rabbitmq,rabbitmq,linux

连接工具类

public class FactoryUtil {
    public static ConnectionFactory newFactory(){
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("43.143.86.174");
        factory.setUsername("admin");
        factory.setPassword("oycm1234");
        factory.setVirtualHost("/");
        factory.setPort(5672);
        return factory;
    }
}

Connection conn = factory.newConnection(addrArr);
//可以指定连接时使用的端点列表。将使用第一个可到达的端点。在连接失败的情况下,使用端点列表使应用程序可以在原始节点关闭时连接到不同的节点。

//Uri连接
factory.setUri("amqp://userName:password@hostName:portNumber/virtualHost");
factory.newConnection();
amqp_URI = "amqp://" + amqp_authority + [ "/" vhost ] [ "?" query ];
amqp_authority = [amqp_userinfo"@"] + host + [":" port];
amqp_userinfo = username [":" password];
//eg:amqp://admin:1234@hostName(ip):5672/%2f; %2f代表/

消息生产者

public class HelloWorld {

    private final static String QUEUE_NAME = "hello_world";

    public static void main(String[] args) {

        ConnectionFactory factory = FactoryUtil.newFactory();
        try(
                Connection connection = factory.newConnection();
                Channel channel = connection.createChannel()
        ){
            channel.queueDeclare(QUEUE_NAME,true,false,false,null);

            Scanner in = new Scanner(System.in);
            while (in.hasNext()){
                String message = in.next();
                channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
                System.out.println(message + ",线程===>" + Thread.currentThread().getName());
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        } catch (TimeoutException e) {
            throw new RuntimeException(e);
        }
    }
}
//通过输入流不断开连接,可以一直往RabbitMQ-server发送消息

消息消费者

public class HelloWorldReceive {

    private final static String QUEUE_NAME = "hello_world";

    public static void main(String[] args) throws IOException, TimeoutException {

        ConnectionFactory factory = FactoryUtil.newFactory();
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);

        System.out.println("等待接收消息");

        DeliverCallback deliverCallback = (String consumerTag, Delivery message)->{
            System.out.println("接收的消息是:======> " + new String(message.getBody()));
            System.out.println("comsumerTag ===> " + consumerTag);
            System.out.println("线程 ==>" + Thread.currentThread().getName());
        };

        channel.basicConsume(QUEUE_NAME,deliverCallback,(consumerTag)->{
            System.out.println("取消回调 ===> " + consumerTag);
            System.out.println("线程 ===> " + Thread.currentThread().getName());
        });
		System.out.println("main 方法结束");
    }
}
/*通过接收消息,对线程名字的输入,channel.basicConsume会异步开启一个线程池,去监听RabbitMQ-server指定的队列,根据
发生的事件,指定调用哪个回调方法。

而且在这个生产者和消费者之间,消息被消费者接收之后,消息现在的状态是Unacked(没有被确定消费了的),这个时候关掉
consumer,再次启动就会发现还会继续消费一次。这与调用basicConsume方法传递的参数有关,也就是方法重载,不同方法不同效
果。
*/

4.2、Work queues(工作模式)

linux 启动rabbitmq,java-rabbitmq,rabbitmq,linux
Work queues(工作模式)是,多个worker处理一个对应队列的消息,可以通过设置prefetch设置每个worker处理的消息数量来控制。默认情况下是轮询调度,不管消费者的处理能力,平均分配消息,如果有个worker处理能力快,消息处理完了,也不会处理多的消息(因为消息事先已经分配好了)。
设置的prefetch对应消费者的消费能力,加入总共3条消息,a消费者设置3,b消费者设置1,这是a消费者虽然处理消息能力慢,但是还是会获取2条消息。

  1. 消息持久化(队列的持久化,消息持久化)
  2. 消息持久化的可靠性(publisher confirm),确定消息发布成功
  3. 使用消息确认和预取计数,您可以设置工作队列,来确保能顺序消费。

生产者生产消息

public class Task {
    private final static String QUEUE_NAME = "worker";
    public static void main(String[] args) {
        ConnectionFactory factory = FactoryUtil.newFactory();
        try(Connection connection = factory.newConnection();
            Channel channel = connection.createChannel()
        ){
            channel.queueDeclare(QUEUE_NAME,true,false,false,null);
            Scanner in = new Scanner(System.in);
            while (in.hasNext()){
                System.out.println(in.next());
                for (int i = 1; i < 11; i++) {
                    channel.basicPublish("",QUEUE_NAME,null,Integer.toString(i).getBytes());
                }
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        } catch (TimeoutException e) {
            throw new RuntimeException(e);
        }

    }
}

worker1消费者

public class Worker1 {
    private final static String QUEUE_NAME = "worker";
    public static void main(String[] args) throws IOException, TimeoutException {

        ConnectionFactory factory = FactoryUtil.newFactory();
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.basicQos(1);
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);
        DeliverCallback deliverCallback = (consumerTag,message)->{
            String m = new String(message.getBody());
            try {
                Thread.sleep(Integer.parseInt(m)*20000);
                System.out.println("消息:" + m + ",线程===> " + Thread.currentThread().getName());
               	//手动确认消息使用的通道必须和监听队列的通道是一致的,不然会报错并且关闭通道
                channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        };
        channel.basicConsume(QUEUE_NAME,false,deliverCallback,(consumerTag)->{
            System.out.println("取消回调");
        });

    }
}

worker2消费者

public class Worker2 {
    private final static String QUEUE_NAME = "worker";
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = FactoryUtil.newFactory();
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);
        channel.basicQos(1);
        DeliverCallback deliverCallback = (consumerTag,message)->{
            String m = new String(message.getBody());
            try {
                Thread.sleep(Integer.parseInt(m)*10000);
                System.out.println("消息:" + m + ",线程===> " + Thread.currentThread().getName());
                channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        };
        channel.basicConsume(QUEUE_NAME,false,deliverCallback,(consumerTag)->{
            System.out.println("取消回调");
        });

    }
}

4.3、Publish/Subscribe(发布订阅模式)

linux 启动rabbitmq,java-rabbitmq,rabbitmq,linux

客户端不直接将消息发送至队列,而是发送到交换机,交换机通过和队列的绑定关系,再将消息发送到队列。发布订阅模式要声明的交换机是fanout模式,在将创建的队列和交换机绑定关系到对应的路由key(空串就可以了),发送消息至交换机绑定的路由key,就可以实现,一次发送,多次接收。排他队列不能在连接中删除

Publish生产者

public class Publisher {
    private static final String EXCHANGE_NAME = "fanout_";
    public static void main(String[] args) {
        ConnectionFactory factory = FactoryUtil.newFactory();
        try(Connection connection = factory.newConnection();
            Channel channel = connection.createChannel()
        ){
            channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
            Scanner in = new Scanner(System.in);
            while (in.hasNext()){
                String str = in.nextLine();
                channel.basicPublish(EXCHANGE_NAME,"",null,str.getBytes());//这里不声明队列关系,通过消费者声明队列并绑定关系
            }

        } catch (IOException e) {
            throw new RuntimeException(e);
        } catch (TimeoutException e) {
            throw new RuntimeException(e);
        }
    }
}

Subscribe订阅者

public class Subscribe1 {
    private static final String EXCHANGE_NAME = "fanout_";
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = FactoryUtil.newFactory();
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
        String queue = channel.queueDeclare().getQueue();//声明临时队列,不持久,排他,自动删除
        channel.queueBind(queue,EXCHANGE_NAME,"");
        DeliverCallback deliverCallback = ((consumerTag, message) -> {
            System.out.println(new String(message.getBody()) + ", 线程====>" + Thread.currentThread().getName());
            channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
        });
        channel.basicConsume(queue,false,deliverCallback,(consumerTag)->{
            System.out.println("回调");
        });
    }
}

public class Subscribe2 {
    private static final String EXCHANGE_NAME = "fanout_";
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = FactoryUtil.newFactory();
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
        String queue = channel.queueDeclare().getQueue();//声明临时队列,不持久,排他,自动删除
        channel.queueBind(queue,EXCHANGE_NAME,"");
        DeliverCallback deliverCallback = ((consumerTag, message) -> {
            System.out.println(new String(message.getBody()) + ", 线程====>" + Thread.currentThread().getName());
        });
        channel.basicConsume(queue,false,deliverCallback,(consumerTag)->{
            System.out.println("回调");
        });
    }
}

4.4、Routing(路由模式)

linux 启动rabbitmq,java-rabbitmq,rabbitmq,linux
路由key模式是对发布订阅模式的补充,如果队列绑定的key对应的都相同,那往这个交换机绑定的队列发送消息就和发布订阅模式没有什么不同,direct模式可以声明多种路由key和队列之间的关系。
linux 启动rabbitmq,java-rabbitmq,rabbitmq,linux
生产者

public class Direct {
    private final static String EXCHANGE_NAME = "direct_";

    public static void main(String[] args) {
        ConnectionFactory factory = FactoryUtil.newFactory();
        try(Connection connection = factory.newConnection();
            Channel channel = connection.createChannel()
        ){
            channel.exchangeDeclare(EXCHANGE_NAME,"direct");
            Scanner in = new Scanner(System.in);
            while (in.hasNext()){
                String str = in.nextLine();
                if (str.contains("error")){
                    channel.basicPublish(EXCHANGE_NAME,"error",null,str.getBytes());//控制台输入包含error,发送到error路由键
                }else {
                    channel.basicPublish(EXCHANGE_NAME,"log",null,str.getBytes());
                }
            }

        } catch (IOException e) {
            throw new RuntimeException(e);
        } catch (TimeoutException e) {
            throw new RuntimeException(e);
        }
    }
}

消费者

public class DirectReceive {
    private final static String EXCHANGE_NAME = "direct_";
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = FactoryUtil.newFactory();
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare("log",true,false,false,null);
        channel.queueDeclare("error",true,false,false,null);
        channel.queueBind("log",EXCHANGE_NAME,"log");
        channel.queueBind("error",EXCHANGE_NAME,"error");
        DeliverCallback deliverCallback = (consumerTag,delivery)->{
            String str = new String(delivery.getBody());
            if (str.contains("error")){
                System.out.println(str);
                System.exit(0);//error队列接收到消息退出
            }else {
                System.out.println(str + " ===== " + Thread.currentThread().getName());
            }
        };

        new Thread(()->{
            try {
                Connection connection1 = factory.newConnection();
                Channel channel1 = connection1.createChannel();
                channel1.basicConsume("log",true,deliverCallback,(c)->{});
            } catch (IOException e) {
                throw new RuntimeException(e);
            } catch (TimeoutException e) {
                throw new RuntimeException(e);
            }
        }).start();

        new Thread(()->{
            try {
                Connection connection2 = factory.newConnection();
                Channel channel2 = connection2.createChannel();
                channel2.basicConsume("error",true,deliverCallback,(c)->{});
            } catch (IOException e) {
                throw new RuntimeException(e);
            } catch (TimeoutException e) {
                throw new RuntimeException(e);
            }
        }).start();
    }
}

4.5、Topics(Topic主题模式)

linux 启动rabbitmq,java-rabbitmq,rabbitmq,linux
虽然路由键kern.critical和topic_4和交换机的绑定都对应上了,但是这个队列只能接收1条消息。
生产者

public class Topic {
    private static final String EXCHANGE_NAME = "topic_";

    public static void main(String[] args) {
        ConnectionFactory factory = FactoryUtil.newFactory();
        try(Connection connection = factory.newConnection();
            Channel channel = connection.createChannel()
        ){
            channel.exchangeDeclare(EXCHANGE_NAME,"topic");
            channel.queueDeclare("topic_1",true,false,false,null);
            channel.queueDeclare("topic_2",true,false,false,null);
            channel.queueDeclare("topic_3",true,false,false,null);
            channel.queueDeclare("topic_4",true,false,false,null);
            channel.queueBind("topic_1",EXCHANGE_NAME,"#");
            channel.queueBind("topic_2",EXCHANGE_NAME,"kern.*");
            channel.queueBind("topic_3",EXCHANGE_NAME,"*.critical");
            channel.queueBind("topic_4",EXCHANGE_NAME,"kern.*");
            channel.queueBind("topic_4",EXCHANGE_NAME,"*.critical");
            Scanner in = new Scanner(System.in);
            while (in.hasNext()){
                String str = in.nextLine();
                channel.basicPublish(EXCHANGE_NAME,"kern.critical",null,str.getBytes());
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        } catch (TimeoutException e) {
            throw new RuntimeException(e);
        }
    }
}

消费者

public class TopicReceive {
    private static final String EXCHANGE_NAME = "topic_";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = FactoryUtil.newFactory();
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME,"topic");
        channel.queueDeclare("topic_1",true,false,false,null);
        channel.queueDeclare("topic_2",true,false,false,null);
        channel.queueDeclare("topic_3",true,false,false,null);
        channel.queueDeclare("topic_4",true,false,false,null);

        DeliverCallback deliverCallback = ((consumerTag, message) -> {
            String msg = new String(message.getBody());
            System.out.println("消息:"+msg + " . Envelope:" + message.getEnvelope());
            System.out.println(Thread.currentThread().getName());
        });

        channel.basicConsume("topic_1",true,deliverCallback,(consumerTag) ->{});
        channel.basicConsume("topic_2",true,deliverCallback,(consumerTag) ->{});
        channel.basicConsume("topic_3",true,deliverCallback,(consumerTag) ->{});
        channel.basicConsume("topic_4",true,deliverCallback,(consumerTag) ->{});
    }

}

4.6、Publish Confirms(发布确认模式)

发布者确认模式是RabbitMQ为了实现可靠发布的扩展,在通道上启用发布者确认时,客户端发布的消息将由代理异步确认。

单条消息确认

public class PublishConfirmSync {
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = FactoryUtil.newFactory();
        try(Connection connection = factory.newConnection();
            Channel channel = connection.createChannel()
        ){
            channel.confirmSelect();
            channel.queueDeclare("queue",true,false,false,null);
            channel.queuePurge("queue");
            String queue = "queue";
            System.out.println(LocalDateTime.now());
            for (int i = 0; i < 200; i++) {
                channel.basicPublish("",queue,null,(""+i).getBytes());
                channel.waitForConfirmsOrDie();
            }
            System.out.println(LocalDateTime.now());
        }

    }
}
//发送消息非常慢,200条消息都需要15s

批量消息确认

public class PublishConfirmSync {
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = FactoryUtil.newFactory();
        try(Connection connection = factory.newConnection();
            Channel channel = connection.createChannel()
        ){
            channel.confirmSelect();
            channel.queueDeclare("queue",true,false,false,null);
            channel.queuePurge("queue");
            int batchSize = 100;
            int count = 0;
            System.out.println(LocalDateTime.now());
            for (int i = 0; i < 500; i++) {
                channel.basicPublish("","queue",null,(""+i).getBytes());
                count++;
                if (count == batchSize){
                    channel.waitForConfirmsOrDie(5000);//批量确认
                    count = 0;
                }
            }
            if (count > 0){
                channel.waitForConfirmsOrDie(5000);
            }
            System.out.println(LocalDateTime.now());
        }

    }
}
//无法确定哪次发送消息出现问题

异步确定消息

public class PublishConfirmASync {
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = FactoryUtil.newFactory();
        try(Connection connection = factory.newConnection();
            Channel channel = connection.createChannel()
        ){
            String queue = UUID.randomUUID().toString();
            channel.queueDeclare(queue, false, false, true, null);
            channel.confirmSelect();//开启发布确认模式
            ConcurrentNavigableMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();
			//处理单次确认或者多次确认,类似basicAck手动确认,false确认一条消息,true确认这个之前的所有消息
            ConfirmCallback cleanOutstandingConfirms = (sequenceNumber, multiple) -> {
                System.out.println(sequenceNumber + "=======" + multiple);
                //为什么成功会有false,false只表示确认一条消息,true表示确认这个序列之前的所有消息
                if (multiple) {
                    //headMap(key,boolean),key表示一个标志位,true表示是否包含这个标志位
                    ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(sequenceNumber, true);
                    confirmed.clear();
                } else {
                    outstandingConfirms.remove(sequenceNumber);
                }
            };

            channel.addConfirmListener(cleanOutstandingConfirms, (sequenceNumber, multiple) -> {
                System.out.println(sequenceNumber + "=======" + multiple);
                String body = outstandingConfirms.get(sequenceNumber);
                System.err.format(
                        "Message with body %s has been nack-ed. Sequence number: %d, multiple: %b%n",
                        body, sequenceNumber, multiple
                );
                cleanOutstandingConfirms.handle(sequenceNumber, multiple);
            });

            long start = System.nanoTime();
            for (int i = 0; i < 50000; i++) {
                String body = String.valueOf(i);
                outstandingConfirms.put(channel.getNextPublishSeqNo(), body);
                //getNextPublishSeqNo()发布确认模式下的下一条消息的序列号,其他模式下一直为0
                channel.basicPublish("", "worker", null, body.getBytes());
            }

            if (!waitUntil(Duration.ofSeconds(60), () -> outstandingConfirms.isEmpty())) {
                throw new IllegalStateException("All messages could not be confirmed in 60 seconds");
            }

            long end = System.nanoTime();

            System.out.println(Duration.ofNanos(end - start).toMillis());
        }
    }

    static boolean waitUntil(Duration timeout, BooleanSupplier condition) throws InterruptedException {
        int waited = 0;
        while (!condition.getAsBoolean() && waited < timeout.toMillis()) {
            Thread.sleep(100L);
            waited = +100;
        }
        return condition.getAsBoolean();
    }
}

5.实现RPC(Remote Procedure Call)调用

linux 启动rabbitmq,java-rabbitmq,rabbitmq,linux
客户端

public class RpcClient {
    public static void main(String[] args) {
        ConnectionFactory factory = FactoryUtil.newFactory();
        try(Connection connection = factory.newConnection();
            Channel channel = connection.createChannel()
        ){
            channel.queueDeclare("rpc_queue",true,false,false,null);
            Scanner in = new Scanner(System.in);
            while (in.hasNext()){
                System.out.println("远程调用结果:" + call(channel, in.next()) + "============" +Thread.currentThread().getName());
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        } catch (TimeoutException e) {
            throw new RuntimeException(e);
        } catch (ExecutionException e) {
            throw new RuntimeException(e);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    public static String call(Channel channel,String message) throws IOException, ExecutionException, InterruptedException {

        String callbackQueueName = channel.queueDeclare().getQueue();
        AMQP.BasicProperties props = new AMQP.BasicProperties()
                .builder().
                replyTo(callbackQueueName).correlationId(UUID.randomUUID().toString())
                .build();
        channel.basicPublish("","rpc_queue",props,message.getBytes());
        final CompletableFuture<String> response = new CompletableFuture<>();
        String ctag = channel.basicConsume(callbackQueueName,true,(consumerTag, delivery) ->{
            if (delivery.getProperties().getCorrelationId().equals(props.getCorrelationId())){
                response.complete(new String(delivery.getBody(),"UTF-8"));
            }
        },consumerTag -> {} );
        String result = response.get();
        System.out.println("ctag=========>" +ctag);
        channel.basicCancel(ctag);
        return result;
    }
}

远程调用端

public class RpcServer {

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = FactoryUtil.newFactory();
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare("rpc_queue",true,false,false,null);
        channel.queuePurge("rpc_queue");//清除队列内容
        channel.basicQos(1);
        System.out.println("等待RPC远程调用");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            AMQP.BasicProperties props = new AMQP.BasicProperties().builder()
                    .correlationId(delivery.getProperties().getCorrelationId())
                    .build();
            String response = "";
            try {
                String message = new String(delivery.getBody(),"UTF-8");
                System.out.println(message);
                int i = Integer.parseInt(message);
                response += fib(i);
                System.out.println(response);
            }catch (RuntimeException e){
                System.out.println(e);
            }finally {
                channel.basicPublish("",delivery.getProperties().getReplyTo(),props,response.getBytes());
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
            }
        };
        channel.basicConsume("rpc_queue",false,deliverCallback,consumerTag -> {});
    }

    private static int fib(int n) {
        if (n == 0) return 0;
        if (n == 1) return 1;
        return fib(n - 1) + fib(n - 2);
    }
}

6.RabbitMQ手动确认消息的三种方式

6.1、basicAck

void basicAck(long deliveryTag, boolean multiple) throws IOException;
//deliverTag表示消息的标志,监听获取的消息里面会有整个属性,如果multiple=false确认消息,只会确认一条消息
//multiple是true表示批量确认消息,可以自己传递一个固定的值测
情况1:channel.basicQos(10),channel.basciAck(10,true)
队列中有足够的消息被消费,消费者能读取到10(有时候会是20),确认10条消息,但是消费之后就会造成阻塞(channel被关闭)。
队列中没有10条消息,消费者能读到所有消息,确认0条消息,也是阻塞,稍后队列中有足够的消息也不能被消费。
队列中0条消息,消费者等待接收消息,如果能接收到消息大于10,则读取10条确认10条。没有10就不会确认。回到类似情况2。
通过可视化界面发现,channel是关闭的,connection没有关闭。

情况2:channel.basicQos(9),channel.basicAck(10,true)
队列中有足够的数据,消费者能读取9条数据,确认0条消息。阻塞断开channel连接。
队列中没有9条数据,消费者能读到所有信息,确认0条消息。
队列中0条数据,消费者开始没有数据,只能接收一批消息,后面断开连接。

情况3:channel.basicQos(11),channel.basic(10,true),
队列中有足够的数据,消费者能读取至少11条数据,确认10条消息。阻塞断开channel连接。(队列中消息低于deliver的2倍,能读取19)
队列中没有11条数据,消费者能读到所有消息,确认0条消息。
队列中0条数据,一批消息数量大,则读取11,确认10,小于qos的2()19,能读取全部数据

true的使用类似生产者publish confrim模式下的异步确认,为true表示批量确认消息发布成功。总之在使用固定值或者消息中的属性值来手动确定消息,如果deliveryTag不符合,则断开channel连接。文章来源地址https://www.toymoban.com/news/detail-519715.html

6.2 basicNack

void basicNack(long deliveryTag, boolean multiple, boolean requeue)
//deliveryTag通道获取消息的标记
//multiple true表示拒绝所有消息 false仅代表拒绝当前消息
//requeue true表示重新排队 false表示发送到死信队列或者丢弃(被监听的队列需要绑定死信交换机)
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-dead-letter-exchange", "some.exchange.name");//死信交换机
args.put( "x-dead-letter-routing-key" , "some-routing-key" );//死信交换机路由的key

channel.queueDeclare("myqueue", false, false, false, args);
使用multiple为true的情况类似ack。

6.3basicReject

void basicReject(long deliveryTag, boolean requeue) throws IOException;
//deliveryTag通道获取消息的标记
//requeue true表示重新排队 false表示发送到死信队列或者丢弃(被监听的队列需要绑定死信交换机)
使用情况和上面requeue一致。

到了这里,关于Linux安装RabbitMQ,怎么使用RabbitMQ的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • docker启动rabbitmq及使用

    docker search rabbitmq:management docker pull rabbitmq:management docker run -d --hostname localhost --name rabbitmq -p 15672:15672 -p 5672:5672 rabbitmq:management docker logs rabbitmq http://localhost:15672 账户密码默认:guest 运行该方法,可以看到控制台的打印 name=hello的队列收到Message 启动工作线程 启动发送线程,此时

    2023年04月26日
    浏览(80)
  • RabbitMQ学习(二)——Linux下安装RabbitMQ

    1、 先去官网下载RabbitMQ 下载地址 :Downloading and Installing RabbitMQ — RabbitMQ 选择对应的系统版本点击下载,下载后会得到 .rpm 文件   2、下载Erlang RabbitMQ是采用 Erlang语言开发的,所以系统环境必须提供 Erlang环境,需要是安装 Erlang Erlang 和 RabbitMQ 版本对照:RabbitMQ Erlang Version

    2024年02月08日
    浏览(39)
  • 【RabbitMQ】Linux系统服务器安装RabbitMQ

    首先应该下载erlang,rabbitmq运行需要有erland环境。 官网地址:https://www.erlang.org/downloads 下载rabbitmq 官网环境:https://www.rabbitmq.com/download.html 注意:el7对应centos7,el8对应centos8,centos7用erlang23版本或者23以下版本,centos8用erlang24版本。 博主的系统是centos 7的所以下载的是el7的 1、

    2024年02月14日
    浏览(40)
  • Linux系统安装RabbitMQ

    说明:本次使用centos7.9 安装虚拟机. 在线安装依赖环境: 根据课前提供的资料,上传如下三个rpm文件 erlang-18.3-1.el7.centos.x86_64.rpm socat-1.7.3.2-5.el7.lux.x86_64.rpm rabbitmq-server-3.6.5-1.noarch.rpm 1、安装erlang-18.3-1.el7.centos.x86_64.rpm 如果不是采用CentOS-7.iso安装的系统,则有可能出现如下错误

    2024年02月13日
    浏览(45)
  • Linux安装RabbitMQ

    初始环境:CnetOS7 JDK11 创建/opt/rabbitmq目录,进入该目录 2.1下载 下载比较慢,建议从本地上传 2.2解压 2.3进入解压目录 2.4指定安装目录 2.5编译与安装 3.1配置 进入erlang安装目录下的bin,erlang安装到了 /usr/local/erlang下 3.2刷新环境变量 查看是否安装成功:任意位置输入erl  只要没

    2024年02月09日
    浏览(30)
  • linux快速安装Rabbitmq

    linux快速安装Rabbitmq 准备yum仓库 准备仓库文件 在rabbitmq.repo中填入如下内容 安装 启动 关闭防火墙 启动RabbitMQ的WEB管理控制台 添加admin用户,并赋予权限 浏览器打开管理控制台IP:15672 ​ ​

    2024年02月14日
    浏览(35)
  • Linux RabbitMQ 安装及卸载

    RabbitMQ是用Erlang编写的,所以需要先安装Erlang的编译环境 注意 Erlang和RabbitMQ的版本是有一些版本匹配关系的,如果不匹配会导致RabbitMQ无法启动 3.1、配置开机启动 6.1、报错解决 6.2、报错解决  6.3、重启成功无法访问端口

    2024年02月20日
    浏览(32)
  • Linux下安装RabbitMQ教程

    官方安装指南:https://www.rabbitmq.com/install-rpm.html 我们将要安装的RabbitMQ的版本是3.8.2 el/7/rabbitmq-server-3.8.2-1.el7.noarch.rpm - rabbitmq/rabbitmq-server · packagecloud 不需要单独安装Erlang环境。 2. 环境配置: 前提:在一个新建的阿里云的Cent OS 7.6上安装,不要对yum换源,否则可能会安装失败

    2024年02月14日
    浏览(33)
  • RabbitMQ详解(一):Linux安装

    消息队列 是在消息的传输过程中保存消息的容器。队列的主要目的是提供路由并保证消息的传递;如果发送消息时接收者不可用,消息队列会保留消息,直到可以成功地传递它。 RabbitMQ 基于AMQP(高级消息队列协议)基础上完成的, erlang 语言开发的企业消息系统,是当前最主流

    2024年02月13日
    浏览(28)
  • 在Linux下安装RabbitMQ

    erlang网站主页:https://www.erlang.org/ 下载地址:https://erlang.org/download/otp_src_25.0.tar.gz RabbitMQ网站主页: https://www.rabbitmq.com/ 下载地址: https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.10.4/rabbitmq-server-generic-unix-3.10.4.tar.xz  新建一个文件夹 对 erlang 进行安装环境的配置 运行下面

    2024年02月15日
    浏览(35)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包