fanout发布订阅模式
基本用法
生产者
import json
import rabbitmq
# 建立连接
credentials = rabbitmq.PlainCredentials(
'zhangdapeng',
'zhangdapeng520',
) # mq用户名和密码
connection_target = rabbitmq.ConnectionParameters(
host='127.0.0.1',
port=5672,
virtual_host='/',
credentials=credentials,
)
connection = rabbitmq.BlockingConnection(connection_target)
# 队列信息
exchange_name = "user_manager_fanout"
queue_name = "user_manager_fanout"
# 创建管道
channel = connection.channel()
# 声明一个交换机
channel.exchange_declare(exchange=exchange_name, exchange_type=rabbitmq.ExchangeType.fanout)
# 向队列中写入数据
user = {"id": 1, "name": "张三", "age": 23}
message = json.dumps(user, ensure_ascii=True)
channel.basic_publish(
exchange=exchange_name,
routing_key=queue_name, # 队列名
body=message.encode('utf8'),
properties=rabbitmq.BasicProperties(delivery_mode=2), # 声明消息在队列中持久化
)
print(message)
# 关闭连接
connection.close()
消费者
import rabbitmq
import json
# 创建连接
credentials = rabbitmq.PlainCredentials(
'zhangdapeng',
'zhangdapeng520',
)
target = rabbitmq.ConnectionParameters(
host='127.0.0.1',
port=5672,
virtual_host='/',
credentials=credentials,
)
connection = rabbitmq.BlockingConnection(target)
# 创建管道
channel = connection.channel()
# 队列信息
exchange_name = "user_manager_fanout"
queue_name = "user_manager_fanout"
# 绑定交换机
channel.exchange_declare(
exchange=exchange_name,
exchange_type=rabbitmq.ExchangeType.fanout,
)
# 绑定队列
result = channel.queue_declare(
queue=queue_name,
exclusive=True,
)
channel.queue_bind(
exchange=exchange_name,
queue=queue_name,
)
def callback(ch, method, properties, body):
"""每次接收到消息的消费回调方法"""
ch.basic_ack(delivery_tag=method.delivery_tag)
data = body.decode("utf8")
print(json.loads(data))
# 开始消费
channel.basic_consume(
queue=queue_name,
on_message_callback=callback,
auto_ack=False,
)
try:
channel.start_consuming()
finally:
connection.close()
简化代码
生产者
import rabbitmq
# 建立连接
connection = rabbitmq.get_connection()
# 队列信息
exchange_name = "user_manager_fanout"
queue_name = "user_manager_fanout"
# 创建管道
channel = connection.channel()
# 声明一个交换机
channel.exchange_declare(exchange=exchange_name, exchange_type=rabbitmq.ExchangeType.fanout)
# 向队列中写入数据
user = {"id": 1, "name": "张三", "age": 23}
rabbitmq.send_json(channel, user, exchange_name, queue_name)
# 关闭连接
connection.close()
消费者
import rabbitmq
import json
# 创建连接
connection = rabbitmq.get_connection()
# 创建管道
channel = connection.channel()
# 队列信息
exchange_name = "user_manager_fanout"
queue_name = "user_manager_fanout"
# 绑定交换机
channel.exchange_declare(
exchange=exchange_name,
exchange_type=rabbitmq.ExchangeType.fanout,
)
# 绑定队列
result = channel.queue_declare(
queue=queue_name,
exclusive=True,
)
channel.queue_bind(
exchange=exchange_name,
queue=queue_name,
)
def callback(ch, method, properties, body):
"""每次接收到消息的消费回调方法"""
print(rabbitmq.receive_json(ch, method, body))
# 开始消费
rabbitmq.consume(connection, queue_name, callback)
进一步简化代码
生产者
import rabbitmq
# 建立连接
connection = rabbitmq.get_connection()
# 队列信息
exchange_name = "user_manager_fanout"
queue_name = "user_manager_fanout"
# 创建管道
channel = rabbitmq.get_fanout_channel(connection, exchange_name)
# 向队列中写入数据
user = {"id": 1, "name": "张三", "age": 23}
rabbitmq.send_json(channel, user, exchange_name, queue_name)
# 关闭连接
connection.close()
消费者
import rabbitmq
# 创建连接
connection = rabbitmq.get_connection()
# 队列信息
exchange_name = "user_manager_fanout"
queue_name = "user_manager_fanout"
# 创建管道
channel = rabbitmq.get_fanout_channel(connection, exchange_name, queue_name)
def callback(ch, method, properties, body):
"""每次接收到消息的消费回调方法"""
print(rabbitmq.receive_json(ch, method, body))
# 开始消费
rabbitmq.consume(connection, queue_name, callback)
文章来源地址https://www.toymoban.com/news/detail-798574.html
文章来源:https://www.toymoban.com/news/detail-798574.html
到了这里,关于Python如何操作RabbitMQ实现fanout发布订阅模式?有录播直播私教课视频教程的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!