Springboot最简单的实战介绍 整合kafka-生产者与消费者(消息推送与订阅获取)

这篇具有很好参考价值的文章主要介绍了Springboot最简单的实战介绍 整合kafka-生产者与消费者(消息推送与订阅获取)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

指定kafka 代理地址,可以多个

#spring.kafka.bootstrap-servers=123.xxx.x.xxx:19092,123.xxx.x.xxx:19093,123.xxx.x.xxx:19094

spring.kafka.bootstrap-servers=192.168.x.xxx:9092

#=============== producer生产者 =======================

spring.kafka.producer.retries=0

每次批量发送消息的数量

spring.kafka.producer.batch-size=16384

缓存容量

spring.kafka.producer.buffer-memory=33554432

指定消息key和消息体的编解码方式

spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer

spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

#=============== consumer消费者 =======================

指定默认消费者group id

spring.kafka.consumer.group-id=test-app

spring.kafka.consumer.auto-offset-reset=earliest

spring.kafka.consumer.enable-auto-commit=true

spring.kafka.consumer.auto-commit-interval=100ms

指定消息key和消息体的编解码方式

spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer

spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

#spring.kafka.consumer.bootstrap-servers=192.168.8.111:9092

#spring.kafka.consumer.zookeeper.connect=192.168.8.103:2181

#指定tomcat端口

server.port=8063

application.yml:

spring:

KAFKA

kafka:

ָkafka服务器地址,可以指定多个

bootstrap-servers: 123.xxx.x.xxx:19092,123.xxx.x.xxx:19093,123.xxx.x.xxx:19094

#=============== producer生产者配置 =======================

producer:

retries: 0

每次批量发送消息的数量

batch-size: 16384

缓存容量

buffer-memory: 33554432

ָ指定消息key和消息体的编解码方式

key-serializer: org.apache.kafka.common.serialization.StringSerializer

value-serializer: org.apache.kafka.common.serialization.StringSerializer

#=============== consumer消费者配置 =======================

consumer:

#指定默认消费者的group id

group-id: test-app

#earliest

#当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费

#latest

#当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据

#none

#topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常

auto-offset-reset: latest

enable-auto-commit: true

auto-commit-interval: 100ms

#指定消费key和消息体的编解码方式

key-deserializer: org.apache.kafka.common.serialization.StringDeserializer

value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

好了,配置工作准备完毕。

我们先来搞Kafka的生产者,也就是负责推送消息的模块:

创建一个类, 叫KafkaSender(注解不能少,留意代码),

package com.kafkademo.producer;

import com.google.gson.Gson;

import com.google.gson.GsonBuilder;

import com.sun.scenario.effect.impl.sw.sse.SSEBlend_SRC_OUTPeer;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.kafka.core.KafkaTemplate;

import org.springframework.kafka.support.SendResult;

import org.springframework.stereotype.Component;

import org.springframework.util.concurrent.ListenableFuture;

import org.springframework.util.concurrent.ListenableFutureCallback;

/**

  • Hello!

  • Created By JCccc on 2018/11/24

  • 11:25

*/

@Component

public class KafkaSender {

@Autowired

private KafkaTemplate<String, Object> kafkaTemplate;

private final Logger logger = LoggerFactory.getLogger(KafkaSender.class);

public void send(String topic, String taskid, String jsonStr) {

//发送消息

ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, taskid, jsonStr);

future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {

@Override

//推送成功

public void onSuccess(SendResult<String, Object> result) {

logger.info(topic + " 生产者 发送消息成功:" + result.toString());

}

@Override

//推送失败

public void onFailure(Throwable ex) {

logger.info(topic + " 生产者 发送消息失败:" + ex.getMessage());

}

});

}

}

以上就是kafka生产者了,到此刻,你已经可以开始往kafka服务器推送消息了

事不宜迟,我们立马试试:

创建个controller,搞个接口试试推送下消息,

@GetMapping(“/sendMessageToKafka”)

public String sendMessageToKafka() {

Map<String,String> messageMap=new HashMap();

messageMap.put(“message”,“我是一条消息”);

String taskid=“123456”;

String jsonStr=JSONObject.toJSONString(messageMap);

//kakfa的推送消息方法有多种,可以采取带有任务key的,也可以采取不带有的(不带时默认为null)

kafkaSender.send(“testTopic”,taskid,jsonStr);

return “hi guy!”;

}

自我介绍一下,小编13年上海交大毕业,曾经在小公司待过,也去过华为、OPPO等大厂,18年进入阿里一直到现在。

深知大多数Java工程师,想要提升技能,往往是自己摸索成长或者是报班学习,但对于培训机构动则几千的学费,着实压力不小。自己不成体系的自学效果低效又漫长,而且极易碰到天花板技术停滞不前!

因此收集整理了一份《2024年Java开发全套学习资料》,初衷也很简单,就是希望能够帮助到想自学提升又不知道该从何学起的朋友,同时减轻大家的负担。
springboot 设置kafka生产者只发送一次,2024年程序员学习,spring boot,kafka,linq
springboot 设置kafka生产者只发送一次,2024年程序员学习,spring boot,kafka,linq
springboot 设置kafka生产者只发送一次,2024年程序员学习,spring boot,kafka,linq
springboot 设置kafka生产者只发送一次,2024年程序员学习,spring boot,kafka,linq
springboot 设置kafka生产者只发送一次,2024年程序员学习,spring boot,kafka,linq
springboot 设置kafka生产者只发送一次,2024年程序员学习,spring boot,kafka,linq

既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,基本涵盖了95%以上Java开发知识点,真正体系化!

由于文件比较大,这里只是将部分目录大纲截图出来,每个节点里面都包含大厂面经、学习笔记、源码讲义、实战项目、讲解视频,并且后续会持续更新

如果你觉得这些内容对你有帮助,可以添加V获取:vip1024b (备注Java)
springboot 设置kafka生产者只发送一次,2024年程序员学习,spring boot,kafka,linq

最后

针对以上面试题,小编已经把面试题+答案整理好了

springboot 设置kafka生产者只发送一次,2024年程序员学习,spring boot,kafka,linq

springboot 设置kafka生产者只发送一次,2024年程序员学习,spring boot,kafka,linq

springboot 设置kafka生产者只发送一次,2024年程序员学习,spring boot,kafka,linq

面试专题

springboot 设置kafka生产者只发送一次,2024年程序员学习,spring boot,kafka,linq

除了以上面试题+答案,小编同时还整理了微服务相关的实战文档也可以分享给大家学习

springboot 设置kafka生产者只发送一次,2024年程序员学习,spring boot,kafka,linq

springboot 设置kafka生产者只发送一次,2024年程序员学习,spring boot,kafka,linq

springboot 设置kafka生产者只发送一次,2024年程序员学习,spring boot,kafka,linq
转存中…(img-vLBijKnN-1711994053434)]

[外链图片转存中…(img-9KgrPEIU-1711994053434)]

[外链图片转存中…(img-40pAgAhG-1711994053434)]

面试专题

[外链图片转存中…(img-0bSNx7H2-1711994053435)]

除了以上面试题+答案,小编同时还整理了微服务相关的实战文档也可以分享给大家学习

[外链图片转存中…(img-WLGSGgNJ-1711994053435)]

[外链图片转存中…(img-lL1RhZ10-1711994053435)]

[外链图片转存中…(img-uG2StbjW-1711994053436)]文章来源地址https://www.toymoban.com/news/detail-845716.html

到了这里,关于Springboot最简单的实战介绍 整合kafka-生产者与消费者(消息推送与订阅获取)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【项目实战】Kafka 生产者写入分区的策略

    👉 博主介绍 : 博主从事应用安全和大数据领域,有8年研发经验,5年面试官经验,Java技术专家,WEB架构师,阿里云专家博主,华为云云享专家,51CTO TOP红人 Java知识图谱点击链接: 体系化学习Java(Java面试专题) 💕💕 感兴趣的同学可以收藏关注下 , 不然下次找不到哟

    2024年02月16日
    浏览(35)
  • Kafka官方生产者和消费者脚本简单使用

    怎样使用Kafka官方生产者和消费者脚本进行消费生产和消费?这里假设已经下载了kafka官方文件,并已经解压. 这就可以见到测试kafka对应topic了.

    2024年02月04日
    浏览(49)
  • kafka生产者和消费者配置介绍

    每个kafka broker中配置文件 server.properties 默认必须配置的属性如下: **bootstrap.servers** - 指定生产者客户端连接kafka集群所需的broker地址列表,格式为host1:port1,host2:port2,可以设置一个或多个。这里并非需要所有的broker地址,因为生产者会从给定的broker里寻找其它的broker。 **key

    2024年02月12日
    浏览(49)
  • Spring Boot 整合kafka:生产者ack机制和消费者AckMode消费模式、手动提交ACK

    Kafka 生产者的 ACK 机制指的是生产者在发送消息后,对消息副本的确认机制。ACK 机制可以帮助生产者确保消息被成功写入 Kafka 集群中的多个副本,并在需要时获取确认信息。 Kafka 提供了三种 ACK 机制的配置选项,分别是: acks=0:生产者在成功将消息发送到网络缓冲区后即视

    2024年02月04日
    浏览(54)
  • kafka配置大全broker、topic、生产者和消费者等配置介绍

    每个kafka broker中配置文件 server.properties 默认必须配置的属性如下: **bootstrap.servers** - 指定生产者客户端连接kafka集群所需的broker地址列表,格式为host1:port1,host2:port2,可以设置一个或多个。这里并非需要所有的broker地址,因为生产者会从给定的broker里寻找其它的broker。 **key

    2024年02月05日
    浏览(47)
  • 【RabbitMQ实战】 03 SpringBoot RabbitMQ生产者和消费者示例

    上一节我们写了一段原生API来进行生产和消费的例子。实际上SpringBoot对原生RabbitMQ客户端做了二次封装,让我们使用API的代价更低。 依赖引入 RabbitMQ的配置如下 每个配置的具体含义,详见配置 代码说明 使用RabbitTemplate可以发送消息 这个Controller定义了一个发送的接口,调用

    2024年02月07日
    浏览(39)
  • Kafka生产者原理 kafka生产者发送流程 kafka消息发送到集群步骤 kafka如何发送消息 kafka详解

    kafka尚硅谷视频: 10_尚硅谷_Kafka_生产者_原理_哔哩哔哩_bilibili ​      1. producer初始化:加载默认配置,以及配置的参数,开启网络线程      2. 拦截器拦截      3. 序列化器进行消息key, value序列化      4. 进行分区      5. kafka broker集群 获取metaData      6. 消息缓存到

    2024年02月11日
    浏览(48)
  • [kafka消息生产被阻塞] - 如何解决Kafka生产者阻塞的问题

    [kafka消息生产被阻塞] - 如何解决Kafka生产者阻塞的问题 Kafka是一个高度可扩展的分布式流平台,用于构建实时数据管道和流处理应用程序。作为一个广泛使用的消息代理系统,Kafka在数据传输方面表现出色,但是在极端情况下,它可能会出现生产者阻塞的问题。这可能会导致

    2024年02月11日
    浏览(49)
  • 三、Kafka生产者1---Kafka生产者初始化-new KafkaProducer

    概述 本文主要是分享Kafka初始化生产者的 大体过程 初始化过程中会新建很多对象,本文暂先分享部分对象 1.分区器---Partitioner partitioner 2.重试时间---long retryBackoffMs 3.序列化器---SerializerK keySerializer,SerializerV valueSerializer 4.拦截器--- ListProducerInterceptorK, V interceptorList 5.累加器-

    2024年03月14日
    浏览(63)
  • Spring整合RabbitMQ——生产者

    添加依赖坐标,在producer和consumer模块的pom文件中各复制一份。 配置producer的配置文件 配置producer的xml配置文件 编写测试类发送消息

    2024年02月07日
    浏览(41)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包