使用kafka-clients的Java API操作Kafka集群的Topic

这篇具有很好参考价值的文章主要介绍了使用kafka-clients的Java API操作Kafka集群的Topic。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

记录:464

场景:在Spring Boot微服务集成Kafka客户端kafka-clients-3.0.0操作Kafka集群的Topic的创建和删除。

版本:JDK 1.8,Spring Boot 2.6.3,kafka_2.12-2.8.0,kafka-clients-3.0.0。

Kafka集群安装:https://blog.csdn.net/zhangbeizhen18/article/details/131156084

1.微服务中配置Kafka信息

1.1在pom.xml添加依赖

pom.xml文件:

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-clients</artifactId>
  <version>3.0.0</version>
</dependency>

解析:使用原生的kafka-clients,版本:3.0.0。操作kafka集群的Topic。

2.使用AdminClient创建Kafka集群的Topic

AdminClient全称:org.apache.kafka.clients.admin.AdminClient

(1)示例代码

@RestController
@RequestMapping("/hub/example/cluster/topic")
@Slf4j
public class UseKafkaClusterTopicController {
  //定义Kafka的Topic
  private final String topicName = "hub-topic-city-info-002";
  @GetMapping("/f01_1")
  public Object f01_1() {
      try {
          //1.配置Kafka集群
          Map<String, Object> configs = new HashMap<>();
          Collection<String> cluster = Lists.newArrayList("192.168.19.161:29092",
                  "192.168.19.162:29092",
                  "192.168.19.163:29092");
          configs.put("bootstrap.servers", cluster);
          //2.创建客户端AdminClient
          AdminClient adminClient = KafkaAdminClient.create(configs);
          //3.获取Kafka集群中Topic清单
          Set<String> topicSet = adminClient.listTopics().names().get();
          log.info("在Kafka集群已建Topic数量: {} ,清单:", topicSet.size());
          topicSet.forEach(System.out::println);
          //4.在Kafka集群创建Topic
          if (!topicSet.contains(topicName)) {
              log.info("新建Topic: {}", topicName);
              // Topic名称,分区Partition数目,复制因子(replication Factor)
              NewTopic newTopic = new NewTopic(topicName, 1, (short) 1);
              Collection<NewTopic> newTopics = Lists.newArrayList(newTopic);
              adminClient.createTopics(newTopics);
              ThreadUtil.sleep(1000);
              topicSet = adminClient.listTopics().names().get();
              log.info("创建后,在Kafka集群已建Topic数量: {} ,清单:", topicSet.size());
              topicSet.forEach(System.out::println);
          }
      } catch (Exception e) {
          log.info("创建Topic异常.");
          e.printStackTrace();
      }
      return "创建成功";
  }
}

(2)解析代码

操作Kafka集群的Topic需要先创建AdminClient,使用AdminClient的API创建Topic。

创建Topic一般只需指定Topic名称,分区Partition数目,复制因子(replication Factor)就行。

3.使用AdminClient删除Kafka集群的Topic

AdminClient全称:org.apache.kafka.clients.admin.AdminClient

(1)示例代码

@RestController
@RequestMapping("/hub/example/cluster/topic")
@Slf4j
public class UseKafkaClusterTopicController {
  //定义Kafka的Topic
  private final String topicName = "hub-topic-city-info-002";
  @GetMapping("/f01_2")
  public Object f01_2() {
      try {
          //1.获取Kafka集群配置信息
          Map<String, Object> configs = new HashMap<>();
          Collection<String> cluster = Lists.newArrayList("192.168.19.161:29092",
                  "192.168.19.162:29092",
                  "192.168.19.163:29092");
          configs.put("bootstrap.servers", cluster);
          //2.创建客户端AdminClient
          AdminClient adminClient = KafkaAdminClient.create(configs);
          //3.获取Kafka集群中Topic清单
          Set<String> topicSet = adminClient.listTopics().names().get();
          log.info("在Kafka集群已建Topic数量: {} ,清单:", topicSet.size());
          topicSet.forEach(System.out::println);
          //4.在Kafka集群删除Topic
          if (topicSet.contains(topicName)) {
              log.info("删除Topic: {}", topicName);
              Collection<String> topics = Lists.newArrayList(topicName);
              DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(topics);
              deleteTopicsResult.all().get();
              ThreadUtil.sleep(1000);
              topicSet = adminClient.listTopics().names().get();
              log.info("删除后,在Kafka集群已建Topic数量: {} ,清单:", topicSet.size());
              topicSet.forEach((topic) -> {
                  System.out.println(topic);
              });
          }
      } catch (Exception e) {
          log.info("删除Topic异常.");
          e.printStackTrace();
      }
      return "删除成功";
  }
}

(2)解析代码

操作Kafka集群的Topic需要先创建AdminClient,使用AdminClient的API删除Topic。

创建Topic一般只需指定Topic名称就行。

4.测试

创建请求RUL:http://127.0.0.1:18210/hub-210-kafka/hub/example/cluster/topic/f01_1

删除请求RUL:http://127.0.0.1:18210/hub-210-kafka/hub/example/cluster/topic/f01_2

以上,感谢。

2023年6月18日文章来源地址https://www.toymoban.com/news/detail-490081.html

到了这里,关于使用kafka-clients的Java API操作Kafka集群的Topic的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 在Spring Boot微服务集成Kafka客户端(kafka-clients)操作Kafka

    记录 :459 场景 :在Spring Boot微服务集成Kafka客户端kafka-clients-3.0.0操作Kafka。使用kafka-clients的原生KafkaProducer操作Kafka生产者Producer。使用kafka-clients的原生KafkaConsumer操作Kafka的消费者Consumer。 版本 :JDK 1.8,Spring Boot 2.6.3,kafka_2.12-2.8.0,kafka-clients-3.0.0。 Kafka安装 :https://blog.csdn.ne

    2024年02月12日
    浏览(52)
  • 使用spring-kafka的Java API操作Kafka集群的Topic

    记录 :462 场景 :在Spring Boot微服务集成spring-kafka-2.8.2操作Kafka集群的Topic的创建和删除。 版本 :JDK 1.8,Spring Boot 2.6.3,kafka_2.12-2.8.0,spring-kafka-2.8.2。 Kafka集群安装 :https://blog.csdn.net/zhangbeizhen18/article/details/131156084 1.微服务中 配置Kafka信息 1.1在pom.xml添加依赖 pom.xml文件: 解析:

    2024年02月10日
    浏览(44)
  • 使用Kafka客户端(spring-kafka)的Java API操作Kafka的Topic

    记录 :458 场景 :在Spring Boot微服务集成Kafka客户端spring-kafka-2.8.2操作Kafka的Topic的创建和删除。 版本 :JDK 1.8,Spring Boot 2.6.3,kafka_2.12-2.8.0,spring-kafka-2.8.2。 Kafka安装 :https://blog.csdn.net/zhangbeizhen18/article/details/129071395 1.微服务中 配置Kafka信息 1.1在pom.xml添加依赖 pom.xml文件: 解析

    2024年02月09日
    浏览(44)
  • 通过Java client访问Kafka

    1. Install Kafka 1) download kafka binary from https://kafka.apache.org/downloads 2) extract binary 2. Start Kafka 1) start zookeeper in daemon mode 2) start kafka server in daemon mode 3. Test Kafka 1) create a topic 2) producer events 3) consumer events 4. Access Kafka from Java client 1) download kafka client binary from https://jar-download.com/artifacts

    2024年02月05日
    浏览(44)
  • 在Spring Boot微服务集成spring-kafka操作Kafka集群

    记录 :461 场景 :在Spring Boot微服务集成spring-kafka-2.8.2操作Kafka集群。使用KafkaTemplate操作Kafka集群的生产者Producer。使用@KafkaListener操作Kafka集群的消费者Consumer。 版本 :JDK 1.8,Spring Boot 2.6.3,kafka_2.12-2.8.0,spring-kafka-2.8.2。 Kafka集群安装 :https://blog.csdn.net/zhangbeizhen18/article/details

    2024年02月10日
    浏览(52)
  • 使用flink的sql-client.sh,测试mysql-->kafka-->kafka-->mysql实时流

    目录 1. 环境介绍 2. mysql建表 3. flinksql建表 3.1 进入flinksql客户端  ​3.2 配置输出格式 ​3.3 flink建表 3.4 任务流配置 4. 测试 4.1 插入测试数据 4.2 查看结果表数据​ 4.3 新增测试数据 4.4 再次查看结果表数据 服务 版本 zookeeper 3.8.0 kafka 3.3.1 flink 1.13.5 mysql 5.7.34 jdk 1.8 scala 2.12 连接器

    2024年02月11日
    浏览(41)
  • 【Kafka】Docker安装kafka&java kafka api

    docker依赖于zookeeper,首先安装zookeeper 1 拉取镜像 2 创建network 在启动之前,先指定一个网络 3 启动容器 启动zookeeper容器 测试是否成功 进入zookeeper 执行代码 1 拉取kafka镜像 2 启动kafka容器 进入kafka 3 创建topic -- 创建topic 查看topic -- 分区topic 4 创建生产者 -- 生产者 5 创建消费者

    2024年02月13日
    浏览(53)
  • Kafka集群安装部署(超详细操作演示)—— Linux

    Kafka 是一款 分布式的 、 去中心化的 、 高吞吐低延迟 、 订阅模式 的消息队列系统。 同 RabbitMQ 一样, Kafka 也是 消息队列 。不过 RabbitMQ 多用于 后端系统 ,因其更加专注于消息的 延迟和容错 。 Kafka 多用于 大数据体系 ,因其更加专注于 数据的吞吐能力 。 Kafka 多数都是运

    2024年02月03日
    浏览(74)
  • kafka生产者api和数据操作

    发送流程 消息发送过程中涉及到两个线程—— main线程和Sender线程 main线程 使用serializer(并非java默认)序列化数据,使用partitioner确认发送分区 在main线程中创建了一个双端队列RecordAccumulator,main线程将批次数据发送给RecordAccumulator。 创建批次数据是从内存池中分配内存,在

    2024年02月13日
    浏览(34)
  • 最新版ES8的client API操作 Elasticsearch Java API client 8.0

    作者:ChenZhen 本人不常看网站消息,有问题通过下面的方式联系: 邮箱:1583296383@qq.com vx: ChenZhen_7 我的个人博客地址:https://www.chenzhen.space/🌐 版权:本文为博主的原创文章,本文版权归作者所有,转载请附上原文出处链接及本声明。📝 如果对你有帮助,请给一个小小的s

    2024年02月04日
    浏览(41)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包