docker启动rabbitmq及使用

这篇具有很好参考价值的文章主要介绍了docker启动rabbitmq及使用。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

搜索rabbitmq镜像

docker search rabbitmq:management

docker启动rabbitmq及使用

下载镜像

docker pull rabbitmq:management

docker启动rabbitmq及使用

启动容器

docker run -d --hostname localhost --name rabbitmq -p 15672:15672 -p 5672:5672 rabbitmq:management

docker启动rabbitmq及使用
docker启动rabbitmq及使用

打印容器

docker logs rabbitmq

docker启动rabbitmq及使用
docker启动rabbitmq及使用

访问RabbitMQ Management

http://localhost:15672
账户密码默认:guest
docker启动rabbitmq及使用

编写生产者类

package com.xun.rabbitmqdemo.example;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer {
    private final static String QUEUE_NAME = "hello";
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setVirtualHost("/");

        Connection connection = factory.newConnection();

        Channel channel = connection.createChannel();
        /**
         * 生成一个queue队列
         * 1、队列名称 QUEUE_NAME
         * 2、队列里面的消息是否持久化(默认消息存储在内存中)
         * 3、该队列是否只供一个Consumer消费 是否共享 设置为true可以多个消费者消费
         * 4、是否自动删除 最后一个消费者断开连接后 该队列是否自动删除
         * 5、其他参数
         */
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        String message = "Hello world!";
        /**
         * 发送一个消息
         * 1、发送到哪个exchange交换机
         * 2、路由的key
         * 3、其他的参数信息
         * 4、消息体
         */
        channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
        System.out.println(" [x] Sent '"+message+"'");

        channel.close();
        connection.close();
    }
}

运行该方法,可以看到控制台的打印
docker启动rabbitmq及使用
name=hello的队列收到Message
docker启动rabbitmq及使用

消费者

package com.xun.rabbitmqdemo.example;

import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Receiver {
    private final static String QUEUE_NAME = "hello";
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setConnectionTimeout(600000);//milliseconds
        factory.setRequestedHeartbeat(60);//seconds
        factory.setHandshakeTimeout(6000);//milliseconds
        factory.setRequestedChannelMax(5);
        factory.setNetworkRecoveryInterval(500);

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        System.out.println("Waiting for messages. ");

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
            }
        };
        channel.basicConsume(QUEUE_NAME,true,consumer);
    }
}

docker启动rabbitmq及使用
docker启动rabbitmq及使用

工作队列

RabbitMqUtils工具类

package com.xun.rabbitmqdemo.utils;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class RabbitMqUtils {
    public static Channel getChannel() throws Exception{
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setUsername("guest");
        factory.setPassword("guest");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        return channel;
    }
}

启动2个工作线程

package com.xun.rabbitmqdemo.workQueue;

import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xun.rabbitmqdemo.utils.RabbitMqUtils;

public class Work01 {
    private static final String QUEUE_NAME = "hello";
    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        DeliverCallback deliverCallback = (consumerTag,delivery)->{
            String receivedMessage = new String(delivery.getBody());
            System.out.println("接收消息:"+receivedMessage);
        };
        CancelCallback cancelCallback = (consumerTag)->{
            System.out.println(consumerTag+"消费者取消消费接口回调逻辑");
        };
        System.out.println("C1 消费者启动等待消费....");
        /**
         * 消费者消费消息
         * 1、消费哪个队列
         * 2、消费成功后是否自动应答
         * 3、消费的接口回调
         * 4、消费未成功的接口回调
         */
        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
    }
}
package com.xun.rabbitmqdemo.workQueue;

import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xun.rabbitmqdemo.utils.RabbitMqUtils;

public class Work02 {
    private static final String QUEUE_NAME = "hello";
    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        DeliverCallback deliverCallback = (consumerTag,delivery)->{
            String receivedMessage = new String(delivery.getBody());
            System.out.println("接收消息:"+receivedMessage);
        };
        CancelCallback cancelCallback = (consumerTag)->{
            System.out.println(consumerTag+"消费者取消消费接口回调逻辑");
        };
        System.out.println("C2 消费者启动等待消费....");
        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
    }
}

启动工作线程
docker启动rabbitmq及使用

启动发送线程

package com.xun.rabbitmqdemo.workQueue;

import com.rabbitmq.client.Channel;
import com.xun.rabbitmqdemo.utils.RabbitMqUtils;
import java.util.Scanner;

public class Task01 {
    private static final String QUEUE_NAME = "hello";
    public static void main(String[] args) throws Exception{
        try(Channel channel= RabbitMqUtils.getChannel();){
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
            //从控制台接收消息
            Scanner scanner = new Scanner(System.in);
            while(scanner.hasNext()){
                String message = scanner.next();
                channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
                System.out.println("发送消息完成:"+message);
            }
        }
    }
}

启动发送线程,此时发送线程等待键盘输入
docker启动rabbitmq及使用
发送4个消息
docker启动rabbitmq及使用
docker启动rabbitmq及使用
docker启动rabbitmq及使用
可以看到2个工作线程按照顺序分别接收message。

消息应答机制

rabbitmq将message发送给消费者后,就会将该消息标记为删除。
但消费者在处理message过程中宕机,会导致消息的丢失。
因此需要设置手动应答。

生产者

import com.xun.rabbitmqdemo.utils.RabbitMqUtils;
import java.util.Scanner;

public class Task02 {
    private static final String TASK_QUEUE_NAME = "ack_queue";
    public static void main(String[] args) throws Exception{
        try(Channel channel = RabbitMqUtils.getChannel()){
            channel.queueDeclare(TASK_QUEUE_NAME,false,false,false,null);
            Scanner scanner = new Scanner(System.in);
            System.out.println("请输入信息");
            while(scanner.hasNext()){
                String message = scanner.nextLine();
                channel.basicPublish("",TASK_QUEUE_NAME,null,message.getBytes());
                System.out.println("生产者task02发出消息"+ message);
            }
        }
    }
}

消费者

package com.xun.rabbitmqdemo.workQueue;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xun.rabbitmqdemo.utils.RabbitMqUtils;
import com.xun.rabbitmqdemo.utils.SleepUtils;

public class Work03 {
    private static final String ACK_QUEUE_NAME = "ack_queue";
    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        System.out.println("Work03 等待接收消息处理时间较短");
        DeliverCallback deliverCallback = (consumerTag,delivery)->{
            String message = new String(delivery.getBody());
            SleepUtils.sleep(1);
            System.out.println("接收到消息:"+message);
            /**
             * 1、消息的标记tag
             * 2、是否批量应答
             */
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
        };
        CancelCallback cancelCallback = (consumerTag)->{
            System.out.println(consumerTag+"消费者取消消费接口回调逻辑");
        };
        //采用手动应答
        boolean autoAck = false;
        channel.basicConsume(ACK_QUEUE_NAME,autoAck,deliverCallback,cancelCallback);
    }
}
package com.xun.rabbitmqdemo.workQueue;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xun.rabbitmqdemo.utils.RabbitMqUtils;
import com.xun.rabbitmqdemo.utils.SleepUtils;

public class Work04 {
    private static final String ACK_QUEUE_NAME = "ack_queue";
    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        System.out.println("Work04 等待接收消息处理时间较长");
        DeliverCallback deliverCallback = (consumerTag,delivery)->{
            String message = new String(delivery.getBody());
            SleepUtils.sleep(30);
            System.out.println("接收到消息:"+message);
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
        };
        CancelCallback cancelCallback = (consumerTag)->{
            System.out.println(consumerTag+"消费者取消消费接口回调逻辑");
        };
        //采用手动应答
        boolean autoAck = false;
        channel.basicConsume(ACK_QUEUE_NAME,autoAck,deliverCallback,cancelCallback);
    }
}

工具类SleepUtils

package com.xun.rabbitmqdemo.utils;
public class SleepUtils {
    public static void sleep(int second){
        try{
            Thread.sleep(1000*second);
        }catch (InterruptedException _ignored){
            Thread.currentThread().interrupt();
        }
    }
}

模拟
docker启动rabbitmq及使用
docker启动rabbitmq及使用
docker启动rabbitmq及使用
work04等待30s后发出ack
docker启动rabbitmq及使用
在work04处理message时手动停止线程,可以看到message:dd被rabbitmq交给了work03
docker启动rabbitmq及使用
docker启动rabbitmq及使用
docker启动rabbitmq及使用

不公平分发

上面的轮询分发,生产者依次向消费者按顺序发送消息,但当消费者A处理速度很快,而消费者B处理速度很慢时,这种分发策略显然是不合理的。
不公平分发:

int prefetchCount = 1;
channel.basicQos(prefetchCount);

通过此配置,当消费者未处理完当前消息,rabbitmq会优先将该message分发给空闲消费者。
docker启动rabbitmq及使用文章来源地址https://www.toymoban.com/news/detail-425652.html

到了这里,关于docker启动rabbitmq及使用的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Docker安装RabbitMQ镜像

    步骤1: 拉取镜像 步骤2: 运行 -e:设置环境变量: RABBITMQ_DEFAULT_USER:指定web管理平台的用户名 RABBITMQ_DEFAULT_PASS:指定web管理平台的用户名 如果不指定,则默认使用guest/guest(默认guest无法远程登陆,只能localhost:15672登陆) 小插曲: 以管理员身份运行一下命令,然后再执行

    2024年02月14日
    浏览(26)
  • Docker命令---搜索镜像

    使用docker命令搜索镜像。 以搜索ElasticSearch镜像为例

    2024年01月22日
    浏览(23)
  • Docker系列第03部分:列出镜像+搜索镜像+拉取镜像+删除镜像

    Docker镜像是由文件系统叠加而成(是一种文件的存储形式)。最底端是一个文件引导系统,即bootfs,这很像典型的Linux/Unix的引导文件系统。Docker用户几乎永远不会和引导系统有什么交互。实际上,当一个容器启动后,它将会被移动到内存中,而引导文件系统则会被卸载,以留

    2024年02月09日
    浏览(25)
  • K8S部署后的使用:dashboard启动、使用+docker镜像拉取、容器部署(ubuntu环境+gpu3080+3主机+部署深度学习模型)

    0、k8s安装、docker安装 参考:前两步Ubuntu云原生环境安装,docker+k8s+kubeedge(亲测好用)_爱吃关东煮的博客-CSDN博客_ubantu部署kubeedge  配置节点gpu: K8S调用GPU资源配置指南_思影影思的博客-CSDN博客_k8s 使用gpu 1、重置和清除旧工程:每个节点主机都要运行 2、部署新的k8s项目:

    2023年04月20日
    浏览(48)
  • 【RabbitMQ】RabbitMQ 集群的搭建 —— 基于 Docker 搭建 RabbitMQ 的普通集群,镜像集群以及仲裁队列

    在RabbitMQ中,有不同的集群模式,包括普通模式、镜像模式和仲裁队列。每种模式具有不同的特点和应用场景。 普通集群,也称为标准集群(classic cluster),具备以下特征: 在集群的各个节点之间共享部分数据,包括交换机和队列的元信息,但不包括队列中的消息。 当访问

    2024年02月04日
    浏览(50)
  • docker容器启动rabbitmq

    在RabbitMQ的官方文档中,讲述了两种集群的配置方式: 普通模式:普通模式集群不进行数据同步,每个MQ都有自己的队列、数据信息(其它元数据信息如交换机等会同步)。例如我们有2个MQ:mq1,和mq2,如果你的消息在mq1,而你连接到了mq2,那么mq2会去mq1拉取消息,然后返回

    2024年03月15日
    浏览(62)
  • docker 启动镜像命令

    Docker 的启动命令用于启动 Docker 容器。这些命令可以从基本的 docker run 命令扩展到包括多个选项和参数,以满足不同的需求。以下是一些常用的 Docker 启动命令和选项的示例: 启动一个新容器 : IMAGE :指定要运行的镜像。 COMMAND :在容器内执行的命令。 ARG :命令的参数。

    2024年03月16日
    浏览(46)
  • 启动docker镜像

    将文件从宿主机拷贝到docker里 在宿主机里面执: 从docker里面拷文件到宿主机 在宿主机里面执: 停止、启动、杀死、重启一个容器

    2024年02月16日
    浏览(25)
  • Docker启动rabbitmq最详细步骤

    一 概况 rabbitmq官网 用docker启动rabbitmq是比较方便的。官方也给出了镜像。我们只需要几个简单的命令,就可以将rabbitmq容器启动起来。 二 查找rabbitmq镜像 三 拉取rabbitmq镜像 四 启动容器 -d 表示后台运行 -p 表示端口映射 五 装载可视化插件 六 改密码 设置我们的用户密码 七

    2024年02月11日
    浏览(45)
  • Docker 镜像 用普通用户启动服务

    Docker是一种用于构建、封装和分发应用程序的开源平台。它利用容器化技术将应用程序及其依赖项打包到一个可移植的容器中,从而实现快速部署和可伸缩性。 在Docker中,通过使用Docker镜像可以创建容器,镜像是容器化过程中的一个关键概念,它是一个只读的文件系统,其中

    2024年04月13日
    浏览(22)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包