spring boot学习第八篇:kafka

这篇具有很好参考价值的文章主要介绍了spring boot学习第八篇:kafka。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

目录

1、安装kafka

1.1确认jdk是否安装OK

1.2下载&&安装kafka

1.3验证kafka

2、连接kafka

3、在java中操作kafka


1、安装kafka

1.1确认jdk是否安装Ok

java -version

spring boot学习第八篇:kafka,Spring Boot,spring boot,java,kafka

1.2下载&&安装kafka

wget http://archive.apache.org/dist/zookeeper/zookeeper-3.4.14/zookeeper-3.4.14.tar.gz

spring boot学习第八篇:kafka,Spring Boot,spring boot,java,kafka

mkdir zk-3.4.14
tar -xvzf zookeeper-3.4.14.tar.gz -C /home/lighthouse/zk-3.4.14

spring boot学习第八篇:kafka,Spring Boot,spring boot,java,kafka

配置

进入该目录下的conf文件夹中。

zoo_sample.cfg是一个配置文件的样本,需要将这个文件复制并重命名为zoo.cfg:   

cp zoo_sample.cfg zoo.cfg

修改配置文件:

vi zoo.cfg

spring boot学习第八篇:kafka,Spring Boot,spring boot,java,kafka

配置环境变量, 使用vim打开etc目录下的profile文件:vim /etc/profile

在末尾配置环境变量,这里需要写入的是:

export ZOOKEEPER_HOME=/home/lighthouse/zk-3.4.14/zookeeper-3.4.14

export PATH=$PATH:$ZOOKEEPER_HOME/bin

写入信息并保存后,需要使配置文件生效,所用的命令为:source /etc/profile

启动zookeeper, 由于配置了环境变量,可以在系统中的任意目录执行启动zookeeper的命令,其执行的实际上是zookeeper的bin文件夹中的zkServer.sh的命令:zkServer.sh start

Zookeeper启动成功:

spring boot学习第八篇:kafka,Spring Boot,spring boot,java,kafka

下载kafka2.2.1:

wget https://archive.apache.org/dist/kafka/2.2.1/kafka_2.12-2.2.1.tgz

spring boot学习第八篇:kafka,Spring Boot,spring boot,java,kafka

解压:

tar -zxvf kafka_2.12-2.2.1.tgz

启动:

nohup bin/kafka-server-start.sh config/server.properties > output.txt &

spring boot学习第八篇:kafka,Spring Boot,spring boot,java,kafka

其中server.properties文件内容如下:

spring boot学习第八篇:kafka,Spring Boot,spring boot,java,kafka

1.3验证kafka

执行命令:bin/kafka-topics.sh –version

看不到版本号

spring boot学习第八篇:kafka,Spring Boot,spring boot,java,kafka

2、连接kafka,并执行命令

2.1创建topic:执行命令:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

spring boot学习第八篇:kafka,Spring Boot,spring boot,java,kafka

2.2查看topic:执行命令:

bin/kafka-topics.sh --list --zookeeper localhost:2181 

spring boot学习第八篇:kafka,Spring Boot,spring boot,java,kafka

bin/kafka-topics.sh --list --zookeeper 43.138.0.199:2181

spring boot学习第八篇:kafka,Spring Boot,spring boot,java,kafka

2.3使用kafka-console-producer.sh 发送消息:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

spring boot学习第八篇:kafka,Spring Boot,spring boot,java,kafka

 bin/kafka-console-producer.sh --broker-list 43.138.0.199:9092 --topic test

spring boot学习第八篇:kafka,Spring Boot,spring boot,java,kafka

2.4使用kafka-console-consumer.sh消费消息:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

spring boot学习第八篇:kafka,Spring Boot,spring boot,java,kafka

bin/kafka-console-consumer.sh --bootstrap-server 43.138.0.199:9092 --topic test --from-beginning

spring boot学习第八篇:kafka,Spring Boot,spring boot,java,kafka

3、在java中操作kafka

pom.xml增加如下依赖:

<dependencies>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka_2.11</artifactId>
      <version>1.0.1</version>
    </dependency>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>1.0.1</version>
    </dependency>
  </dependencies>

Producer.java代码如下:

package com.hmblogs.backend.util;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.log4j.Logger;

import java.util.Properties;

/******************************************************
 ****** @ClassName   : Producer.java
 ****** @author      : milo ^ ^
 ****** @date        : 2018 03 14 11:34
 ****** @version     : v1.0.x
 *******************************************************/
public class Producer {

    static Logger log = Logger.getLogger(Producer.class);

    private static final String TOPIC = "test";
    private static final String BROKER_LIST = "43.138.0.199:9092";
    private static KafkaProducer<String,String> producer = null;

    /*
    初始化生产者
     */
    static {
        Properties configs = initConfig();
        producer = new KafkaProducer<String, String>(configs);
    }

    /*
    初始化配置
     */
    private static Properties initConfig(){
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,BROKER_LIST);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
        return properties;
    }

    public static void main(String[] args) throws InterruptedException {
        //消息实体
        ProducerRecord<String , String> record = null;
        for (int i = 0; i < 3; i++) {
            record = new ProducerRecord<String, String>(TOPIC, "value"+(int)(10*(Math.random())));
            //发送消息
            producer.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (null != e){
                        System.out.println("send error" + e.getMessage());
                    }else {
                        System.out.println(String.format("offset:%s,partition:%s",recordMetadata.offset(),recordMetadata.partition()));
                    }
                }
            });
        }
        producer.close();
    }
}

执行报错,如下:

spring boot学习第八篇:kafka,Spring Boot,spring boot,java,kafka

send errorExpiring 3 record(s) for test-0: 30034 ms has passed since batch creation plus linger time
send errorExpiring 3 record(s) for test-0: 30034 ms has passed since batch creation plus linger time
send errorExpiring 3 record(s) for test-0: 30034 ms has passed since batch creation plus linger time

搜索资料,尝试解决

既然本地调不通,那我就发到linux机器里面去调试

改成ProducerController

spring boot学习第八篇:kafka,Spring Boot,spring boot,java,kafka

代码如下:

package com.hmblogs.backend.controller;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Properties;

@RestController
@Slf4j
public class ProducerController {

    private static final String TOPIC = "test";
    private static final String BROKER_LIST = "43.138.0.199:9092";
    private static KafkaProducer<String,String> producer = null;

    /**
     * sendMessage
     * @return
     */
    @GetMapping(value = "/sendMessage")
    public void redisTestLock(){
        log.info("sendMessage");
        Properties configs = initConfig();
        producer = new KafkaProducer<String, String>(configs);
        send();
    }

    /*
    初始化配置
     */
    private Properties initConfig(){
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,BROKER_LIST);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
        return properties;
    }

    private void send() {
        //消息实体
        ProducerRecord<String , String> record = null;
        for (int i = 0; i < 3; i++) {
            record = new ProducerRecord<String, String>(TOPIC, "value:"+(int)(10*(Math.random())));
            //发送消息
            producer.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (null != e){
                        log.error("send error:" + e.getMessage());
                    }else {
                        log.info(String.format("offset:%s,partition:%s",recordMetadata.offset(),recordMetadata.partition()));
                    }
                }
            });
        }
        producer.close();
    }
}

 执行clean install命令,打包成jar文件,上传到云主机里,然后启动

java -jar hmblogs.jar

spring boot学习第八篇:kafka,Spring Boot,spring boot,java,kafka

访问GET接口

http://43.138.0.199:8081/sendMessage

查看hmblogs服务的日志 

spring boot学习第八篇:kafka,Spring Boot,spring boot,java,kafka 然后,重新查看该topic的消息

spring boot学习第八篇:kafka,Spring Boot,spring boot,java,kafka

消费的验证也改成通过调用接口来实现

ConsumerController代码如下:

package com.hmblogs.backend.controller;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Collections;
import java.util.Properties;

@RestController
@Slf4j
public class ConsumerController {

    private static final String TOPIC = "test";
    private static final String BROKER_LIST = "43.138.0.199:9092";
    private static KafkaConsumer<String,String> consumer = null;

    /**
     * consumeMessage
     * @return
     */
    @GetMapping(value = "/consumeMessage")
    public void consumeKafkaMessage(){
        log.info("consumeMessage");
        Properties configs = initConfig();
        consumer = new KafkaConsumer<String, String>(configs);
        consumer.subscribe(Collections.singletonList(TOPIC));
        send();
    }

    private static Properties initConfig(){
        Properties properties = new Properties();
        properties.put("bootstrap.servers",BROKER_LIST);
        properties.put("group.id","0");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("enable.auto.commit", "true");
        properties.setProperty("auto.offset.reset", "earliest");
        return properties;
    }


    private void send() {
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(10);
            for (ConsumerRecord<String, String> record : records) {
                log.info("Received message: key={}, value={}, partition={}, offset={}\n",
                        record.key(), record.value(), record.partition(), record.offset());
            }
        }
    }
}

 然后clean install,发到云主机里,调用如下GET接口:

http://43.138.0.199:8081/consumeMessage

hmblogs.log日志显示内容如下截图:

spring boot学习第八篇:kafka,Spring Boot,spring boot,java,kafka文章来源地址https://www.toymoban.com/news/detail-798838.html

到了这里,关于spring boot学习第八篇:kafka的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Spring Boot 整合 Kafka

    环境:自行创建 Spring Boot 项目,添加测试依赖,并启动 Zookeeper 和 kafka 服务。 注意:Zookeeper 默认好像占用 8080 端口,自己注意端口占用问题。 1. 添加依赖 2. 添加配置 3. 创建消息生产者 4. 创建消息消费者 5. 消息发送测试

    2023年04月11日
    浏览(36)
  • Spring Boot集成Kafka详解

    Spring Boot是一个用于构建独立的、生产级的Java应用程序的框架,而Kafka是一种高吞吐量的分布式发布订阅消息系统。在本文中,我们将详细解释如何在Spring Boot项目中集成Kafka。 1. 添加依赖 首先,我们需要在项目的pom.xml文件中添加Spring Boot和Kafka的依赖。 2. 配置Kafka 接下来,

    2024年02月09日
    浏览(40)
  • Spring Boot Kafka Example

    作者:禅与计算机程序设计艺术 Kafka是一个分布式消息系统,它可以实现消息的持久化、高并发量处理以及实时的可靠传输。相比于其他消息队列中间件(例如RabbitMQ、ActiveMQ),其最大的优点在于它提供的跨越语言的API支持,支持多种编程语言的客户端。作为一种轻量级的分

    2024年02月07日
    浏览(42)
  • 在Spring Boot微服务集成spring-kafka操作Kafka集群

    记录 :461 场景 :在Spring Boot微服务集成spring-kafka-2.8.2操作Kafka集群。使用KafkaTemplate操作Kafka集群的生产者Producer。使用@KafkaListener操作Kafka集群的消费者Consumer。 版本 :JDK 1.8,Spring Boot 2.6.3,kafka_2.12-2.8.0,spring-kafka-2.8.2。 Kafka集群安装 :https://blog.csdn.net/zhangbeizhen18/article/details

    2024年02月10日
    浏览(50)
  • 轻松上手 Spring Boot & Kafka 实战!

    2、解压包 tar -zxvf kafka_2.11-1.0.0.tgzmv kafka_2.11-1.0.0 kafka1mv kafka_2.11-1.0.0 kafka2mv kafka_2.11-1.0.0 kafka3 3、创建ZK集群 修改ZK配置文件:kafka1-3/config/zookeeper.properties分别修改对应的参数。 dataDir=/usr/local/kafka/zookeeper1 dataLogDir=/usr/local/kafka/zookeeper/log clientPort=2181 maxClientCnxns=0 tickTime=2000

    2024年04月12日
    浏览(43)
  • spring boot配置双Kafka方法

    第一步:application.yml的配置 第二步:配置config 注意!注意!注意!!!代码中的一些字段名自己改一下。xxxx之类的换成自己的就行 第三步: 直接在你要用到的类中直接引用就行。 跟着以上三步走就可以简单的配置两个Kafka了,还有跟高级的spring切面切点的方法作者还没有

    2024年02月11日
    浏览(40)
  • Spring Boot集成kafka的相关配置

    额外依赖只需要这一个,kafka-client 不是springboot 的东西,那是原生的 kafka 客户端, kafka-test也不需要,是用代码控制broker的东西。 也可以用java类Config 方式配置,如果没有特殊要求,可以只用spring配置的方式 注意加上@Component,被spring管理监听才有效 注意这里不能用@Value注解

    2024年02月07日
    浏览(46)
  • Spring Boot+Kafka实战生产级Kafka消费组

    作者:禅与计算机程序设计艺术 Kafka是一个开源分布式消息系统,最初由LinkedIn开发,之后成为Apache项目的一部分。Kafka主要用于大数据实时流处理,具有低延迟、高吞吐量等特点。本文将会从基本概念、术语说明、原理及应用场景三个方面对Kafka进行详细介绍。 Kafka作为一个

    2024年02月10日
    浏览(34)
  • 在Spring Boot微服务集成Kafka客户端(spring-kafka)操作Kafka

    记录 :457 场景 :在Spring Boot微服务集成Kafka客户端spring-kafka-2.8.2操作Kafka。使用Spring封装的KafkaTemplate操作Kafka生产者Producer。使用Spring封装的@KafkaListener操作Kafka的消费者Consumer。 版本 :JDK 1.8,Spring Boot 2.6.3,kafka_2.12-2.8.0,spring-kafka-2.8.2。 Kafka安装 :https://blog.csdn.net/zhangbeizhen1

    2024年02月09日
    浏览(57)
  • Spring Boot配置多个Kafka数据源

    application.properties配置文件如下 1.第一个kakfa 2.第二个kakfa 备注: 生产者消费者代码参考链接,开发同学需要以实际情况按要求自己变更下代码即可: Spring Boot 集成多个 Kafka_springboot集成多个kafka_//承续缘_纪录片的博客-CSDN博客

    2024年02月07日
    浏览(73)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包