环境搭建
前置条件
-
JDK
安装教程自行百度-这个比较简单。 -
zookeeper
-
zookeeper安装参考地址((2条消息) 快速搭建-分布式远程调用框架搭建-dubbo+zookper+springboot demo 演示_康世行的博客-CSDN博客)
1 ,复制 conf 文件夹下面的 zoo_sample.cfg 改名为 zoo.cfg 即可。因为没有配置文件,zookeeper 无法启动 2 创建 dataDir 的临时目录 mkdir -p /temp/zookeeper //配置文件参考下面截图 修改完配置文件之后进行启动 3 ,启动 sh zkServer.sh start
-
修改zookeeper配合文件
-
启动成功
ps aux|grep zookeeper
-
开放端口号
1 开放2181 端口 1.1 查看已经开发的端口 ,避免端口冲突 firewall-cmd --list-ports 1.2 开放2181 端口 firewall-cmd --zone=public --add-port=2181/tcp --permanent 2 重启防火墙 使用规则生效 firewall-cmd --reload 2(因为我使用的是腾讯云,所以还得把腾讯云的控制台防火墙端口放 开 2181
安装kafka 2.12.x 版本
-
下载kafka安装包
cd /opt wget http://archive.apache.org/dist/kafka/2.8.2/kafka_2.12-2.8.2.tgz
-
安装遇到的问题(由于网站证书不安全导致)
-
解决方案
sudo yum install -y ca-certificates //继续使用weget进行下载
-
下载成功
-
-
安装
-
解压
tar zxvf kafka_2.12-2.8.2.tgz
-
进入kafka目录
cd kafka_2.12-2.8.2/ 创建 logs 目录 mkdir logs
-
修改配置文件
# 修改以下配置 # 1.broker.id : 配置的是集群环境,要求每台kafka都有唯一的brokerid # 2.log.dir : 数据存放的目录 # 3.zookeeper.connect : zookeeper连接池地址信息(zookeeper集群) # 4.delete.topic.enable : 是否直接删除topic # 5.host.name : 主机名称 # 6.listeners=PLAINTEXT://server1:9092 vim /opt/kafka_2.12-2.8.1/config/server.properties advertised.listeners=PLAINTEXT://:9092 //在配置文件把这行注释解开 log.dirs=/opt/kafka_2.12-2.8.1/logs zookeeper.connect=server1:2181,server2:2181,server3:2181 # 文件尾部添加以下内容 delete.topic.enable=true # 退出并保存
-
启动
./kafka-server-start.sh -daemon ../config/server.properties //后台启动 ps aux|grep kafka //查询kafka 运行状态
-
-
使用示例(发送消息)
-
服务器端测试kafka发送消息和消费消息
-
创建topic
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testTopic
-
查看已经创建的topic
./kafka-topics.sh --list --zookeeper localhost:2181
-
发送消息
./kafka-console-producer.sh --broker-list localhost:9092 --topic testTopic
-
消费消息
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic testTopic --from-beginning
-
-
代码测试
pom
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.2.8.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.example</groupId> <artifactId>springBoot-kafka-demo</artifactId> <version>0.0.1-SNAPSHOT</version> <name>springBoot-kafka-demo</name> <description>springBoot-kafka-demo</description> <properties> <java.version>1.8</java.version> <fastjson.version>1.2.58</fastjson.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <!-- kafkfa --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>${fastjson.version}</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
yml
server: port: 8081 spring: application: name: kafka-demo kafka: bootstrap-servers: 124.222.227.132:9092 consumer: group-id: kafka-demo-kafka-group key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
controller
package com.example.springbootkafkademo.controller; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; /** * @Description 测试发送消息 * @ClassName TestController * @Author 康世行 * @Date 22:09 2023/2/5 * @Version 1.0 **/ @RestController @RequestMapping("/test") public class TestController { @Autowired private KafkaTemplate<String,String> kafkaTemplate; @GetMapping("/send/{msg}/{topic}") public String sendMessage(@PathVariable("msg") String msg,@PathVariable("topic") String topic){ //发送消息到kafka kafkaTemplate.send(topic,msg); return "发送成功!"; } }
service
package com.example.springbootkafkademo.service; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Service; /** * @Description 监听发送的消息 * @ClassName HelloListener * @Author 康世行 * @Date 22:14 2023/2/5 * @Version 1.0 **/ @Service public class HelloListener { /** * 消费者端:指定监听话题 * * @param consumerRecord 监听到数据 */ @KafkaListener(topics = {"testTopic"}) public void handlerMsg(ConsumerRecord<String, String> consumerRecord) { System.out.println("接收到消息:消息值:" + consumerRecord.value() + ", 消息偏移量:" + consumerRecord.offset()); } }
测试
127.0.0.1:8081/test/send/测试kafka发送消息32/testTopic
测试结果
``文章来源:https://www.toymoban.com/news/detail-400594.html
感谢阅读~~,希望对您有帮助。文章来源地址https://www.toymoban.com/news/detail-400594.html
到了这里,关于ELK分布式日志收集快速入门-(一)-kafka单体篇的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!