RabbitMQ消息的链路跟踪

这篇具有很好参考价值的文章主要介绍了RabbitMQ消息的链路跟踪。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

背景

TraceId能标记一次请求的调用链路,在我们排查问题的时候十分重要。系统引入MQ后,MQ消息默认不带TraceId,所以消息发送和处理的链路就断了。下面分享如何对业务逻辑无感的方式,将TraceId带到消费端。

难点

RabbitMQ的Message对象可以在属性上设置头信息,所以携带TraceId的位置有了,问题是怎么无感的方式设置和获取TraceId?

Spring RabbitMQ拦截器

在Spring里使用RabbitMQ本身没有拦截器,但是有一个消息处理器,可以在发送和接收消息之前对消息进行处理。里面有3个重载的方法,对原始消息进行转换。我们可以借助这个处理器,在Message对象里加上TraceId。

public interface MessagePostProcessor {

	/**
	 * Change (or replace) the message.
	 * @param message the message.
	 * @return the message.
	 * @throws AmqpException an exception.
	 */
	Message postProcessMessage(Message message) throws AmqpException;

	/**
	 * Change (or replace) the message and/or change its correlation data. Only applies to
	 * outbound messages.
	 * @param message the message.
	 * @param correlation the correlation data.
	 * @return the message.
	 * @since 1.6.7
	 */
	default Message postProcessMessage(Message message, Correlation correlation) {
		return postProcessMessage(message);
	}

	/**
	 * Change (or replace) the message and/or change its correlation data. Only applies to
	 * outbound messages.
	 * @param message the message.
	 * @param correlation the correlation data.
	 * @param exchange the exchange to which the message is to be sent.
	 * @param routingKey the routing key.
	 * @return the message.
	 * @since 2.3.4
	 */
	default Message postProcessMessage(Message message, Correlation correlation, String exchange, String routingKey) {
		return postProcessMessage(message, correlation);
	}

}

发送消息时,携带TraceId

Spring默认使用RabbitTemplate来发送消息,RabbitTemplate的send方法在发送消息之前,会调用beforePublishPostProcessors来处理Message对象。beforePublishPostProcessors集合里存的就是MessagePostProcessor对象,它会在发消息之前执行:

public void doSend(Channel channel, String exchangeArg, String routingKeyArg, Message message,
			boolean mandatory, @Nullable CorrelationData correlationData) throws IOException {
	// ...
	// 核心代码
	if (this.beforePublishPostProcessors != null) {
		for (MessagePostProcessor processor : this.beforePublishPostProcessors) {
			messageToUse = processor.postProcessMessage(messageToUse, correlationData, exch, rKey);
		}
	}
	// ...

	sendToRabbit(channel, exch, rKey, mandatory, messageToUse);

	// ...
}

我们要做的是在注册RabbitTemplate的时候加上处理TraceId的逻辑

@Bean
public RabbitTemplate rabbitTemplate(RabbitTemplateConfigurer configurer, ConnectionFactory connectionFactory) {
    RabbitTemplate template = new RabbitTemplate();
    configurer.configure(template, connectionFactory);

    template.setBeforePublishPostProcessors(new MessagePostProcessor() {
        @Override
        public Message postProcessMessage(Message message) throws AmqpException {
			// 这个方法没调用,调用的是下面那个方法
            return message;
        }

        @Override
        public Message postProcessMessage(Message message, Correlation correlation, String exchange, String routingKey) {
            String requestId = MDC.get(MdcConstants.REQUESTID);
            if (StringUtils.isEmpty(requestId)) {
                requestId = StringUtils.random();
            }
            message.getMessageProperties().setHeader(RabbitMQConstants.HEADER_REQUESTID, requestId);
            return message;
        }
    });
    return template;
}

消费消息时,获取TraceId

当我们通过@RabbitListener注册consumer时,Spring会通过org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.AsyncMessageProcessingConsumer#mainLoop方法,不断从consumer队列里拿到消息。在把消息交给@RabbitListener标注的对象前,也会对Message对象进行处理。这里的处理器是存在org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#afterReceivePostProcessors属性上。

private void doExecuteListener(Channel channel, Object data) {
	if (data instanceof Message) {
		Message message = (Message) data;
		if (this.afterReceivePostProcessors != null) {
			for (MessagePostProcessor processor : this.afterReceivePostProcessors) {
				message = processor.postProcessMessage(message);
				if (message == null) {
					throw new ImmediateAcknowledgeAmqpException(
							"Message Post Processor returned 'null', discarding message");
				}
			}
		}
		// ...

		invokeListener(channel, message);
	}
	else {
		invokeListener(channel, data);
	}
}

怎么设置SimpleMessageListenerContainer#afterReceivePostProcessors的值?

SimpleMessageListenerContainer对象是由SimpleRabbitListenerContainerFactory工厂对象创建,在创建SimpleMessageListenerContainer对象时,会把工厂里的属性拷贝过来,afterReceivePostProcessors就是通过工厂拷过来。所以我们直接设置SimpleRabbitListenerContainerFactory的afterReceivePostProcessors值就可以。

@Bean(name = {"rabbitListenerContainerFactory"})
@ConditionalOnProperty(
        prefix = "spring.rabbitmq.listener",
        name = {"type"},
        havingValue = "simple",
        matchIfMissing = true
)
public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory, ObjectProvider<ContainerCustomizer<SimpleMessageListenerContainer>> simpleContainerCustomizer) {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    configurer.configure(factory, connectionFactory);
    simpleContainerCustomizer.ifUnique(factory::setContainerCustomizer);

    factory.setAfterReceivePostProcessors(message -> {
        Object requestId = message.getMessageProperties().getHeader(RabbitMQConstants.HEADER_REQUESTID);
        if (StringUtils.isEmpty(requestId)) {
            requestId = StringUtils.random();
        }
        MDC.put(MdcConstants.REQUESTID, String.valueOf(requestId));
        return message;
    });
    return factory;
}

这样,我们就能在@RabbitListener的消费逻辑里拿到TraceId。文章来源地址https://www.toymoban.com/news/detail-729844.html

到了这里,关于RabbitMQ消息的链路跟踪的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 分布式搜索引擎(Elastic Search)+消息队列(RabbitMQ)部署(商城4)

    1、全文搜索 Elastic search可以用于实现全文搜索功能,例如商城中对商品搜索、搜索、分类搜索、订单搜索、客户搜索等。它支持复杂的查询语句、中文分词、近似搜索等功能,可以快速地搜索并返回匹配的结果。 2、日志分析 Elastic search可以用于实现实时日志分析,例

    2024年02月04日
    浏览(51)
  • 链路追踪详解(三):分布式链路追踪标准的演进

    目录 Google Dapper Twitter Zipkin Uber Jaeger OpenTracing 和 OpenCensus OpenTelemetry 小结 分布式链路追踪是现代云计算和微服务架构中一个关键技术,可以让开发者和运维团队理解和监控服务请求在复杂系统中的完整流转路径。分布式链路追踪技术的发展经历了从早期的专有解决方案到现代

    2024年02月05日
    浏览(44)
  • 分布式链路追踪专栏,Spring Cloud Sleuth:分布式链路追踪之通信模型设计

    Spring Cloud Sleuth  赋予分布式跟踪的  Spring Boot  自动配置的一键解决方案。 Spring Cloud Sleuth  是基于  Brave  的封装,也是很多公司采用开源加自研的最佳解决方案。 那么从作为架构师或者技术专家如何去借鉴优秀框架的设计理念和思想,本次  Chat  将开启作者既分布式链路

    2024年01月19日
    浏览(66)
  • 链路追踪详解(四):分布式链路追踪的事实标准 OpenTelemetry 概述

    目录 OpenTelemetry 是什么? OpenTelemetry 的起源和目标 OpenTelemetry 主要特点和功能 OpenTelemetry 的核心组件 OpenTelemetry 的工作原理 OpenTelemetry 的特点 OpenTelemetry 的应用场景 小结 OpenTelemetry 是一个为实现可观测性的开源的框架和工具集,用于创建和管理遥测数据,例如 traces,、metric

    2024年02月04日
    浏览(48)
  • 分布式系统中的分布式链路追踪与分布式调用链路

    本文分享自天翼云开发者社区《分布式系统中的分布式链路追踪与分布式调用链路》,作者:c****w 在分布式系统中,由于服务间的调用关系复杂,需要实现分布式链路追踪来跟踪请求在各个服务中的调用路径和时间消耗。这对问题排查和性能监控都很重要。 常用的分布式链

    2024年01月19日
    浏览(59)
  • 分布式链路追踪

    随着互联网业务快速扩展,软件架构也日益变得复杂,为了适应海量用户高并发请求,系统中越来越多的组件开始走向分布式化,如单体架构拆分为微服务、服务内缓存变为分布式缓存、服务组件通信变为分布式消息,这些组件共同构成了繁杂的分布式网络。 在大型系统的微

    2024年02月16日
    浏览(43)
  • 进阶分布式链路追踪

                            https://item.jd.com/14337086.html​编辑https://item.jd.com/14337086.html “ RocketMQ 消息中间件实战派上下册”是我既“ Spring Cloud Alibaba 微服务架构实战派上下册”之后,又一本历时超过 1 年半的巨无霸技术实战类型的书籍。 为了提高读者阅读本书的体验性,本书

    2024年02月02日
    浏览(46)
  • 分布式链路追踪概述

    随着系统设计变得日趋复杂,越来越多的组件开始走向分布式化,如微服务、分布式数据库、分布式缓存等,使得后台服务构成了一种复杂的分布式网络。往往前端的一个请求需要经过多个微服务、跨越多个数据中心才能最终获取到结果,如下图 并且随着业务的不断扩张,服

    2024年02月13日
    浏览(40)
  • RabbitMQ:高效传递消息的魔法棒,一篇带你助力构建可靠的分布式系统(上篇)

    MQ是消息队列( Message Queue )的缩写,是一种在应用程序之间传递消息的技术。通常用于 分布式系统 或 异步通信 中,其中 发送者 将消息放入队列,而 接收者 从队列中获取消息。 这种异步通信模式允许发送者和接收者在不需要实时连接的情况下进行通信,从而提高了应用

    2024年02月15日
    浏览(48)
  • 什么是分布式链路追踪

    随着互联网业务快速扩展,软件架构也日益变得复杂,为了适应海量用户高并发请求,系统中越来越多的组件开始走向分布式化,如单体架构拆分为微服务、服务内缓存变为分布式缓存、服务组件通信变为分布式消息,这些组件共同构成了繁杂的分布式网络。 在大型系统的微

    2024年02月16日
    浏览(49)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包