大数据之Kafka————java来实现kafka相关操作

这篇具有很好参考价值的文章主要介绍了大数据之Kafka————java来实现kafka相关操作。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、在java中配置pom

 <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.11</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>2.8.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka_2.12</artifactId>
      <version>2.8.0</version>
    </dependency>
  </dependencies>

二、生产者方法

(1)、Producer

Java中写在生产者输入内容在kafka中可以让消费者提取

[root@kb144 config]# kafka-console-consumer.sh --bootstrap-server 192.168.153.144:9092 --topic kb22

package nj.zb.kb22.Kafka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.Scanner;

/**
 * 用java生产消息 在xshell消费消息
 */
public class MyProducer {
    public static void main(String[] args) {
        Properties properties = new Properties();
        //生产者的配置文件
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.153.144:9092");
        //key的序列化
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        //value的序列化
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
        /**
         * ack应答机制
         * 0
         * 1
         * all
        */
       properties.put(ProducerConfig.ACKS_CONFIG,"1");
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
        Scanner scanner = new Scanner(System.in);
        while (true){
            System.out.println("请输入kafka的内容");

            String msg =scanner.next();
            ProducerRecord<String,String> record = new ProducerRecord<String, String>("kb22",msg);
            producer.send(record);
        }
    }
}

(2)、Producer进行多线程操作

  生产者多线程是一种常见的技术实践,可以提高消息生产的并发性和吞吐量。通过将消息生产任务分配给多个线程来并行地发送消息,可以有效地利用系统资源,加快消息的发送速度。

package nj.zb.kb22.Kafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class MyProducer2 {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < 10; i++) {//i代表线程
            Thread thread =new Thread(new Runnable() {
                @Override
                public void run() {
                    Properties properties = new Properties();
                    
  properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.153.144:9092");
   properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);              
  properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
  properties.put(ProducerConfig.ACKS_CONFIG,"0");
  KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
                    //多线程操作 j代表消息
                    for (int j = 0; j < 100; j++) {
                        String msg=Thread.currentThread().getName()+" "+ j;
                        System.out.println(msg);
                        ProducerRecord<String, String> re = new ProducerRecord<String, String>("kb22", msg);
                        producer.send(re);

                    }

                }
            });
            executorService.execute(thread);
        }
        executorService.shutdown();
        while (true){
            if (executorService.isTerminated()){
                System.out.println("game over");
                break;
            }

        }
    }
}

三、消费者方法

(1)、Consumer

通过java来实现消费者

package nj.zb.kb22.Kafka;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class MyConsumer {
    public static void main(String[] args) {
        Properties properties = new Properties();

        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.153.144:9092");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);

        //设置拉取信息后是否自动提交(kafka记录当前app是否已经获取到此信息),false 手动提交 ;true 自动提交
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");

        /**
         * earliest 第一条数据开始拉取(当前应该没有获取过此topic信息)
         * latest 获取最新的数据(当前没有获取过此topic信息)
         * none
         * group消费者分组的概念
         */
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"GROUP3");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
        //创建好kafka消费者对象后,订阅消息,指定消费的topic
        consumer.subscribe(Collections.singleton("kb22"));

        while (true){
            Duration mills = Duration.ofMillis(100);
            ConsumerRecords<String, String> records = consumer.poll(mills);
            for (ConsumerRecord<String,String> record:records){
                String topic = record.topic();
                int partition = record.partition();
                long offset = record.offset();
                String key = record.key();
                String value = record.value();
                long timestamp = record.timestamp();
                System.out.println("topic:"+topic+"\tpartition"+partition+"\toffset"+offset+"\tkey"+key+"\tvalue"+value+"\ttimestamp"+timestamp);
            }
            //consumer.commitAsync();//手动提交
        }
    }
}

(2)、设置多人访问文章来源地址https://www.toymoban.com/news/detail-674255.html

package nj.zb.kb22.Kafka;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class MyConsumerThread {
    //模仿多人访问
    public static void main(String[] args) {
        for (int i = 0; i <3; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    Properties properties = new Properties();

                    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.153.144:9092");
                    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
                    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);

                    //设置拉取信息后是否自动提交(kafka记录当前app是否已经获取到此信息),false 手动提交 ;true 自动提交
                    properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");

                    /**
                     * earliest 第一条数据开始拉取(当前应该没有获取过此topic信息)
                     * latest 获取最新的数据(当前没有获取过此topic信息)
                     * none
                     * group消费者分组的概念
                     */
                    properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
                    properties.put(ConsumerConfig.GROUP_ID_CONFIG,"GROUP3");

                    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);

                    consumer.subscribe(Collections.singleton("kb22"));
                    while (true){
                        //poll探寻数据
                        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                        for (ConsumerRecord<String,String>record:records){
                            String topic = record.topic();
                            int partition = record.partition();
                            long offset = record.offset();
                            String key = record.key();
                            String value = record.value();
                            long timestamp = record.timestamp();
                            String name = Thread.currentThread().getName();
                            System.out.println("name"+name
                                    +"\ttopic:"+topic
                                    +"\tpartition" +partition
                                    +"\toffset"+offset
                                    +"\tkey"+key
                                    +"\tvalue"+value
                                    +"\ttimestamp"+timestamp
                            );
                        }
                    }
                }
            }).start();

        }
    }
}

到了这里,关于大数据之Kafka————java来实现kafka相关操作的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【Kafka】Java实现数据的生产和消费

    Kafka 是由 LinkedIn 公司开发的,它是一个分布式的,支持多分区、多副本,基于 Zookeeper 的分布式消息流平台,它同时也是一款开源的 基于发布订阅模式的消息引擎系统 。 Broker:消息中间件处理节点,一个Kafka节点就是一个Broker,一个或者多个Broker可以组成一个Kafka集群; T

    2023年04月19日
    浏览(46)
  • 【Flink-Kafka-To-Mongo】使用 Flink 实现 Kafka 数据写入 Mongo(根据对应操作类型进行增、删、改操作,写入时对时间类型字段进行单独处理)

    需求描述: 1、数据从 Kafka 写入 Mongo。 2、相关配置存放于 Mysql 中,通过 Mysql 进行动态读取。 3、此案例中的 Kafka 是进行了 Kerberos 安全认证的,如果不需要自行修改。 4、Kafka 数据为 Json 格式,获取到的数据根据操作类型字段进行增删改操作。 5、读取时使用自定义 Source,写

    2024年02月22日
    浏览(50)
  • java操作kafka读写操作

    1,前提,kafka 的 server.properties里面开通了 2,防火墙 3,java代码 生产者

    2024年02月13日
    浏览(38)
  • Java代码操作Kafka

    生产者主要的对象有: KafkaProducer , ProducerRecord 。 其中KafkaProducer 是用于发送消息的类 , ProducerRecord 类 用于封装Kafka的消息 。  KafkaProducer 的创建需要指定的参数和含义: 参数 说明 bootstrap.servers 配置生产者如何与broker建立连接。该参数设置的是初始化参数。如果生产者需

    2024年02月13日
    浏览(39)
  • 使用Kafka客户端(spring-kafka)的Java API操作Kafka的Topic

    记录 :458 场景 :在Spring Boot微服务集成Kafka客户端spring-kafka-2.8.2操作Kafka的Topic的创建和删除。 版本 :JDK 1.8,Spring Boot 2.6.3,kafka_2.12-2.8.0,spring-kafka-2.8.2。 Kafka安装 :https://blog.csdn.net/zhangbeizhen18/article/details/129071395 1.微服务中 配置Kafka信息 1.1在pom.xml添加依赖 pom.xml文件: 解析

    2024年02月09日
    浏览(41)
  • 使用Kafka客户端(kafka-clients)的Java API操作Kafka的Topic

    记录 :460 场景 :在Spring Boot微服务集成Kafka客户端kafka-clients-3.0.0操作Kafka的Topic的创建和删除。 版本 :JDK 1.8,Spring Boot 2.6.3,kafka_2.12-2.8.0,kafka-clients-3.0.0。 Kafka安装 :https://blog.csdn.net/zhangbeizhen18/article/details/129071395 1.微服务中 配置Kafka信息 1.1在pom.xml添加依赖 pom.xml文件: 解析

    2024年02月09日
    浏览(69)
  • 使用kafka-clients的Java API操作Kafka集群的Topic

    记录 :464 场景 :在Spring Boot微服务集成Kafka客户端kafka-clients-3.0.0操作Kafka集群的Topic的创建和删除。 版本 :JDK 1.8,Spring Boot 2.6.3,kafka_2.12-2.8.0,kafka-clients-3.0.0。 Kafka集群安装 :https://blog.csdn.net/zhangbeizhen18/article/details/131156084 1.微服务中 配置Kafka信息 1.1在pom.xml添加依赖 pom.xml文

    2024年02月09日
    浏览(41)
  • 使用spring-kafka的Java API操作Kafka集群的Topic

    记录 :462 场景 :在Spring Boot微服务集成spring-kafka-2.8.2操作Kafka集群的Topic的创建和删除。 版本 :JDK 1.8,Spring Boot 2.6.3,kafka_2.12-2.8.0,spring-kafka-2.8.2。 Kafka集群安装 :https://blog.csdn.net/zhangbeizhen18/article/details/131156084 1.微服务中 配置Kafka信息 1.1在pom.xml添加依赖 pom.xml文件: 解析:

    2024年02月10日
    浏览(42)
  • Doris系列之导入Kafka数据操作

    注:大家觉得博客好的话,别忘了点赞收藏呀,本人每周都会更新关于人工智能和大数据相关的内容,内容多为原创,Python Java Scala SQL 代码,CV NLP 推荐系统等,Spark Flink Kafka Hbase Hive Flume等等~写的都是纯干货,各种顶会的论文解读,一起进步。 今天和大家分享一下Doris系列之

    2024年02月08日
    浏览(42)
  • Flume采集数据到Kafka操作详解

    目录 一、创建一个Kafka主题 二、配置Flume 三、开启Flume 四、开启Kafka消费者 五、复制文件到Flume监控的source目录下 六、查看Flume是否能够成功采集 七、采集后查看Kafka消费者主题 八、采集数据错误解决办法 1.Ctrl+C关闭flume 2.删除出错的topic并重新创建 3.删除对应Flume文件中指定

    2024年02月09日
    浏览(61)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包