docker安装kafka,并集成springboot进行测试

这篇具有很好参考价值的文章主要介绍了docker安装kafka,并集成springboot进行测试。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

大家好,今天我们开始学习kafka中间件,今天我们改变一下策略,不刷视频学习,改为实践学习,在网上找一些案例功能去做,来达到学习实践的目的。

首先,是安装相关组件。

1. docker安装安装

1.1 yum-utils软件包

yum install -y yum-utils

1.2 设置阿里云镜像


yum-config-manager --add-repo http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo

1.3 安装docker

yum install docker-ce docker-ce-cli containerd.io 

1.4 启动docker

systemctl start docker

1.5 测试

docker version
docker run hello-world
docker images

至此,docker就安装完毕了。接下来就是安装zookeeper和kafka了,我这里用的是kafka2.x的版本,因此需要结合zookeeper去是使用。现在最新的kafka3.x已经可以抛弃zookeeper去单独使用了,小伙伴们有兴趣的话可以自己去动手安装实践下。

2. 安装zookeeper和kafka

2.1 docker安装zookeeper

docker pull wurstmeister/zookeeper

2.2 启动zookeeper

docker run -d --name zookeeper -p 2181:2181 -e TZ="Asia/Shanghai" --restart always wurstmeister/zookeeper 

2.3 docker查看zookeeper容器是否启动

docker ps

docker安装kafka,并集成springboot进行测试

 出现以上信息,就代表zookeeper已经安装并启动成功。

2.4 安装kafka

docker pull wurstmeister/kafka

2.5 启动kafka

docker run -d --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=124.223.205.125:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://124.223.205.125:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -e KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" -e TZ="Asia/Shanghai" wurstmeister/kafka 

2.6 用docker ps查看kafka是否启动

docker安装kafka,并集成springboot进行测试

出现以上信息,就代表kafka启动成功了。

下来就测试一下

3. 发送消息和消费消息

3.1 进入kafka容器

docker exec -it 容器id /bin/bash

cd /opt/kafka_2.13-2.8.1/bin/

 3.2 连接生产者

./kafka-console-producer.sh --broker-list localhost:9092 --topic shopping

接下来就可以发送消息了。

docker安装kafka,并集成springboot进行测试 3.3 另起一个窗口,重复3.1的动作进入kafka容器,然后连接消费者

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic shopping --from-beginning

这是就能就收消息了。

docker安装kafka,并集成springboot进行测试

 到达这里,我们的kafka就安装并测试成功了。

4. 接下来我们就创建Springboot工程来连接kafka进行消息的生产和消费

4.1 pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.4</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.volga</groupId>
    <artifactId>kafka</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <description>Demo project for Spring Boot</description>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <!-- Kafka -->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>
        <!-- 阿里巴巴 fastjson -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.58</version>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

4.2 我们创建一个订单的实体类

@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class Order {
    /**
     * 订单id
     */
    private long orderId;
    /**
     * 订单号
     */
    private String orderNum;
    /**
     * 订单创建时间
     */
    private LocalDateTime createTime;
}

4.3 创建生产者

@Component
@Slf4j
public class KafkaProvider {
    /**
     * 消息 TOPIC
     */
    private static final String TOPIC = "shopping";

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(long orderId, String orderNum, LocalDateTime createTime) {
        // 构建一个订单类
        Order order = Order.builder()
                .orderId(orderId)
                .orderNum(orderNum)
                .createTime(createTime)
                .build();

        // 发送消息,订单类的 json 作为消息体
        ListenableFuture<SendResult<String, String>> future =
                kafkaTemplate.send(TOPIC, JSONObject.toJSONString(order));

        // 监听回调
        future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onFailure(Throwable throwable) {
                log.info("生产者产生消息 失败 ## Send message fail ...");
            }

            @Override
            public void onSuccess(SendResult<String, String> result) {
                log.info("生产者产生消息 成功 ## Send message success ...");
            }
        });
    }
}

4.4 创建消费者

@Component
@Slf4j
public class KafkaConsumer {
    @KafkaListener(topics = "shopping", groupId = "group_id") //这个groupId是在yml中配置的
    public void consumer(String message) {
        log.info("消费者消费信息 ## consumer message: {}", message);
    }
}

4.5 创建测试类

@SpringBootTest
public class SpringBootKafakaApplicationTests {
    @Autowired
    private KafkaProvider kafkaProvider;

    @Test
    public void sendMessage() throws InterruptedException {
        System.out.println("是否为空??+"+kafkaProvider);
        // 发送 10 个消息
        for (int i = 0; i < 10; i++) {
            long orderId = i+1;
            String orderNum = UUID.randomUUID().toString();
            kafkaProvider.sendMessage(orderId, orderNum, LocalDateTime.now());
        }
        TimeUnit.MINUTES.sleep(1);
    }
}

4.6 要创建一个Application方法,不然项目会启动报错

@SpringBootApplication
public class KafkaApplication {
    public static void main(String[] args) {
        SpringApplication.run(KafkaApplication.class,args);
    }
}

4.7 配置application.yml

spring:
  kafka:
    # 指定 kafka 地址,我这里部署在的虚拟机,开发环境是Windows,kafkahost是虚拟机的地址, 若外网地址,注意修改为外网的IP( 集群部署需用逗号分隔)
    bootstrap-servers: 服务器ip:9092
    consumer:
      # 指定 group_id
      group-id: group_id
      auto-offset-reset: earliest
      # 指定消息key和消息体的序列化方式
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      # 发生错误后,消息重发的次数。
      retries: 0
      #当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。
      batch-size: 16384
      # 设置生产者内存缓冲区的大小。
      buffer-memory: 33554432
      # 指定消息key和消息体的序列化方式
      key-deserializer: org.apache.kafka.common.serialization.StringSerializer
      value-deserializer: org.apache.kafka.common.serialization.StringSerializer
    listener:
      # 在侦听器容器中运行的线程数。
      concurrency: 5
      #listner负责ack,每调用一次,就立即commit
      ack-mode: manual_immediate
      missing-topics-fatal: false

以上就创建项目成功了,我们运行测试方法,就能获取kafka中的消息了。

### 生产消息

docker安装kafka,并集成springboot进行测试

 ### 消费消息

docker安装kafka,并集成springboot进行测试

这里就是简单实现了kafka的消息生产和消费,后续的kafka复杂场景的实现会持续更新。

我是空谷有来人,谢谢支持。 文章来源地址https://www.toymoban.com/news/detail-424508.html

到了这里,关于docker安装kafka,并集成springboot进行测试的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • SpringBoot3.1.7集成Kafka和Kafka安装

    我们在很多系统开发都需要用到消息中间件,目前来说Kafka凭借其优秀的性能,使得它的使用率已经是名列前茅了,所以今天我们将它应用到我们的系统 在使用一个中间件一定要考虑版本的兼容性,否则后面会遇到很多问题,首先我们打开Spring的官网:Spring for Apache Kafka Spr

    2024年01月23日
    浏览(43)
  • 使用TestContainers在Docker中进行集成测试

    现代软件应用很少独立工作。典型的应用程序会与几个外部系统进行通信,如: 数据库、 消息系统、 缓存提供商 其他第三方服务。 你应该编写测试确保一切正常运行。 单元测试 有助于隔离地测试业务逻辑,不涉及任何外部服务。它们易于编写并提供几乎即时的反馈。 有了

    2024年02月08日
    浏览(40)
  • SpringBoot 如何使用 EmbeddedDatabaseBuilder 进行数据库集成测试

    在开发 SpringBoot 应用程序时,我们通常需要与数据库进行交互。为了确保我们的应用程序在生产环境中可以正常工作,我们需要进行数据库集成测试,以测试我们的应用程序是否能够正确地与数据库交互。在本文中,我们将介绍如何使用 SpringBoot 中的 EmbeddedDatabaseBuilder 来进行

    2024年02月16日
    浏览(56)
  • SpringBoot 如何使用 TestRestTemplate 进行 RESTful API 集成测试

    在使用 SpringBoot 开发 RESTful API 的过程中,我们需要进行集成测试,以确保 API 的正确性和可用性。而 TestRestTemplate 是 Spring Framework 提供的一个工具类,可以用来进行 RESTful API 的集成测试。在本文中,我们将介绍如何使用 TestRestTemplate 进行 RESTful API 集成测试。 TestRestTemplate 是

    2024年02月13日
    浏览(75)
  • SpringBoot 如何使用 TestEntityManager 进行 JPA 集成测试, 如何使用

    Spring Boot 是一个非常流行的 Java Web 开发框架,它简化了开发过程,提高了开发效率。在开发过程中,我们通常需要使用 JPA 操作数据库,为了保证代码的质量和正确性,我们需要进行集成测试。TestEntityManager 是 Spring Boot 提供的用于 JPA 集成测试的工具,它可以模拟 EntityManag

    2024年02月13日
    浏览(68)
  • Docker 安装kafka 并创建topic 进行消息通信

            Apache Kafka是一个分布式流处理平台,用于构建高性能、可扩展的实时数据流应用程序。本文将介绍如何使用Docker容器化技术来安装和配置Apache Kafka。 1、kafka安装必须先安装Zookpper 2、下载镜像 3、查看下载好的镜像 4、启动Kafka 5、查看是否创建好Kafka容器 6、进入到

    2024年03月15日
    浏览(44)
  • MinIO【部署 01】MinIO安装及SpringBoot集成简单测试

    下载 https://min.io/download#/linux; 安装文档 https://min.io/docs/minio/linux/index.html。 工作台详细使用文档 https://min.io/docs/minio/linux/administration/minio-console.html#minio-console 登录页面: 登录成功: Java Quickstart Guide https://min.io/docs/minio/linux/developers/java/minio-java.html#minio-java-quickstart Java SDK htt

    2024年02月11日
    浏览(35)
  • Docker安装ClickHouse22.6.9.11并与SpringBoot、MyBatisPlus集成

    上一篇文章CentOS6.10上离线安装ClickHouse19.9.5.36并修改默认数据存储目录记录了在旧版的操作系统上直接安装低版本 ClickHouse (脱胎于俄罗斯头号搜索引擎的技术)的过程,开启远程访问并配置密码; 其实通过 Docker 运行 ClickHouse 是我在2022年10月左右在虚拟机上实验的,当时

    2024年02月09日
    浏览(39)
  • kafka:java集成 kafka(springboot集成、客户端集成)

    摘要 对于java的kafka集成,一般选用springboot集成kafka,但可能由于对接方kafka老旧、kafka不安全等问题导致kafak版本与spring版本不兼容,这个时候就得自己根据kafka客户端api集成了。 一、springboot集成kafka 具体官方文档地址:https://docs.spring.io/spring-boot/docs/current/reference/htmlsingle/

    2023年04月22日
    浏览(62)
  • 经实测利用POSTMAN根本无法进行并发测试,大家不要再被一些搬运工给误导了

    以下为我的实测记录 一、先上我测试的接口代码,就是一个redis的tryLock分布式锁的获取,接口在获取到锁后,线程sleep了5秒,此时线程是不释放锁的,那按道理第二个请求在这个时间进来,是获取不到锁的,但结果却不是这样的 二、按照网上的那些博文,postman操作步骤如下

    2024年02月11日
    浏览(47)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包