文章来源:https://www.toymoban.com/news/detail-575143.html
需求分析
- 及时信息传递:消息推送功能能够确保网站向用户发送及时的重要信息,包括新闻更新、促销活动、账户状态变更等。这样可以增强用户体验,同时也提高用户对网站的参与度。
- 个性化定制:消息推送功能可以根据用户的偏好和兴趣来定制推送内容,使用户能够接收到与其相关和感兴趣的信息。这样可以提高用户满意度和参与度。
- 提高用户参与度:通过定期发送通知和提醒,消息推送功能可以吸引用户回归网站,并参与到活动中去。
- 提醒和警示功能:消息推送可以用作提醒和警示的工具,例如,提醒用户交易状态、密码更改请求、订单跟踪等。这样可以确保用户及时了解和处理相关事务。
- 营销和推广:消息推送还可以用于营销和推广活动,向用户发送促销信息、新产品推介等,从而增加销售和推广效果。
解决方案
- 实时性:使用WebSocket可以实现双向通信,而不需要客户端主动发送请求获取消息。一旦有新消息需要推送,服务器可以立即将消息推送给客户端,实现实时的消息传递。
- 高效性:相比传统的轮询机制,WebSocket的长连接机制可以减少不必要的网络请求和服务器负载。而使用Redis作为消息推送的中间件,可以利用其高性能的内存数据库特性,快速读写消息数据,提高消息推送的效率和响应速度。
- 可扩展性:WebSocket和Redis具有良好的可扩展性。WebSocket协议在现代浏览器中得到广泛支持,可以与各种后端框架和技术进行集成。而Redis作为高性能的分布式内存数据库,具备分布式部署和数据持久化等特性,可以满足大规模应用的消息推送需求。
- 灵活性:WebSocket和Redis的结合可以实现灵活的消息推送方式。通过使用Redis的发布/订阅功能,可以支持多种消息推送模式,如一对一推送、一对多推送和广播推送。根据实际需求,可以选择合适的推送模式,提供个性化的消息推送服务。
- 实现简单:WebSocket和Redis都有成熟的实现和丰富的文档资源,使用它们来实现消息推送相对来说比较简单。WebSocket提供基于事件的API,可方便地处理连接和消息的发送和接收。而Redis提供了易于使用的Pub/Sub功能,可以轻松地进行消息的发布和订阅。
实现步骤
架构图
文章来源地址https://www.toymoban.com/news/detail-575143.html
配置websocket请求地址
@Configuration
@EnableWebSocket
public class WebsocketConfiguration implements WebSocketConfigurer {
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
// webSocket通道
// 指定处理器和路径
registry.addHandler(new WebSocketHandler(), "/websocket")
// 指定自定义拦截器
.addInterceptors(new WebSocketInterceptor())
// 允许跨域
.setAllowedOrigins("*");
// sockJs通道
registry.addHandler(new WebSocketHandler(), "/sock-js")
.addInterceptors(new WebSocketInterceptor())
.setAllowedOrigins("*")
// 开启sockJs支持
.withSockJS();
}
}
配置websocket连接前置和连接关闭监听
public class WebSocketInterceptor implements HandshakeInterceptor {
private final RedisUtil redisUtil = GetBeanUtils.getBean(RedisUtil.class);
private final JwtTokenUtils jwtTokenUtils = GetBeanUtils.getBean(JwtTokenUtils.class);
@Override
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler webSocketHandler, Map<String, Object> map) {
if (request instanceof ServletServerHttpRequest) {
ServletServerHttpRequest serverHttpRequest = (ServletServerHttpRequest) request;
// TODO: 2022/12/15 握手前置监听 可以验证token是否有效
String authorization = serverHttpRequest.getServletRequest().getParameter("Authorization");
Claims allClaimsFromToken = jwtTokenUtils.getAllClaimsFromToken(authorization);
if (authorization != null
&& jwtTokenUtils.validateToken(authorization, allClaimsFromToken.getSubject())
&& !redisUtil.hasKey(RedisConstant.EXPIRATION_TOKEN.concat(CommonConstant.COLON).concat(authorization))) {
map.put(UserInfo.ID, allClaimsFromToken.get(UserInfo.ID, String.class));
return true;
}
}
return false;
}
@Override
public void afterHandshake(ServerHttpRequest serverHttpRequest, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Exception e) {
// TODO: 2022/12/15 断开连接监听
}
}
配置websocket处理程序
@Slf4j
public class WebSocketHandler extends AbstractWebSocketHandler {
/**
* 存储sessionId和webSocketSession
* 需要注意的是,webSocketSession没有提供无参构造,不能进行序列化,也就不能通过redis存储
* 在分布式系统中,要想别的办法实现webSocketSession共享(解决方案:采用redis发布订阅功能实现)
*/
private static final Map<String, WebSocketSession> SESSION_MAP = new ConcurrentHashMap<>();
private static final Map<String, String> USERID_MAP = new ConcurrentHashMap<>();
private final StringRedisTemplate stringRedisTemplate = GetBeanUtils.getBean(StringRedisTemplate.class);
/**
* webSocket连接创建后调用
*/
@Override
public void afterConnectionEstablished(WebSocketSession session) {
// 获取参数
String userId = String.valueOf(session.getAttributes().get(UserInfo.ID));
USERID_MAP.put(userId, session.getId());
SESSION_MAP.put(session.getId(), session);
}
/**
* 接收到消息会调用
*/
@Override
public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) {
try {
//登录消息不返回信息
if (message.getPayload().equals(NewsEnum.LOGIN.getCode())){
return;
}
JSONObject jsonObject = JSONObject.parseObject(message.getPayload().toString());
MessageBo messageBo = jsonObject.toJavaObject(MessageBo.class);
sendMessage(messageBo);
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
/**
* 连接出错会调用
*/
@Override
public void handleTransportError(WebSocketSession session, Throwable exception) {
SESSION_MAP.remove(session.getId());
USERID_MAP.values().remove(session.getId());
}
/**
* 连接关闭会调用
*/
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
SESSION_MAP.remove(session.getId());
USERID_MAP.values().remove(session.getId());
}
@Override
public boolean supportsPartialMessages() {
return false;
}
/**
* 其他模块调用该方法
* @param messageDto 消息模型
*/
public void sendMessage(MessageDto messageDto){
MessageBo messageBo = new MessageBo();
BeanUtils.copyProperties(messageDto, messageBo);
sendMessage(messageBo);
}
/**
* 后端发送消息
*/
public void sendMessage(MessageBo messageBo) {
if (messageBo.isPublish()) {
//发送消息需要通过redis发布出去
publish(messageBo);
} else if (messageBo.getUserIdList().isEmpty()) {
//未指定用户,发送给所有在线的客户
USERID_MAP.forEach((username, sessionId) -> {
send(sessionId, messageBo.getMessage());
});
} else {
//根据指定客户筛选出客户对应的sessionId集合
List<String> sessionIdList = USERID_MAP.entrySet().stream()
.filter(entry -> messageBo.getUserIdList().contains(entry.getKey()))
.map(Map.Entry::getValue)
.collect(Collectors.toList());
//发布消息
sessionIdList.forEach(sessionId -> send(sessionId, messageBo.getMessage()));
}
}
/**
* 指定用户发送消息
*
* @param sessionId 消息
*/
@SneakyThrows
private void send(String sessionId, String msg) {
WebSocketSession session = SESSION_MAP.get(sessionId);
if (session != null) {
session.sendMessage(new TextMessage(msg));
}
}
/**
* redis发布
*
* @param messageBo 消息
*/
private void publish(MessageBo messageBo) {
// 每个消息只需要发布一次
messageBo.setPublish(false);
stringRedisTemplate.convertAndSend(ConstantConfiguration.CHANNEL_1, JSON.toJSONString(messageBo));
}
}
配置redis交换机
public class ConstantConfiguration {
public static final String CHANNEL_1 = "CHANNEL_1";
}
配置redis订阅监听
@Slf4j
@Component
public class MessageSubListener implements MessageListener {
@Override
public void onMessage(Message message, byte[] bytes) {
log.info("消费消息:".concat(message.toString()));
WebSocketHandler webSocketHandler = new WebSocketHandler();
MessageBo messageBo = JSONObject.toJavaObject(JSON.parseObject(message.toString()), MessageBo.class);
webSocketHandler.sendMessage(messageBo);
}
}
配置redis发布监听
@Configuration
public class RedisPubListenerConfig {
public static final String ONMESSAGE = "ONMESSAGE";
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
// 可以添加多个 messageListener,配置不同的交换机
container.addMessageListener(listenerAdapter, new PatternTopic(ConstantConfiguration.CHANNEL_1));
return container;
}
/**
* 消息适配器
*
* @param receiver 接收者
* @return {@link MessageListenerAdapter}
*/
@Bean
MessageListenerAdapter listenerAdapter(MessageSubListener receiver) {
return new MessageListenerAdapter(receiver, ONMESSAGE);
}
}
到了这里,关于消息推送(websocket)集群化解决方案的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!