部署Kafka+ZK及其日志采集实战(系统版本:linux_CentOs_7.8)

这篇具有很好参考价值的文章主要介绍了部署Kafka+ZK及其日志采集实战(系统版本:linux_CentOs_7.8)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

  • 部署ZK

    docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper
    
  • 部署Kafka

    	docker run -d --name xdclass_kafka \
    	-p 9092:9092 \
    	-e KAFKA_BROKER_ID=0 \
    	--env KAFKA_HEAP_OPTS=-Xmx256M \
    	--env KAFKA_HEAP_OPTS=-Xms128M \
    	-e KAFKA_ZOOKEEPER_CONNECT=[内网ip]:2181 \
    	-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://[外网ip]:9092 \
    	-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 wurstmeister/kafka:2.13-2.7.0
    
  • 采用Slf4j采集日志(lombok)

  • 需求

    • 控制台输出访问日志,方便测试
    • 业务数据实际输出到kafka
    • 常用的框架 log4j、logback、self4j等
  • log4j、logback、self4j 之间有啥关系

    • SLF4J(Simple logging Facade for Java) 门面设计模式 |外观设计模式

      • 把不同的日志系统的实现进行了具体的抽象化,提供统一的日志使用接口

      • 具体的日志系统就有log4j,logback等;

      • logback也是log4j的作者完成的,有更好的特性,可以取代log4j的一个日志框架, 是slf4j的原生实现

      • log4j、logback可以单独的使用,也可以绑定slf4j一起使用

    • 编码规范建议不直接用log4j、logback的API,应该用self4j, 日后更换框架所带来的成本就很低

  • 依赖引入

    <!-- 代码自动生成依赖 end-->
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
    </dependency>
    <!--Springboot项目整合spring-kafka依赖包配置-->
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
    
  • kafka配置application.properties

    #----------kafka配置--------------
    spring.kafka.bootstrap-servers=[外网ip]:9092
    spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
    spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
    
  • logback.xml配置

    <?xml version="1.0" encoding="UTF-8"?>
    <configuration>
        <property name="LOG_HOME" value="./data/logs/link" />
    
        <!--采用打印到控制台,记录日志的方式-->
        <appender name="console" class="ch.qos.logback.core.ConsoleAppender">
            <encoder>
                <pattern>%date %level [%thread] %logger{36} [%file : %line] %msg%n</pattern>
            </encoder>
        </appender>
    
        <!-- 采用保存到日志文件 记录日志的方式-->
        <appender name="rollingFile" class="ch.qos.logback.core.rolling.RollingFileAppender">
            <file>${LOG_HOME}/link.log</file>
            <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
                <fileNamePattern>${LOG_HOME}/link-%d{yyyy-MM-dd}.log</fileNamePattern>
            </rollingPolicy>
            <encoder>
                <pattern>%date %level [%thread] %logger{36} [%file : %line] %msg%n</pattern>
            </encoder>
        </appender>
    
    
        <!-- 指定某个类单独打印日志 -->
        <logger name="net.xdclass.service.impl.LogServiceImpl"
                level="INFO" additivity="false">
            <appender-ref ref="rollingFile" />
            <appender-ref ref="console" />
        </logger>
    
        <root level="info" additivity="false">
            <appender-ref ref="console" />
        </root>
    
    </configuration>
    
  • LogServiceImpl

    @Service
    @Slf4j
    public class LogServiceImpl implements LogService {
    	
    	// Kafka:topic
        private static final String TOPIC_NAME = "ods_link_visit_topic";
    
        @Autowired
        private KafkaTemplate kafkaTemplate;
    
        /**
         * 记录日志
         *
         * @param request
         * @param shortLinkCode
         * @param accountNo
         * @return
         */
        @Override
        public void recodeShortLinkLog(HttpServletRequest request, String shortLinkCode, Long accountNo) {
            // ip、 浏览器信息
            String ip = CommonUtil.getIpAddr(request);
            // 全部请求头
            Map<String, String> headerMap = CommonUtil.getAllRequestHeader(request);
    
            Map<String,String> availableMap = new HashMap<>();
            availableMap.put("user-agent",headerMap.get("user-agent"));
            availableMap.put("referer",headerMap.get("referer"));
            availableMap.put("accountNo",accountNo.toString());
    
            LogRecord logRecord = LogRecord.builder()
                    //日志类型
                    .event(LogTypeEnum.SHORT_LINK_TYPE.name())
                    //日志内容
                    .data(availableMap)
                    //客户端ip
                    .ip(ip)
                    // 时间
                    .ts(CommonUtil.getCurrentTimestamp())
                    //业务唯一标识(短链码)
                    .bizId(shortLinkCode).build();
    
            String jsonLog = JsonUtil.obj2Json(logRecord);
    
            //打印日志 in 控制台
            log.info(jsonLog);
    
            // 发送kafka
            kafkaTemplate.send(TOPIC_NAME,jsonLog);
    
        }
    }
    
  • kafka命令

    ```
    创建topic
    ./kafka-topics.sh --create --zookeeper 172.17.0.1:2181 --replication-factor 1 --partitions 1 --topic ods_link_visit_topic
    
    查看topic
    ./kafka-topics.sh --list --zookeeper 172.17.0.1:2181
    
    删除topic
    ./kafka-topics.sh --zookeeper 172.17.0.1:2181 --delete --topic ods_link_visit_topic
    
    消费者消费消息
    ./kafka-console-consumer.sh --bootstrap-server 192.168.75.146:9092 --from-beginning --topic ods_link_visit_topic
    
    生产者发送消息
    ./kafka-console-producer.sh --broker-list 192.168.75.146:9092  --topic ods_link_visit_topic
    ```
    
    
  • 测试文章来源地址https://www.toymoban.com/news/detail-657677.html

    @Controller
    @Slf4j
    public class LinkApiController {
        @Autowired
        private ShortLinkService shortLinkService;
    
        @Autowired
        private LogService logService;
    
    
        @GetMapping(path = "/test")
        public void dispatch(HttpServletRequest request, HttpServletResponse response) {
    
            logService.recodeShortLinkLog(request, shortLinkCode, shortLinkVO.getAccountNo());
            
        }
    
    

到了这里,关于部署Kafka+ZK及其日志采集实战(系统版本:linux_CentOs_7.8)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • springboot整合ELK+kafka采集日志

    在分布式的项目中,各功能模块产生的日志比较分散,同时为满足性能要求,同一个微服务会集群化部署,当某一次业务报错后,如果不能确定产生的节点,那么只能逐个节点去查看日志文件;logback中RollingFileAppender,ConsoleAppender这类同步化记录器也降低系统性能,综上一些

    2024年02月15日
    浏览(42)
  • flink日志实时采集写入Kafka/ElasticSearch

    由于公司想要基于flink的日志做实时预警功能,故需要实时接入,并刷入es进行分析。 日志接入必须异步,不能影响服务性能 kafka集群宕机,依旧能够提交flink任务且运行任务 kafka集群挂起恢复,可以依旧续写实时运行日志 在类上加上@Plugin注解,标记为自定义appender 在类加上

    2024年02月08日
    浏览(54)
  • Filebeat+Kafka+ELK日志采集(五)——Elasticsearch

    1、下载 2、解压: 3、配置 启动Elasticsearch,进入/bin目录下 ./elasticsearch ,不出意外的外会报以下错误: 报错1:能打开的文件或进程数太低。 解决方法: 修改/etc/security/limits.conf 配置文件,添加配置如下: 报错2: 解决方法: 修改/etc/sysctl.conf 配置文件,添加配置如下: 修

    2024年02月05日
    浏览(58)
  • filebeat采集日志数据到kafka(一)(filebeat->kafka->logstash->es)

    一、filebeat安装 filebeat-kafka版本适配 1、安装包下载 https://www.elastic.co/cn/downloads/past-releases#filebeat 解压 2、新建yml配置文件,如test.yml 3、Filebeat启动和停止 启动:./filebeat -c test.yml 停止:kill -9 PID

    2024年02月16日
    浏览(89)
  • 【数仓】通过Flume+kafka采集日志数据存储到Hadoop

    【数仓】基本概念、知识普及、核心技术 【数仓】数据分层概念以及相关逻辑 【数仓】Hadoop软件安装及使用(集群配置) 【数仓】Hadoop集群配置常用参数说明 【数仓】zookeeper软件安装及集群配置 【数仓】kafka软件安装及集群配置 【数仓】flume软件安装及配置 【数仓】flum

    2024年03月17日
    浏览(59)
  • 【ELK企业级日志分析系统】部署Filebeat+Kafka+Logstash+Elasticsearch+Kibana集群详解(EFLFK)

    参见安装与部署ELK详解 参见安装与部署EFLK详解 参见安装与部署Zookeeper集群详解 1.1.1 为什么需要消息队列(MQ) MQ(Message Queue)主要原因是由于 在高并发环境下,同步请求来不及处理,请求往往会发生阻塞 。比如大量的并发请求,访问数据库,导致行锁表锁,最后请求线程会

    2024年02月16日
    浏览(49)
  • 3节点ubuntu24.04服务器docker-compose方式部署高可用elk+kafka日志系统并接入nginx日志

    节点名称 IP 部署组件及版本 配置文件路径 机器CPU 机器内存 机器存储 Log-001 10.10.100.1 zookeeper:3.4.13 kafka:2.8.1 elasticsearch:7.7.0 logstash:7.7.0 kibana:7.7.0 zookeeper:/data/zookeeper kafka:/data/kafka elasticsearch:/data/es logstash:/data/logstash kibana:/data/kibana 2*1c/16cores 62g 50g 系统 800g 数据盘 Log-002 10.10.10

    2024年04月27日
    浏览(46)
  • 日志采集传输框架之 Flume,将监听端口数据发送至Kafka

    1、简介                 Flume 是 Cloudera 提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传 输的系统。Flume 基于流式架构,主要有以下几个部分组成。  主要组件介绍: 1)、 Flume Agent 是一个 JVM 进程,它以事件的形式将数据从源头送至目的。Agent 主

    2024年01月22日
    浏览(58)
  • 五.实战软件部署 1-3实战章节-前言&MYSQL 5.7版本在centos系统安装&MYSQL 8.0版本在centos系统安装

    目录 五.实战软件部署 1-实战章节-前言 五.实战软件部署 2-MYSQL 5.7版本在centos系统安装 1-配置yum仓库 2-使用yum安装mysql 3-安装完成后,启动mysql并配置开机自启动 4-检查mysql的运行状态 --配置 1-获取mysql的初识密码 2-登录mysql数据库系统 3-修改root用户密码 4-配置root的简单密码

    2024年02月21日
    浏览(49)
  • Flume实战篇-采集Kafka到hdfs

    记录Flume采集kafka数据到Hdfs。 主要是自定义下Flume读取event头部的时间。 将打好的包放入/opt/module/flume/lib文件夹下 [root@ lib]$ ls | grep interceptor flume-interceptor-1.0-SNAPSHOT-jar-with-dependencies.jar 配置下flume的jvm    上面配置消费的形式是earliest,如果重新启动以后,他会从最新的位置开

    2024年02月06日
    浏览(54)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包