需求背景:
用户认证中心(Authorization center简称ac)使用jwt实现用户请求身份认证,需要支持多副本部署。系统架构如下:
用户登录后生成jwt,纵向需要通过socket长连接把jwt下发到应用集成层ws,ws再把jwt下发到应用。前端请求各应用时可以在应用的filter中校验jwt是否有效,无效则向上询问ws jwt是否有效,无效再请求ac jwt是否有效。
所以,用户登录请求通过负载均衡落到ac副本1(简称ac1)后,ac1生成jwt,除了纵向下发之外,还需要横向同步到ac2 ac3,ac2和ac3再纵向同步jwt,实现全平台的单点登录。
具体需求:
ac1发送消息到rabbit mq,其他的所有副本ac2和ac3消费消息。
实现方案:
1.添加依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.添加rabbitMQ配置
spring:
application:
name: xxx
rabbitmq:
addresses: x.x.x.x:5672
username: admin
password: admin
virtual-host: /
# 启用消息发布ack
publisher-confirm-type: correlated
# 启用发布返回
publisher-returns: true
template:
#启用强制信息;默认false
mandatory: true
retry:
# 发送重试是否可用
enabled: true
#最大重试次数
max-attempts: 3
#消费端配置
listener:
simple:
missing-queues-fatal: false
#最大/最小的消费者数量
concurrency: 1
max-concurrency: 8
#表示消息确认方式,其有三种配置方式,分别是none、manual和auto;默认auto
acknowledge-mode: manual
#监听器抛出异常而拒绝的消息是否被重新放回队列,默认值为true。
default-requeue-rejected: true
#每次从mq取消息的条数
prefetch: 1
retry:
enabled: true
#最大重试次数
max-attempts: 5
#最大重试时间间隔
max-interval: 32000
#第一次和第二次尝试发布或传递消息之间的间隔
initial-interval: 2000
#应用于上一重试间隔的乘数
multiplier: 2
3.登录service中发送mq(jwt),即mq provider
@Autowired
private RabbitTemplate rabbitTemplate;
public void login(Token token){
//如果是自己发的mq则不需要消费,直接return
if(cache.contains(token.getJwt())return;
//下发token到ws层
server.getNamespace("/ws1").getBroadcastOperations().sendEvent("LOGIN_EVENT",token,
broadcastAckCallback);
//获取redis中注册的ac节点信息,如果节点数大于1则需要发送mq同步token
int acNodeCount= redisTemplate.boundValueOps(nodeKey).get();
if(acNodeCount>1){
rabbitTemplate.convertAndSend(""ac-topic-exchange"",
AcConstant.LOGIN_ROUTING_KEY,
token);
}
4.MQ consumer
通过@Queue注解中的value属性,用spring spel表达式,在每次启动ac的时候生成一个带有随机字符串的名字,绑定到ac的topic exchange。这样,启动3个ac副本,就有3个queue绑定到了ac的exchange,mq message发送到topic exchange, 通过routing key分发到所有符合规则的queue,就能实现所有副本消费同一条消息。
@RabbitListener(
bindings = {
@QueueBinding(
value = @Queue(value = "ac-login-queue- #{T(System).currentTimeMillis()}"),
exchange = @Exchange(value = "ac-topic-exchange",
type = ExchangeTypes.TOPIC),
key = "ac.login.#")
},
ackMode = "MANUAL")
@RabbitHandler
public void loginConsumer(Token token, Channel channel, Message message) throws IOException {
log.info("【loginConsumer】 监听到其他ac节点的登录事件,jwt:{}",token.getJwt());
long messageId = message.getMessageProperties().getDeliveryTag();
try{
listenLogin();
//手动ack
channel.basicAck(messageId,false);
}catch (Exception e){
//失败ack,消息重新入队
log.error("loginConsumer消费失败:{}",e.getCause().getMessage());
channel.basicNack(messageId,false,true);
}
}
文章来源:https://www.toymoban.com/news/detail-767380.html
登出时的实现逻辑相同。 文章来源地址https://www.toymoban.com/news/detail-767380.html
到了这里,关于springboot+RabbitMQ实现一条消息被所有consumer消费的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!