日志=》kafka》ELK

这篇具有很好参考价值的文章主要介绍了日志=》kafka》ELK。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

kELK是三个开源软件的缩写,分别表示:Elasticsearch , Logstash, Kibana;
Elasticsearch是个开源分布式搜索引擎,提供搜集、分析、存储数据三大功能;它的特点有:分布式,零配置,自动发现,索引自动分片,索引副本机制,restful风格接口,多数据源,自动搜索负载等。
Logstash 主要是用来日志的搜集、分析、过滤日志的工具,支持大量的数据获取方式。一般工作方式为c/s架构,client端安装在需要收集日志的主机上,server端负责将收到的各节点日志进行过滤、修改等操作在一并发往elasticsearch上去。
Kibana 也是一个开源和免费的工具,Kibana可以为 Logstash 和 ElasticSearch 提供的日志分析友好的 Web 界面,可以帮助汇总、分析和搜索重要数据日志

logstach(日志收集)->Elasticsearch(日志存储和搜索)->Kibana(查看日志,可视化)

为什么要使用elk?
        ELK 组件在海量日志系统的运维中,可用于解决以下主要问题:- 分布式日志数据统一收集,实现集中式查询和管理
故障排查
安全信息和事件管理
报表功能

我们为什么用kafka,一定要通过kafka吗 
    不是,可以直接logback到ELK的,但是为什么使用kafka接收日志呢,是为了减少logstash对于日志进入时的压力。kafka的特性使用过的人应该都清楚,拥有这10W级别每秒的单机吞吐量,所以很适合作为数据来源缓冲区。

logback.xml

 <!-- kafkaAppender 输出日志到kafka -->
    <appender name="kafkaAppender" 
         class="com.td.ai.frame.uni.platform.oaudit.unify.config.KafkaAppender">
        <bootstrapServers>kafka-servers</bootstrapServers>
        <topic>kafka-topic</topic>
    </appender>

 <!-- 要输出日志的类 -->
    <logger name="logKafka" level="info">
        <appender-ref ref="kafkaAppender"/>
    </logger>
    <!-- 异步传递策略,建议选择异步,不然连接kafka失败,会阻挡服务启动 -->
    <appender name="Async" class="ch.qos.logback.classic.AsyncAppender">
        <appender-ref ref="kafkaAppender"/>
    </appender>
public class KafkaAppender extends AppenderBase<ILoggingEvent> {

    private static Logger logger = LoggerFactory.getLogger(KafkaAppender.class);

    private String topic = "***";



  
    private Producer<String, String> producer;



    @Override
    public void start() {
        super.start();
        if (producer == null) {
            Properties props = new Properties();
            props.put("security.protocol", "SASL_PLAINTEXT");
            props.put("sasl.mechanism", "SCRAM-SHA-512");
            props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule\n" +
                    "required username=\"***\"" +
                    "password=\"****\";");
            props.put("bootstrap.servers", topic);
            //判断是否成功,我们指定了“1”将会阻塞消息
            props.put("acks", "1");
            props.put("retries", 3);
            props.put("batch.size", 262144);
            //延迟10s,10s内数据会缓存进行发送\
            props.put("linger.ms", 10);
            props.put("buffer.memory", 67108864);
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("metric.reporters", "com.ctg.kafka.clients.reporter.KafkaClientMetricsReporter");
            props.put("client.id", ""***);
            producer = new KafkaProducer<String, String>(props);


        }

    }


    @Override
    protected void append(ILoggingEvent iLoggingEvent) {
        String msg = iLoggingEvent.getFormattedMessage();
        String message = "";
        InetAddress localHost = null;
        try {
            localHost = Inet4Address.getLocalHost();
        } catch (UnknownHostException e) {
            e.printStackTrace();
        }
        String hostIp = localHost.getHostAddress();
        String hostName = localHost.getHostName();
        Date date = new Date();
        SimpleDateFormat sdformat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");//24小时制
        String datetime = sdformat.format(date);
        JSONObject json = new JSONObject();
       
        json.put("podIP", hostIp);
        json.put("podName", hostName);
        message = json.toString();
//        System.out.println("向kafka推送日志开始:" + message);
        //key为null  2.4之前为轮询策略
        // 如果key值为null,并且使用了默认的分区器,Kafka会根据轮询(Random Robin)策略将消息均匀地分布到各个分区上。
        // 之后为粘性策略
        ProducerRecord<String, String> record = new ProducerRecord<String, String>(
                topic, null, message);
        //同步发动消息-改-异步发送消息
        try {
            Future<RecordMetadata> result = producer.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception != null) {
                        // 执行错误逻辑处理//否则就是成功喽
                        exception.printStackTrace();
                    }
                }
            });
//            System.out.println("分区:" + result.get().partition() + ",offset: " + result.get().offset());
        } catch (Exception e) {
            e.printStackTrace();
        }


        producer.flush();



    }



}

服务器安装logstash 


1. 查看一下路径
pwd
应该显示/app/logstash或者/data/logstash

2. 将tar包上传

3. 执行以下命令
tar -zxvf logstash-7.5.2.tar.gz(自己的版本号)

cd logstash-7.5.2

mkdir config/conf
mkdir config/certs
mkdir logs

cd config/conf
上传js-sysname.conf

input {
    kafka {
		topics_pattern  => "kafkatopic"
        consumer_threads => 4
        group_id => "***-consumer" # kafka 消费组
        type => "kafka"
        security_protocol => "SASL_PLAINTEXT"
        sasl_mechanism => "SCRAM-SHA-512"
        jaas_path => "/home/crmapp/logstash-7.5.2/config/certs/kafka_client_jaas.conf"
        bootstrap_servers => "*****"
        codec => "json"
    }
}
 
filter {
  ruby{
    code => "event.set('index_day',event.get('@timestamp').time.localtime('+08:00').strftime('%Y.%m.%d'))"
  }
 
}

output {
    
        elasticsearch {
       hosts => ["*****"]
       index => "***a-log-%{index_day}"
            user => "**"
            password => "这里写es的密码"
        }
    
}

cd ../certs
上传kafka_client_jaas.conf

KafkaClient {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="**"
password="****";
};

cd ..
vim pipelines.yml
将下面的加到"# Example of two pipelines:"这一行下面
 - pipeline.id:js-sysname
   pipeline.workers: 2
   path.config: "/app/logstash/logstash-7.5.2/config/conf/-js-sysname.conf"

cd /app/logstash/logstash-7.5.2/
nohup bin/logstash -r true --config.reload.automatic >> logs/logstash.log &

4. 查看日志
tail -100f logs/logstash.log
启动需要时间,如果没有erorr日志,没有提示连不上kafka或者elasricsearch即可文章来源地址https://www.toymoban.com/news/detail-788482.html

到了这里,关于日志=》kafka》ELK的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • java使用kafka-clients集成0.10.0.0版本kafka(一)

    因为某个功能需要对接的kafka是一个上古版本 0.10.0.0 ,公司项目又是springcloud项目,导致版本兼容性的问题很头大 1.kafka的版本号 下载的windows版kafka如:kafka_2.10-0.10.0.0 2.10标识编译kafka集群的scala版本号,kafka的服务端编码语言为scala 0.10.0.0标识kafka真正的版本号 kafka的版本号从

    2024年02月07日
    浏览(55)
  • 在Spring Boot微服务集成kafka-clients操作Kafka集群

    记录 :463 场景 :在Spring Boot微服务集成kafka-clients-3.0.0操作Kafka集群。使用kafka-clients的原生KafkaProducer操作Kafka集群生产者Producer。使用kafka-clients的原生KafkaConsumer操作Kafka集群的消费者Consumer。 版本 :JDK 1.8,Spring Boot 2.6.3,kafka_2.12-2.8.0,kafka-clients-3.0.0。 Kafka集群安装 :https://bl

    2024年02月09日
    浏览(51)
  • 在Spring Boot微服务集成Kafka客户端(kafka-clients)操作Kafka

    记录 :459 场景 :在Spring Boot微服务集成Kafka客户端kafka-clients-3.0.0操作Kafka。使用kafka-clients的原生KafkaProducer操作Kafka生产者Producer。使用kafka-clients的原生KafkaConsumer操作Kafka的消费者Consumer。 版本 :JDK 1.8,Spring Boot 2.6.3,kafka_2.12-2.8.0,kafka-clients-3.0.0。 Kafka安装 :https://blog.csdn.ne

    2024年02月12日
    浏览(51)
  • 使用kafka-clients的Java API操作Kafka集群的Topic

    记录 :464 场景 :在Spring Boot微服务集成Kafka客户端kafka-clients-3.0.0操作Kafka集群的Topic的创建和删除。 版本 :JDK 1.8,Spring Boot 2.6.3,kafka_2.12-2.8.0,kafka-clients-3.0.0。 Kafka集群安装 :https://blog.csdn.net/zhangbeizhen18/article/details/131156084 1.微服务中 配置Kafka信息 1.1在pom.xml添加依赖 pom.xml文

    2024年02月09日
    浏览(43)
  • 使用Kafka客户端(kafka-clients)的Java API操作Kafka的Topic

    记录 :460 场景 :在Spring Boot微服务集成Kafka客户端kafka-clients-3.0.0操作Kafka的Topic的创建和删除。 版本 :JDK 1.8,Spring Boot 2.6.3,kafka_2.12-2.8.0,kafka-clients-3.0.0。 Kafka安装 :https://blog.csdn.net/zhangbeizhen18/article/details/129071395 1.微服务中 配置Kafka信息 1.1在pom.xml添加依赖 pom.xml文件: 解析

    2024年02月09日
    浏览(69)
  • 使用flink的sql-client.sh,测试mysql-->kafka-->kafka-->mysql实时流

    目录 1. 环境介绍 2. mysql建表 3. flinksql建表 3.1 进入flinksql客户端  ​3.2 配置输出格式 ​3.3 flink建表 3.4 任务流配置 4. 测试 4.1 插入测试数据 4.2 查看结果表数据​ 4.3 新增测试数据 4.4 再次查看结果表数据 服务 版本 zookeeper 3.8.0 kafka 3.3.1 flink 1.13.5 mysql 5.7.34 jdk 1.8 scala 2.12 连接器

    2024年02月11日
    浏览(41)
  • Failed to construct kafka producer at org.apache.kafka.clients.producer.KafkaProducer

    报错信息如下: 在网上找了很久的解决方案,也没找到个所以然,可能是我能力不足没理解到,后来我尝试clean下项目,竟然报错了 提示我pom.xml中有错误,我看了看,唯一有可能的是新导入的一个依赖去掉了版本号,我加上版本号后又重新clean下,成功了,, 然后,我重启

    2024年02月05日
    浏览(38)
  • kafka消费报错, org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since

    问题: 在有大量消息需要消费时,消费端出现报错:org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which t

    2024年03月23日
    浏览(42)
  • Kafka指定分区消费及consumer-id,client-id相关概念解析

    xxxx系列(1)― xxxx系列(2)― xxxxx系列(3)― 提示:写完文章后,目录可以自动生成,如何生成可参考右边的帮助文档 在最近使用Kafka过程中,发现使用@KafkaListener指定分区消费时(指定了所有分区),如果服务是多节点,会出现重复消费的现象,即两个服务节点中的消费者均会消

    2024年02月13日
    浏览(50)
  • Kafka系列之:Unexpected handshake request with client mechanism PLAIN, enabled mechanisms are []

    java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.IllegalSaslStateException: Unexpected handshake request with client mechanism PLAIN, enabled mechanisms are [] at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) at org.apache.kafka.common.internals.KafkaFutureImpl.access 000 ( K a f k

    2024年02月16日
    浏览(37)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包