Spring for Apache Kafka概述和简单入门

这篇具有很好参考价值的文章主要介绍了Spring for Apache Kafka概述和简单入门。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、概述

Spring for Apache Kafka 的高级概述以及底层概念和可运行的示例代码。

二、准备工作

注意:进行工作开始之前至少要有一个 Apache Kafka 环境

2.1、依赖

  • 使用 Spring Boot
<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
</dependency>

使用 Spring Boot 时,省略版本,Boot 将自动引入与您的 Boot 版本兼容的正确版本

  • 使用 Spring
<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
  <version>2.7.14</version>
</dependency>

使用Spring 时必须要申明使用的版本。

2.2、版本兼容性

  • Apache Kafka 客户端 2.7.0 或 2.8.0

  • Spring 框架 5.3.x 或 Spring Boot 2.7.x

  • 最低 Java 版本:8

在 Spring Boot 应用程序中使用 Apache Kafka 时, Apache Kafka 依赖项版本由 Spring Boot 的依赖项管理确定。
想要使用不同于Spring Boot版本的 Apache Kafka时需要覆盖所有关联的依赖项。
尤其在使用嵌入式 Kafka 代理时特别要注意。

2.3、依赖覆盖

在 Spring Boot 应用程序中使用 Apache Kafka 时, Apache Kafka 依赖项版本由 Spring Boot 的依赖项管理确定。
如果要使用kafka-clients或的不同版本kafka-streams(例如 2.x), 则需要覆盖所有关联的依赖项。
尤其是在 spring-kafka-test 中使用嵌入式 Kafka 代理时。

并非所有的 Spring Boot都会向下兼容

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.7.14</version>
</dependency>

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka-test</artifactId>
    <version>2.7.14</version>
    <scope>test</scope>
</dependency>

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>{kafka-version}</version>
</dependency>

<!-- 可选  仅在使用 kafka 流时需要 -->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>{kafka-version}</version>
</dependency>

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>{kafka-version}</version>
    <classifier>test</classifier>
    <scope>test</scope>
</dependency>

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.13</artifactId>
    <version>{kafka-version}</version>
    <scope>test</scope>
</dependency>

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.13</artifactId>
    <version>{kafka-version}</version>
    <classifier>test</classifier>
    <scope>test</scope>
</dependency>

三、Spring Boot消费者

3.1、应用程序

@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("topic1")
                .partitions(10)
                .replicas(1)
                .build();
    }

    @KafkaListener(id = "myId", topics = "topic1")
    public void listen(String in) {
        System.out.println(in);
    }

}

3.2、配置项

spring.kafka.consumer.auto-offset-reset=earliest //从提交的offset开始消费;无提交的offset时,从头开始消费

四、Srping Boot生产者

4.1、应用程序

@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("topic1")
                .partitions(10)
                .replicas(1)
                .build();
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> {
            template.send("topic1", "test");
        };
    }

}

五、不使用 Spring Boot

在不使用 Spring Boot 时必须定义几个基础的Bean。

@Configuration
@EnableKafka
public class Config {

    @Bean
    ConcurrentKafkaListenerContainerFactory<Integer, String>
                        kafkaListenerContainerFactory(ConsumerFactory<Integer, String> consumerFactory) {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        return factory;
    }

    @Bean
    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerProps());
    }

    private Map<String, Object> consumerProps() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        // ...
        return props;
    }

    @Bean
    public Sender sender(KafkaTemplate<Integer, String> template) {
        return new Sender(template);
    }

    @Bean
    public Listener listener() {
        return new Listener();
    }

    @Bean
    public ProducerFactory<Integer, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(senderProps());
    }

    private Map<String, Object> senderProps() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        //...
        return props;
    }

    @Bean
    public KafkaTemplate<Integer, String> kafkaTemplate(ProducerFactory<Integer, String> producerFactory) {
        return new KafkaTemplate<Integer, String>(producerFactory);
    }

}

在 Spring 上下文之外创建侦听器容器,则必须满足容器实现的所有接口,否则有些功能会出现异常工作。

不包括 Spring Boot的完整示例如下:文章来源地址https://www.toymoban.com/news/detail-701680.html

public class Sender {

	public static void main(String[] args) {
		AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Config.class);
		context.getBean(Sender.class).send("test", 42);
	}

	private final KafkaTemplate<Integer, String> template;

	public Sender(KafkaTemplate<Integer, String> template) {
		this.template = template;
	}

	public void send(String toSend, int key) {
		this.template.send("topic1", key, toSend);
	}

}

public class Listener {

    @KafkaListener(id = "listen1", topics = "topic1")
    public void listen1(String in) {
        System.out.println(in);
    }

}

@Configuration
@EnableKafka
public class Config {

    @Bean
    ConcurrentKafkaListenerContainerFactory<Integer, String>
                        kafkaListenerContainerFactory(ConsumerFactory<Integer, String> consumerFactory) {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        return factory;
    }

    @Bean
    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerProps());
    }

    private Map<String, Object> consumerProps() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        // ...
        return props;
    }

    @Bean
    public Sender sender(KafkaTemplate<Integer, String> template) {
        return new Sender(template);
    }

    @Bean
    public Listener listener() {
        return new Listener();
    }

    @Bean
    public ProducerFactory<Integer, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(senderProps());
    }

    private Map<String, Object> senderProps() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        //...
        return props;
    }

    @Bean
    public KafkaTemplate<Integer, String> kafkaTemplate(ProducerFactory<Integer, String> producerFactory) {
        return new KafkaTemplate<Integer, String>(producerFactory);
    }

}

到了这里,关于Spring for Apache Kafka概述和简单入门的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【SpringBoot】| Spring Boot 概述和入门程序剖析

    目录 一:Spring Boot 入门 1. Spring能做什么? 2. SpringBoot特点 3. 如何学习SpringBoot 4. 创建Spring Boot项目 Maven的配置 入门案例: SpringBoot中几个重要的注解 5. 了解自动配置原理 依赖管理 自动配置 6. SpringBoot核心配置文件 多环境测试 自定义配置 7. SpringBoot中使用JSP(了解) 8. S

    2024年02月06日
    浏览(54)
  • Spring Boot与Apache Kafka实现高吞吐量消息处理:解决大规模数据处理问题

    现代数据量越来越庞大对数据处理的效率提出了更高的要求。Apache Kafka是目前流行的分布式消息队列之一。Spring Boot是现代Java应用程序快速开发的首选框架。综合使用Spring Boot和Apache Kafka可以实现高吞吐量消息处理。 Apache Kafka采用分布式发布-订阅模式具有高度的可扩展性和可

    2024年02月05日
    浏览(52)
  • kafka--技术文档--spring-boot集成基础简单使用

            查阅了很多资料了解到,使用了spring-boot中整合的kafka的使用是被封装好的。也就是说这些使用其实和在linux中的使用kafka代码的使用其实没有太大关系。但是逻辑是一样的。这点要注意! 核心配置为: 如果在下面规定了spring-boot的版本那么就不需要再使用版本号,如

    2024年02月11日
    浏览(48)
  • Spring Security 6.x 系列【1】基础篇之概述及入门案例

    有道无术,术尚可求,有术无道,止于术。 本系列Spring Boot 版本 3.0.4 本系列Spring Security 版本 6.0.2 源码地址:https://gitee.com/pearl-organization/study-spring-security-demo 本系列基于最新 Spring Boot 3.0 + Spring Security 6.0 版本,由浅入深,从实战到源码分析,详细讲解各种 Spring Security 的使用

    2024年02月06日
    浏览(59)
  • Spring Security OAuth2.0(3):Spring Security简单入门

    Spring Security 快速入门。 本章代码已分享至Gitee:https://gitee.com/lengcz/security-spring-security qquad Spring Secutiry 是一个能够为基于Spring的企业应用系统提供声明式的安全访问控制解决方案的安全框架。由于它是Spring生态系统的一员,因此它伴随着整个Spring生态系统不断修正、升级,

    2024年02月13日
    浏览(50)
  • Spring Boot快速入门:构建简单的Web应用

      Spring Boot是一个用于简化Spring应用程序开发的框架,它通过提供开箱即用的配置和一组常用的功能,使得构建高效、可维护的应用变得非常容易。在本篇博客中,我们将一步步地介绍如何快速入门Spring Boot,并构建一个简单的Web应用。 步骤1:准备开发环境 Java Development

    2024年02月07日
    浏览(65)
  • 快速入门使用spring详细步骤(介绍、导入依赖、第一个简单程序)

    目录 一、spring介绍 二、spring使用步骤 (一)创建maven项目  (二) maven项目导入spring依赖 (三)开始编写第一个spring程序 三、新篇章之springboot(额外篇) spring是作为Java EE企业级开发很好的一个框架,这篇文章就来讲解一下怎么使用spring。要使用spring,现在一般都是 使用

    2024年02月04日
    浏览(51)
  • UI for Apache Kafka

    文章Overview of UI Tools for Monitoring and Management of Apache Kafka Clusters | by German Osin | Towards Data Science中介绍了8种常见的kafka UI工具,这些产品的核心功能对比信息如下图所示, 通过对比发现 UI for Apache Kafka 功能齐全且免费,因此可以作为我们的首选。本文通过二进制jar包的方式进行安

    2024年02月04日
    浏览(41)
  • Splunk Connect for Kafka – Connecting Apache Kafka with Splunk

    1: 背景: 1: splunk 有时要去拉取kafka 上的数据: 下面要用的有用的插件:Splunk Connect for Kafka 先说一下这个Splunk connect for kafka 是什么: Spunk Connect for Kafka is a “sink connector” built on the Kafka Connect framework for exporting data from Kafka topics into Splunk. With a focus on speed and reliability, include

    2024年02月05日
    浏览(40)
  • Spring学习笔记之spring概述

    Spring是一个轻量级的控制反转和面向切面的容器框架 Spring最初的出现是为了解决EJB臃肿的设计,以及难以测试等问题。 Spring为了简化开发而生,让程序员只需关注核心业务的实现,尽可能的不再关注非业务逻辑代码(事务控制,安全日志等) 八大模块中有两大核心模块,

    2024年02月14日
    浏览(43)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包