spring cloud steam 整合kafka 进行消息发送与接收

这篇具有很好参考价值的文章主要介绍了spring cloud steam 整合kafka 进行消息发送与接收。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

spring cloud steam :
Binder和Binding
Binder是SpringCloud Stream的一个抽象概念,是应用与消息中间件之间的粘合剂,目前SpringCloud Stream实现了Kafka和RabbitMQ的binder
Binder可以生成Binding,Binding用来绑定消息容器的生产者和消费者,它有两种类型,INPUT和OUTPUT,INPUT对应于消费者,OUTPUT对应于生产者。
整合配置yml文件:

spring:
  cloud:
    #    function:
    #      definition: testChannel
    stream:
      default-binder: kafka #默认的binder是kafka(粘合剂粘合的类型为kafka)

      #可以动态绑定的目标列表(如:动态路由),如果设置,则只能绑定列出的目的地
      #     dynamic-destinations:
      #绑定信息
      bindings:
        #消息报错后,数据保存的topic
        error:
          destination: myError
        testChannel-in-0:
          #消费者
          consumer:
            #            bindingName: myInConsumer
            #消费者并发 默认为1
            concurrency: 1
            #是否分区接收数据 默认false
            partitioned: false
            #头信息模式,设置为raw时,禁用输入头文件解析。仅适用于不支持消息头的消息中间件,并且需要头部嵌入。入站数据来自外部Spring Cloud Stream应用程序时很有用。
            header-mode: headers
            #重试次数
            max-attempts: 3
            #初始回退间隔时间
            back-off-initial-interval: 1000
            #最大回退间隔时间
            back-off-max-interval: 10000
            #回退倍数
            back-off-multiplier: 2.0
            #大于0时,表示允许自定义该消费者的实例索引,-1时使用spring.cloud.stream.instance-index
            instance-index: -1
            #大于0时表示自定义消费者实例技术,-1时默认使用spring.cloud.stream.instanceCount
            instance-count: -1
            content-type: application/json

          #生产者
          producer:
            #一个确定如何分配出站数据的SpEL表达式
            partition-key-expression: headers.cs
            #一个PartitionKeyExtractorStrategy实现。如果设置,或者如果设置了partitionKeyExpression,则该通道上的出站数据将被分区,并且partitionCount必须设置为大于1的值才能生效。这两个选项是相互排斥的。
            #partition-key-extractor-class:
            #一个PartitionSelectorStrategy实现。与partitionSelectorExpression相互排斥。如果没有设置,则分区将被选为hashCode(key) % partitionCount,其中key通过partitionKeyExpression或partitionKeyExtractorClass计算。
            #partition-selector-class:
            #partition-selector-expression:
            #如果启用分区,则数据的目标分区数。如果生产者被分区,则必须设置为大于1的值。在Kafka,解释为提示; 而是使用更大的和目标主题的分区计数。
            partition-count: 3
            #消息发送失败的处理逻辑默认是关闭的   test.errors
            error-channel-enabled: true
          destination: test #目标主题 相当于kafka的topic
          binder: kafka  #粘合器
          content-type: application/json
          #          content-type: text/html
          group: group2
        testChannel-out-0:
          #消费者
          consumer:
            #            bindingName: myOutConsumer
            #消费者并发 默认为1
            concurrency: 1
            #是否分区接收数据 默认false
            partitioned: false
            #头信息模式,设置为raw时,禁用输入头文件解析。仅适用于不支持消息头的消息中间件,并且需要头部嵌入。入站数据来自外部Spring Cloud Stream应用程序时很有用。
            header-mode: headers
            #重试次数
            max-attempts: 3
            #初始回退间隔时间
            back-off-initial-interval: 1000
            #最大回退间隔时间
            back-off-max-interval: 10000
            #回退倍数
            back-off-multiplier: 2.0
            #大于0时,表示允许自定义该消费者的实例索引,-1时使用spring.cloud.stream.instance-index
            instance-index: -1
            #大于0时表示自定义消费者实例技术,-1时默认使用spring.cloud.stream.instanceCount
            instance-count: -1

          #          #生产者
          #          producer:
          #            #一个确定如何分配出站数据的SpEL表达式
          #            partition-key-expression: headers.cs
          #            #一个PartitionKeyExtractorStrategy实现。如果设置,或者如果设置了partitionKeyExpression,则该通道上的出站数据将被分区,并且partitionCount必须设置为大于1的值才能生效。这两个选项是相互排斥的。
          #            #partition-key-extractor-class:
          #            #一个PartitionSelectorStrategy实现。与partitionSelectorExpression相互排斥。如果没有设置,则分区将被选为hashCode(key) % partitionCount,其中key通过partitionKeyExpression或partitionKeyExtractorClass计算。
          #            #partition-selector-class:
          #            #partition-selector-expression:
          #            #如果启用分区,则数据的目标分区数。如果生产者被分区,则必须设置为大于1的值。在Kafka,解释为提示; 而是使用更大的和目标主题的分区计数。
          #            partition-count: 3
          #            #消息发送失败的处理逻辑默认是关闭的   test.errors
          #            error-channel-enabled: true
          destination: test #本例子创建了另外一个topic (test1)用于区分不同的功能区分。
          binder: kafka
          content-type: application/json
          group: group1

      #          producer:
      #            error-channel-enabled: true
      ##            partitionSelectorName: customPartitionSelector
      ##            partitionKeyExtractorName: customPartitionKeyExtractor
      #            partitionCount: 3
      #            partitionKeyExpression: headers.cs
      ##            partition-key-extractor-name: customPartitionKeyExtractor
      ##            partition-selector-name: customPartitionSelector
      binders:
        kafka:
          binder:
            #kafka brokers,默认localhost
            brokers: localhost
            #kafka 端口号,默认9092
            default-broker-port: 9092
            #kafka zk节点,默认localhost
            zk-nodes: localhost
            #zookeeper 端口
            default-zk-port: 2181
            #配置,map
            #configuration:
            #自定义标题列表
            headers:  cs
            #偏移量保存时间(ms)窗口,0:忽略,默认10000(ms)
            offset-update-time-window: 10000
            #偏移量保存次数,与时间窗口互斥
            offset-update-count: 0
            #broker 需要的ack数量
            required-acks: 1
            #只有设置autoCreateTopics或autoAddPartitions才有效
            min-partition-count: 1
            #自动创建topic时 生成的副本数量
            replication-factor: 1
            #自动创建主题
            auto-create-topics: true
            #如果设置为true,则绑定器将根据需要创建新的分区。如果设置为false,则绑定器将依赖于已配置的主题的分区大小。如果目标主题的分区计数小于预期值,则绑定器将无法启动。
            auto-add-partitions: true
            #socket 缓冲区大小
            socket-buffer-size: 2097152

            bootstrap-servers: 127.0.0.1:9092 #kafka服务地址,集群部署的时候需要配置多个

            #配置,map
            configuration:
              acks: -1
              key:
                serializer: org.apache.kafka.common.serialization.StringSerializer
              #            value:
              #              serializer: org.apache.kafka.common.serialization.StringSerializer
              max:
                poll:
                  records: 200
              retries: 3
              session:
                timeout:
                  ms: 40000   # 每次消费的处理时间
          #绑定
          bindings:
            testChannel-out-put:
              #消费者
              consumer:
                #主题分区消费者组成员之间自动平衡
                auto-rebalance-enabled: true
                #自动提交偏移量
                auto-commit-offset: true
#                auto-commit-on-error:
                #连接恢复尝试之间的间隔,以毫秒为单位。
                recovery-interval: 5000
                #是否将消费者偏移量重置为start-offset提供的值
                reset-offsets: false
                #新组的起始偏移量,或resetOffsets为true时的起始偏移量。允许的值:earliest,latest,默认值:null(相当于earliest)
                start-offset: earliest
                enable-dlq: false
                #              configuration:
                #接收错误消息的DLQ主题的名称。默认值:null(如果未指定,将导致错误的消息将转发到名为error:<destination>:<group>的主题)。:
              #              dlq-name:
              #生产者
              producer:
                #              configuration:
                buffer-size: 16348
                #生产者是否是同步的
                sync: true
                #生产者在发送之前等待多长时间,以便允许更多消息在同一批次中累积。(通常,生产者根本不等待,并且简单地发送在先前发送进行中累积的所有消息。)非零值可能会以延迟为代价增加吞吐量。
                batch-timeout: 0
                #                key-serializer: org.apache.kafka.common.serialization.ByteArraySerializer
                #                value-serializer: org.apache.kafka.common.serialization.ByteArraySerializer
                client-id: producer1
              health-timeout: 60

消息生产者:


@Component
public class MessageProducer {
    private final StreamBridge streamBridge;

    @Autowired(required = false)
    private BinderAwareChannelResolver resolver;


    public MessageProducer(StreamBridge streamBridge) {
        this.streamBridge = streamBridge;
    }

    public String resolverSendMessage(String messages) {
        SendMessageDto sendMessageDto = new SendMessageDto();
        sendMessageDto.setIp(UUID.randomUUID().toString());
        sendMessageDto.setMessage(messages);
        sendMessageDto.setTiem(new Date().toString());
        MessageBuilder<SendMessageDto> stringMessageBuilder = MessageBuilder.withPayload(sendMessageDto);
        stringMessageBuilder.setHeader("cs","1");
//        stringMessageBuilder.setHeader(KafkaHeaders.MESSAGE_KEY,"1233322");
//        Message<SendMessageDto> build = ;
        GenericMessage stringMessage = (GenericMessage) stringMessageBuilder.build();
        resolver.resolveDestination("testChannel-in-0").send(stringMessage);
        return "yes!";

    }

    public String send(String messages) {
        SendMessageDto sendMessageDto = new SendMessageDto();
        sendMessageDto.setIp(UUID.randomUUID().toString());
        sendMessageDto.setMessage(messages);
        sendMessageDto.setTiem(new Date().toString());
//        String s = JSON.toJSONString(sendMessageDto);
        MessageBuilder<SendMessageDto> stringMessageBuilder = MessageBuilder.withPayload(sendMessageDto);
        stringMessageBuilder.setHeader("cs","1");
//        stringMessageBuilder.setHeader(KafkaHeaders.MESSAGE_KEY,"1233322");
//        Message<SendMessageDto> build = ;
        GenericMessage stringMessage = (GenericMessage) stringMessageBuilder.build();
        streamBridge.send("testChannel-in-0", stringMessage);
        return "发送消息: " + messages;
    }
}

消息消费者

//注意这里采用的是函数式编程,向spring 容器中注入名为testChannel 的bean,  应为高版本的spring cloud steam 弃用了
//@StreamListener ,@Input等注解,而是提倡函数式接口
//testChannel 与生产者写入消息的通道名“testChannel-in-0”  有所差异,-in-0是spring cloud steam存在的默认规则
 @Bean(name = "testChannel")
    Consumer<SendMessageDto> testChannel( ) {
        return str ->  {
            System.out.println("消费者处理消息:" +str );
        };
    }

如果需要指定消息的分区,需要在配置文件中自定义分区的计算逻辑属性为:文章来源地址https://www.toymoban.com/news/detail-687305.html

partition-key-expression: headers.cs

到了这里,关于spring cloud steam 整合kafka 进行消息发送与接收的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【Spring连载】使用Spring访问 Apache Kafka(三)----接收消息

    当你使用消息监听器容器,你必须提供一个监听器接收数据。现在有8个受支持的接口做为消息监听器。见以下代码: MessageListener:,当使用自动提交或者容器管理的提交方法之一时,使用MessageListener接口处理从kafka consumer的poll方法收到的单个ConsumerRecord实例。 AcknowledgingMes

    2024年02月01日
    浏览(39)
  • Springboot Kafka整合(开发实例、连接、配置TOPICS、发送消息)—官方原版

    Spring for Apache Kafka项目将Spring的核心概念应用于基于Kafka的消息传递解决方案的开发。我们提供了一个“模板”作为发送消息的高级抽象。 本快速教程适用于以下版本: Apache Kafka 客户端 3.3.x Spring Framework 6.0.x 最低 Java 版本:17 以下是一个不使用Spring Boot的应用程序示例;它既

    2024年02月06日
    浏览(62)
  • 实战:Spring Cloud Stream消息驱动框架整合rabbitMq

    相信很多同学都开发过WEB服务,在WEB服务的开发中一般是通过缓存、队列、读写分离、削峰填谷、限流降级等手段来提高服务性能和保证服务的正常投用。对于削峰填谷就不得不用到我们的MQ消息中间件,比如适用于大数据的kafka,性能较高支持事务活跃度高的rabbitmq等等,MQ的

    2024年02月08日
    浏览(45)
  • MQTT协议-EMQX技术文档-spring-boot整合使用--发送接收-消费

    MQTT(Message Queuing Telemetry Transport)是一种基于发布/订阅模式的通信协议,它与MQ(Message Queue,消息队列)有一定的关联,但二者并不完全相同。 MQTT是一种轻量级的通信协议,专门为在物联网(IoT)设备之间的消息传递而设计。它运行在TCP协议之上,以“发布-订阅”模式进行

    2024年02月12日
    浏览(37)
  • Spring Cloud Stream 4.0.4 rabbitmq 发送消息多function

    注意当多个消费者时,需要添加配置项:spring.cloud.function.definition 启动日志 交换机名称对应: spring.cloud.stream.bindings.demo-in-0.destination配置项的值 队列名称是交换机名称+分组名 http://localhost:8080/sendMsg?delay=10000name=zhangsan 问题总结 问题一 解决办法: 查看配置是否正确: spring

    2024年02月19日
    浏览(43)
  • 第三章 Spring Boot 整合 Kafka消息队列 消息者

    第一章 Kafka 配置部署及SASL_PLAINTEXT安全认证 第二章  Spring Boot 整合 Kafka消息队列 生产者 第三章  Spring Boot 整合 Kafka消息队列 消息者         Kafka 是一个消息队列产品,基于Topic partitions的设计,能达到非常高的消息发送处理性能。本文主是基于Spirng Boot封装了Apache 的

    2024年02月22日
    浏览(44)
  • 【Spring Boot】集成Kafka实现消息发送和订阅

    最近忙着搞低代码开发,好久没新建spring项目了,结果今天心血来潮准备建个springboot项目 注意Type选Maven,java选8,其他默认 点下一步后完成就新建了一个spring boot项目,配置下Maven环境,主要是settings.xml文件,里面要包含阿里云仓库,不然可能依赖下载不下来 在maven配置没问

    2024年02月09日
    浏览(47)
  • 13.Spring 整合 Kafka + 发送系统通知 + 显示系统通知

    目录 1.Spring 整合 Kafka 2.发送系统通知 2.1 封装事件对象  2.2 开发事件的生产者和消费者 2.3 触发事件:在评论、点赞、关注后通知​编辑 3.显示系统通知 3.1 通知列表 3.1.1 数据访问层 3.1.2 业务层 3.1.3 表现层 3.2 开发通知详情 3.2.1 开发数据访问层 3.1.2 业务层 3.1.3 表现层 3.3 未

    2024年02月03日
    浏览(44)
  • 使用Spring Boot和Kafka实现消息发送和订阅

    最近忙着搞低代码开发,好久没新建spring项目了,结果今天心血来潮准备建个springboot项目 注意Type选Maven,java选8,其他默认 点下一步后完成就新建了一个spring boot项目,配置下Maven环境,主要是settings.xml文件,里面要包含阿里云仓库,不然可能依赖下载不下来 在maven配置没问

    2024年02月11日
    浏览(38)
  • 使用Spring Boot和Kafka实现消息订阅和发送

    最近忙着搞低代码开发,好久没新建spring项目了,结果今天心血来潮准备建个springboot项目 注意Type选Maven,java选8,其他默认 点下一步后完成就新建了一个spring boot项目,配置下Maven环境,主要是settings.xml文件,里面要包含阿里云仓库,不然可能依赖下载不下来 在maven配置没问

    2024年02月11日
    浏览(36)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包