Kafka入门, 消费者组案例(十九)

这篇具有很好参考价值的文章主要介绍了Kafka入门, 消费者组案例(十九)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

pom 文件

    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>3.0.0</version>
        </dependency>
    </dependencies>

独立消费者案例(订阅主语)

Kafka入门, 消费者组案例(十九),kafka,linq,分布式
在消费者API代码中,必须配置消费者id。命令行启动消费者不填写消费者组id会被自动填写随机得消费者组id

package com.longer.consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;

public class CustomConsumer {
    public static void main(String[] args) {
        //创建消费者的配置对象
        Properties properties=new Properties();
        //2、给消费者配置对象添加参数
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092");
        //配置序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        //配置消费者组(组名任意起名)必须
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
        //创建消费者对象
        KafkaConsumer<String,String> kafkaConsumer=new KafkaConsumer<String, String>(properties);
        //注册要消费的主题
        ArrayList<String> topics=new ArrayList<>();
        topics.add("first");
        kafkaConsumer.subscribe(topics);
        while (true){
            //设置1s中消费一批数据
            ConsumerRecords<String,String> consumerRecords=kafkaConsumer.poll(Duration.ofSeconds(1));
            //打印消费到的数据
            for(ConsumerRecord<String,String> record:consumerRecords){
                System.out.println(record);
            }
        }
    }
}

发送的信息
Kafka入门, 消费者组案例(十九),kafka,linq,分布式
消费信息(因为我发了好多次)
Kafka入门, 消费者组案例(十九),kafka,linq,分布式

独立消费者案例

Kafka入门, 消费者组案例(十九),kafka,linq,分布式

package com.longer.consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;

public class CustomConsumerPartition {
    public static void main(String[] args) {
        Properties properties=new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        //配置消费者组(组名任意起名)必须
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
        KafkaConsumer kafkaConsumer=new KafkaConsumer(properties);
        //消费某个注意的某个分区数据
        ArrayList<TopicPartition> topicPartitions=new ArrayList<>();
        topicPartitions.add(new TopicPartition("first",0));
        kafkaConsumer.assign(topicPartitions);
        while (true){
            ConsumerRecords<String,String> consumerRecords=kafkaConsumer.poll(Duration.ofSeconds(1));
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                System.out.println(consumerRecord);
            }
        }
    }
}

Kafka入门, 消费者组案例(十九),kafka,linq,分布式

Kafka入门, 消费者组案例(十九),kafka,linq,分布式
发送信息发现只消费0分区的信息
Kafka入门, 消费者组案例(十九),kafka,linq,分布式

消费者组案例

Kafka入门, 消费者组案例(十九),kafka,linq,分布式

package com.longer.consumer.group;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;

public class CustomConsumer1 {
    public static void main(String[] args) {
        //创建消费者的配置对象
        Properties properties=new Properties();
        //2、给消费者配置对象添加参数
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092");
        //配置序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        //配置消费者组(组名任意起名)必须
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
        //创建消费者对象
        KafkaConsumer<String,String> kafkaConsumer=new KafkaConsumer<String, String>(properties);
        //注册要消费的主题
        ArrayList<String> topics=new ArrayList<>();
        topics.add("first");
        kafkaConsumer.subscribe(topics);
        while (true){
            //设置1s中消费一批数据
            ConsumerRecords<String,String> consumerRecords=kafkaConsumer.poll(Duration.ofSeconds(1));
            //打印消费到的数据
            for(ConsumerRecord<String,String> record:consumerRecords){
                System.out.println(record);
            }
        }
    }
}

复制三分,然后运行
Kafka入门, 消费者组案例(十九),kafka,linq,分布式Kafka入门, 消费者组案例(十九),kafka,linq,分布式

Kafka入门, 消费者组案例(十九),kafka,linq,分布式
Kafka入门, 消费者组案例(十九),kafka,linq,分布式文章来源地址https://www.toymoban.com/news/detail-526742.html

到了这里,关于Kafka入门, 消费者组案例(十九)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Kafka3.0.0版本——消费者(独立消费者消费某一个主题中某个分区数据案例__订阅分区)

    1.1、案例需求 创建一个独立消费者,消费firstTopic主题 0 号分区的数据,所下图所示: 1.2、案例代码 生产者往firstTopic主题 0 号分区发送数据代码 消费者消费firstTopic主题 0 分区数据代码 1.3、测试 在 IDEA 中执行消费者程序,如下图: 在 IDEA 中执行生产者程序 ,在控制台观察

    2024年02月09日
    浏览(45)
  • Kafka入门, 消费者工作流程(十八)

    pull(拉)模式: consumer采用从broker中主动拉取数据。 Kafka采用这种方式。 push(推)模式: Kafka没有采用这种方式,因为由broker决定消息发送速率,很难适应所有消费者的速率。例如推送速度是50m/s,consumer1,consumer2就来不及处理消息 pull模式不足之处是,如果kafka没有数据,消费者

    2024年02月13日
    浏览(70)
  • Kafka入门,漏消费和重复消费, 消费者事务,数据积压(二十四)

    重复消费:已经消费了数据,但是offset没提交。 漏消费:先提交offset后消费,有可能会造成数据得漏消费 如果向完成consumer端得进准一次性消费,那么需要Kafka消费端将消费过程和提交offset过程做原子绑定。此时我们需要将kafka的offset保存到支持事务的自定义介质(比如MySQ

    2024年02月15日
    浏览(42)
  • Kafka 入门到起飞 - Kafka怎么做到保障消息不会重复消费的? 消费者组是什么?

    消费者 : 1、订阅Topic(主题) 2、从订阅的Topic消费(pull)消息, 3、将消费消息的offset(偏移量)保存在Kafka内置的一Topic名字是_consumer_offsets的主题中,在Kafka的logs文件下能看到这👟文件,存放的是消息的偏移量数据 消费者组 : 1、订阅同一个Topic的消费者可以加入到一个

    2024年02月15日
    浏览(42)
  • Kafka 入门到起飞系列 - 消费者组管理、位移管理

    消费者组 - Consumer Group 上文我们已经讲过消费者组了,我们知道消费组的存在可以保证一个主题下一个分区的消息只会被组内一个消费者消费,从而避免了消息的重复消费 消费者组是Kafka 提供的可扩展且具有容错性的消费者机制 消费组有一个或多个消费者,消费者可以是一

    2024年02月15日
    浏览(45)
  • java:Kafka生产者推送数据与消费者接收数据(参数配置以及案例)

    bootstrap.servers :Kafka集群中的Broker列表,格式为host1:port1,host2:port2,…。生产者会从这些Broker中选择一个可用的Broker作为消息发送的目标Broker。 acks :Broker对消息的确认模式。可选值为0、1、all。0表示生产者不会等待Broker的任何确认消息;1表示生产者会等待Broker的Leader副本确认

    2024年02月16日
    浏览(47)
  • 13、Kafka ------ kafka 消费者API用法(消费者消费消息代码演示)

    消费者API的核心类是 KafkaConsumer,它提供了如下常用方法: 下面这些方法都体现了Kafka是一个数据流平台,消费者通过这些方法可以从分区的任意位置、重新开始读取数据。 根据KafkaConsumer不难看出,使用消费者API拉取消息很简单,基本只要几步: 1、创建KafkaConsumer对象,创建

    2024年04月11日
    浏览(49)
  • 分布式 - 消息队列Kafka:Kafka消费者和消费者组

    1. Kafka 消费者是什么? 消费者负责订阅Kafka中的主题,并且从订阅的主题上拉取消息。与其他一些消息中间件不同的是:在Kafka的消费理念中还有一层消费组的概念,每个消费者都有一个对应的消费组。当消息发布到主题后,只会被投递给订阅它的每个消费组中的一个消费者

    2024年02月13日
    浏览(45)
  • 【Kafka】Kafka消费者

    pull(拉)模式:consumer采用从broker中主动拉取数据。 Kafka采用这种方式。 push(推)模式:Kafka没有采用这种方式,因为由broker决定消息发送速率,很难适应所有的消费者的消费速率。例如推送的速度是50m/s,consumer1和consumer2旧来不及处理消息。 pull模式不足之处是,如果Kafka没有数

    2024年02月13日
    浏览(49)
  • kafka配置多个消费者groupid kafka多个消费者消费同一个partition(java)

    kafka是由Apache软件基金会开发的一个开源流处理平台。kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。 kafka中partition类似数据库中的分表数据,可以起到水平扩展数据的目的,比如有a,b,c,d,e,f 6个数据,某个topic有两个partition,一

    2024年01月22日
    浏览(86)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包