}
System.out.println(“发送数据成功”);
channel.close();
connection.close();
}
}
消费者一:
import com.liao.rabbitmq.utils.RabbitConstant;
import com.liao.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
- 消费者1
*/
public class ConsumerOne {
public static void main(String[] args) throws Exception {
Connection connection = RabbitUtils.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(RabbitConstant.QUEUE_SENDER_CONTENT, false, false, false, null);
//如果不写basicQos(1),则自动MQ会将所有请求平均发送给所有消费者
//basicQos,MQ不再对消费者一次发送多个请求,而是消费者处理完一个消息后(确认后),在从队列中获取一个新的
channel.basicQos(1);//处理完一个取一个
channel.basicConsume(RabbitConstant.QUEUE_SENDER_CONTENT , false , new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String jsonSMS = new String(body);
System.out.println(“ConsumerOne-发送成功:” + jsonSMS);
try {
Thread.sleep(10);
} catch (Exception e) {
e.printStackTrace();
}
//确认签收
channel.basicAck(envelope.getDeliveryTag() , false);
}
});
}
}
消费者二:
import com.liao.rabbitmq.utils.RabbitConstant;
import com.liao.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
- 消费者2
*/
public class ConsumerTwo {
public static void main(String[] args) throws IOException {
Connection connection = RabbitUtils.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(RabbitConstant.QUEUE_SENDER_CONTENT, false, false, false, null);
//如果不写basicQos(1),则自动MQ会将所有请求平均发送给所有消费者
//basicQos,MQ不再对消费者一次发送多个请求,而是消费者处理完一个消息后(确认后),在从队列中获取一个新的
channel.basicQos(1);//处理完一个取一个
channel.basicConsume(RabbitConstant.QUEUE_SENDER_CONTENT , false , new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String jsonSMS = new String(body);
System.out.println(“ConsumerTwo-发送成功:” + jsonSMS);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
//确认签收
channel.basicAck(envelope.getDeliveryTag() , false);
}
});
}
}
消费者三:
import com.liao.rabbitmq.utils.RabbitConstant;
import com.liao.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
- 消费者3
*/
public class ConsumerThree {
public static void main(String[] args) throws IOException {
Connection connection = RabbitUtils.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(RabbitConstant.QUEUE_SENDER_CONTENT, false, false, false, null);
//如果不写basicQos(1),则自动MQ会将所有请求平均发送给所有消费者
//basicQos,MQ不再对消费者一次发送多个请求,而是消费者处理完一个消息后(确认后),在从队列中获取一个新的
channel.basicQos(1);//处理完一个取一个
channel.basicConsume(RabbitConstant.QUEUE_SENDER_CONTENT , false , new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String jsonSMS = new String(body);
System.out.println(“ConsumerThree-发送成功:” + jsonSMS);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
//确认签收
channel.basicAck(envelope.getDeliveryTag() , false);
}
});
}
}
这里对每个消费者设置不同的休眠时间演示每个消费者处理业务的时间不同,查看消息消费的情况
可以看出消费者一消费的最多,消费者三消费的最少,因为代码中设置了这个
channel.basicQos(1);//处理完一个取一个
消费者处理完一个消息后(确认后),在从队列中获取一个新的。
Pub/Sub 订阅模式 :
为了区分好理解,我每个模式都去创建一个虚拟机,这里我先去rabbitMq管控页面创建一个虚拟机
创建一个交换机:这里用广播模式作为交换机的类型用来演示
修改工具类的虚拟机
生产者:
import com.liao.rabbitmq.utils.RabbitConstant;
import com.liao.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.util.Scanner;
/**
- 发布者
*/
public class Producer {
public static void main(String[] args) throws Exception {
Connection connection = RabbitUtils.getConnection();
//键盘输入
String input = new Scanner(System.in).next();
Channel channel = connection.createChannel();
//第一个参数交换机名字 其他参数和之前的一样
channel.basicPublish(RabbitConstant.EXCHANGE_CONTENT,“” , null , input.getBytes());
channel.close();
connection.close();
}
}
消费者一:
import com.liao.rabbitmq.utils.RabbitConstant;
import com.liao.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
- 消费者1
*/
public class ConsumerOne {
public static void main(String[] args) throws IOException {
//获取TCP长连接
Connection connection = RabbitUtils.getConnection();
//获取虚拟连接
final Channel channel = connection.createChannel();
//声明队列信息
channel.queueDeclare(RabbitConstant.QUEUE_ONE, false, false, false, null);
//queueBind用于将队列与交换机绑定
//参数1:队列名 参数2:交互机名 参数三:路由key(暂时用不到)
channel.queueBind(RabbitConstant.QUEUE_ONE, RabbitConstant.EXCHANGE_CONTENT, “”);
channel.basicQos(1);
channel.basicConsume(RabbitConstant.QUEUE_ONE , false , new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(“消费者一收到信息:” + new String(body));
channel.basicAck(envelope.getDeliveryTag() , false);
}
});
}
}
消费者二:
import com.liao.rabbitmq.utils.RabbitConstant;
import com.liao.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
- 消费者2
*/
public class ConsumerTwo {
public static void main(String[] args) throws IOException {
//获取TCP长连接
Connection connection = RabbitUtils.getConnection();
//获取虚拟连接
final Channel channel = connection.createChannel();
//声明队列信息
channel.queueDeclare(RabbitConstant.QUEUE_TWO, false, false, false, null);
//queueBind用于将队列与交换机绑定
//参数1:队列名 参数2:交互机名 参数三:路由key(暂时用不到)
channel.queueBind(RabbitConstant.QUEUE_TWO, RabbitConstant.EXCHANGE_CONTENT, “”);
channel.basicQos(1);
channel.basicConsume(RabbitConstant.QUEUE_TWO , false , new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(“消费者二收到信息:” + new String(body));
channel.basicAck(envelope.getDeliveryTag() , false);
}
});
}
}
演示效果:
Routing 路由模式:
为了区分好理解,我每个模式都去创建一个虚拟机,这里我先去rabbitMq管控页面创建一个虚拟机
修改工具类的虚拟机
创建交换机:这里的交换机type类型一定要改成routing模式,如果还是广播模式的fanout的话,跟上面发布和订阅模式出现的效果会是一样的。
错误实例:
正确的实例:
生产者:
import com.liao.rabbitmq.utils.RabbitConstant;
import com.liao.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
/**
- 发布者
*/
public class Producer {
public static void main(String[] args) throws Exception {
Map area = new LinkedHashMap<String, String>();
area.put(“routing.one.a.20201127”, “中国湖南长沙20201127私密数据”);
area.put(“routing.one.d.20201128”, “中国河北石家庄20201128私密数据”);
area.put(“routing.two.b.20201127”, “中国湖北武汉20201127私密数据”);
area.put(“routing.two.e.20201128”, “中国湖北武汉20201128私密数据”);
area.put(“routing.three.c.20201127”, “中国湖南株洲20201128私密数据”);
area.put(“routing.three.f.20201128”, “中国河南郑州20201128私密数据”);
area.put(“us.one.a.20201127”, “美国加州洛杉矶20201127私密数据”);
area.put(“us.two.b.20201128”, “美国加州洛杉矶20201128私密数据”);
Connection connection = RabbitUtils.getConnection();
Channel channel = connection.createChannel();
Iterator<Map.Entry<String, String>> itr = area.entrySet().iterator();
while (itr.hasNext()) {
Map.Entry<String, String> me = itr.next();
//第一个参数交换机名字 第二个参数作为 消息的routing key
channel.basicPublish(RabbitConstant.EXCHANGE_CONTENT_ROUTING,me.getKey() , null , me.getValue().getBytes());
}
channel.close();
connection.close();
}
}
消费者一:
import com.liao.rabbitmq.utils.RabbitConstant;
import com.liao.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
- 消费者1
*/
public class ConsumerOne {
public static void main(String[] args) throws IOException {
Connection connection = RabbitUtils.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(RabbitConstant.QUEUE_ONE, false, false, false, null);
//queueBind用于将队列与交换机绑定
//参数1:队列名 参数2:交互机名 参数三:路由key
channel.queueBind(RabbitConstant.QUEUE_ONE, RabbitConstant.EXCHANGE_CONTENT_ROUTING, “routing.one.a.20201127”);
channel.queueBind(RabbitConstant.QUEUE_ONE, RabbitConstant.EXCHANGE_CONTENT_ROUTING, “us.one.a.20201127”);
channel.basicQos(1);
channel.basicConsume(RabbitConstant.QUEUE_ONE , false , new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(“消费者1收到信息:” + new String(body));
channel.basicAck(envelope.getDeliveryTag() , false);
}
});
}
}
消费者二:
import com.liao.rabbitmq.utils.RabbitConstant;
import com.liao.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
- 消费者2
*/
public class ConsumerTwo {
public static void main(String[] args) throws IOException {
Connection connection = RabbitUtils.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(RabbitConstant.QUEUE_TWO, false, false, false, null);
//queueBind用于将队列与交换机绑定
//参数1:队列名 参数2:交互机名 参数三:路由key
channel.queueBind(RabbitConstant.QUEUE_TWO, RabbitConstant.EXCHANGE_CONTENT_ROUTING, “routing.one.d.20201128”);
channel.queueBind(RabbitConstant.QUEUE_TWO, RabbitConstant.EXCHANGE_CONTENT_ROUTING, “routing.two.e.20201128”);
channel.basicQos(1);
channel.basicConsume(RabbitConstant.QUEUE_TWO , false , new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(“消费者2收到信息:” + new String(body));
channel.basicAck(envelope.getDeliveryTag() , false);
}
});
}
}
效果:
路由模式需要消费者指定路由key
Topics 通配符模式:
为了区分好理解,我每个模式都去创建一个虚拟机,这里我先去rabbitMq管控页面创建一个虚拟机
修改工具类的虚拟机
创建交互机,类型为topic
生产者:
import com.liao.rabbitmq.utils.RabbitConstant;
import com.liao.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
/**
- 发布者
*/
public class Producer {
public static void main(String[] args) throws Exception {
Map area = new LinkedHashMap<String, String>();
area.put(“routing.one.a.20201127”, “中国湖南长沙20201127私密数据”);
area.put(“routing.one.d.20201128”, “中国河北石家庄20201128私密数据”);
area.put(“routing.two.b.20201127”, “中国湖北武汉20201127私密数据”);
area.put(“routing.two.e.20201128”, “中国湖北武汉20201128私密数据”);
area.put(“routing.three.c.20201127”, “中国湖南株洲20201128私密数据”);
area.put(“routing.three.f.20201128”, “中国河南郑州20201128私密数据”);
area.put(“us.one.a.20201127”, “美国加州洛杉矶20201127私密数据”);
area.put(“us.two.b.20201128”, “美国加州洛杉矶20201128私密数据”);
Connection connection = RabbitUtils.getConnection();
Channel channel = connection.createChannel();
Iterator<Map.Entry<String, String>> itr = area.entrySet().iterator();
while (itr.hasNext()) {
Map.Entry<String, String> me = itr.next();
//第一个参数交换机名字 第二个参数作为 消息的routing key
channel.basicPublish(RabbitConstant.EXCHANGE_CONTENT_TOPIC,me.getKey() , null , me.getValue().getBytes());
}
channel.close();
connection.close();
}
}
消费者一:
import com.liao.rabbitmq.utils.RabbitConstant;
import com.liao.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
- 消费者1
*/
public class ConsumerOne {
public static void main(String[] args) throws IOException {
Connection connection = RabbitUtils.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(RabbitConstant.QUEUE_ONE, false, false, false, null);
//queueBind用于将队列与交换机绑定
//参数1:队列名 参数2:交互机名 参数三:路由key
channel.queueBind(RabbitConstant.QUEUE_ONE, RabbitConstant.EXCHANGE_CONTENT_TOPIC, “..*.20201127”);
// channel.queueBind(RabbitConstant.QUEUE_ONE, RabbitConstant.EXCHANGE_CONTENT_TOPIC, “us.two.b.20201128”);
channel.basicQos(1);
channel.basicConsume(RabbitConstant.QUEUE_ONE , false , new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(“消费者1收到信息:” + new String(body));
channel.basicAck(envelope.getDeliveryTag() , false);
}
});
}
}
消费者二:
import com.liao.rabbitmq.utils.RabbitConstant;
import com.liao.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
- 消费者2
*/
public class ConsumerTwo {
public static void main(String[] args) throws IOException {
//获取TCP长连接
Connection connection = RabbitUtils.getConnection();
//获取虚拟连接
final Channel channel = connection.createChannel();
//声明队列信息
channel.queueDeclare(RabbitConstant.QUEUE_TWO, false, false, false, null);
//指定队列与交换机以及routing key之间的关系
channel.queueBind(RabbitConstant.QUEUE_TWO, RabbitConstant.EXCHANGE_CONTENT_TOPIC, “us.#”);
channel.basicQos(1);
channel.basicConsume(RabbitConstant.QUEUE_TWO , false , new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(“消费者2收到信息:” + new String(body));
自我介绍一下,小编13年上海交大毕业,曾经在小公司待过,也去过华为、OPPO等大厂,18年进入阿里一直到现在。
深知大多数Java工程师,想要提升技能,往往是自己摸索成长或者是报班学习,但对于培训机构动则几千的学费,着实压力不小。自己不成体系的自学效果低效又漫长,而且极易碰到天花板技术停滞不前!
因此收集整理了一份《2024年Java开发全套学习资料》,初衷也很简单,就是希望能够帮助到想自学提升又不知道该从何学起的朋友,同时减轻大家的负担。
既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,基本涵盖了95%以上Java开发知识点,真正体系化!
由于文件比较大,这里只是将部分目录截图出来,每个节点里面都包含大厂面经、学习笔记、源码讲义、实战项目、讲解视频,并且会持续更新!
如果你觉得这些内容对你有帮助,可以扫码获取!!(备注Java获取)
知其然不知其所以然,大厂常问面试技术如何复习?
1、热门面试题及答案大全
面试前做足功夫,让你面试成功率提升一截,这里一份热门350道一线互联网常问面试题及答案助你拿offer
2、多线程、高并发、缓存入门到实战项目pdf书籍
3、文中提到面试题答案整理
4、Java核心知识面试宝典
覆盖了JVM 、JAVA集合、JAVA多线程并发、JAVA基础、Spring原理、微服务、Netty与RPC、网络、日志、Zookeeper、Kafka、RabbitMQ、Hbase、MongoDB 、Cassandra、设计模式、负载均衡、数据库、一致性算法 、JAVA算法、数据结构、算法、分布式缓存、Hadoop、Spark、Storm的大量技术点且讲解的非常深入
《一线大厂Java面试题解析+核心总结学习笔记+最新讲解视频+实战项目源码》,点击传送门即可获取!
由于文件比较大,这里只是将部分目录截图出来,每个节点里面都包含大厂面经、学习笔记、源码讲义、实战项目、讲解视频,并且会持续更新!
如果你觉得这些内容对你有帮助,可以扫码获取!!(备注Java获取)
知其然不知其所以然,大厂常问面试技术如何复习?
1、热门面试题及答案大全
面试前做足功夫,让你面试成功率提升一截,这里一份热门350道一线互联网常问面试题及答案助你拿offer
[外链图片转存中…(img-2SMWTO66-1711807198038)]
2、多线程、高并发、缓存入门到实战项目pdf书籍
[外链图片转存中…(img-cYdKwk9R-1711807198038)]
[外链图片转存中…(img-XDssikVI-1711807198038)]
[外链图片转存中…(img-uChXg9xG-1711807198039)]
3、文中提到面试题答案整理
[外链图片转存中…(img-f6916taY-1711807198039)]
4、Java核心知识面试宝典
覆盖了JVM 、JAVA集合、JAVA多线程并发、JAVA基础、Spring原理、微服务、Netty与RPC、网络、日志、Zookeeper、Kafka、RabbitMQ、Hbase、MongoDB 、Cassandra、设计模式、负载均衡、数据库、一致性算法 、JAVA算法、数据结构、算法、分布式缓存、Hadoop、Spark、Storm的大量技术点且讲解的非常深入
[外链图片转存中…(img-DMtRv7cH-1711807198039)]
[外链图片转存中…(img-TttZr13M-1711807198039)]文章来源:https://www.toymoban.com/news/detail-849186.html
[外链图片转存中…(img-Xe8bgYg8-1711807198040)]
《一线大厂Java面试题解析+核心总结学习笔记+最新讲解视频+实战项目源码》,点击传送门即可获取!文章来源地址https://www.toymoban.com/news/detail-849186.html
到了这里,关于RabbitMQ:第一章:6 种工作模式以及消息确认机制的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!