Java对接kafka简单示例

这篇具有很好参考价值的文章主要介绍了Java对接kafka简单示例。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Java可以使用Apache Kafka提供的kafka-clients库来对接Kafka。下面是一个简单的示例代码,展示了如何使用Java对接Kafka并发送和接收消息: 首先,确保已经在项目中添加了kafka-clients库的依赖。

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.clients.consumer.*;
import java.util.Properties;
public class KafkaExample {
    private static final String TOPIC = "test-topic";
    private static final String BOOTSTRAP_SERVERS = "localhost:9092";
    
    public static void main(String[] args) {
        // 生产者示例
        Properties producerProps = new Properties();
        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        
        Producer<String, String> producer = new KafkaProducer<>(producerProps);
        
        String message = "Hello, Kafka!";
        ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, message);
        
        producer.send(record, new Callback() {
            @Override
            public void onCompletion(RecordMetadata metadata, Exception exception) {
                if (exception != null) {
                    System.err.println("Failed to send message: " + exception.getMessage());
                } else {
                    System.out.printf("Sent message: topic=%s, partition=%d, offset=%d%n",
                            metadata.topic(), metadata.partition(), metadata.offset());
                }
            }
        });
        
        producer.close();
        
        // 消费者示例
        Properties consumerProps = new Properties();
        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        Consumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
        consumer.subscribe(Collections.singletonList(TOPIC));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("Received message: topic=%s, partition=%d, offset=%d, key=%s, value=%s%n",
                        record.topic(), record.partition(), record.offset(), record.key(), record.value());
            }
        }
    }
}

以上代码演示了如何使用Kafka的生产者将消息发送到指定的topic,以及如何使用消费者从指定的topic接收消息。请根据实际情况修改TOPICBOOTSTRAP_SERVERS变量的值。

如果你使用Maven构建你的项目,可以在项目的pom.xml文件中添加以下依赖来引入Kafka客户端库:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.8.0</version>
</dependency>

然后,你可以在你的Java代码中使用Kafka的API来对接Kafka。以下是一个示例的pom.xml文件,展示了如何添加Kafka依赖:

<project>
    <!-- 其他配置 -->
    <dependencies>
        <!-- Kafka依赖 -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.8.0</version>
        </dependency>
    </dependencies>
    <!-- 其他配置 -->
</project>

请注意,上面的示例使用了Kafka的2.8.0版本,你可以根据实际情况选择合适的版本。 添加依赖后,你可以在你的Java代码中使用Kafka的API,如上面的示例代码所示。记得在你的Java文件中引入相关的类,例如:

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.clients.consumer.*;
import java.util.Properties;

这样,你就可以使用Maven管理你的项目的依赖,并使用Kafka的API对接Kafka。

目录

Java对接Kafka简单示例

步骤一:引入Kafka客户端库

步骤二:创建Kafka生产者

步骤三:创建Kafka消费者

总结


Kafka是一种分布式流处理平台,最初由LinkedIn开发并开源。它以高吞吐量、可扩展性和容错性为特点,用于处理大规模的实时数据流。 Kafka的设计目标是提供一种高效的、可持久化的、分布式发布-订阅消息系统。它采用了分布式、分区和复制的架构,可以同时处理大量的实时数据流,并将数据持久化存储在集群中,以便后续的数据分析和处理。 Kafka的核心概念包括以下几个部分:

  1. Topic(主题):消息的发布和订阅都是基于主题进行的,每个主题可以分为多个分区。
  2. Partition(分区):主题可以被划分为多个分区,每个分区是一个有序的日志文件,用于存储消息。分区可以分布在不同的Kafka集群节点上,实现数据的分布式存储和处理。
  3. Producer(生产者):生产者负责向Kafka集群发送消息,并将消息发布到指定的主题和分区。
  4. Consumer(消费者):消费者从Kafka集群订阅主题,并消费分区中的消息。消费者可以以不同的方式进行消息消费,如批量消费、实时消费等。
  5. Broker(代理服务器):Kafka集群中的每个节点都被称为代理服务器,负责存储和处理消息。
  6. ZooKeeper(动物管理员):Kafka使用ZooKeeper来进行集群的协调和管理,包括领导者选举、分区分配和集群状态的维护等。 Kafka广泛应用于大数据领域,特别适用于实时数据流处理、日志收集和数据管道等场景。它可以与其他大数据生态系统工具(如Hadoop、Spark、Flink等)无缝集成,实现高效的数据处理和分析。

Java对接Kafka简单示例

Kafka是一种高吞吐量的分布式消息队列系统,常用于处理大规模的实时数据流。在Java中,我们可以使用Kafka提供的客户端库来对接Kafka,并进行消息的发送和接收。以下是一个简单的Java对接Kafka的示例。

步骤一:引入Kafka客户端库

首先,我们需要在Java项目中引入Kafka客户端库。可以通过Maven或Gradle等构建工具来添加Kafka依赖项。例如,使用Maven可以在​​pom.xml​​中添加以下依赖项:

xmlCopy code<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.8.0</version>
</dependency>

步骤二:创建Kafka生产者

接下来,我们需要创建一个Kafka生产者来发送消息。首先,我们需要设置Kafka的相关配置,如Kafka服务器的地址和端口号,以及消息的序列化方式等。然后,我们可以创建一个​​Producer​​对象,并使用​​send()​​方法来发送消息。以下是一个简单的示例代码:

javaCopy codeimport org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class KafkaProducerExample {
    public static void main(String[] args) {
        // 设置Kafka配置
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9092");
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        // 创建Kafka生产者
        Producer<String, String> producer = new KafkaProducer<>(properties);
        // 发送消息
        String topic = "example-topic";
        String key = "key1";
        String value = "Hello, Kafka!";
        ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
        producer.send(record, new Callback() {
            @Override
            public void onCompletion(RecordMetadata metadata, Exception exception) {
                if (exception != null) {
                    System.err.println("Failed to send message: " + exception.getMessage());
                } else {
                    System.out.println("Sent message: topic = " + metadata.topic() +
                            ", partition = " + metadata.partition() +
                            ", offset = " + metadata.offset());
                }
            }
        });
        // 关闭Kafka生产者
        producer.close();
    }
}

步骤三:创建Kafka消费者

除了发送消息,我们还可以创建一个Kafka消费者来接收消息。类似地,我们需要设置Kafka的相关配置,并使用​​poll()​​方法来获取消息。以下是一个简单的示例代码:

javaCopy codeimport org.apache.kafka.clients.consumer.*;
import java.util.Collections;
import java.util.Properties;
public class KafkaConsumerExample {
    public static void main(String[] args) {
        // 设置Kafka配置
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9092");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("group.id", "example-group");
        // 创建Kafka消费者
        Consumer<String, String> consumer = new KafkaConsumer<>(properties);
        // 订阅主题
        String topic = "example-topic";
        consumer.subscribe(Collections.singletonList(topic));
        // 拉取消息
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                System.out.println("Received message: topic = " + record.topic() +
                        ", partition = " + record.partition() +
                        ", offset = " + record.offset() +
                        ", key = " + record.key() +
                        ", value = " + record.value());
            }
        }
    }
}

总结

通过以上示例,我们可以看到Java如何对接Kafka并进行简单的消息发送和接收。使用Kafka可以实现高吞吐量的消息处理,并且具有良好的可扩展性和容错性。通过学习Kafka的使用,我们可以更好地应用它来处理实时数据流和构建大规模的分布式系统。文章来源地址https://www.toymoban.com/news/detail-860945.html

到了这里,关于Java对接kafka简单示例的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Java Socket几个简单的入门示例

    Socket难的地方是服务端的编写,首先要合理地管理客户端连接,能让客户端持续不断地连接进来。其次每个连接的读写不能互相干扰,不能因为一个连接在传输数据,别的连接就得挂着。搞定了这两点,基本上就解决了Socket编程80%的问题。 以下是根据个人经验,写了几个示例

    2024年02月06日
    浏览(57)
  • JAVA连接Kafka及SSL认证

    1、Maven驱动(注意一定要对应自己的Kafka版本) 2、生产者生产数据 2.1 普通方式创建Producer 2.2 ssl加密和认证创建Producer(Plain) 2.3 ssl加密和认证创建Producer(Plain使用配置文件方式) kafka_client_jaas_plain配置文件信息: 具体代码实现: 2.4 ssl加密和认证创建Producer(Scram) 2.5 ssl加密和认证创

    2024年02月12日
    浏览(42)
  • Java动态创建kafka并保持连接长期可用

    首先了解下kafka是怎么初始化的,看源码,就可以了解项目启动的时候,它就读取配置文件里面的kafka配置,然后初始化一个KafkaTemplate连接,然后接下来使用kafka推送或监听,就是这一次连接的。读取的主机和端口是在“bootstrap-servers”后面的那一串配置。后面就是一些生产者

    2024年02月07日
    浏览(45)
  • Kafka 认证一:PlainLoginModule 认证及 Java 连接测试

    背景 项目需求是:通过 Web 页面配置 Kafka 信息并存储,且支持安全认证模式。 本文介绍 Kafka 帐号密码认证的完整实践流程,自己实践一遍,才能穿起各个概念。Kafka 的 Plain 简单文本认证方式比较简单,只需要 Kafka 服务端维护用户列表,客户端同样的安全认证配置即可。 本

    2023年04月09日
    浏览(37)
  • Kafka-入门及简单示例

    结果 一个生产者 一个消费者 生产者在某些事件时触发消息产生 消费者根据topic监听事件 如果有生产者产生消息 自动进行消费

    2024年02月05日
    浏览(36)
  • kafka简单介绍和代码示例

    “这是一篇理论文章,给大家讲一讲kafka” 在大数据领域开发者常常会听到MQ这个术语,该术语便是消息队列的意思, Kafka是分布式的发布—订阅消息系统。它最初由LinkedIn(领英)公司发布,使用Scala语言编写,与2010年12月份开源,成为Apache的顶级项目。Kafka是一个高吞吐量的、

    2024年01月17日
    浏览(32)
  • Java连接rabbitMQ(三步超简单)

    rabbitMQ安装教程网上特别多就不多赘述,这里主要说一下怎么去连接 创建一个Maven项目,打开pom.xml,添加两个依赖,并更新Maven。 在src-main-java中新建一个文件夹utils,在此文件夹中添加class:rabbitMQUtils 需要注意的是要将ip、用户名、密码更换成自己的,如果是在服务器上装的

    2024年02月13日
    浏览(34)
  • Java----使用eureka进行注册连接(微服务简单实现)

            当采用微服务架构时,各个业务流程被逐一分解,虽说是分解,但还是要进行连接的,最简单的就是使用http请求,将他们联系起来,通过给容器注入restTemplate,然后使用内置的方法进行请求,但是在请求过程中,难免就会涉及请求地址,请求路径之类,而这些变

    2024年02月15日
    浏览(31)
  • Flink连接Hbase时的kafka报错:java.lang.NoClassDefFoundError: org/apache/kafka/common/utils/ThreadUtils

    书接上文 【Flink实时数仓】需求一:用户属性维表处理-Flink CDC 连接 MySQL 至 Hbase 实验及报错分析http://t.csdn.cn/bk96r 我隔了一天跑Hbase中的数据,发现kafka报错,但是kafka在这个代码段中并没有使用,原因就是我在今天的其他项目中添加的kafka依赖导致了冲突。 注释掉kafka依赖,

    2024年02月04日
    浏览(49)
  • 【Java】使用PowerMockito mock static方法/new对象/mock对象的public或private方法的简单示例

    1.1 打桩类的public static方法 测试用例中如果需要对public静态方法的打桩,针对测试类增加注解@RunWith(PowerMockRunner.class)同时针对静态方法所在的类增加注解@PrepareForTest({StaticMethod.class}),接着在测试用例调用方法之前增加 PowerMockito.mockStatic(StaticMethod.class); PowerMockito.when(StaticMet

    2024年01月24日
    浏览(41)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包