直播弹幕系统(二)- 整合RabbitMQ进行消息广播和异步处理

这篇具有很好参考价值的文章主要介绍了直播弹幕系统(二)- 整合RabbitMQ进行消息广播和异步处理。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

前言

上一篇文章 SpringCloud网关对WebSocket链接进行负载均衡 中把主要的架子搭建好了,这一篇文章就要开始写业务逻辑了。在分布式系统下,如何达到SpringBoot - WebSocket的使用和聊天室练习的效果。

一. Socket服务整合RabbitMQ

我们页面上,通过WebSocket发送弹幕信息的时候,后端通过@OnMessage注解修饰的函数进行接收。这里我们统一将原始的弹幕消息丢给MQ。让另一个专业的弹幕服务去消费处理。目的也是希望WebSocket服务它只负责消息的传递和WebSocket信息的维护,业务逻辑啥也不做。

1.添加pom依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2.配置文件bootstrap.yml,添加RabbitMQ相关配置

server:
  port: 81

spring:
  application:
    name: tv-service-socket
  cloud:
    nacos:
      discovery:
        server-addr: 你的Nacos地址:8848
  rabbitmq:
    username: guest
    password: guest
    # 虚拟主机,默认是/
    virtual-host: /
    # 超时时间
    connection-timeout: 30000
    listener:
      simple:
        # 消费模式,手动
        acknowledge-mode: manual
        # 并发数
        concurrency: 5
        # 最大并发数
        max-concurrency: 10
        # 限流,如果严格控制消费顺序,这里应该填1,数值越大,消费处理速度越快。MQ会把这个数值的消息放到缓存当中。
        # 因此数值越大,内存占用越大,还需要考虑消费的速度
        prefetch: 10
    addresses: 你的RabbitMQ地址:5672

3.RabbitMQ配置类:

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author Zong0915
 * @date 2022/12/15 下午1:29
 */
@Configuration
public class RabbitMQConfig {
    @Bean
    public Queue initDirectQueue() {
        return new Queue("originBullet-queue", true);
    }

    @Bean
    DirectExchange initDirectExchange() {
        return new DirectExchange("bulletPreProcessor-exchange", true, false);
    }

    @Bean
    Binding initBindingDirect() {
        return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("bullet.originMessage");
    }
}

4.写一个简单的消息体OriginMessage,发送到MQ的:

import lombok.Data;

/**
 * @author Zong0915
 * @date 2022/12/15 下午1:30
 */
@Data
public class OriginMessage {
    private String sessionId;
    private String userId;
    private String roomId;
    private String message;
}

5.MQ生产者OriginMessageSender

/**
 * @author Zong0915
 * @date 2022/12/15 下午1:29
 */
@Component
public class OriginMessageSender {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void send(OriginMessage originMessage) {
        CorrelationData correlationData = new CorrelationData();
        correlationData.setId(UUID.randomUUID().toString());// 唯一ID
        Map<String, Object> map = new HashMap<>();
        map.put("message", JSONObject.toJSONString(originMessage));
        // 发送给消息预处理队列
        rabbitTemplate.convertAndSend("bulletPreProcessor-exchange",// 交换机名称
                "bullet.originMessage",// 路由Key
                map, correlationData);
    }
}

6.我们再对WebSocket的监听类做一下小改动,将收到的消息,封装一下,然后调用生产者的API即可。只需要注意一下多例下属性的注入方式是怎么写的即可

import kz.cache.SocketCache;
import kz.entity.OriginMessage;
import kz.producer.OriginMessageSender;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.websocket.OnClose;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.util.concurrent.atomic.AtomicLong;
import lombok.Getter;
/**
 * @author Zong0915
 * @date 2022/12/9 下午3:45
 */
@Component
@ServerEndpoint("/websocket/live/{roomId}/{userId}")
@Slf4j
@Getter
public class BulletScreenServer {
    /**
     * 多例模式下的赋值方式
     */
    private static OriginMessageSender originMessageSender;

    /**
     * 多例模式下的赋值方式
     */
    @Autowired
    private void setOriginMessageSender(OriginMessageSender originMessageSender) {
        BulletScreenServer.originMessageSender = originMessageSender;
    }

    private static final AtomicLong count = new AtomicLong(0);

    private Session session;
    private String sessionId;
    private String userId;
    private String roomId;

    /**
     * 打开连接
     * @param session
     * @OnOpen 连接成功后会自动调用该方法
     */
    @OnOpen
    public void openConnection(Session session, @PathParam("roomId") String roomId, @PathParam("userId") String userId) {
        // 如果是游客观看视频,虽然有弹幕,但是没有用户信息,所以需要用try
        count.incrementAndGet();
        log.info("*************WebSocket连接次数: {} *************", count.longValue());
        this.userId = userId;
        this.roomId = roomId;
        // 保存session相关信息到本地
        this.sessionId = session.getId();
        this.session = session;
        SocketCache.put(sessionId, this);
    }

    /**
     * 客户端刷新页面,或者关闭页面,服务端断开连接等等操作,都需要关闭连接
     */
    @OnClose
    public void closeConnection() {
        SocketCache.remove(sessionId);
    }

    /**
     * 客户端发送消息给服务端
     * @param message
     */
    @OnMessage
    public void onMessage(String message) {
        if (StringUtils.isBlank(message)) {
            return;
        }
        // 将消息丢给MQ,业务上的处理什么也不管,交给弹幕业务来处理,并且达到削峰的目的
        originMessageSender.send(buildMessage(message));
    }

    private OriginMessage buildMessage(String message) {
        OriginMessage originMessage = new OriginMessage();
        originMessage.setMessage(message);
        originMessage.setRoomId(roomId);
        originMessage.setSessionId(sessionId);
        originMessage.setUserId(userId);
        return originMessage;
    }
}

备注:记得将另一个Socket项目也改造成同样的代码。

二. 弹幕服务创建

2.1 创建一个公共maven项目

我们创建一个maven项目:service-bulletcommon。先看下最终的项目架构:
直播弹幕系统(二)- 整合RabbitMQ进行消息广播和异步处理

1.pom依赖添加一些常用的工具:

<groupId>bullet-service</groupId>
<artifactId>service-bulletcommon</artifactId>
<version>1.0-SNAPSHOT</version>

<dependencies>
    <dependency>
        <groupId>org.apache.commons</groupId>
        <artifactId>commons-lang3</artifactId>
        <version>3.12.0</version>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.79</version>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.18.24</version>
    </dependency>
    <dependency>
        <groupId>commons-collections</groupId>
        <artifactId>commons-collections</artifactId>
        <version>3.2.2</version>
    </dependency>
</dependencies>

2.创建一个常量定义类SocketConstants

/**
 * @author Zong0915
 * @date 2022/12/15 下午3:59
 */
public class SocketConstants {
    /**
     * 这条消息是否处理过
     */
    public static final String CORRELATION_SET_PRE = "Correlation_Set_";
    /**
     * 同一个房间里面有哪些SessionID
     */
    public static final String ROOM_LIVE_USER_SET_PRE = "ROOM_LIVE_USER_Set_";

    public static final String MESSAGE = "message";
    public static final String ID = "id";
    /**
     * 原始消息所在队列
     */
    public static final String ORIGIN_BULLET_QUEUE = "originBullet-queue";
    /**
     * 广播队列A
     */
    public static final String BULLET_SOCKET_QUEUE_A = "bulletSocket-queueA";
    /**
     * 广播队列B
     */
    public static final String BULLET_SOCKET_QUEUE_B = "bulletSocket-queueB";
    /**
     * 弹幕预处理交换机
     */
    public static final String BULLET_PRE_PROCESSOR_EXCHANGE = "bulletPreProcessor-exchange";
    /**
     * 弹幕广播交换机
     */
    public static final String BULLET_FANOUT_EXCHANGE = "bulletFanOut-exchange";
    /**
     * 弹幕预处理路由Key
     */
    public static final String BULLET_ORIGIN_MESSAGE_ROUTE_KEY = "bullet.originMessage";
}

3.创建一个消息传输体OriginMessage

import lombok.Data;

/**
 * @author Zong0915
 * @date 2022/12/15 下午2:07
 */
@Data
public class OriginMessage {
    private String sessionId;
    private String userId;
    private String roomId;
    private String message;
}

2.2 弹幕服务项目创建

1.我们创建一个maven项目:service-bulletscreen。先看下最终的项目架构:
直播弹幕系统(二)- 整合RabbitMQ进行消息广播和异步处理

1.pom文件:

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.3.2.RELEASE</version>
    <relativePath/> <!-- lookup parent from repository -->
</parent>
<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>com.alibaba.cloud</groupId>
        <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
        <version>2.2.1.RELEASE</version>
        <exclusions>
            <exclusion>
                <artifactId>archaius-core</artifactId>
                <groupId>com.netflix.archaius</groupId>
            </exclusion>
            <exclusion>
                <artifactId>commons-io</artifactId>
                <groupId>commons-io</groupId>
            </exclusion>
            <exclusion>
                <artifactId>commons-lang3</artifactId>
                <groupId>org.apache.commons</groupId>
            </exclusion>
            <exclusion>
                <artifactId>fastjson</artifactId>
                <groupId>com.alibaba</groupId>
            </exclusion>
            <exclusion>
                <artifactId>guava</artifactId>
                <groupId>com.google.guava</groupId>
            </exclusion>
            <exclusion>
                <artifactId>httpclient</artifactId>
                <groupId>org.apache.httpcomponents</groupId>
            </exclusion>
            <exclusion>
                <artifactId>servo-core</artifactId>
                <groupId>com.netflix.servo</groupId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-redis</artifactId>
        <version>2.6.7</version>
        <exclusions>
            <exclusion>
                <artifactId>log4j-api</artifactId>
                <groupId>org.apache.logging.log4j</groupId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>bullet-service</groupId>
        <artifactId>service-bulletcommon</artifactId>
        <version>1.0-SNAPSHOT</version>
    </dependency>
</dependencies>

2.application.properties

spring.application.name=tv-service-bulletscreen
spring.cloud.nacos.discovery.server-addr=你的Nacos地址:8848

3.bootstrap.yml文件:

server:
  port: 83

spring:
  application:
    name: tv-service-bulletscreen
  redis:
    database: 0 # Redis数据库索引(默认为0)
    host: 你的Redis地址 # Redis的服务地址
    port: 6379 # Redis的服务端口
    password: 密码
    jedis:
      pool:
        max-active: 8 # 连接池最大连接数(使用负值表示没有限制)
        max-wait: -1 # 连接池最大阻塞等待时间(使用负值表示没有限制)
        max-idle: 8 # 连接池中的最大空闲连接
        min-idle: 0 # 连接池中的最小空闲链接
    timeout: 30000 # 连接池的超时时间(毫秒)
  cloud:
    nacos:
      discovery:
        server-addr: 你的Nacos地址:8848
  rabbitmq:
    username: guest
    password: guest
    # 虚拟主机,默认是/
    virtual-host: /
    # 超时时间
    connection-timeout: 30000
    listener:
      simple:
        # 消费模式,手动
        acknowledge-mode: manual
        # 并发数
        concurrency: 5
        # 最大并发数
        max-concurrency: 10
        # 限流,如果严格控制消费顺序,这里应该填1,数值越大,消费处理速度越快。MQ会把这个数值的消息放到缓存当中。
        # 因此数值越大,内存占用越大,还需要考虑消费的速度
        prefetch: 10
    addresses: 你的RabbitMQ地址:5672

4.Redis配置类RedisConfig

import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

@Configuration
public class RedisConfig {
    /**
     * 实例化 RedisTemplate 对象
     *
     * @return
     */
    @Bean
    public RedisTemplate<String, Object> functionDomainRedisTemplate(RedisConnectionFactory redisConnectionFactory) {
        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
        initDomainRedisTemplate(redisTemplate, redisConnectionFactory);
        return redisTemplate;
    }
    /**
     * 设置数据存入 redis 的序列化方式,并开启事务
     *
     * @param redisTemplate
     * @param factory
     */
    private void initDomainRedisTemplate(RedisTemplate<String, Object> redisTemplate, RedisConnectionFactory factory) {
        //如果不配置Serializer,那么存储的时候缺省使用String,如果用User类型存储,那么会提示错误User can't cast to String!
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        redisTemplate.setHashKeySerializer(new StringRedisSerializer());
        redisTemplate.setHashValueSerializer(new GenericJackson2JsonRedisSerializer());
        redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());

        // 开启事务
        redisTemplate.setEnableTransactionSupport(true);
        redisTemplate.setConnectionFactory(factory);
    }

    @Bean
    @ConditionalOnMissingBean(StringRedisTemplate.class)
    public StringRedisTemplate stringRedisTemplate(
            RedisConnectionFactory redisConnectionFactory) {
        StringRedisTemplate template = new StringRedisTemplate();
        template.setConnectionFactory(redisConnectionFactory);
        return template;
    }
}

2.2.1 创建队列和广播型交换机

创建一个广播模式的交换机bulletFanOut-exchange:其实用direct也可以,因为我只要监听的队列用同一个即可,这里只是进行一个模拟。
直播弹幕系统(二)- 整合RabbitMQ进行消息广播和异步处理

分别为两个Socket服务创建个队列,用来接收处理好的消息(练习下广播模式):

  • bulletSocket-queueA
  • bulletSocket-queueB

再分别为他们和上述创建好的交换机进行绑定。
直播弹幕系统(二)- 整合RabbitMQ进行消息广播和异步处理

我们的弹幕服务主要做两件事:

  • 监听预处理队列,数据来自:originBullet-queue
  • 将处理完的消息通过广播,发送给bulletSocket-queueA/B两个队列。

RabbitMQ配置类如下:

import kz.common.SocketConstants;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author Zong0915
 * @date 2022/12/15 下午1:29
 */
@Configuration
public class RabbitMQConfig {
    @Bean
    public Queue initDirectQueue() {
        return new Queue(SocketConstants.ORIGIN_BULLET_QUEUE, true);
    }
	
    @Bean
    public Queue initFanoutSocketQueueA() {
        return new Queue(SocketConstants.BULLET_SOCKET_QUEUE_A, true);
    }

    @Bean
    public Queue initFanoutSocketQueueB() {
        return new Queue(SocketConstants.BULLET_SOCKET_QUEUE_B, true);
    }

    @Bean
    DirectExchange initDirectExchange() {
        return new DirectExchange(SocketConstants.BULLET_PRE_PROCESSOR_EXCHANGE, true, false);
    }

    @Bean("fanoutExchange")
    FanoutExchange initFanoutExchange() {
        return new FanoutExchange(SocketConstants.BULLET_FANOUT_EXCHANGE, true, false);
    }

    @Bean
    Binding initBindingDirect() {
        return BindingBuilder.bind(initDirectQueue()).to(initDirectExchange()).with(SocketConstants.BULLET_ORIGIN_MESSAGE_ROUTE_KEY);
    }

    @Bean
    Binding initBindingFanoutA(@Qualifier("fanoutExchange") FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(initFanoutSocketQueueA()).to(fanoutExchange);
    }

    @Bean
    Binding initBindingFanoutB(@Qualifier("fanoutExchange") FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(initFanoutSocketQueueB()).to(fanoutExchange);
    }
}

2.2.2 生产者发送最终弹幕数据

创建FanoutMessageProducer类:记得向我们上面绑定的广播交换机发送数据。

import com.alibaba.fastjson.JSONObject;
import kz.entity.OriginMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

/**
 * @author Zong0915
 * @date 2022/12/15 下午2:51
 */
@Component
public class FanoutMessageProducer {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void send(OriginMessage originMessage) {
        CorrelationData correlationData = new CorrelationData();
        correlationData.setId(UUID.randomUUID().toString());// 唯一ID
        Map<String, Object> map = new HashMap<>();
        map.put("message", JSONObject.toJSONString(originMessage));

        rabbitTemplate.convertAndSend("bulletFanOut-exchange",// 交换机名称
                "",// 路由Key
                map, correlationData);
    }
}

2.2.3 消费者监听原始弹幕数据

创建OriginMessageConsumer类:

import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import kz.service.BulletScreenService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.Map;

/**
 - @author Zong0915
 - @date 2022/12/15 下午1:57
 */
@Component
@Slf4j
public class OriginMessageConsumer {
    @Autowired
    private BulletScreenService bulletScreenService;

    /**
     * 处理原始消息
     *
     * @param testMessage Map类型的消息体
     * @param headers     消息头
     * @param channel     消息所在的管道
     */
    @RabbitListener(
            bindings = @QueueBinding(
                    value = @Queue(value = "originBullet-queue", durable = "true"),
                    // 默认的交换机类型就是direct
                    exchange = @Exchange(name = "bulletPreProcessor-exchange", type = "direct"),
                    key = "bullet.originMessage"
            )
    )
    @RabbitHandler
    public void onOriginMessage(@Payload Map testMessage, @Headers Map<String, Object> headers,
                                Channel channel) throws IOException {
        log.info("***********消费开始*************");
        log.info("消费体:{}", JSONObject.toJSONString(testMessage));
        bulletScreenService.processMessage(testMessage, headers, channel);
    }
}

2.创建BulletScreenService类用于原始弹幕的业务处理,主要考虑的几个点:

  • 消息的合法性校验。
  • 消息的幂等性保证,这里用了Redis做个存储。
  • 将原始数据处理完后,在丢给MQ进行广播。
import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import kz.common.SocketConstants;
import kz.entity.OriginMessage;
import kz.producer.FanoutMessageProducer;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

import java.io.IOException;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

/**
 * @author Zong0915
 * @date 2022/12/9 下午3:45
 */
@Service
@Slf4j
public class BulletScreenService {
    @Autowired
    private StringRedisTemplate redisTemplate;
    @Autowired
    private FanoutMessageProducer fanoutMessageProducer;

    @Async
    public void processMessage(Map testMessage, Map<String, Object> headers,
                               Channel channel) throws IOException {
        OriginMessage originMessage = getOriginMessage(testMessage);
        // 合法性校验
        if (!validMessage(testMessage, headers, originMessage)) {
            return;
        }
        // 处理消息
        log.info("***********业务处理,弹幕: {}***********", originMessage.getMessage());
        String correlationId = headers.get(SocketConstants.ID).toString();
        // 存入Redis并设置过期时间1天
        redisTemplate.opsForSet().add(SocketConstants.CORRELATION_SET_PRE + originMessage.getRoomId(), correlationId);
        redisTemplate.expire(SocketConstants.CORRELATION_SET_PRE + originMessage.getRoomId(), 1, TimeUnit.DAYS);
        // 将处理好的消息发送给MQ,通过广播队列,将消息发送给所有的Socket服务,一般这里还会对originMessage进行一些二次封装
        // 本案例就不做处理了,原样返回
        fanoutMessageProducer.send(originMessage);
        // 确认消息
        Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
        channel.basicAck(deliveryTag, false);
    }

    public OriginMessage getOriginMessage(Map testMessage) {
        String messageJson = (String) testMessage.get(SocketConstants.MESSAGE);
        if (StringUtils.isBlank(messageJson)) {
            return null;
        }
        OriginMessage originMessage = JSONObject.parseObject(messageJson, OriginMessage.class);
        return originMessage;
    }

    /**
     * 对消息进行合法性校验
     */
    public boolean validMessage(Map testMessage, Map<String, Object> headers, OriginMessage originMessage) {
        // 判空
        if (testMessage == null || testMessage.size() == 0 || originMessage == null) {
            return false;
        }
        if (headers == null || headers.size() == 0) {
            return false;
        }
        // 幂等性校验,如果消息已经被消费过了,那么这个弹幕消息就不应该被二次消费,这个消息就直接把他处理掉
        UUID correlationId = (UUID) headers.get(SocketConstants.ID);
        Boolean exist = redisTemplate.opsForSet().isMember(SocketConstants.CORRELATION_SET_PRE + originMessage.getRoomId(), correlationId.toString());
        return !Optional.ofNullable(exist).orElse(false);
    }
}

最后就是启动类BulletScreenApplication

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.scheduling.annotation.EnableAsync;

/**
 * @author Zong0915
 * @date 2022/12/10 下午9:44
 */
@SpringBootApplication
@EnableDiscoveryClient
@EnableAsync
public class BulletScreenApplication {
    public static void main(String[] args) {
        SpringApplication.run(BulletScreenApplication.class, args);
    }
}

2.3 Socket服务监听弹幕数据并返回前端

记得在pom依赖中引入上面的公共包:

<dependency>
   <groupId>bullet-service</groupId>
    <artifactId>service-bulletcommon</artifactId>
    <version>1.0-SNAPSHOT</version>
</dependency>

2.3.1 配置类

RabbitMQ配置类增加下队列和交换机的配置信息:绑定bulletSocket-queueA

@Bean
public Queue initFanoutSocketQueueA() {
    return new Queue(SocketConstants.BULLET_SOCKET_QUEUE_A, true);
}

@Bean("fanoutExchange")
FanoutExchange initFanoutExchange() {
    return new FanoutExchange(SocketConstants.BULLET_FANOUT_EXCHANGE, true, false);
}

@Bean
Binding initBindingFanoutA(@Qualifier("fanoutExchange") FanoutExchange fanoutExchange) {
    return BindingBuilder.bind(initFanoutSocketQueueA()).to(fanoutExchange);
}

另一个Socket项目,添加以下配置:绑定bulletSocket-queueB

@Bean
public Queue initFanoutSocketQueueB() {
    return new Queue(SocketConstants.BULLET_SOCKET_QUEUE_B, true);
}

@Bean("fanoutExchange")
FanoutExchange initFanoutExchange() {
    return new FanoutExchange(SocketConstants.BULLET_FANOUT_EXCHANGE, true, false);
}

@Bean
Binding initBindingFanoutA(@Qualifier("fanoutExchange") FanoutExchange fanoutExchange) {
    return BindingBuilder.bind(initFanoutSocketQueueB()).to(fanoutExchange);
}

再写一个缓存工具类,通过直播间号获得同一个直播间下的所有WebSocket信息:

public class SocketCache {
	public static List<BulletScreenServer> getSocketGroupByRoomId(String roomId) {
        ArrayList<BulletScreenServer> res = new ArrayList<>();
        if (StringUtils.isBlank(roomId)) {
            return res;
        }
        for (Map.Entry<Integer, ConcurrentHashMap<String, BulletScreenServer>> hashMapEntry : CACHE_SEGMENT.entrySet()) {
            ConcurrentHashMap<String, BulletScreenServer> map = hashMapEntry.getValue();
            if (map == null || map.size() == 0) {
                continue;
            }
            for (BulletScreenServer server : map.values()) {
                if (server.getSession().isOpen() && StringUtils.equals(roomId, server.getRoomId())) {
                    res.add(server);
                }
            }
        }
        return res;
    }
}

2.3.2 消费者

重点就是消费者的业务代码了,对最终的弹幕数据进行广播,创建FanOutMessageConsumer类:

import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import kz.cache.SocketCache;
import kz.common.SocketConstants;
import kz.entity.OriginMessage;
import kz.service.BulletScreenServer;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.List;
import java.util.Map;

/**
 * @author Zong0915
 * @date 2022/12/15 下午1:57
 */
@Component
@Slf4j
public class FanOutMessageConsumer {

    /**
     * 处理弹幕消息,开始广播
     *
     * @param testMessage Map类型的消息体
     * @param headers     消息头
     * @param channel     消息所在的管道
     */
    @RabbitListener(
            bindings = @QueueBinding(
                    value = @Queue(value = "bulletSocket-queueA", durable = "true"),
                    // 默认的交换机类型就是direct
                    exchange = @Exchange(name = "bulletFanOut-exchange", type = "fanout")
            )
    )
    @RabbitHandler
    public void onOriginMessage(@Payload Map testMessage, @Headers Map<String, Object> headers, Channel channel) throws IOException {
        log.info("***********消费开始, Socket服务A接收到广播消息*************");
        log.info("消费体:{}", JSONObject.toJSONString(testMessage));
        OriginMessage originMessage = getOriginMessage(testMessage);
        if (originMessage == null) {
            return;
        }
        // 根据roomID去找到同一个直播间下的所有用户并广播消息
        List<BulletScreenServer> socketGroupByRoomId = SocketCache.getSocketGroupByRoomId(originMessage.getRoomId());
        for (BulletScreenServer bulletScreenServer : socketGroupByRoomId) {
            bulletScreenServer.getSession().getBasicRemote().sendText(JSONObject.toJSONString(originMessage));
        }
        // 确认消息
        Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
        channel.basicAck(deliveryTag, false);
    }

    public OriginMessage getOriginMessage(Map testMessage) {
        String messageJson = (String) testMessage.get(SocketConstants.MESSAGE);
        if (StringUtils.isBlank(messageJson)) {
            return null;
        }
        OriginMessage originMessage = JSONObject.parseObject(messageJson, OriginMessage.class);
        return originMessage;
    }
}

另一个Socket服务则改一下消费者的监听队列和日志内容即可:

直播弹幕系统(二)- 整合RabbitMQ进行消息广播和异步处理

2.4 测试

打开同一个直播间的两个用户,让两个WebSocket正好建立到不同的服务器上:
直播弹幕系统(二)- 整合RabbitMQ进行消息广播和异步处理
此时Socket服务A:
直播弹幕系统(二)- 整合RabbitMQ进行消息广播和异步处理

Socket服务B:
直播弹幕系统(二)- 整合RabbitMQ进行消息广播和异步处理
页面A中随便发送一条弹幕:
直播弹幕系统(二)- 整合RabbitMQ进行消息广播和异步处理
页面B中随便发送一条弹幕:
直播弹幕系统(二)- 整合RabbitMQ进行消息广播和异步处理

1.前端发送一条弹幕,后端监听到,开始向预处理队列丢消息。
直播弹幕系统(二)- 整合RabbitMQ进行消息广播和异步处理
2.service-bulletscreen服务,监听到预处理队列数据,开始进行处理。

直播弹幕系统(二)- 整合RabbitMQ进行消息广播和异步处理
3.经过一系列校验和幂等性处理之后,将处理完的弹幕通过交换机发送给广播队列:
直播弹幕系统(二)- 整合RabbitMQ进行消息广播和异步处理
4.Socket服务B接收到消息:
直播弹幕系统(二)- 整合RabbitMQ进行消息广播和异步处理

Socket服务A接收到广播消息:
直播弹幕系统(二)- 整合RabbitMQ进行消息广播和异步处理

5.前端页面展示:

页面A:
直播弹幕系统(二)- 整合RabbitMQ进行消息广播和异步处理

页面B:
直播弹幕系统(二)- 整合RabbitMQ进行消息广播和异步处理

到这里,一个聊天服务就完成了。不过大家也看到在线人数这块咱没做。可以用Redis缓存来记录每个直播间的人数。这个功能放到下一篇文章来讲。文章来源地址https://www.toymoban.com/news/detail-402619.html

到了这里,关于直播弹幕系统(二)- 整合RabbitMQ进行消息广播和异步处理的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 202、RabbitMQ 之 使用 fanout 类型的Exchange 实现 Pub-Sub 消息模型---fanout类型就是广播类型

    就是声明一个 fanout 类型的 Exchange 来分发消息。消费者进行消费 fanout 类型就是广播模式 。 fanout 类型 的 Exchange 不会判断消息的路由key,直接将消息分发给绑定到该Exchange的所有队列。 生产者发送一条消息到fanout类型的Exchange后,绑定到该Exchange的所有队列都会收到该消息的

    2024年02月07日
    浏览(38)
  • .Net 6 下WorkerService+RabbitMq实现消息的异步发布订阅

            近期项目里有需要用到RabbitMq实现一些业务,学习整理之后在此记录一下,如有问题或者不对的地方,欢迎留言指正。 注意: 多线程消息发布时,应避免多个线程使用同一个IModel实例,必须保证Imodel被一个线程独享,如果必须要多个线程访问呢一个实例的话,则可以

    2024年01月18日
    浏览(52)
  • Java中如何使用消息队列实现异步(ActiveMQ,RabbitMQ,Kafka)

    在 Java 中,可以使用消息队列实现异步处理。下面是一个简单的示例代码,用于说明如何使用 ActiveMQ 实现消息队列异步处理: 添加 ActiveMQ 依赖 在 pom.xml 文件中添加以下依赖: 创建消息队列 创建一个名为 “TestQueue” 的消息队列,并配置 ActiveMQ 连接信息: 创建消息消费者

    2024年02月16日
    浏览(57)
  • logback整合rabbitmq实现消息记录日志

    logback.xml文件配置 yml文件配置 代码配置rabbitmq信息 测试客户端 此外,如果要对日志进行链路标记,可以是用MDC

    2024年02月05日
    浏览(45)
  • Springboot整合RabbitMQ消息中间件

    spring-boot-rabbitmq–消息中间件整合 前言:RabbitMQ的各种交换机说明 1、直连交换机 生产者发布消息时必须带着routing-key,队列绑定到交换机时必须指定binding-key ,且routing-key和binding-key必须完全相同,如此才能将消息路由到队列中 直连交换机通常用来循环分发任务给多个workers,

    2024年02月11日
    浏览(47)
  • 消息队列——spring和springboot整合rabbitmq

    目录 spring整合rabbitmq——生产者 rabbitmq配置文件信息 倒入生产者工程的相关代码 简单工作模式 spring整合rabbitmq——消费者 spring整合rabbitmq——配置详解 SpringBoot整合RabbitMQ——生产者  SpringBoot整合RabbitMQ——消费者   使用原生amqp来写应该已经没有这样的公司了 创建两个工程

    2024年02月16日
    浏览(54)
  • Spring Boot 整合 RabbitMQ 实现延迟消息

    消息队列(Message Queuing,简写为 MQ)最初是为了解决金融行业的特定业务需求而产生的。慢慢的,MQ 被应用到了更多的领域,然而商业 MQ 高昂的价格让很多初创公司望而却步,于是 AMQP(Advanced Message Queuing Protocol,高级消息队列协议)应运而生。 随着 AMQP 草案的发布,两个月

    2024年04月08日
    浏览(50)
  • springboot整合rabbitmq 实现消息发送和消费

    Spring Boot提供了RabbitMQ的自动化配置,使得整合RabbitMQ变得非常容易。 首先,需要在pom.xml文件中引入amqp-client和spring-boot-starter-amqp依赖: 接下来需要在application.properties文件中配置RabbitMQ连接信息: 然后编写消息发送者: 其中,my-exchange和my-routing-key是需要自己定义的交换机和

    2024年02月07日
    浏览(42)
  • SpringBoot 整合RabbitMq 自定义消息监听容器来实现消息批量处理

    RabbitMQ是一种常用的消息队列,Spring Boot对其进行了深度的整合,可以快速地实现消息的发送和接收。在RabbitMQ中,消息的发送和接收都是异步的,因此需要使用监听器来监听消息的到来。Spring Boot中提供了默认的监听器容器,但是有时候我们需要自定义监听器容器,来满足一

    2024年02月16日
    浏览(50)
  • 视频号直播弹幕采集

    训练地址:https://www.qiulianmao.com websocket逆向 http拦截 websocket拦截 视频号直播弹幕采集 实战一:Http轮询弹幕拦截 更新中

    2024年02月06日
    浏览(49)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包