通过Java client访问Kafka

这篇具有很好参考价值的文章主要介绍了通过Java client访问Kafka。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1. Install Kafka
1) download kafka binary from https://kafka.apache.org/downloads
2) extract binary

$ tar xvf kafka_2.13-3.3.1.tgz -C ~/bigdata/

2. Start Kafka
1) start zookeeper in daemon mode

$ cd ~/bigdata/kafka_2.13-3.3.1
$ bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
$ netstat -lnpt | grep -i TCP | grep `jps | grep -w QuorumPeerMain | awk '{print $1}'` | grep "LISTEN"
tcp6       0      0 :::45835                :::*                    LISTEN      568684/java         
tcp6       0      0 :::2181                 :::*                    LISTEN      568684/java

2) start kafka server in daemon mode

$ bin/kafka-server-start.sh -daemon config/server.properties
$ netstat -lnpt | grep -i TCP | grep `jps | grep -w Kafka | awk '{print $1}'` | grep "LISTEN"
tcp6       0      0 :::33011                :::*                    LISTEN      569177/java         
tcp6       0      0 :::9092                 :::*                    LISTEN      569177/java

3. Test Kafka
1) create a topic

$ bin/kafka-topics.sh --create --topic test --bootstrap-server localhost:9092
Created topic test.
$ bin/kafka-topics.sh --describe --topic test --bootstrap-server localhost:9092
Topic: test	TopicId: oLdPl33IR7KZFGmrURMKFw	PartitionCount: 1	ReplicationFactor: 1	Configs: 
	Topic: test	Partition: 0	Leader: 0	Replicas: 0	Isr: 0

2) producer events

$ bin/kafka-console-producer.sh --topic test --bootstrap-server localhost:9092
>hello world
>good morning
>cheer
>...
input ^C to break

3) consumer events

$ bin/kafka-console-consumer.sh --topic test --from-beginning --bootstrap-server localhost:9092 
hello world
good morning
cheer
...
input ^C to break

4. Access Kafka from Java client
1) download kafka client binary from https://jar-download.com/artifacts/org.apache.kafka/kafka-clients/3.3.1

$ tar xvf kafka-client-3.3.1.tgz -C ~/learn/java/java8/lib/

2) Write the kafka Java client

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

public class Kafka {
    static class KfkProducer {
        Producer<String, String> producer;

        KfkProducer(String host, int port) {
            Properties props = new Properties();
            props.put("bootstrap.servers", String.format("%s:%d", host, port));
//            props.put("linger.ms", 1);
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            producer = new KafkaProducer<>(props);
        }

        void close() {
            producer.close();
        }

        RecordMetadata send(String topic, String key, String value) {
            Future<RecordMetadata> result = producer.send(new ProducerRecord<>(topic, key, value));
            RecordMetadata meta = null;
            try {
                meta = result.get();
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
            return meta;
        }
    }

    static class KfkConsumer {
        KafkaConsumer<String, String> consumer;

        KfkConsumer(String host, int port, List<String> topics) {
            Properties props = new Properties();
            props.put("bootstrap.servers", String.format("%s:%d", host, port));
            props.put("group.id", "group01");
            props.put("enable.auto.commit", "true");
            props.put("auto.commit.interval.ms", "1000");
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            consumer = new KafkaConsumer<>(props);
            consumer.subscribe(topics);
        }

        void close() {
            consumer.close();
        }

        List<List<Object>> poll(int num) {
            List<List<Object>> result = new ArrayList<>();
            while (result.size() < num) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                for (ConsumerRecord<String, String> record : records) {
                    result.add(Arrays.asList(record.offset(), record.key(), record.value()));
                }
            }
            return result;
        }
    }

    public static void main(String... argv) {
        KfkProducer producer = new KfkProducer("localhost", 9092);
        for (int i = 0; i < 5; i ++) {
            System.out.println(producer.send("test", "" + (i % 5), Integer.toString(i)));
        }
        producer.close();

        KfkConsumer consumer = new KfkConsumer("localhost", 9092, Arrays.asList("test"));
        List<List<Object>> records = consumer.poll(5);
        for (List<Object> record: records) {
            System.out.printf("offset = %d, key = %s, value = %s%n",
                    (long)(record.get(0)), record.get(1), record.get(2));
        }
        consumer.close();
    }
}

3) compile the client

$ javac -cp "lib/kafka-client-3.3.1/*" Kafka.java

4) run the client

$ java -cp "lib/kafka-client-3.3.1/*:." Kafka
test-0@8
test-0@9
test-0@10
test-0@11
test-0@12
offset = 8, key = 0, value = 0
offset = 9, key = 1, value = 1
offset = 10, key = 2, value = 2
offset = 11, key = 3, value = 3
offset = 12, key = 4, value = 4

Note: following scripts can be used to stop servers and clean all created events as need

$ bin/kafka-server-stop.sh
$ bin/zookeeper-server-stop.sh
$ rm -rf /tmp/kafka-logs /tmp/zookeeper /tmp/kraft-combined-logs

Reference: https://kafka.apache.org/quickstart文章来源地址https://www.toymoban.com/news/detail-447647.html

到了这里,关于通过Java client访问Kafka的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 聊聊kafka client性能调优及kafka最佳实践

    这里是 weihubeats ,觉得文章不错可以关注公众号 小奏技术 ,文章首发。拒绝营销号,拒绝标题党 最近在使用 kafka 的时候遇到了一些性能问题。 所以就打算研究下 kafka 相关的性能优化方案。 client 主要分两个 producer consumer producer 主要是有两个核心参数 batch.size linger.ms batch.s

    2024年02月03日
    浏览(41)
  • kerberos认证Flink的kafka connector和kafka client配置

    1. kafka配置文件 kafka jaas必须配置,如果缺少,则报一下错误。 对于Flink只能通过配置 java.security.auth.login.config 的方式。 jaas配置 1.1 方式一: System.setProperty配置系统变量: kafka_client_jaas_keytab.conf文件内容如下: 1.2 方法二:在IDEA中添加jvm参数: 注意:将参数添加至kafka 的pr

    2024年02月04日
    浏览(39)
  • 在Spring Boot微服务集成Kafka客户端(kafka-clients)操作Kafka

    记录 :459 场景 :在Spring Boot微服务集成Kafka客户端kafka-clients-3.0.0操作Kafka。使用kafka-clients的原生KafkaProducer操作Kafka生产者Producer。使用kafka-clients的原生KafkaConsumer操作Kafka的消费者Consumer。 版本 :JDK 1.8,Spring Boot 2.6.3,kafka_2.12-2.8.0,kafka-clients-3.0.0。 Kafka安装 :https://blog.csdn.ne

    2024年02月12日
    浏览(49)
  • kafka client for go

    关于 go 的 kafka client 有很多开源项目,例如 sarama: 具有完整协议支持的纯 Go 实现。包括消费者和生产者实施,支持 GZIP 和 Snappy 压缩。 confluent-kafka-go: Confluent 的 Golang Kafka 客户端包装了 librdkafka C 库,提供完整的 Kafka 协议支持,具有出色的性能和可靠性。提供了高级生产者和

    2024年02月03日
    浏览(39)
  • 在Spring Boot微服务集成kafka-clients操作Kafka集群

    记录 :463 场景 :在Spring Boot微服务集成kafka-clients-3.0.0操作Kafka集群。使用kafka-clients的原生KafkaProducer操作Kafka集群生产者Producer。使用kafka-clients的原生KafkaConsumer操作Kafka集群的消费者Consumer。 版本 :JDK 1.8,Spring Boot 2.6.3,kafka_2.12-2.8.0,kafka-clients-3.0.0。 Kafka集群安装 :https://bl

    2024年02月09日
    浏览(47)
  • Kafka传输数据到Spark Streaming通过编写程序java、scala程序实现操作

    现有一电商网站数据文件,名为buyer_favorite1,记录了用户对商品的收藏数据,数据以“t”键分割,数据内容及数据格式如下: 项目环境说明 开启hadoop集群,zookeeper服务,开启kafka服务。再另开启一个窗口,在/apps/kafka/bin目录下创建一个topic。 1、新创一个文件folder命名为li

    2024年02月13日
    浏览(55)
  • 使用flink的sql-client.sh,测试mysql-->kafka-->kafka-->mysql实时流

    目录 1. 环境介绍 2. mysql建表 3. flinksql建表 3.1 进入flinksql客户端  ​3.2 配置输出格式 ​3.3 flink建表 3.4 任务流配置 4. 测试 4.1 插入测试数据 4.2 查看结果表数据​ 4.3 新增测试数据 4.4 再次查看结果表数据 服务 版本 zookeeper 3.8.0 kafka 3.3.1 flink 1.13.5 mysql 5.7.34 jdk 1.8 scala 2.12 连接器

    2024年02月11日
    浏览(41)
  • Ubuntu22.04 install Kafka

    kafka quickstart  install kafka

    2024年02月09日
    浏览(32)
  • Failed to construct kafka producer at org.apache.kafka.clients.producer.KafkaProducer

    报错信息如下: 在网上找了很久的解决方案,也没找到个所以然,可能是我能力不足没理解到,后来我尝试clean下项目,竟然报错了 提示我pom.xml中有错误,我看了看,唯一有可能的是新导入的一个依赖去掉了版本号,我加上版本号后又重新clean下,成功了,, 然后,我重启

    2024年02月05日
    浏览(37)
  • kafka消费报错, org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since

    问题: 在有大量消息需要消费时,消费端出现报错:org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which t

    2024年03月23日
    浏览(42)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包