1.Linux上安装Docken
服务器系统版本以及内核版本:cat /etc/redhat-release 查看服务器内核版本:uname -r 安装依赖包:yum install -y yum-utils device-mapper-persistent-data lvm2 设置阿里云镜像源:yum-config-manager --add-repo https://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo 安装Docker:yum install -y docker-ce 社区版(Community Edition,缩写为 CE) 企业版(Enterprise Edition,缩写为 EE) 启动docker并设置开机自启: 启动docker命令:systemctl start docker 设置开机自启命令:systemctl enable docker 查看docker版本命令:docker version
删除docker-ce命令:yum remove docker-ce
删除镜像、容器、配置文件等内容
rm -rf /var/lib/containerd
rm -rf /var/lib/docker
----------------------------------------通过docker help命令来查看更多的命令--------------------------------
docker search --镜像名 搜索仓库镜像
docker pull --镜像名 拉取镜像
docker ps 查看目前正在运行的所有容器 (-a 显示包括已经停止的容器)
docker rmi image_id/image_name 删除镜像
docker build 使用Dockerfile创建镜像
docker run 运行容器
docker exec 进入容器中执行命令 (例如:docker exec -it container_id/container_name /bin/bash)
docker logs container_id/container_name 查看容器日志(例如:docker logs -f -t --tail 10 container_id )
docker start container_id/container_name 启动容器
docker restart container_id/container_name 重启容器
docker stop container_id/container_name 停止容器
docker rm container_id/container_name 删除容器(只能删除已停止的容器)
2基于Docken安装RabbitMq
docker启动:systemctl start docker
docker重启:ystemctl restart docker
docker关闭:systemctl stop docker
查看正在运行容器:docker ps
查询Rabbitmq镜像: docker search rabbitmq
安装Rabbitmq镜像:
指定版本:docker pull rabbitmq:3.7.7-management
最新版本:docker pull rabbitmq
创建和启动容器:docker run -d --hostname myrabbitmq --name rabbitmq -p 5672:5672 -p 15673:15672 rabbitmq
-d 后台运行容器;
--hostname 主机名;
--name 指定容器名;
-p 指定服务运行的端口
5672 控制台Web端口号(服务端)
15672 应用访问端口(客户端)
-v 映射目录或文件
-e 指定环境变量(RABBITMQ_DEFAULT_VHOST:默认虚拟机名;RABBITMQ_DEFAULT_USER:默认的用户名;RABBITMQ_DEFAULT_PASS:默认用户名的密码)
进入容器内部:docker exec -it 容器id /bin/bash
运行:rabbitmq-plugins enable rabbitmq_management
重启rabbitmq:docker start rabbitmq
重启容器:docker restart rabbitmq
停止容器:docker stop rabbitmq
访问:http://ip:15672/
账号密码:guest/guest
其它命令:
列出所有用户:rabbitmqctl list_users
添加用户:rabbitmqctl add_user username password 如:新增一个用户:rabbitmqctl add_user 名称 密码
删除用户:rabbitmqctl delete_user username
修改密码:rabbitmqctl change_password username newpassword
列出用户权限:rabbitmqctl list_user_permissions username
列出虚拟主机上的所有权限:rabbitmqctl list_permissions -p vhostpath
设置用户权限:rabbitmqctl set_permissions -p vhostpath username “.” “.” “.*” 如:设置用户权限:rabbitmqctl set_permissions -p VHostPath User ConfP WriteP ReadP
3.添加用户和设置权限
4.NET中使用RabbitMQ
RabbitMq有7种模式:RabbitMQ Tutorials | RabbitMQ
安装包:RabbitMQ.Client
定义队列和交换机名称
/// <summary> /// 定义队列和交换机名称 /// </summary> public class RabbitConstant { public const string QUEUE_HELLO_WORLD = "helloworld.queue"; public const string QUEUE_SMS = "sms.queue"; public const string EXCHANGE_WEATHER = "weather.exchange"; public const string QUEUE_BAIDU = "baidu.queue"; public const string QUEUE_SINA = "sina.queue"; public const string EXCHANGE_WEATHER_ROUTING = "weather.routing.exchange"; public const string EXCHANGE_WEATHER_TOPIC = "weather.topic.exchange"; }
第一种模式:Hello World
消费者:
using RabbitMQ.Client; using RabbitMQ.Client.Events; public class HelloConsumer { public static void HelloWorldShow() { var factory = new ConnectionFactory(); factory.HostName = "127.0.0.1"; factory.Port = 5672;//5672是RabbitMQ默认的端口号 factory.UserName = "admin"; factory.Password = "admin"; factory.VirtualHost = "my_vhost"; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { /* * 创建队列,声明并创建一个队列,如果队列已存在,则使用这个队列 * 第一个参数:队列名称ID * 第二个参数:是否持久化,false对应不持久化数据,MQ停掉数据就会丢失 * 第三个参数:是否队列私有化,false则代表所有的消费者都可以访问,true代表只有第一次拥有它的消费者才能一直使用 * 第四个:是否自动删除,false代表连接停掉后不自动删除这个队列 * 其他额外参数为null */ channel.QueueDeclare(RabbitConstant.QUEUE_HELLO_WORLD, true, false, false, null); Console.ForegroundColor = ConsoleColor.Cyan; //事件消费者类 EventingBasicConsumer consumers = new EventingBasicConsumer(channel); // 触发事件 consumers.Received += (model, ea) => { var body = ea.Body.ToArray(); var message = Encoding.UTF8.GetString(body); // false只是确认签收当前的消息,设置为true的时候则代表签收该消费者所有未签收的消息 channel.BasicAck(ea.DeliveryTag, false); Console.WriteLine($"Consumer01接收消息:{message}"); }; /* * 从MQ服务器中获取数据 * 创建一个消息消费者 * 第一个参数:队列名 * 第二个参数:是否自动确认收到消息,false代表手动确认消息,这是MQ推荐的做法 * 第三个参数:要传入的IBasicConsumer接口 */ channel.BasicConsume(RabbitConstant.QUEUE_HELLO_WORLD, false, consumers); Console.WriteLine("Press [Enter] to exit"); Console.Read(); } } } }
生产者:
using RabbitMQ.Client; public class HelloProducer { public static void HelloWorldShow() { var factory = new ConnectionFactory(); factory.HostName = "127.0.0.1";//IP factory.Port = 5672;//端口 factory.UserName = "admin";//用户名 factory.Password = "admin";//密码 factory.VirtualHost = "my_vhost";//虚拟主机 // 获取TCP 长连接 using (var connection = factory.CreateConnection()) { // 创建通信“通道”,相当于TCP中的虚拟连接 using (var channel = connection.CreateModel()) { /* * 创建队列,声明并创建一个队列,如果队列已存在,则使用这个队列 * 第一个参数:队列名称ID * 第二个参数:是否持久化,false对应不持久化数据,MQ停掉数据就会丢失 * 第三个参数:是否队列私有化,false则代表所有的消费者都可以访问,true代表只有第一次拥有它的消费者才能一直使用 * 第四个:是否自动删除,false代表连接停掉后不自动删除这个队列 * 其他额外参数为null */ channel.QueueDeclare(RabbitConstant.QUEUE_HELLO_WORLD, true, false, false, null); Console.ForegroundColor = ConsoleColor.Red; string message = "hello CodeMan 666";//要发送的数据 var body = Encoding.UTF8.GetBytes(message); /* * 第一个参数:exchange:交换机,暂时用不到,在进行发布订阅时才会用到 * 第二个参数:路由key * 第三个参数:额外的设置属性 * 第四个参数:最后一个参数是要传递的消息字节数组 */ channel.BasicPublish("", RabbitConstant.QUEUE_HELLO_WORLD, null, body); Console.WriteLine($"producer消息:{message}已发送"); } } } }
-------------------------------------------------------------------------------漂亮的分割线--------------------------------------------------------------------------------------------------
获取ConnectionFactory 对象
/// <summary> /// RabbitMQ连接类 /// </summary> public class RabbitUtils { /// <summary> /// 获取ConnectionFactory对象 /// </summary> /// <returns></returns> public static ConnectionFactory GetConnection() { var factory = new ConnectionFactory(); factory.HostName = "127.0.0.1";//IP地址 factory.Port = 5672;//5672是RabbitMQ默认的端口号 factory.UserName = "admin";//用户名 factory.Password = "admin";//密码 factory.VirtualHost = "my_vhost";//虚拟主机 return factory; } }
/// <summary> /// 发送消息内容类 /// </summary> public class Sms { public string Name { get; set; } public string Mobile { get; set; } public string Content { get; set; } public Sms() { } public Sms(string name, string mobile, string content) { Name = name; Mobile = mobile; Content = content; } }
第二种模式:Work Queues
消费者1
public class SmsReceive { public static void Sender() { var connection = RabbitUtils.GetConnection().CreateConnection(); var channel = connection.CreateModel(); /* * 创建队列,声明并创建一个队列,如果队列已存在,则使用这个队列 * 第一个参数:队列名称ID * 第二个参数:是否持久化,false对应不持久化数据,MQ停掉数据就会丢失 * 第三个参数:是否队列私有化,false则代表所有的消费者都可以访问,true代表只有第一次拥有它的消费者才能一直使用 * 第四个:是否自动删除,false代表连接停掉后不自动删除这个队列 * 其他额外参数为null */ channel.QueueDeclare(RabbitConstant.QUEUE_SMS, true, false, false, null); // 如果不写basicQos(1),则自动MQ会将所有请求平均发送给所有消费者 // basicQos,MQ不再对消费者一次发送多个请求,而是消费者处理完一个消息后(确认后),在从队列中获取一个新的 channel.BasicQos(0, 1, false); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body.ToArray(); var message = Encoding.UTF8.GetString(body); Thread.Sleep(30); Console.WriteLine($"SmsSender-发送短信成功:{message}"); channel.BasicAck(ea.DeliveryTag, false); }; channel.BasicConsume(RabbitConstant.QUEUE_SMS, false, consumer); Console.WriteLine("Press [Enter] to exit"); Console.Read(); } }
消费者2
public class SmsReceive { public static void Sender() { var connection = RabbitUtils.GetConnection().CreateConnection(); var channel = connection.CreateModel(); channel.QueueDeclare(RabbitConstant.QUEUE_SMS, true, false, false, null); // 如果不写basicQos(1),则自动MQ会将所有请求平均发送给所有消费者 // basicQos,MQ不再对消费者一次发送多个请求,而是消费者处理完一个消息后(确认后),在从队列中获取一个新的 channel.BasicQos(0, 1, false);//处理完一个取一个 var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body.ToArray(); var message = Encoding.UTF8.GetString(body); Thread.Sleep(60); Console.WriteLine($"SmsSender-发送短信成功:{message}"); channel.BasicAck(ea.DeliveryTag, false); }; channel.BasicConsume(RabbitConstant.QUEUE_SMS, false, consumer); Console.WriteLine("Press [Enter] to exit"); Console.Read(); } }
生产者
public class SmsSender { public static void Sender() { using (var connection = RabbitUtils.GetConnection().CreateConnection()) { using (var channel = connection.CreateModel()) { /* * 创建队列,声明并创建一个队列,如果队列已存在,则使用这个队列 * 第一个参数:队列名称ID * 第二个参数:是否持久化,false对应不持久化数据,MQ停掉数据就会丢失 * 第三个参数:是否队列私有化,false则代表所有的消费者都可以访问,true代表只有第一次拥有它的消费者才能一直使用 * 第四个:是否自动删除,false代表连接停掉后不自动删除这个队列 * 其他额外参数为null */ channel.QueueDeclare(RabbitConstant.QUEUE_SMS, true, false, false, null); for (int i = 0; i < 100; i++) { Sms sms = new Sms("乘客" + i, "139000000" + i, "您的车票已预定成功"); string jsonSms = JsonConvert.SerializeObject(sms); var body = Encoding.UTF8.GetBytes(jsonSms); /* * 第一个参数:exchange:交换机,暂时用不到,在进行发布订阅时才会用到 * 第二个参数:路由key * 第三个参数:额外的设置属性 * 第四个参数:最后一个参数是要传递的消息字节数组 */ channel.BasicPublish("", RabbitConstant.QUEUE_SMS, null, body); Console.WriteLine($"正在发送内容:{jsonSms}"); } Console.WriteLine("发送数据成功"); } } } }
第三种模式:Publish/Subscribe
消费者1
public class WeatherFanout { public static void Weather() { using (var connection = RabbitUtils.GetConnection().CreateConnection()) { using (var channel = connection.CreateModel()) { //交换机 channel.ExchangeDeclare(RabbitConstant.EXCHANGE_WEATHER, ExchangeType.Fanout); // 声明队列信息 channel.QueueDeclare(RabbitConstant.QUEUE_BAIDU, true, false, false, null); /* * queueBind 用于将队列与交换机绑定 * 参数1:队列名 * 参数2:交换机名 * 参数3:路由Key(暂时用不到) */ channel.QueueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER, ""); channel.BasicQos(0, 1, false); var consumer = new EventingBasicConsumer(channel); consumer.Received += ((model, ea) => { var message = Encoding.UTF8.GetString(ea.Body.ToArray()); Console.WriteLine($"百度收到的气象信息:{message}"); channel.BasicAck(ea.DeliveryTag, false); }); channel.BasicConsume(RabbitConstant.QUEUE_BAIDU, false, consumer); Console.WriteLine("Press [Enter] to exit"); Console.Read(); } } } }
消费者2
public class WeatherFanout { public static void Weather() { using (var connection = RabbitUtils.GetConnection().CreateConnection()) { using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(RabbitConstant.EXCHANGE_WEATHER, ExchangeType.Fanout); // 声明队列信息 channel.QueueDeclare(RabbitConstant.QUEUE_SINA, true, false, false, null); /* * queueBind 用于将队列与交换机绑定 * 参数1:队列名 * 参数2:交换机名 * 参数3:路由Key(暂时用不到) */ channel.QueueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER, ""); channel.BasicQos(0, 1, false); var consumer = new EventingBasicConsumer(channel); consumer.Received += ((model, ea) => { var message = Encoding.UTF8.GetString(ea.Body.ToArray()); Console.WriteLine($"百度收到的气象信息:{message}"); channel.BasicAck(ea.DeliveryTag, false); }); channel.BasicConsume(RabbitConstant.QUEUE_SINA, false, consumer); Console.WriteLine("Press [Enter] to exit"); Console.Read(); } } } }
生产者
public class WeatherFanout { public static void Weather() { using (var connection = RabbitUtils.GetConnection().CreateConnection()) { using (var channel = connection.CreateModel()) { string message = "20度"; var body = Encoding.UTF8.GetBytes(message); /* * 第一个参数:exchange:交换机,暂时用不到,在进行发布订阅时才会用到 * 第二个参数:路由key * 第三个参数:额外的设置属性 * 第四个参数:最后一个参数是要传递的消息字节数组 */ channel.BasicPublish(RabbitConstant.EXCHANGE_WEATHER, "", null, body); Console.WriteLine("天气信息发送成功!"); } } } }
第三种模式:Routing
消费者1
public class WeatherDirect { public static void Weather() { using (var connection = RabbitUtils.GetConnection().CreateConnection()) { using (var channel = connection.CreateModel()) { //交换机 channel.ExchangeDeclare(RabbitConstant.EXCHANGE_WEATHER_ROUTING, ExchangeType.Direct); //队列 channel.QueueDeclare(RabbitConstant.QUEUE_BAIDU, true, false, false, null); /* * queueBind 用于将队列与交换机绑定 * 参数1:队列名 * 参数2:交换机名 * 参数3:路由Key(暂时用不到) */ channel.QueueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.hunan.changsha.20210525"); channel.QueueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.hubei.wuhan.20210525"); channel.BasicQos(0, 1, false); var consumer = new EventingBasicConsumer(channel); consumer.Received += ((model, ea) => { var message = Encoding.UTF8.GetString(ea.Body.ToArray()); Console.WriteLine($"百度收到的气象信息:{message}"); channel.BasicAck(ea.DeliveryTag, false); }); channel.BasicConsume(RabbitConstant.QUEUE_BAIDU, false, consumer); Console.WriteLine("Press [Enter] to exit"); Console.Read(); } } } }
消费者2
public class WeatherDirect { public static void Weather() { using (var connection = RabbitUtils.GetConnection().CreateConnection()) { using (var channel = connection.CreateModel()) { //交换机 channel.ExchangeDeclare(RabbitConstant.EXCHANGE_WEATHER_ROUTING, ExchangeType.Direct); // 声明队列信息 channel.QueueDeclare(RabbitConstant.QUEUE_SINA, true, false, false, null); /* * queueBind 用于将队列与交换机绑定 * 参数1:队列名 * 参数2:交换机名 * 参数3:路由Key */ channel.QueueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.hubei.xiangyang.20210525"); channel.QueueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "us.cal.lsj.20210525"); channel.QueueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.hubei.wuhan.20210525"); channel.BasicQos(0, 1, false); var consumer = new EventingBasicConsumer(channel); consumer.Received += ((model, ea) => { var message = Encoding.UTF8.GetString(ea.Body.ToArray()); Console.WriteLine($"新浪收到的气象信息:{message}"); channel.BasicAck(ea.DeliveryTag, false); }); channel.BasicConsume(RabbitConstant.QUEUE_SINA, false, consumer); Console.WriteLine("Press [Enter] to exit"); Console.Read(); } } } }
生产者
public class WeatherDirect { public static void Weather() { Dictionary<string, string> area = new Dictionary<string, string>(); area.Add("china.hunan.changsha.20210525", "中国湖南长沙20210525天气数据"); area.Add("china.hubei.wuhan.20210525", "中国湖北武汉20210525天气数据"); area.Add("china.hubei.xiangyang.20210525", "中国湖北襄阳20210525天气数据"); area.Add("us.cal.lsj.20210525", "美国加州洛杉矶20210525天气数据"); using (var connection = RabbitUtils.GetConnection().CreateConnection()) { using (var channel = connection.CreateModel()) { foreach (var item in area) { /* * 第一个参数:exchange:交换机,暂时用不到,在进行发布订阅时才会用到 * 第二个参数:路由key * 第三个参数:额外的设置属性 * 第四个参数:最后一个参数是要传递的消息字节数组 */ channel.BasicPublish(RabbitConstant.EXCHANGE_WEATHER_ROUTING, item.Key, null, Encoding.UTF8.GetBytes(item.Value)); } Console.WriteLine("气象信息发送成功!"); } } } }
第五章模式:Topics
消费者1
public class WeatherTopic { public static void Weather() { using (var connection = RabbitUtils.GetConnection().CreateConnection()) { using (var channel = connection.CreateModel()) { //交换机 channel.ExchangeDeclare(RabbitConstant.EXCHANGE_WEATHER_TOPIC, ExchangeType.Topic); // 声明队列信息 channel.QueueDeclare(RabbitConstant.QUEUE_BAIDU, true, false, false, null); /* * queueBind 用于将队列与交换机绑定 * 参数1:队列名 * 参数2:交换机名 * 参数3:路由Key(暂时用不到) */ channel.QueueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_TOPIC, "china.#");//有关china的所有信息 channel.BasicQos(0, 1, false); var consumer = new EventingBasicConsumer(channel); consumer.Received += ((model, ea) => { var message = Encoding.UTF8.GetString(ea.Body.ToArray()); Console.WriteLine($"百度收到的气象信息:{message}"); channel.BasicAck(ea.DeliveryTag, false); }); channel.BasicConsume(RabbitConstant.QUEUE_BAIDU, false, consumer); Console.WriteLine("Press [Enter] to exit"); Console.Read(); } } } }
消费者2
public class WeatherTopic { public static void Weather() { using (var connection = RabbitUtils.GetConnection().CreateConnection()) { using (var channel = connection.CreateModel()) { //交换机 channel.ExchangeDeclare(RabbitConstant.EXCHANGE_WEATHER_TOPIC, ExchangeType.Topic); // 声明队列信息 channel.QueueDeclare(RabbitConstant.QUEUE_SINA, true, false, false, null); /* * queueBind 用于将队列与交换机绑定 * 参数1:队列名 * 参数2:交换机名 * 参数3:路由Key(暂时用不到) */ channel.QueueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_TOPIC, "china.hubei.*.20210525");//有关china.hubei.的信息 channel.BasicQos(0, 1, false); var consumer = new EventingBasicConsumer(channel); consumer.Received += ((model, ea) => { var message = Encoding.UTF8.GetString(ea.Body.ToArray()); Console.WriteLine($"新浪收到的气象信息:{message}"); channel.BasicAck(ea.DeliveryTag, false); }); channel.BasicConsume(RabbitConstant.QUEUE_SINA, false, consumer); Console.WriteLine("Press [Enter] to exit"); Console.Read(); } } } }
生产者文章来源:https://www.toymoban.com/news/detail-848035.html
public class WeatherTopic { public static void Weather() { Dictionary<string, string> area = new Dictionary<string, string>(); area.Add("china.hunan.changsha.20210525", "中国湖南长沙20210525天气数据"); area.Add("china.hubei.wuhan.20210525", "中国湖北武汉20210525天气数据"); area.Add("china.hubei.xiangyang.20210525", "中国湖北襄阳20210525天气数据"); area.Add("us.cal.lsj.20210525", "美国加州洛杉矶20210525天气数据"); using (var connection = RabbitUtils.GetConnection().CreateConnection()) { using (var channel = connection.CreateModel()) { foreach (var item in area) { channel.BasicPublish(RabbitConstant.EXCHANGE_WEATHER_TOPIC, item.Key, null, Encoding.UTF8.GetBytes(item.Value)); } Console.WriteLine("气象信息发送成功!"); } } } }
文章来源地址https://www.toymoban.com/news/detail-848035.html
到了这里,关于01_在NET中使用RabbitMQ的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!