Java整合Kafka实现生产及消费

这篇具有很好参考价值的文章主要介绍了Java整合Kafka实现生产及消费。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

前提条件

  • 搭建Kafka环境,参考Kafka集群环境搭建及使用
  • Java环境:JDK1.8
  • Maven版本:apache-maven-3.6.3
  • 开发工具:IntelliJ IDEA

项目环境

  1. 创建maven项目。
  2. pom.xml文件中引入kafka依赖。
<dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>2.1.0</version>
        </dependency>
</dependencies>

创建Topic

创建topic命名为testtopic并指定2个分区。

./kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --create --topic testtopic --partitions 2

生产消息

public class Producer {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 生产参数配置
        Properties properties = new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
        int i=0;
        while (true) {
            //生产消息
            Future<RecordMetadata> future = kafkaProducer.send(new ProducerRecord<String, String>("testtopic", "key"+i, "value"+i));
            //获取生产的数据信息
            RecordMetadata recordMetadata = future.get();
            System.out.println("time:"+recordMetadata.timestamp()+" key:"+i+" value:"+i+" partition:"+recordMetadata.partition()+" offset:"+recordMetadata.offset());
            Thread.sleep(1000);
            i+=1;
        }
    }
}

生产者参数配置

// ACK机制,默认为1 (0,1,-1)
properties.setProperty(ProducerConfig.ACKS_CONFIG, "");
// Socket发送消息缓冲区大小,默认为128K,设置为-1代表操作系统的默认值
properties.setProperty(ProducerConfig.SEND_BUFFER_CONFIG, ""); 
// Socket接收消息缓冲区大小,默认为32K,设置为-1代表操作系统的默认值
properties.setProperty(ProducerConfig.RECEIVE_BUFFER_CONFIG, ""); 
// 生产者客户端发送消息的最大值,默认1M
properties.setProperty(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, ""); 
// 发送消息异常时重试次数,默认为0
properties.setProperty(ProducerConfig.RETRIES_CONFIG, "");   
// 重试间隔时间,默认100
properties.setProperty(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "");    
// 生产消息自定义分区策略类
properties.setProperty(ProducerConfig.PARTITIONER_CLASS_CONFIG, "");
// 开启幂等 ,默认true
properties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "");

更多配置信息查看ProducerConfig类

生产自定义分区策略

  1. 创建分区策略类,实现org.apache.kafka.clients.producer.Partitioner接口,编写具体策略。
public class PartitionPolicy implements Partitioner {

    private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap();

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        if (keyBytes == null) {
            int nextValue = this.nextValue(topic);
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            if (availablePartitions.size() > 0) {
                int part = Utils.toPositive(nextValue) % availablePartitions.size();
                return ((PartitionInfo)availablePartitions.get(part)).partition();
            } else {
                return Utils.toPositive(nextValue) % numPartitions;
            }
        } else {
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }


    private int nextValue(String topic) {
        AtomicInteger counter = (AtomicInteger)this.topicCounterMap.get(topic);
        if (null == counter) {
            counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
            AtomicInteger currentCounter = (AtomicInteger)this.topicCounterMap.putIfAbsent(topic, counter);
            if (currentCounter != null) {
                counter = currentCounter;
            }
        }

        return counter.getAndIncrement();
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> map) {

    }
}
  1. 参数配置。
properties.setProperty(ProducerConfig.PARTITIONER_CLASS_CONFIG, PartitionPolicy.class.getName());

生产到指定分区

ProducerRecord有指定分区的构造方法,设置分区号
public ProducerRecord(String topic, Integer partition, K key, V value)

Future<RecordMetadata> future = kafkaProducer.send(new ProducerRecord<String, String>("testtopic", 1, "key"+i, "value"+i));

消费消息

public class Consumer {
    public static void main(String[] args) throws InterruptedException {
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        //约定的编解码
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test_group");
        //默认为自动提交
        properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        //当设置为自动提交时,默认5秒自动提交
        //properties.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "5000");
        //
        //properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "5000");
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
        //订阅topic
        kafkaConsumer.subscribe(Arrays.asList("testtopic"));
        Set<TopicPartition> assignment = kafkaConsumer.assignment();
        ConsumerRecords<String, String> records = null;
        while (assignment.size() == 0) {
            records = kafkaConsumer.poll(Duration.ofMillis(100));
            assignment = kafkaConsumer.assignment();
        }
        /*//1.根据时间戳获取 offset,设置 offset
        Map<TopicPartition, Long> offsetsForTimes=new HashMap<>();
        for (TopicPartition topicPartition : assignment) {
            offsetsForTimes.put(topicPartition,1669972273941L);
        }
        Map<TopicPartition, OffsetAndTimestamp> offsetAndTimestampMap = kafkaConsumer.offsetsForTimes(offsetsForTimes);
        offsetAndTimestampMap.forEach((tp,offsettime)->{
            kafkaConsumer.seek(tp,offsettime.offset());
        });*/
        /*//2.指定从头开始消费
        kafkaConsumer.seekToBeginning(assignment);*/
        /*//3.指定从某offset开始消费
        kafkaConsumer.seek(tp,0);*/
        while (true) {
            if (records.isEmpty()) {
                Thread.sleep(3000);
            } else {
                System.out.printf("records count:" + records.count());
                Iterator<ConsumerRecord<String, String>> iterator = records.iterator();
                while (iterator.hasNext()) {
                    ConsumerRecord<String, String> record = iterator.next();
                    System.out.println(" time:" + record.timestamp() + " key:" + record.key() + " value:" + record.value() + " partition:" + record.partition() + " offset:" + record.offset());
                }
                kafkaConsumer.commitSync();
            }
            records = kafkaConsumer.poll(Duration.ofMillis(0));
        }
    }
}

消费参数配置

// 消费者必须指定一个消费组
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "");
// 消费者每次最多POLL的数量
properties.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "");
// 消费者POLL的时间间隔
properties.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_DOC, "");
// 设置是否自动提交,默认为true
properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "");  
// 如果是自动提交,默认5s后提交,会发生丢失消息和重复消费情况
properties.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "");   
// 当一个新的消费组或者消费信息丢失后,在哪里开始进行消费。earliest:消费最早的消息。latest(默认):消费最近可用的消息。none:没有找到消费组消费数据时报异常。

更多配置信息查看ConsumerConfig类

offset设置方式

如代码所示,设置offset的几种方式:

  • 指定 offset,需要自己维护 offset,方便重试。
  • 指定从头开始消费。
  • 指定 offset 为最近可用的 offset (默认)。
  • 根据时间戳获取 offset,设置 offset。

代码仓库

https://gitee.com/codeWBG/learn_kafka文章来源地址https://www.toymoban.com/news/detail-441357.html

到了这里,关于Java整合Kafka实现生产及消费的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Spring Boot 整合kafka:生产者ack机制和消费者AckMode消费模式、手动提交ACK

    Kafka 生产者的 ACK 机制指的是生产者在发送消息后,对消息副本的确认机制。ACK 机制可以帮助生产者确保消息被成功写入 Kafka 集群中的多个副本,并在需要时获取确认信息。 Kafka 提供了三种 ACK 机制的配置选项,分别是: acks=0:生产者在成功将消息发送到网络缓冲区后即视

    2024年02月04日
    浏览(51)
  • Docker搭建Kafka教程(python使用kafka基础用法,生产消费)

    Docker支持多种网络模式,包括bridge(桥接模式)、host(主机模式)、overlay(覆盖网络模式)等。 Bridge(桥接模式):这是Docker默认的网络模式。在桥接模式下,每个Docker容器都运行在自己的虚拟网络接口上,并且这些接口通过一个桥接器连接在一起。Docker服务端启动时会自

    2024年02月03日
    浏览(40)
  • Springboot最简单的实战介绍 整合kafka-生产者与消费者(消息推送与订阅获取)

    #spring.kafka.bootstrap-servers=123.xxx.x.xxx:19092,123.xxx.x.xxx:19093,123.xxx.x.xxx:19094 spring.kafka.bootstrap-servers=192.168.x.xxx:9092 #=============== producer生产者 ======================= spring.kafka.producer.retries=0 spring.kafka.producer.batch-size=16384 spring.kafka.producer.buffer-memory=33554432 spring.kafka.producer.key-serializer=org.ap

    2024年04月09日
    浏览(42)
  • 【JAVA】生产环境kafka重复消费问题记录

    业务系统每周都有定时任务在跑,由于是大任务因此采用分而治之思想将其拆分为多个分片小任务采用 kafka异步队列消费 的形式来减少服务器压力,每个小任务都会调用后台的c++算法,调用完成之后便会回写数据库的成功次数。今天观测到定时任务的分片小任务存在被重复消

    2024年04月12日
    浏览(41)
  • Java轻松使用Kafka生产者,消费者

    Java轻松使用Kafka生产者,消费者 一、环境说明 项目中需要下面的依赖: ( 版本自定义 ) 2. yml配置文件设置 1. 简单生产者的书写: 1. 简单消费者的书写:   注:多消费者时,需要对应kafka中配置的分区;多少的Partition就有多少个消费者,以免资源浪费

    2024年02月15日
    浏览(52)
  • golang整合rabbitmq-实现创建生产者绑定交换机-创建消费者消费完整代码

    1,在生产者端初始化mq连接 2,创建生产者 3,另起一个go服务进行消费者消费 后面将会发布golang整合es操作的文章

    2024年01月25日
    浏览(55)
  • 【Kafka】Ubuntu 部署kafka中间件,实现Django生产和消费

    原文作者 :我辈李想 版权声明 :文章原创,转载时请务必加上原文超链接、作者信息和本声明。 队列是一种特殊的线性表,特殊之处在于它只允许在表的前端(front)进行删除操作,而在表的后端(rear)进行插入操作,和栈一样,队列是一种操作受限制的线性表。进行插入

    2024年02月16日
    浏览(47)
  • java:Kafka生产者推送数据与消费者接收数据(参数配置以及案例)

    bootstrap.servers :Kafka集群中的Broker列表,格式为host1:port1,host2:port2,…。生产者会从这些Broker中选择一个可用的Broker作为消息发送的目标Broker。 acks :Broker对消息的确认模式。可选值为0、1、all。0表示生产者不会等待Broker的任何确认消息;1表示生产者会等待Broker的Leader副本确认

    2024年02月16日
    浏览(44)
  • 2.搭建Fabric区块链网络环境——前提条件和fabric的安装

    这些前提条件的满足确保了你可以顺利地搭建和运行 Fabric 区块链网络,并进行链码的开发、部署和执行。 安装 Docker:确保系统上已经安装了 Docker,并且 Docker 服务正在运行。 Docker:Fabric 使用 Docker 容器化技术来部署和管理区块链网络的各个组件。Docker 提供了一个轻量级、

    2024年02月11日
    浏览(52)
  • 从生产者-消费者模型中学习互斥量,锁,条件变量

    经典的并发控制模型 主要是练习 mutex unique_lock conditional_variable [[20 原子操作]] 1 mutex 互斥量 mutex 是一种互斥的同步原语,用于保护共享资源的访问,确保在同一时间只有一个线程可以访问共享资源。通过对互斥量加锁和解锁,可以实现对共享资源的独占访问。 2 shared_mutex 共

    2024年02月06日
    浏览(40)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包