redis发布订阅广播模式的使用&结合jeecg的Redis网关路由刷新机制
本质和传统的消息发布和订阅机制是差不多的,但是相较于其他几款MQ产品,Redis的使用更加便捷,也更加轻量化,不需要搭建一套繁重的MQ框架。
但是也它致命的缺点,redis的消息不会被持久化,服务器出现问题,消息会丢失,导致数据问题。对于数据一致性要求比较高的场景不适合使用,需要慎重选择。
导致消息丢失的情况:
一般获取消息的客户端(订阅者)会通过while循环不断的向redis服务器请求发布者获取消息,假如发布者在订阅者退出订阅状态时发布了消息,则该消息会丢失。
关于这个订阅者退出状态,值得探讨,这里做一个分析。使用终端模拟时,假如订阅者断开连接后,又重新连接,在这个断开的时间段内,如果存在数据发布,则重新连接的客户端是无法获取到消息的,因为redis服务器认为它是一个纯新的客户端。因此在程序里,必须小心使用redis的订阅连接,当订阅的连接没有主动释放时,也没有执行退出订阅时,数据会源源不断的写入内存,直到所有订阅者取走消息。
优缺点
优点:便捷,轻量化
缺点:
-
不稳定,消息易丢失,导致数据一致性问题,性能不高
-
redis客户端在订阅消息时,要求订阅在发布之前,否则无法订阅到客户端订阅前,已经发布的消息。
-
redis的消息发布与订阅,无法实现高并发和大数据量。前者受限于redis本身的并发量限制和内存大小;后者是因为redis发布消息时,会先将数据推送到每个客户端的连接缓冲区,如果单个消息过大会撑爆缓冲区,导致redis错误,就算redis没有撑爆缓冲区,如果消费者(订阅方)没有及时取走消息,也会因为数据积累而撑爆内存。
-
总结,快捷方便,消息不会持久化,网络不稳定,断开重连,无法接受到已发布的消息,链接没有释放,也没有退出订阅,消息会一直在内存中积压,撑爆内存。
简单使用
命令:
psubscribe 订阅一个或者多个channel,使用正则方式匹配多个
publish 发布消息到指定channel
pubsub 查看订阅与发布系统状态
pubsub channels pattern 列出当前活跃的channel
pubsub numsub channel-1 channel-n 获取指定频道的订阅者数量
pubsub numpat 获取订阅模式的数量
subscribe 订阅一个或者多个channel消息,使用一个或者多个准确的channel名实现订阅
unsubscribe 客户端退订channel
springboot中使用
@Autowired
private RedisTemplate<String, Object> redisTemplate;
BaseMap params = new BaseMap();
params.put("handlerName", "loderRouderHandler");
//刷新网关
redisTemplate.convertAndSend(GlobalConstants.REDIS_TOPIC_NAME, params);
jeecg代码分析:
注册消息监听器适配器
//RedisConfig中配置MessageListenerAdapter适配器,自定义消息处理方法
@Bean
MessageListenerAdapter commonListenerAdapter(RedisReceiver redisReceiver) {
MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(redisReceiver, "onMessage");
messageListenerAdapter.setSerializer(this.jacksonSerializer());
return messageListenerAdapter;
}
自定义消息接收器,
###在消息接收器中获取 JeecgRedisListener->onMessage()来获取消息,
###其他模块只需要实现JeecgRedisListener即可
###Object handlerName = params.get(“handlerName”),是消息内容传递过来的,可以动态指定是哪个消息处理类,比如路由处理类就是:GlobalConstants.LODER_ROUDER_HANDLER文章来源:https://www.toymoban.com/news/detail-419490.html
@Component
public class RedisReceiver {
public void onMessage(BaseMap params) {
//获取redis消息处理类名称
Object handlerName = params.get("handlerName");
//通过SpringContextHolder拿到JeecgRedisListener
JeecgRedisListener messageListener = (JeecgRedisListener)SpringContextHolder.getHandler(handlerName.toString(), JeecgRedisListener.class);
if (ObjectUtil.isNotEmpty(messageListener)) {
messageListener.onMessage(params);
}
}
public RedisReceiver() {
}
public boolean equals(final Object o) {
if (o == this) {
return true;
} else if (!(o instanceof RedisReceiver)) {
return false;
} else {
RedisReceiver other = (RedisReceiver)o;
return other.canEqual(this);
}
}
protected boolean canEqual(final Object other) {
return other instanceof RedisReceiver;
}
public int hashCode() {
int result = true;
return 1;
}
public String toString() {
return "RedisReceiver()";
}
}
定义路由监听器,(路由处理器),通过监听redis来实现
@Slf4j
@Component(GlobalConstants.LODER_ROUDER_HANDLER)
public class LoderRouderHandler implements JeecgRedisListener {
@Resource
private DynamicRouteLoader dynamicRouteLoader;
@Override
public void onMessage(BaseMap message) {
dynamicRouteLoader.refresh(message);
}
}
最终通过下面的方法获取路由配置
/**
* 从redis中读取路由配置
*
* @return
*/
private void loadRoutesByRedis(BaseMap baseMap) {
List<MyRouteDefinition> routes = Lists.newArrayList();
configService = createConfigService();
if (configService == null) {
log.warn("initConfigService fail");
}
Object configInfo = redisUtil.get(CacheConstant.GATEWAY_ROUTES);
if (ObjectUtil.isNotEmpty(configInfo)) {
log.info("获取网关当前配置:\r\n{}", configInfo);
JSONArray array = JSON.parseArray(configInfo.toString());
try {
routes = getRoutesByJson(array);
} catch (URISyntaxException e) {
e.printStackTrace();
}
}else{
log.warn("ERROR: 从Redis获取网关配置为空,请确认system服务是否启动成功!");
}
for (MyRouteDefinition definition : routes) {
log.info("update route : {}", definition.toString());
Integer status=definition.getStatus();
if(status.equals(0)){
dynamicRouteService.delete(definition.getId());
}else{
dynamicRouteService.add(definition);
}
}
if(ObjectUtils.isNotEmpty(baseMap)){
String delRouterId = baseMap.get("delRouterId");
if (ObjectUtils.isNotEmpty(delRouterId)) {
dynamicRouteService.delete(delRouterId);
}
}
this.publisher.publishEvent(new RefreshRoutesEvent(this));
}
参考链接:
https://www.cnblogs.com/lovelsl/articles/15272190.html文章来源地址https://www.toymoban.com/news/detail-419490.html
到了这里,关于redis发布订阅广播模式的使用&结合jeecg的Redis网关路由刷新机制的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!