主题配置和 KafkaTemplate 的使用

这篇具有很好参考价值的文章主要介绍了主题配置和 KafkaTemplate 的使用。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、主题

1.1、配置主题

  • 在应用程序上下文定义一个 KafkaAdmin Bean, 它可以自动将主题添加到代理。通过这个Bean可以将
    每一个新建的主题 Topic 添加到应用程序上下文中。下面是一个简单的示例:

也可以创建 TopicBuilder 类,使用它创建 Bean 更加简单。

@Bean
public KafkaAdmin admin() {
        Map<String, Object> configs = new HashMap<>();
        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        return new KafkaAdmin(configs);
        }

@Bean
public KafkaAdmin.NewTopics topics456() {
        return new NewTopics(
        TopicBuilder.name("defaultBoth")
        .build(),
        TopicBuilder.name("defaultPart")
        .replicas(1)
        .build(),
        TopicBuilder.name("defaultRepl")
        .partitions(3)
        .build());
        }

使用 Spring Boot 时,KafkaAdminbean 会自动注册
默认情况下,代理不可用时会记录一条消息,然后上下文会继续加载。可以以编程方式调用Admin的initialize()方法以稍后重试。
也可将 admin 的fatalIfBrokerNotAvailable属性设置为true。然后上下文无法初始化。

1.2、在运行时检查和创建主题

目前有两种方法来进行操作:

  • createOrModifyTopics
  • describeTopics
    或者使用 AdminClient 来直接使用:
@Autowired
private KafkaAdmin admin;

...

    AdminClient client = AdminClient.create(admin.getConfigurationProperties());
    ...
    client.close();

二、消息发送

2.1、使用 KafkaTemplate

2.1.1、KafkaTemplate 介绍

KafkaTemplate 包装了生产者并提供了将数据发送到 Kafka 主题的便捷方法。

2.1.2、配置 KafkaTemplate

要使用模板,需要配置生产者工厂并在模板的构造函数中提供。

  • 单个生产者配置
@Bean
public ProducerFactory<Integer, String> producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfigs());
}

@Bean
public Map<String, Object> producerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    return props;
}

@Bean
public KafkaTemplate<Integer, String> kafkaTemplate() {
    return new KafkaTemplate<Integer, String>(producerFactory());
}
  • 多个生产者配置

    使用来自同一工厂的不同生产者配置创建模板,需要覆盖工厂的ProducerConfig属性。

@Bean
public KafkaTemplate<String, String> stringTemplate(ProducerFactory<String, String> pf) {
return new KafkaTemplate<>(pf);
}

@Bean
public KafkaTemplate<String, byte[]> bytesTemplate(ProducerFactory<String, byte[]> pf) {
return new KafkaTemplate<>(pf,
Collections.singletonMap(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class));
}

然后调用 KafkaTemplate 的方法来使用它。

  • 异步消息发布示例
public void sendToKafka(final MyOutputData data) {
    final ProducerRecord<String, String> record = createRecord(data);

    ListenableFuture<SendResult<Integer, String>> future = template.send(record);
    future.addCallback(new KafkaSendCallback<SendResult<Integer, String>>() {

        @Override
        public void onSuccess(SendResult<Integer, String> result) {
            handleSuccess(data);
        }

        @Override
        public void onFailure(KafkaProducerException ex) {
            handleFailure(data, record, ex);
        }

    });
}
  • 阻塞发布示例
public void sendToKafka(final MyOutputData data) {
    final ProducerRecord<String, String> record = createRecord(data);

    try {
        template.send(record).get(10, TimeUnit.SECONDS);
        handleSuccess(data);
    }
    catch (ExecutionException e) {
        handleFailure(data, record, e.getCause());
    }
    catch (TimeoutException | InterruptedException e) {
        handleFailure(data, record, e);
    }
}

ExecutionException 在于 KafkaProducerException 属性 failedProducerRecord 中

2.1.3、发布结果查看

  • 异步

发布成功还是失败可以向侦听器注册回调以异步接收发送结果:

ListenableFuture<SendResult<Integer, String>> future = template.send("topic", 1, "thing");
        future.addCallback(new KafkaSendCallback<Integer, String>() {

@Override
public void onSuccess(SendResult<Integer, String> result) {
        ...
        }

@Override
public void onFailure(KafkaProducerException ex) {
        ProducerRecord<Integer, String> failed = ex.getFailedProducerRecord();
        ...
        }

        });

或者使用 lambda:

ListenableFuture<SendResult<Integer, String>> future = template.send("topic", 1, "thing");
future.addCallback(result -> {
        ...
    }, (KafkaFailureCallback<Integer, String>) ex -> {
            ProducerRecord<Integer, String> failed = ex.getFailedProducerRecord();
            ...
    });
  • 同步

阻塞发送线程等待结果需要调用 future 的 get()方法,可以使用带超时的 get() 方法。文章来源地址https://www.toymoban.com/news/detail-701683.html

到了这里,关于主题配置和 KafkaTemplate 的使用的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 多个消费者订阅一个Kafka的Topic(使用@KafkaListener和KafkaTemplate)

    记录 :465 场景 :一个Producer在一个Topic发布消息,多个消费者Consumer订阅Kafka的Topic。每个Consumer指定一个特定的ConsumerGroup,达到一条消息被多个不同的ConsumerGroup消费。 版本 :JDK 1.8,Spring Boot 2.6.3,kafka_2.12-2.8.0,spring-kafka-2.8.2。 Kafka集群安装 :https://blog.csdn.net/zhangbeizhen18/arti

    2024年02月15日
    浏览(46)
  • Spring for Apache Kafka Deep Dive – Part 2: Apache Kafka and Spring Cloud Stream

    On the heels of part 1 in this blog series, Spring for Apache Kafka – Part 1: Error Handling, Message Conversion and Transaction Support, here in part 2 we’ll focus on another project that enhances the developer experience when building streaming applications on Kafka: Spring Cloud Stream. We will cover the following in this post: Overview of Spring Clo

    2024年02月19日
    浏览(43)
  • flink如何监听kafka主题配置变更

    从前一篇文章我们知道flink消费kafka主题时是采用的手动assign指定分区的方式,这种消费方式是不处理主题的rebalance操作的,也就是消费者组中即使有消费者退出或者进入也是不会触发消费者所消费的分区的,那么疑问就来了,那是否比如kafka主题分区变多,或者新增了满足

    2024年02月14日
    浏览(37)
  • 【项目实战】SpringBoot整合Kafka消息队列(基于KafkaTemplate和@KafkaListener实现)

    Apache Kafka是分布式发布-订阅消息系统。 它最初由LinkedIn公司开发,之后成为Apache项目的一部分。 Kafka是一种快速、可扩展的、设计内在就是分布式的,分区的和可复制的提交日志服务。 Apache Kafka与传统消息系统相比,有以下不同: 它将消息持久化到磁盘,因此可用于批量消

    2023年04月09日
    浏览(43)
  • Spring for Apache Kafka概述和简单入门

    Spring for Apache Kafka 的高级概述以及底层概念和可运行的示例代码。 注意:进行工作开始之前至少要有一个 Apache Kafka 环境 使用 Spring Boot 使用 Spring Boot 时,省略版本,Boot 将自动引入与您的 Boot 版本兼容的正确版本 使用 Spring 使用Spring 时必须要申明使用的版本。 Apache Kafka 客户

    2024年02月09日
    浏览(37)
  • Kafka【问题 02】KafkaTemplate 报错 Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected 问题解决

    主要的报错信息: Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available. 和 Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected 报错详情如下: 配置信息: 很显然是配置文件没有生效 😢 原来是一个很高阶( diji )的错误 😄 kafka的配置应该是在sprin

    2024年02月16日
    浏览(34)
  • Windows上安装和配置Apache Kafka

    首先,让我们从Apache Kafka的官方网站下载最新的二进制发行版。您可以在以下网址找到下载链接:Apache Kafka 选择适用于Windows的版本并下载压缩文件。一旦下载完成,将文件解压到您选择的目录中。 接下来,您需要进行一些配置,以确保Kafka在Windows上正常运行。 2.1 配置Kafk

    2024年02月09日
    浏览(37)
  • 在Windows上搭建Kafka环境的步骤,包括安装Java、下载Kafka、配置Zookeeper和Kafka、启动Zookeeper和Kafka、创建主题和生产者/消费者等

    1. 安装Java Kafka需要Java环境支持。可以从Oracle官网下载JDK,或者使用OpenJDK。 2. 下载Kafka 可以从Kafka官网下载Kafka二进制压缩包。解压后可以看到bin、config、libs等目录。 3. 配置Zookeeper Kafka依赖Zookeeper实现分布式协作。可以使用Kafka自带的Zookeeper,也可以独立安装Zookeeper。 如果使

    2024年02月11日
    浏览(42)
  • redis的配置和使用、redis的数据结构以及缓存遇见的常见问题

    目录 1.缓存 2.redis不仅仅可以做缓存,只不过说他的大部分场景,是做缓存。本地缓存重启后缓存里的东西就没有了,但是redis有。 3.redis有几个特性:查询快,但是是放到内存里的〈断电或者重启,数据就丢了),所以他有特定的持久化机制 4.服务器(centos)安装redis 5. redis在

    2024年02月14日
    浏览(42)
  • Spring Boot与Apache Kafka实现高吞吐量消息处理:解决大规模数据处理问题

    现代数据量越来越庞大对数据处理的效率提出了更高的要求。Apache Kafka是目前流行的分布式消息队列之一。Spring Boot是现代Java应用程序快速开发的首选框架。综合使用Spring Boot和Apache Kafka可以实现高吞吐量消息处理。 Apache Kafka采用分布式发布-订阅模式具有高度的可扩展性和可

    2024年02月05日
    浏览(49)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包