kafka之java客户端实战

这篇具有很好参考价值的文章主要介绍了kafka之java客户端实战。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1. kafka的客户端

        Kafka提供了两套客户端API,HighLevel API和LowLevel API。 HighLevel API封装了kafka的运行细节,使用起来比较简单,是企业开发过程中最常用的客户端API。 而LowLevel API则需要客户端自己管理Kafka的运行细节,Partition,Offset这些数据都由客户端自行管理。这层API功能更灵活,但是使用起来非常复杂,也更容易出错。只在极少数对性能要求非常极致的场景才会偶尔使用。我们的重点是HighLeve API 。

2. 基础客户端的使用

Kafka提供了非常简单的客户端API。只需要引入一个Maven依赖即可:

  <dependency>
   <groupId>org.apache.kafka</groupId>
   <artifactId>kafka_2.13</artifactId>
   <version>3.4.0</version>
  </dependency>

2.1 如何发消息

        现在, 我们使用Kafka提供的Producer类,如何发送消息。

2.1.1 单项发送消息

代码:

public class MyProducerTest {
    private static final String BOOTSTRAP_SERVERS = "192.168.31.5:9092,192.168.31.176:9092,192.168.31.232:9092";
    private static final String TOPIC = "disTopic";

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //PART1:设置发送者相关属性
        Properties props = new Properties();
        // 此处配置的是kafka的端口
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,"com.roy.kfk.basic.MyInterceptor");
        // 配置key的序列化类
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        // 配置value的序列化类
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");

        Producer<String,String> producer = new KafkaProducer<>(props);
        for(int i = 0; i < 5; i++) {
            //Part2:构建消息
            ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, Integer.toString(i), "MyProducer" + i);
            //Part3:发送消息
            //单向发送:不关心服务端的应答。
            producer.send(record);
            System.out.println("message "+i+" sended");
        }
        //消息处理完才停止发送者。
        producer.close();
    }
}

执行结果:

kafka之java客户端实战,Kafka,kafka,java,分布式

2.1.2 同步发送

代码:

public class MyProducerTest {
    private static final String BOOTSTRAP_SERVERS = "192.168.31.5:9092,192.168.31.176:9092,192.168.31.232:9092";
    private static final String TOPIC = "disTopic";

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //PART1:设置发送者相关属性
        Properties props = new Properties();
        // 此处配置的是kafka的端口
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,"com.roy.kfk.basic.MyInterceptor");
        // 配置key的序列化类
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        // 配置value的序列化类
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");

        Producer<String,String> producer = new KafkaProducer<>(props);
        for(int i = 0; i < 5; i++) {
            //Part2:构建消息
            ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, Integer.toString(i), "MyProducer" + i);
            //Part3:发送消息
            //同步发送:获取服务端应答消息前,会阻塞当前线程。
            RecordMetadata recordMetadata = producer.send(record).get();
            String topic = recordMetadata.topic();
            int partition = recordMetadata.partition();
            long offset = recordMetadata.offset();
            String message = recordMetadata.toString();
            System.out.println("message:["+ message+"] sended with topic:"+topic+"; partition:"+partition+ ";offset:"+offset);

        }
        //消息处理完才停止发送者。
        producer.close();
    }
}

执行结果:

kafka之java客户端实战,Kafka,kafka,java,分布式

 2.1.2 异步发送 

代码:

public class MyProducerTest {
    private static final String BOOTSTRAP_SERVERS = "192.168.31.5:9092,192.168.31.176:9092,192.168.31.232:9092";
    private static final String TOPIC = "disTopic";

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //PART1:设置发送者相关属性
        Properties props = new Properties();
        // 此处配置的是kafka的端口
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,"com.roy.kfk.basic.MyInterceptor");
        // 配置key的序列化类
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        // 配置value的序列化类
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");

        Producer<String,String> producer = new KafkaProducer<>(props);
        CountDownLatch latch = new CountDownLatch(5);
        for(int i = 0; i < 5; i++) {
            //Part2:构建消息
            ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, Integer.toString(i), "MyProducer" + i);
            //Part3:发送消息
            //异步发送:消息发送后不阻塞,服务端有应答后会触发回调函数
            producer.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if(null != e){
                        System.out.println("消息发送失败,"+e.getMessage());
                        e.printStackTrace();
                    }else{
                        String topic = recordMetadata.topic();
                        long offset = recordMetadata.offset();
                        String message = recordMetadata.toString();
                        System.out.println("message:["+ message+"] sended with topic:"+topic+";offset:"+offset);
                    }
                    latch.countDown();
                }
            });
        }
        //消息处理完才停止发送者。
        latch.await();
        //消息处理完才停止发送者。
        producer.close();
    }
}

执行结果:

kafka之java客户端实战,Kafka,kafka,java,分布式

2.1.3 总结 

​ 从上述示例中,我们可以总结出,构建Producer分为三个步骤:

  1. 设置Producer核心属性 :Producer可选的属性都可以由ProducerConfig类管理。比如ProducerConfig.BOOTSTRAP_SERVERS_CONFIG属性,显然就是指发送者要将消息发到哪个Kafka集群上。这是每个Producer必选的属性。在ProducerConfig中,对于大部分比较重要的属性,都配置了对应的DOC属性进行描述。
  2. 构建消息:Kafka的消息是一个Key-Value结构的消息。其中,key和value都可以是任意对象类型。其中,key主要是用来进行Partition分区的,业务上更关心的是value。
  3. 使用Producer发送消息:通常用到的就是单向发送、同步发送和异步发送者三种发送方式。

2.2 如何消费消息

        接下来可以使用Kafka提供的Consumer类,快速消费消息。

2.2.1 消费消息

代码:

public class MyConsumerTest {
    private static final String BOOTSTRAP_SERVERS = "192.168.31.5:9092,192.168.31.176:9092,192.168.31.232:9092";
    private static final String TOPIC = "disTopic";

    public static void main(String[] args) {
        //PART1:设置发送者相关属性
        Properties props = new Properties();
        //kafka地址
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        //每个消费者要指定一个group
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
        //key序列化类
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        //value序列化类
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        Consumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList(TOPIC));

        while (true) {
            //PART2:拉取消息
            // 100毫秒超时时间
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofNanos(100));
            //PART3:处理消息
            for (ConsumerRecord<String, String> record : records) {
                System.out.println("partition = "+record.partition()+"offset = " + record.offset() + ";key = " + record.key() + "; value= " + record.value());
            }


            //提交offset,消息就不会重复推送。
            consumer.commitSync(); //同步提交,表示必须等到offset提交完毕,再去消费下一批数据。
//            consumer.commitAsync(); //异步提交,表示发送完提交offset请求后,就开始消费下一批数据了。不用等到Broker的确认。
        }
    }
}

2.2.2 总结

​ 整体来说,Consumer同样是分为三个步骤:

  1. 设置Consumer核心属性 :可选的属性都可以由ConsumerConfig类管理。在这个类中,同样对于大部分比较重要的属性,都配置了对应的DOC属性进行描述。同样BOOTSTRAP_SERVERS_CONFIG是必须设置的属性。
  2. 拉取消息:Kafka采用Consumer主动拉取消息的Pull模式。consumer主动从Broker上拉取一批感兴趣的消息。
  3. 处理消息,提交位点:消费者将消息拉取完成后,就可以交由业务自行处理对应的这一批消息了。只是消费者需要向Broker提交偏移量offset。如果不提交Offset,Broker会认为消费者端消息处理失败了,还会重复进行推送。

3. 客户端核心参数与客户端机制

3.1 消费者分组消费机制

3.2 生产者拦截器机制

3.3 消息序列化机制

3.4 消息分区路由机制

3.5 生产者消息缓存机制

3.6 发送应答机制

3.7 生产者消息幂等性

3.8 生产者消息事务

内容更新中文章来源地址https://www.toymoban.com/news/detail-798891.html

到了这里,关于kafka之java客户端实战的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【实践篇】Redis最强Java客户端(四)之Ression分布式集合使用指南

    前两章我们了解了《【实践篇】Redis最强Java客户端(一)之Redisson入门介绍》和《【实践篇】Redis最强Java客户端(二)之Redisson基础概念》 本章第四章主要介绍Ression分布式集合使用指南。 上一章《Redisson 7种分布式锁使用指南》回顾。 本章我们介绍了在Redisson中实现的各种分布式集

    2024年02月09日
    浏览(47)
  • 【实践篇】Redis最强Java客户端(三)之Redisson 7种分布式锁使用指南

    前两章我们了解了《【实践篇】Redis最强Java客户端(一)之Redisson入门介绍》和《【实践篇】Redis最强Java客户端(二)之Redisson基础概念》本章第三章主要介绍Redisson的七种分布式锁,分别是简单锁、公平锁、可重入锁、红锁、读写锁、信号量和闭锁。下面是每种锁的基本概念、使用

    2024年02月09日
    浏览(51)
  • 【kafka】Java客户端代码demo:自动异步提交、手动同步提交及提交颗粒度、动态负载均衡

    kafka版本为3.6,部署在3台linux上。 maven依赖如下: 生产者、消费者和topic代码如下: 这里先简单解释一下, kafka的topic只是一个逻辑上的概念,实际上的物理存储是依赖分布在broker中的分区partition来完成的 。kafka依赖的zk中有一个 __consumer_offsets [1]话题,存储了所有consumer和g

    2024年01月19日
    浏览(53)
  • 分布式软件架构——客户端缓存

    当万维网刚刚出现的时候,浏览器的缓存机制差不多就已经存在了。在 HTTP 协议设计之初,人们便确定了服务端与客户端之间“无状态”(Stateless)的交互原则,即要求客户端的每次请求是独立的,每次请求无法感知、也不能依赖另一个请求的存在,这既简化了 HTTP 服务器的

    2024年02月12日
    浏览(43)
  • kafka 02——三个重要的kafka客户端

    请参考下面的文章: Kafka 01——Kafka的安装及简单入门使用. AdminClient API: 允许管理和检测Topic、Broker以及其他Kafka对象。 Producer API: 发布消息到一个或多个API。 Consumer API: 订阅一个或多个Topic,并处理产生的消息。 如下: 完整的pom 关于配置,可参考官网: https://kafka.apa

    2024年02月13日
    浏览(45)
  • Kafka-客户端使用

    Kafka提供了两套客户端API,HighLevel API和LowLevel API。 HighLevel API 封装了kafka的运行细节,使用起来比较简单,是企业开发过程中最常用的客户端API。 LowLevel API则需要客户端自己管理Kafka的运行细节,Partition,Offset这些数据都由客户端自行管理。这层API功能更灵活,但是使用起来

    2024年02月22日
    浏览(51)
  • kafka客户端工具(Kafka Tool)的安装

    官方下载 根据不同的系统下载对应的版本,点击下载后双击,如何一直下一步,安装 kafka环境搭建请参考:CentOS 搭建Kafka集群 (1)连接kafka (2)简单使用  

    2024年04月23日
    浏览(69)
  • Kafka增加安全验证安全认证,SASL认证,并通过spring boot-Java客户端连接配置

    公司Kafka一直没做安全验证,由于是诱捕程序故需要面向外网连接,需要增加Kafka连接验证,保证Kafka不被非法连接,故开始研究Kafka安全验证 使用Kafka版本为2.4.0版本,主要参考官方文档 官网对2.4版本安全验证介绍以及使用方式地址: https://kafka.apache.org/24/documentation.html#secu

    2024年02月01日
    浏览(62)
  • kafka客户端应用参数详解

    Kafka提供了非常简单的客户端API。只需要引入一个Maven依赖即可: 1、消息发送者主流程  然后可以使用Kafka提供的Producer类,快速发送消息。 ​ 整体来说,构建Producer分为三个步骤: 设置Producer核心属性  :Producer可选的属性都可以由ProducerConfig类管理。比如ProducerConfig.BOOTST

    2024年02月07日
    浏览(52)
  • 【Kafka】Kafka客户端认证失败:Cluster authorization failed.

    kafka客户端是公司内部基于spring-kafka封装的 spring-boot版本:3.x spring-kafka版本:2.1.11.RELEASE 集群认证方式:SASL_PLAINTEXT/SCRAM-SHA-512 经过多年的经验,以及实际验证,配置是没问题的,但是业务方反馈用相同的配置,还是报错! 封装的kafka客户端版本过低,高版本的配置项:secu

    2024年01月17日
    浏览(51)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包