Kafka3.1部署和Topic主题数据生产与消费

这篇具有很好参考价值的文章主要介绍了Kafka3.1部署和Topic主题数据生产与消费。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。


前言

本章节主要讲述Kafka3.1X版本在Windows11主机下部署以及JAVA对Kafka应用:

一、Kafka3.1X版本在Windows11主机部署

1.安装JDK配置环境变量

2.Zookeeper(zookeeper-3.7.1)
zk
部署后的目录位置:D:\setup\apache-zookeeper-3.7.1

3.安装Kafka3.1X
3.1 下载包(kafka_2.12-3.1.2.tgz)
Kafka
Kafka3.1部署和Topic主题数据生产与消费,消息队列,kafka,kafka-clients,kafka部署,Window环境,kafka-clients读取
3.2、 解压并进入Kafka目录:
根目录:D:\setup\kafka3.1.2

3、 编辑config/server.properties文件
注意 log.dirs=D:\setup\kafka3.1.2\logs 为根目录下的\logs

listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://localhost:9092
log.dirs=D:\\setup\\kafka3.1.2\\logs

4.运行Zookeeper
Zookeeper安装目录D:\setup\apache-zookeeper-3.7.1\bin,按下Shift+右键,选择“打开命令窗口”选项,打开命令行

  .\zkServer.cmd;

Kafka3.1部署和Topic主题数据生产与消费,消息队列,kafka,kafka-clients,kafka部署,Window环境,kafka-clients读取
5.运行Kafka
Kafka安装目录D:\setup\kafka3.1.2,按下Shift+右键,选择“打开命令窗口”选项,打开命令行

.\bin\windows\kafka-server-start.bat .\config\server.properties

Kafka3.1部署和Topic主题数据生产与消费,消息队列,kafka,kafka-clients,kafka部署,Window环境,kafka-clients读取

二、Kafk生产Topic主题数据

1.kafka生产数据

创建Topic主题heima

.\bin\windows\kafka-topics.bat --bootstrap-server localhost:9092 --create --topic heima --partitions 2 --replication-factor 1
Created topic heima.

查看Topic主题heima

.\bin\windows\kafka-topics.bat --describe --bootstrap-server localhost:9092  --topic heima

Kafka3.1部署和Topic主题数据生产与消费,消息队列,kafka,kafka-clients,kafka部署,Window环境,kafka-clients读取
Topic主题heima生产数据

.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic heima

在 > 符号后输入数据:

{"mobilePhone":"186xxxx1234","roleCode":"super_admin_xxx"}

Kafka3.1部署和Topic主题数据生产与消费,消息队列,kafka,kafka-clients,kafka部署,Window环境,kafka-clients读取

2.JAVA kafka客户端消费数据

2.1 pom.xml文件配置kafka客户端-kafka-clients-2.0.1版本

        <!-- kafka客户端 -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.0.1</version>
        </dependency>

2.2 JAVA数据读取文件

package com.ems.mgr.web.controller.thirdparty;
import com.alibaba.fastjson.JSONObject;
import com.ems.mgr.common.utils.spring.SpringUtils;
import com.ems.mgr.system.service.ISysUserService;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

/**
 * Kafka服务器操作与数据读取
 */
public class KafkaUtilDemo {
    public static final Logger log = LoggerFactory.getLogger(KafkaUtilDemo.class);
    public static final Properties props = new Properties();
//    protected ISysUserService userService = SpringUtils.getBean(ISysUserService.class);

    public static void init(String kafakservers) {
        // 配置Kafka消费者属性
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafakservers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    }

    /**
     * 持续监听并处理kafa消息,当手机号mobilePhone非空时进入数据同步操作
     * @param kafaktopic
     * @return
     */
    public static String poll(String kafaktopic) {
        String msg = "";
        try {
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
            consumer.subscribe(Collections.singletonList(kafaktopic));
            log.info("Kafka消费者订阅指定主题,持续监听并处理消息");
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(60000));
                for (ConsumerRecord<String, String> record : records) {
                    log.info("offset = " + record.offset() + ",key = " + record.key() + ",value = " + record.value());
                    msg = record.value();
                    if (!StringUtils.isBlank(record.value())) {
                        JSONObject jsonObject = JSONObject.parseObject(record.value());
                        String mobilePhone = jsonObject.getString("mobilePhone");
                        if (StringUtils.isBlank(mobilePhone)) {
                            log.error("Kafka消费者手机号mobilePhone为空");
                        } else {
                            KafkaUtilDemo kafkaUtil = new KafkaUtilDemo();
                            kafkaUtil.syncSystemInfoTask(jsonObject);
                        }
                    }
                }
            }
        } catch (Exception e) {
            log.error("Kafka消费者订阅指定主题,持续监听并处理消息 error msg=" + e.getMessage());
        }
        return msg;
    }

    public boolean syncSystemInfoTask(JSONObject jsonObject) {
        boolean repsBln = true;
        try {
            String mobilePhone = jsonObject.getString("mobilePhone");
            String roleType = jsonObject.getString("roleType");
            String roleCode = jsonObject.getString("roleCode");
            log.info("业务数据同步操作................");
        } catch (Exception e) {
            repsBln = false;
            log.error("Kafka消费者同步入库异常,error msg=" + e.getMessage());
        }
        return repsBln;
    }

    public static void main(String[] args) {
        try {
            String kafakservers = "localhost:9092";
            String kafaktopic = "heima";
            init(kafakservers);
            poll(kafaktopic);
        } catch (Exception e) {
            log.error("error msg=" + e.getMessage());
        }
    }

}

3 执行KafkaUtilDemo 文件,查看消费数据。
Kafka3.1部署和Topic主题数据生产与消费,消息队列,kafka,kafka-clients,kafka部署,Window环境,kafka-clients读取

总结

pom.xml文件在引入spring-kafka 会由于版本问题出现


org.apache.kafka
kafka-clients
2.0.1
文章来源地址https://www.toymoban.com/news/detail-701679.html

    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
        <version>2.2.8.RELEASE</version>
    </dependency>

到了这里,关于Kafka3.1部署和Topic主题数据生产与消费的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • kafka如何动态消费新增topic主题

    一、解决痛点 使用spring-kafka客户端,每次新增topic主题,都需要硬编码客户端并重新发布服务,操作麻烦耗时长。kafkaListener虽可以支持通配符消费topic,缺点是并发数需要手动改并且重启服务 。对于业务逻辑相似场景,创建新主题动态监听可以用kafka-batch-starter组件 二、组件

    2023年04月21日
    浏览(41)
  • SpringBoot 2.2.5 整合RabbitMQ,实现Topic主题模式的消息发送及消费

    1、simple简单模式 消息产生着§将消息放入队列 消息的消费者(consumer) 监听(while) 消息队列,如果队列中有消息,就消费掉,消息被拿走后,自动从队列中删除(隐患 消息可能没有被消费者正确处理,已经从队列中消失了,造成消息的丢失)应用场景:聊天(中间有一个过度的服务器;p端,c端

    2024年02月02日
    浏览(47)
  • Linux安装Kafka,创建topic、生产者、消费者

    1.创建安装目录/usr/local/kafka mkdir /usr/local/kafka 2.进入安装包目录 cd /usr/local/kafka  3.下载安装包 wget https://downloads.apache.org/kafka/3.3.1/kafka_2.12-3.3.1.tgz 4.解压安装包 tar -zxvf kafka_2.12-3.3.1.tgz 5.进入cd kafka_2.12-3.3.1目录 cd kafka_2.12-3.3.1/ 6.修改zookeeper配置 cat ./config/zookeeper.properties | grep

    2023年04月17日
    浏览(50)
  • 消息队列Pulsar入门(一) 生产者/消费者/Topic详解,附源码演示

    部署问题 连接Pulsar 创建方式 简单方法创建 loadConf自定义配置创建 Pulsar官网 发送模式 同步发送 异步发送 访问方式/发送方式 Share模式(默认情况) 请注意: Exclusive WaitForExclusive 创建方式 简单方法创建 监听器方法创建 loadConf自定义配置创建 多主题订阅 传入List数组的多主题订阅

    2024年02月08日
    浏览(44)
  • kafka 基础概念、命令行操作(查看所有topic、创建topic、删除topic、查看某个Topic的详情、修改分区数、发送消息、消费消息、 查看消费者组 、更新消费者的偏移位置)

    kafka官网 Broker   一台kafka服务器就是一个broker,可容纳多个topic。一个集群由多个broker组成; Producer   生产者,即向kafka的broker-list发送消息的客户端; Consumer   消费者,即向kafka的broker-list订阅消息的客户端; Consumer Group   消费者组是 逻辑上的一个订阅者 ,由多个

    2024年02月01日
    浏览(61)
  • kafka配置大全broker、topic、生产者和消费者等配置介绍

    每个kafka broker中配置文件 server.properties 默认必须配置的属性如下: **bootstrap.servers** - 指定生产者客户端连接kafka集群所需的broker地址列表,格式为host1:port1,host2:port2,可以设置一个或多个。这里并非需要所有的broker地址,因为生产者会从给定的broker里寻找其它的broker。 **key

    2024年02月05日
    浏览(48)
  • Kafka:主题创建、分区修改查看、生产者、消费者

    1.创建主题 2.查看所有主题 3.查看详细主题 序号从0开始计算 Partition:分区数,该主题有3个分区 Replica:副本数,该主题有3个副本 Leader:副本数中的主的序号,生产消费的对象 1.修改分区数 修改的分区数量不可以小于或者等于当前主题分区的数量,否则会报错 在根目录kaf

    2024年02月11日
    浏览(87)
  • JAVA实时获取kafka各个主题下分区消息的消费情况

    通过指定 主题 和 消费者组 调用方法,实时查看主题下分区消息的消费情况(消息总数量、消费消息数量、未消费的消息数量)。

    2024年02月13日
    浏览(63)
  • 07、Kafka ------ 消息生产者(演示 发送消息) 和 消息消费者(演示 监听消息)

    简单来说,就是一个数据项。 ▲ 消息就是 Kafka 所记录的数据节点,消息在 Kafka 中又被称为记录(record)或事件(event)。 从存储上来看,消息就是存储在分区文件(有点类似于List)中的一个数据项,消息具有 key、value、时间戳 和 可选的元数据头。 ▲ 下面是一个示例事件

    2024年01月20日
    浏览(46)
  • Kafka3.0.0版本——生产者 数据去重

    1.1、至少一次 至少一次(At Least Once )的含义 生产者发送数据到kafka集群,kafka集群至少接收到一次数据。 至少一次的条件: ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2 1.2、最多一次 最多一次(At Most Once )的含义 生产者发送数据到kafka集群,

    2024年02月01日
    浏览(41)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包