spring-kafka之请求响应模式

这篇具有很好参考价值的文章主要介绍了spring-kafka之请求响应模式。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

        kafka是一款性能强劲的分布式流式处理软件,被广泛用于大数据应用场景。所以很多小伙伴对kafka肯定不会陌生,但是kafka的请求响应模式估计使用的却不一定很多。首先简单唠叨下什么是请求响应模式,这个类似于http请求一样发出请求能够在一个请求中返回结果,所以这种场景跟小伙伴大部分使用kafka的场景肯定不大一样,但是这种模式却可以简化下述场景的使用:

场景:数据删除校验

        随着微服务化的发展,很多数据不再像最初的单库模式一样都放在一个数据库实例里,并不是所有的服务都在一个服务里。所以删除某些数据时无法再像单体模式一样进行本地校验,也有有人会说可以通过微服务调用的方式来进行校验,确实微服务确实可以实现删除校验的功能,但是当服务越来越多并且逐渐增长的时候就会出现问题,导致被依赖方需要频繁的调整代码,因为依赖这个基础服务的其他服务一直在变化。所以这个是否使用mq进行数据校验的解耦就成为一种很好的替代方案。相信大部分人使用mq实现该功能的方案就是创建两个topic:请求topic以及响应topic,基础服务删除数据前向请求topic发送数据,服务依赖方收到对应的删除校验请求后判断该服务是否有数据依赖删除的基础数据,如果则向响应topic发送数据。

        我们可以看到上面的交互用到了两个topic,并且鉴于上述响应的异步性,删除校验端需要启动异步处理等待响应的返回,同时需要启动超时检测机制(不能一直等待),这种双topic确实可以解决这种删除校验的逻辑,但是实现比较繁琐,今天咱们就来探索下kafka另外一个处理方式:请求响应模式,看下这种模式如何简化处理流程的。

        kafka实现请求响应在spring框架下很容易实现,ReplyingKafkaTemplate这个类就可以实现该功能,废话不多说,直接给出实例代码:


    @Autowired
    private ReplyingKafkaTemplate<String, String, String> replyingKafkaTemplate;
    
    @Test
    public void sendReplyMsg() throws JsonProcessingException, InterruptedException {
        doSendReplyMsg("si", "li");
        doSendReplyMsg("san", "zhang");
        TimeUnit.SECONDS.sleep(50);
    }
    
    public void doSendReplyMsg(String key, String msg) throws JsonProcessingException, InterruptedException {
        ProducerRecord<String, String> request = new ProducerRecord<>("reply", key, msg);
        // 发送数据并设置3秒的响应超时时间,这个时间可以根据需要自己设定
        RequestReplyFuture<String, String, String> future = replyingKafkaTemplate.sendAndReceive(request,
                Duration.of(3, ChronoUnit.SECONDS));
        future.addCallback(new ListenableFutureCallback<ConsumerRecord<String, String>>() {
            @Override
            public void onFailure(Throwable ex) {
            
            }
            
            @Override
            public void onSuccess(ConsumerRecord<String, String> result) {
                log.info("发送数据:key:{},value:{}收到响应:{}", key, msg, result.value());
            }
        });
    }
@Slf4j
@EnableKafka
@Configuration
public class KafkaConfig {
    
    /**
    ** 定义ReplyingKafkaTemplate.
    **/
    @Bean
    public ReplyingKafkaTemplate<String, String, String> replyKafkaTemplate(ProducerFactory<String, String> pf,
            ConcurrentMessageListenerContainer<String, String> repliesContainer) {
        ReplyingKafkaTemplate<String, String, String> replyTemplate = new ReplyingKafkaTemplate<>(pf, repliesContainer);
        replyTemplate.setDefaultReplyTimeout(Duration.ofSeconds(3));
        replyTemplate.setSharedReplyTopic(true);
        return replyTemplate;
    }
    
    @Bean
    public ConcurrentMessageListenerContainer<String, String> repliesContainer(ConsumerFactory<String, String> cf) {
        ContainerProperties containerProperties = new ContainerProperties("repliesTopic");
        return new ConcurrentMessageListenerContainer<>(cf, containerProperties);
    }
    
    @Bean
    public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> pf) {
        return new KafkaTemplate<>(pf);
    }
}

上述代码为kafka消息生产者相关,下面是kafka消费者代码:

@Component
public class KafkaListeners {
    
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
    
    @KafkaListener(topics = "reply", groupId = "111")
    public void listen(ConsumerRecord<String, String> record, @Headers MessageHeaders headers) {
        log.info("listen收到数据");
        Object obj = headers.get(KafkaHeaders.REPLY_TOPIC);
        if (null != obj) {
            String replyTopic = new String((byte[]) obj);
            RecordHeaders headers1 = new RecordHeaders();
            // 返回数据里面需要在header中增加kafka_correlationId
            headers1.add(KafkaHeaders.CORRELATION_ID, record.headers().lastHeader(KafkaHeaders.CORRELATION_ID).value());
            ProducerRecord<String, String> record1 = new ProducerRecord<>(replyTopic, record.partition(), record.key(),
                    record.value() + record.key(), headers1);
            kafkaTemplate.send(record1);
        }
    }
}

注意上述备注部分,kafka消费端需要在kafka的ProducerRecord header中增加kafka_correlationId,而且该字段需要跟发送方发送的kafka_correlationId值保持一致,这也是生产端进行消息匹配的值。

但需要注意的是及时采用的是kafka的topic模式,多个消费者可能都会响应,但是生产端在收到一个数据后就不再接收后续消费者发送的响应,ReplyingKafkaTemplate的源码可以参考:ReplyingKafkaTemplate源码分析_PolarisHuster的博客-CSDN博客文章来源地址https://www.toymoban.com/news/detail-508615.html

到了这里,关于spring-kafka之请求响应模式的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Spring-Kafka 发送消息的两种写法

    本文主要是使用 Java 语言中 spring-kafka 依赖 对 Kafka 进行使用。 使用以下依赖对 Kafka 进行操作: 需要更改版本的话,可以前往:Maven 仓库 创建项目,先创建一个简单的 Maven 项目,删除无用的包、类之后,使用其作为一个父级项目。 以下内容如果在项目启动时报这个错: 把

    2024年01月20日
    浏览(44)
  • 浅析Spring-kafka源码——消费者模型的实现

    SpringBoot项目中的消费端实现而言,Spring-kafka没有用原生的ConsumerConnector,,而是借助原生client的拉取消息功能做了自己的消费模型的实现,提供了@KafkaListener注解这种方式实现消费。 开发中在使用Spring-kafka时,一般也都是通过使用@KafkaListener注解的方法来实现消息监听和消费。

    2024年02月09日
    浏览(55)
  • 使用spring-kafka的Java API操作Kafka集群的Topic

    记录 :462 场景 :在Spring Boot微服务集成spring-kafka-2.8.2操作Kafka集群的Topic的创建和删除。 版本 :JDK 1.8,Spring Boot 2.6.3,kafka_2.12-2.8.0,spring-kafka-2.8.2。 Kafka集群安装 :https://blog.csdn.net/zhangbeizhen18/article/details/131156084 1.微服务中 配置Kafka信息 1.1在pom.xml添加依赖 pom.xml文件: 解析:

    2024年02月10日
    浏览(42)
  • Spring-Kafka如何实现批量消费消息并且不丢失数据

    先给答案: 某个业务对象由多张表关联而成,要创建该对象需要向多张表插入数据,基于canal的监控就会有多次该对象的变更记录,而Kafka消费的时候也会多次处理同一个对象(虽然不同表,但是同一个对象的不同部分),原有的Kafka消费者是一次处理一条,这将造成重复对同

    2024年02月13日
    浏览(46)
  • 使用Kafka客户端(spring-kafka)的Java API操作Kafka的Topic

    记录 :458 场景 :在Spring Boot微服务集成Kafka客户端spring-kafka-2.8.2操作Kafka的Topic的创建和删除。 版本 :JDK 1.8,Spring Boot 2.6.3,kafka_2.12-2.8.0,spring-kafka-2.8.2。 Kafka安装 :https://blog.csdn.net/zhangbeizhen18/article/details/129071395 1.微服务中 配置Kafka信息 1.1在pom.xml添加依赖 pom.xml文件: 解析

    2024年02月09日
    浏览(41)
  • Spring请求与响应——请求

    不知道是否还对这个图有印象,我们该开始理解一下交互问题了。 我们举个示例来理解吧(我们用下面这个结构来举例): @@RequestMapping() 还记得什么是映射路径嘛?这玩意不能重名哦~,比如下面这种情况就会报错 我们修改也很简单,按照名字和路径给完整就好了比如:

    2024年02月01日
    浏览(28)
  • 【Spring】Spring MVC请求响应

    访问不同的路径, 就是发送不同的请求。在发送请求时, 可能会带⼀些参数, 所以学习Spring的请求, 主要是学习如何传递参数到后端以及后端如何接收。 传递参数,我们通过postman测试。 正常传递: 可以看到, 后端程序正确拿到了name参数的值。 Spring MVC 会根据⽅法的参数名, 找

    2024年02月08日
    浏览(45)
  • spring mvc 请求与响应

    我是南城余!阿里云开发者平台专家博士证书获得者! 欢迎关注我的博客!一同成长! 一名从事运维开发的worker,记录分享学习。 专注于AI,运维开发,windows Linux 系统领域的分享! 知识库链接: 请求与响应 · 语雀 1. 请求映射路径 get请求是路径传参,而post请求是body传参

    2024年01月25日
    浏览(53)
  • 【性能测试】运维测试01之性能测试整体认知包括:TPS、请求响应时间、事务响应时间、并发用户数、吞吐量、吞吐率、点击率、资源使用率等性能指标详细介绍

    性能测试整体认知包括:TPS、请求响应时间、事务响应时间、并发用户数、吞吐量、吞吐率、点击率、资源使用率。 1.1 需求一 1.熟悉Linux、windows等操作系统,熟悉shell脚本; ⒉.熟悉jvm调优, tomcat调优等基础策略 3.熟悉mysq数据库,熟练掌握javascript、java、python、groovy等至少一门

    2024年02月16日
    浏览(40)
  • Spring Boot拓展XML格式的请求和响应

    在我们开发过程中,我们经常使用的参数绝大多少事HTML和JSON格式的请求和响应处理,但是我们在实际开发过程中,我们可能经历一些,比如对于XML格式的请求,我们在后端应该如何接收,并且如何将XML格式的参数变成对象,然后返回一个XML对象呢? 实现原理 我们在处理XM

    2024年02月07日
    浏览(31)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包