logstash同步数据从kafka到es集群

这篇具有很好参考价值的文章主要介绍了logstash同步数据从kafka到es集群。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

背景:需求是这样的,原始文件是txt文件(每天300个文件),最终想要的结果是每天将txt中的数据加载到es中,开始的想法是通过logstash加载数据到es中,但是对logstash不太熟悉,不知道怎么讲程序弄成读取一个txt文件到es中以后,就将这个txt原始文件备份并且删除掉,然后就想到了通过一个中间件来做,Python读取成功一个txt文件,并且加载到kafka中以后,就将这个txt文件备份然后删除掉原始文件。

第一步:向kafka中添加数据,我用Python连接kafka集群,向其中加载数据
# -*- coding: utf-8 -*-

import json
import json
import msgpack
from loguru import logger
from kafka import KafkaProducer
from kafka.errors import KafkaError

def kfk_produce_1():
    """
        发送 json 格式数据
    :return:
    """
    producer = KafkaProducer(
        bootstrap_servers='192.168.85.109:9092',
        value_serializer=lambda v: json.dumps(v).encode('utf-8')
    )
    #logstash-topic-one
    #producer.send('python_test_topic', {'key': 'value'})
    producer.send('logstash-topic-one', {'name': 'value'})

kfk_produce_1()
执行完的结果,来界面工具上看,显示这样,说明数据已经加载进来了

logstash配置es集群,kafka,elasticsearch,分布式,大数据

第二步:配置logstash,将kafka中的数据加载到es集群中
编写的logstash.conf配置如下;
input{
      kafka{
        bootstrap_servers => "192.168.85.109:9092"
        client_id => "consumer_id"
        group_id => "consumer_group"
        auto_offset_reset => "latest"
        consumer_threads => 1
        decorate_events => true
        topics => ["logstash-topic-one","logstash-topic-two"]
      }
}
output {

  if [@metadata][kafka][topic] == "logstash-topic-one" {
        elasticsearch {
          hosts => "http://192.168.85.109:9200"
          index => "kafka-one-data"
          timeout => 300
        }
    }

  if [@metadata][kafka][topic] == "logstash-topic-two" {
        elasticsearch {
          hosts => "http://192.168.85.109:9200"
          index => "kafka-two-data"
          timeout => 300
        }
    }
  stdout {}
}
第三步:执行logstash,通过kibana查看数据是否在es集群中,展示如下,则说明配置是正确的

logstash配置es集群,kafka,elasticsearch,分布式,大数据

logstash配置es集群,kafka,elasticsearch,分布式,大数据

问题1:现在发现,name字段是在message下面,如果是多个字段的话,不方便查询,想着怎么讲字段从message中弄出来,修改的配置如下,增加一段这样的代码就OK了

type => "json"
        codec => json {
            charset => "UTF-8"
        }
完整的配置文件logstash.conf代码如下;
input{
      kafka{
        bootstrap_servers => "192.168.85.109:9092"
        client_id => "consumer_id"
        group_id => "consumer_group"
        auto_offset_reset => "latest"
        consumer_threads => 1
        decorate_events => true
        topics => ["logstash-topic-one","logstash-topic-two"]
        type => "json"
        codec => json {
            charset => "UTF-8"
        }
      }
}
output {

  if [@metadata][kafka][topic] == "logstash-topic-one" {
        elasticsearch {
          hosts => "http://192.168.85.109:9200"
          index => "kafka-one-data"
          timeout => 300
        }
    }

  if [@metadata][kafka][topic] == "logstash-topic-two" {
        elasticsearch {
          hosts => "http://192.168.85.109:9200"
          index => "kafka-two-data"
          timeout => 300
        }
    }
  stdout {}
}
然后我又造了一个多字段的场景如下;

logstash配置es集群,kafka,elasticsearch,分布式,大数据

我先去logstash中查看日志如下,字段已经分离出来了
{
          "name" => "value",
      "@version" => "1",
          "type" => "json",
    "@timestamp" => 2023-05-17T06:13:48.825Z
}
{
      "@version" => "1",
          "type" => "json",
    "@timestamp" => 2023-05-17T06:20:57.729Z,
          "name" => "令狐冲",
           "age" => "30",
        "height" => "180cm"
}
去kibana中去查询,显示如下,测试成功喽,😄

logstash配置es集群,kafka,elasticsearch,分布式,大数据
logstash配置es集群,kafka,elasticsearch,分布式,大数据

问题2:在查询结果中发现,有些字段是没有用的,看看怎么去掉?

在配置文件中增加一个过滤器就可以解决了
filter { mutate {
                 remove_field => ["@version","@timestamp","type"] # 删除字段
                 }
 }
然后再去kibana中去查看,就发现这会儿的字段格式非常好看了,😄

logstash配置es集群,kafka,elasticsearch,分布式,大数据文章来源地址https://www.toymoban.com/news/detail-554698.html

文档后续再继续完善,有好的建议或者问题可以留言交流,😄

到了这里,关于logstash同步数据从kafka到es集群的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • elasticsearch(ES)分布式搜索引擎04——(数据聚合,自动补全,数据同步,ES集群)

    **聚合(aggregations)**可以让我们极其方便的实现对数据的统计、分析、运算。例如: 什么品牌的手机最受欢迎? 这些手机的平均价格、最高价格、最低价格? 这些手机每月的销售情况如何? 实现这些统计功能的比数据库的sql要方便的多,而且查询速度非常快,可以实现近

    2024年02月08日
    浏览(51)
  • k8s部署elk+filebeat+logstash+kafka集群(一)ES集群+kibana部署

    前言: 这次是在部署后很久才想起来整理了下文档,如有遗漏见谅,期间也遇到过很多坑有些目前还没头绪希望有大佬让我学习下 一、环境准备 k8s-master01 3.127.10.209 k8s-master02 3.127.10.95 k8s-master03 3.127.10.66 k8s-node01 3.127.10.233 k8s-node02 3.127.33.173 harbor 3.127.33.174 1、k8s各节点部署nf

    2023年04月23日
    浏览(40)
  • Logstash输入Kafka输出Es配置

    Logstash是一个开源的数据收集引擎,具有实时管道功能。它可以从各种数据源中动态地统一和标准化数据,并将其发送到你选择的目的地。 Logstash 的早期目标主要是用于收集日志,但现在的功能已经远远超出这个范围。任何事件类型都可以通过Logstash进行分析,通过输入、过

    2024年02月03日
    浏览(44)
  • Logstash同步Mysql数据至ES

    官方文档 注意版本要和自己的es版本一致 下载地址:logstash 上传至服务器并进行解压。 1、通过官网下载mysql连接jar包 下载地址:mysql 连接jar包 根据自己mysql版本和系统进行选择 ​  ​   ​   ​   ​ 2、在IDEA中复制msyql连接jar包 ​   ​ 将jar包移动至/logstash/logstash-core/lib

    2024年02月10日
    浏览(37)
  • mysql同步数据到es之logstash

    1.使用 logstash 如果是历史数据同步我们可以用logstash,最快同步频率每分钟一次,如果对时效性要求高,慎用 2.使用 canal 实时同步,本文章未演示 logstash 特性: 无需开发,仅需安装配置logstash即可; 凡是SQL可以实现的logstash均可以实现(本就是通过sql查询数据) 支持每次全量同步或

    2023年04月08日
    浏览(45)
  • Logstash从mysql同步数据到es

    Logstash 是免费且开放的服务器端数据处理管道,能够从多个来源采集数据,转换数据,然后将数据发送到您最喜欢的“存储库”中。 Logstash 是一个功能强大的工具,可与各种部署集成。 它提供了大量插件,可帮助你解析,丰富,转换和缓冲来自各种来源的数据。 如果你的数

    2024年02月08日
    浏览(95)
  • logstash同步mysql数据到es(三、es模板问题)

     相关问题汇总: logstash同步mysql数据到es(一、es模板问题,请求返回400) logstash同步mysql数据到es(二、jdbc_driver_library问题)_(please check user and group permissions for the p-CSDN博客 logstash同步mysql数据到es(三、es模板问题)-CSDN博客 使用docker实现logstash同步mysql到es-CSDN博客 [INFO ] 2023-12-11 09

    2024年01月17日
    浏览(79)
  • Docker部署Logstash同步Mysql数据到ES

    页面访问 ip:9200端口,出现下面页面部署成功 成功日志

    2024年04月13日
    浏览(41)
  • 【ELK企业级日志分析系统】部署Filebeat+Kafka+Logstash+Elasticsearch+Kibana集群详解(EFLFK)

    参见安装与部署ELK详解 参见安装与部署EFLK详解 参见安装与部署Zookeeper集群详解 1.1.1 为什么需要消息队列(MQ) MQ(Message Queue)主要原因是由于 在高并发环境下,同步请求来不及处理,请求往往会发生阻塞 。比如大量的并发请求,访问数据库,导致行锁表锁,最后请求线程会

    2024年02月16日
    浏览(49)
  • Logstash同步MySQL数据到ElasticSearch

    当MySQL数据到一定的数量级,而且索引不能实现时,查询就会变得非常缓慢,所以使用ElasticSearch来查询数据。本篇博客介绍使用Logstash同步MySQL数据到ElasticSearch,再进行查询。 测试环境 Windows系统 MySQL 5.7 Logstash 7.0.1 ElasticSearch 7.0.1 Kibana 7.0.1 ELK工具下载可访问:https://www.elastic

    2024年02月01日
    浏览(42)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包