kafka生产者消费者练习

这篇具有很好参考价值的文章主要介绍了kafka生产者消费者练习。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

需求:写一个生产者,不断的去生产用户行为数据,写入到kafka的一个topic中
生产的数据格式: 造数据
{“guid”:1,“eventId”:“pageview”,“timestamp”:1637868346789} isNew = 1
{“guid”:1,“eventId”:“addcard”,“timestamp”:1637868347625} isNew = 0
{“guid”:2,“eventId”:“collect”,“timestamp”:16378683463219}
{“guid”:3,“eventId”:“paid”,“timestamp”:16378683467829}

再写一个消费者,不断的从kafka中消费上面的用户行为数据,做一个统计
1.每5s输出一次当前来了多少用户(去重) uv
2.将每条数据添加一个字段来标识,如果这个用户的id是第一次出现,那么就标注1,否则就是0

首先添加依赖:

<dependencies>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>${kafka.version}</version>
    </dependency>

    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.75</version>
    </dependency>

    <dependency>
        <groupId>org.apache.commons</groupId>
        <artifactId>commons-lang3</artifactId>
        <version>3.8.1</version>
    </dependency>


    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.18.22</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.roaringbitmap/RoaringBitmap -->
    <dependency>
        <groupId>org.roaringbitmap</groupId>
        <artifactId>RoaringBitmap</artifactId>
        <version>0.9.0</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/com.google.guava/guava -->
    <dependency>
        <groupId>com.google.guava</groupId>
        <artifactId>guava</artifactId>
        <version>31.1-jre</version>
    </dependency>

    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>8.0.28</version>
    </dependency>

生产者代码示例:

package com.doitedu;

import com.alibaba.fastjson.JSON;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.RandomUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

/**
 * 验证数据:
 * 创建topic
 * kafka-topics.sh --create --topic event-log --zookeeper linux01:2181 --partitions 3 --replication-factor 3
 * 搞个消费者消费数据
 * kafka-console-consumer.sh  --bootstrap-server linux01:9092 --topic event-log
 * {"eventId":"zTUAbXcWbn","guid":7170,"timeStamp":1659944455262}
 * {"eventId":"KSzaaNmczb","guid":9743,"timeStamp":1659944455823}
 * {"eventId":"FNUERLlCNu","guid":7922,"timeStamp":1659944456295}
 * {"eventId":"VmXVJHlpOF","guid":2505,"timeStamp":1659944458267}
 * {"eventId":"pMIHwLzSIE","guid":7668,"timeStamp":1659944460088}
 * {"eventId":"ZvGYIvmKTx","guid":3636,"timeStamp":1659944460461}
 * {"eventId":"jBanTDSlCO","guid":3468,"timeStamp":1659944460787}
 * {"eventId":"vXregpYeHu","guid":1107,"timeStamp":1659944462525}
 * {"eventId":"PComosCafr","guid":7765,"timeStamp":1659944463640}
 * {"eventId":"xCHFOYIJlb","guid":3443,"timeStamp":1659944464697}
 * {"eventId":"xDToApWwFo","guid":5034,"timeStamp":1659944465953}
 */
public class Exercise_kafka编程练习 {
    public static void main(String[] args) throws InterruptedException {
        MyData myData = new MyData();
        myData.genData();
    }
}

class MyData{
    KafkaProducer<String, String> producer = null;
    public MyData(){
        Properties props = new Properties();
        props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"linux01:9092,linux02:9092,linux03:9092");
        props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        producer = new KafkaProducer<String, String>(props);
    }
    
    public void genData() throws InterruptedException {
        UserEvent userEvent = new UserEvent();
        while (true){
            //造数据
            userEvent.setGuid(RandomUtils.nextInt(0,10000));
            userEvent.setEventId(RandomStringUtils.randomAlphabetic(10));
            userEvent.setTimeStamp(System.currentTimeMillis());
            String json = JSON.toJSONString(userEvent);
            //数据造完了就往kafka中写
            ProducerRecord<String, String> stringProducerRecord = new ProducerRecord<>("event-log", json);
            Thread.sleep(RandomUtils.nextInt(200,1000));
            producer.send(stringProducerRecord);
        }
    }
}
/*
{"guid":1,"eventId":"pageview","timestamp":1637868346789}
{"guid":1,"eventId":"addcard","timestamp":1637868347625}
{"guid":2,"eventId":"collect","timestamp":16378683463219}
 */
@NoArgsConstructor
@AllArgsConstructor
@Getter
@Setter
class UserEvent{
    private Integer guid;
    private String eventId;
    private long timeStamp;
}

消费者代码示例:用hashset来实现:

package com.doitedu;

import com.alibaba.fastjson.JSON;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.*;

/**
 * 分两步走:
 * 第一步:一个消费者不断的去消费数据
 * 第二步:5分钟计算一次,返回用户数这个结果
 */
public class Exercise_consumerDemo {
    public static void main(String[] args) {
        HashSet<Integer> set = new HashSet<>();
        new Thread(new ConsumerThread(set)).start();
        //定时的任务调度
        Timer timer = new Timer();
        //调度,第一个参数,你给我一个任务,
        //第二个参数代表过多久之后我开始执行任务
        //第三个参数代表每隔多久执行一次
        timer.schedule(new ConsumerTask(set),5000,10000);

    }
}

class ConsumerThread implements Runnable {
    HashSet<Integer> set = null;
    KafkaConsumer<String, String> consumer = null;

    public ConsumerThread(HashSet<Integer> set) {
        this.set = set;
        Properties props = new Properties();
        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "linux01:9092");
        props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test001");

        consumer = new KafkaConsumer<String, String>(props);
        consumer.subscribe(Arrays.asList("event-log"));
    }
    /**
     * 重写run方法的话,我需要在里面实现什么逻辑?
     * 消费者消费数据,拿到数据以后,只需要获取到用户id
     * 将用户id写到hashset集合里面
     */
    @Override
    public void run() {
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(Integer.MAX_VALUE));
            for (ConsumerRecord<String, String> record : records) {
                String json = record.value();
                UserEvent userEvent = JSON.parseObject(json, UserEvent.class);
                Integer guid = userEvent.getGuid();
                set.add(guid);
            }
        }
    }
}

class ConsumerTask extends TimerTask {
    HashSet<Integer> set = null;

    public ConsumerTask(HashSet<Integer> set) {
        this.set = set;
    }
    /**
     * 这里面就是返回的一个用户数
     */
    @Override
    public void run() {
        int userCount = set.size();
        System.out.println(System.currentTimeMillis() + ",截至到当前为止的一个用户数为:"+userCount);
    }
}

用hashset来实现很显然会出问题,如果数据量一直往上增长,会出现oom的问题,而且占用资源越来越多,影响电脑性能!!!
方案二:将HashSet改成bitMap来计数,就很完美,大逻辑不变,小逻辑就是将HashMap改成bitMap

package com.doitedu;

import com.alibaba.fastjson.JSON;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.roaringbitmap.RoaringBitmap;

import java.time.Duration;
import java.util.*;

/**
 * 分两步走:
 * 第一步:一个消费者不断的去消费数据
 * 第二步:5分钟计算一次,返回用户数这个结果
 */
public class BitMap_consumerDemo {
    public static void main(String[] args) {
        //原来我用的是Hashset来记录,现在我用RoaringBitmap来记录
        RoaringBitmap bitMap = RoaringBitmap.bitmapOf();

        new Thread(new BitMapConsumerThread(bitMap)).start();
        //定时的任务调度
        Timer timer = new Timer();
        timer.schedule(new BitMapConsumerTask(bitMap),1000,5000);

    }
}

class BitMapConsumerThread implements Runnable {
    RoaringBitmap bitMap = null;
    KafkaConsumer<String, String> consumer = null;

    public BitMapConsumerThread(RoaringBitmap bitMap) {
        this.bitMap = bitMap;
        Properties props = new Properties();
        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "linux01:9092");
        props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test001");

        consumer = new KafkaConsumer<String, String>(props);
        consumer.subscribe(Arrays.asList("event-log"));
    }
    /**
     * 重写run方法的话,我需要在里面实现什么逻辑?
     * 消费者消费数据,拿到数据以后,只需要获取到用户id
     * 将用户id写到hashset集合里面
     */
    @Override
    public void run() {
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(Integer.MAX_VALUE));
            for (ConsumerRecord<String, String> record : records) {
                String json = record.value();
                UserEvent userEvent = JSON.parseObject(json, UserEvent.class);
                Integer guid = userEvent.getGuid();
                bitMap.add(guid);
            }
        }
    }
}


class BitMapConsumerTask extends TimerTask {
    RoaringBitmap bitMap = null;

    public BitMapConsumerTask(RoaringBitmap bitMap) {
        this.bitMap = bitMap;
    }
    /**
     * 这里面就是返回的一个用户数
     */
    @Override
    public void run() {
        int userCount = bitMap.getCardinality();
        System.out.println(System.currentTimeMillis() + ",截至到当前为止的一个用户数为:"+userCount);
    }
}

需求二:判断来没来过的问题,可以用bitmap来搞,当然还可以用布隆过滤器来搞文章来源地址https://www.toymoban.com/news/detail-480447.html

package com.doitedu;

import com.alibaba.fastjson.JSON;
import com.google.common.hash.BloomFilter;
import com.google.common.hash.Funnels;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import java.util.Timer;
import java.util.TimerTask;

/**
 * 用布隆过滤器来判定是否重复,当然,bitMap也是可以操作的
 */
public class BloomFilter_consumerDemo {
    public static void main(String[] args) {

        BloomFilter<Long> longBloomFilter = BloomFilter.create(Funnels.longFunnel(), 100000);

        new Thread(new BloomFilterConsumerThread(longBloomFilter)).start();
    }
}

class BloomFilterConsumerThread implements Runnable {
    BloomFilter<Long> longBloomFilter = null;
    KafkaConsumer<String, String> consumer = null;

    public BloomFilterConsumerThread(BloomFilter<Long> longBloomFilter) {
        this.longBloomFilter = longBloomFilter;
        Properties props = new Properties();
        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "linux01:9092");
        props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test001");
        consumer = new KafkaConsumer<String, String>(props);
        consumer.subscribe(Arrays.asList("event-log"));
    }

    /**
     * 重写run方法的话,我需要在里面实现什么逻辑?
     * 消费者消费数据,拿到数据以后,只需要获取到用户id
     * 将用户id写到hashset集合里面
     */
    @Override
    public void run() {
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(Integer.MAX_VALUE));
            for (ConsumerRecord<String, String> record : records) {
                String json = record.value();
                UserEvent userEvent = JSON.parseObject(json, UserEvent.class);
                Integer guid = userEvent.getGuid();
                boolean flag = longBloomFilter.mightContain((long) guid);
                if (flag) {
                    userEvent.setIsNew(0);
                } else {
                    userEvent.setIsNew(1);
                }
                //判断完成以后,得把他加进去
                longBloomFilter.put((long) guid);
                System.out.println(JSON.toJSONString(userEvent));
            }
        }
    }
}

到了这里,关于kafka生产者消费者练习的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Java轻松使用Kafka生产者,消费者

    Java轻松使用Kafka生产者,消费者 一、环境说明 项目中需要下面的依赖: ( 版本自定义 ) 2. yml配置文件设置 1. 简单生产者的书写: 1. 简单消费者的书写:   注:多消费者时,需要对应kafka中配置的分区;多少的Partition就有多少个消费者,以免资源浪费

    2024年02月15日
    浏览(43)
  • kafka生产者和消费者(python版)

    生产者 消费者 消费者中的组名主要用户针对主题的偏移量进行更改,也涉及到主题中分区的问题, kafka工具类 此工具类基本上拿过去就可以用 疑问 当消费者链接kafka时发现topic没有未读的消息怎样退出呢,默认是在一直等待,但是我期望没有要读的消息的时候直接退出即可

    2024年02月16日
    浏览(30)
  • Kafka:主题创建、分区修改查看、生产者、消费者

    1.创建主题 2.查看所有主题 3.查看详细主题 序号从0开始计算 Partition:分区数,该主题有3个分区 Replica:副本数,该主题有3个副本 Leader:副本数中的主的序号,生产消费的对象 1.修改分区数 修改的分区数量不可以小于或者等于当前主题分区的数量,否则会报错 在根目录kaf

    2024年02月11日
    浏览(78)
  • Kafka官方生产者和消费者脚本简单使用

    怎样使用Kafka官方生产者和消费者脚本进行消费生产和消费?这里假设已经下载了kafka官方文件,并已经解压. 这就可以见到测试kafka对应topic了.

    2024年02月04日
    浏览(35)
  • Kafka系列:查看Topic列表、消息消费情况、模拟生产者消费者

    执行topic删除命令时,出现提示 这条命令其实并不执行删除动作,仅仅是在zookeeper上标记该topic要被删除而已,同时也提醒用户一定要提前打开delete.topic.enable开关,否则删除动作是不会执行的。 解决办法: a)在server.properties中设置delete.topic.enable参数为ture b)如下操作: 1.登

    2023年04月26日
    浏览(37)
  • Linux安装Kafka,创建topic、生产者、消费者

    1.创建安装目录/usr/local/kafka mkdir /usr/local/kafka 2.进入安装包目录 cd /usr/local/kafka  3.下载安装包 wget https://downloads.apache.org/kafka/3.3.1/kafka_2.12-3.3.1.tgz 4.解压安装包 tar -zxvf kafka_2.12-3.3.1.tgz 5.进入cd kafka_2.12-3.3.1目录 cd kafka_2.12-3.3.1/ 6.修改zookeeper配置 cat ./config/zookeeper.properties | grep

    2023年04月17日
    浏览(33)
  • 探究:kafka生产者/消费者与多线程安全

    目录 1. 多线程安全 1.1. 生产者是多线程安全的么? 1.1. 消费者是多线程安全的么? 2. 消费者规避多线程安全方案 2.1. 每个线程维护一个kafkaConsumer 2.2. [单/多]kafkaConsumer实例 + 多worker线程 2.3.方案优缺点对比         Kafka生产者是 线程安全 的,可以在多个线程中共享一个

    2023年04月26日
    浏览(75)
  • Kafka 架构深度解析:生产者(Producer)和消费者(Consumer)

    Apache Kafka 作为分布式流处理平台,其架构中的生产者和消费者是核心组件,负责实现高效的消息生产和消费。本文将深入剖析 Kafka 架构中生产者和消费者的工作原理、核心概念以及高级功能。 1 发送消息到 Kafka Kafka 生产者负责将消息发布到指定的主题。以下是一个简单的生

    2024年02月03日
    浏览(37)
  • kafka配置大全broker、topic、生产者和消费者等配置介绍

    每个kafka broker中配置文件 server.properties 默认必须配置的属性如下: **bootstrap.servers** - 指定生产者客户端连接kafka集群所需的broker地址列表,格式为host1:port1,host2:port2,可以设置一个或多个。这里并非需要所有的broker地址,因为生产者会从给定的broker里寻找其它的broker。 **key

    2024年02月05日
    浏览(38)
  • 07、Kafka ------ 消息生产者(演示 发送消息) 和 消息消费者(演示 监听消息)

    简单来说,就是一个数据项。 ▲ 消息就是 Kafka 所记录的数据节点,消息在 Kafka 中又被称为记录(record)或事件(event)。 从存储上来看,消息就是存储在分区文件(有点类似于List)中的一个数据项,消息具有 key、value、时间戳 和 可选的元数据头。 ▲ 下面是一个示例事件

    2024年01月20日
    浏览(30)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包