[ELasticSearch]-Logstash的使用

这篇具有很好参考价值的文章主要介绍了[ELasticSearch]-Logstash的使用。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

[ELasticSearch]-Logstash的使用

森格 | 2023年2月

介绍:Logstash在Elastic Stack中担当着ELK的工作,在本文主要阐述Logstash的处理流程以及一些应用案例。



一、Logstash介绍

1.1 What is Logstash?

Logstash是一款强大的ETL(Extract、Transform、Load)工具。我们可以把它想象成具有数据传输能力的管道,数据信息从输入端(Input)传输到输出端(Output),用户可根据自己的需要在管道中间加上过滤网(Filter)。

1.2 版本兼容性

通常来说,对于Elastic的相关产品,版本最好一一对应。

在个人使用Logstash做es的索引迁移过程中,跳板机使用版本为7.4.1,es版本为6.8.x、7.4.x、7.7.x,迁移过程未见问题。

1.3 处理流程

可以把Logstash实例看成一个正在运行的Logstash进程,因为Logstash利用了jvm,建议在单独的机器上运行。而对于其中的管道可以看成插件的合集,一个Logstash可以运行多个管道,且多个管道之间彼此独立。

elasticsearch logstash,Elasticsearch,elasticsearch,java,搜索引擎

对于Logstash我们主要需要配置三个插件:

Input Plugins:提取或接收数据。

Filter Plugins:应用转换并丰富数据。

Output Plugins:将已处理的数据加载给目标。

除此之外还有Codec plugins(编码器插件),作用于input和output plugin中 ,更改事件的数据表示,最常见的为json格式。

想要了解更多插件支持参考官网:Input Plugins、Filter Plugins、Output Plugins、Codec plugins。

# 查看已安装的插件
cd /logstash
./bin/logstash-plugin list

二、Logstash应用

2.1 ElasticSearch索引迁移

2.1.1 logstash-input-elasticsearch

input {
 elasticsearch {
    hosts => [
            "xxx.xxx.xxx.xxx:9200"
        ]
    user => "xxx"
    password => "xxx"
    index => "xxx"
    query => '{
            "query": {
                "match_all": {}
            }
        }'
    size => 500
    scroll => "5m"
    docinfo => true
    codec => json
    }
}

  • index:要匹配的索引,多个索引用, 隔开。
  • size:每次滚动返回的最大匹配记录数。默认值1000。
  • scroll:此参数用来控制滚动请求的保活时间(单位为秒),并启动滚动过程,默认值1m。
  • docinfo:在事件中会包含es文档的信息,例如索引、类型、id等。默认值flase。
  • codec:在数据输入之前对其进行解码。默认为json。

2.1.2 logstash-filter-mutate

filter {
 mutate {
     remove_field => [
            "@timestamp",
            "@version"
        ]
    }
}

Logstash的输出数据会加上版本和时间戳信息。所以我们要过滤掉这些信息。

过滤前:

elasticsearch logstash,Elasticsearch,elasticsearch,java,搜索引擎

过滤后:

elasticsearch logstash,Elasticsearch,elasticsearch,java,搜索引擎

2.1.3 logstash-output-elasticsearch

output {
     elasticsearch {
         hosts => [
            "xxx.xxx.xxx.xxx:9200"
        ]
        user => "xxx"
        password => "xxx"
        action => "update"       //有则更新,无则插入
        doc_as_upsert => true
        document_type => "_doc"	 //文档类型
        document_id => "%{[@metadata][_id]}"	//文档id,取元数据docinfo信息中的id
        index => "%{[@metadata][_index]}"		//文档索引名,取元数据docinfo信息中的索引名
    }
}

2.2 同步MySQL数据

2.2.1 logstash-input-jdbc

  1. 下载jar包(mysql-connector)
# 下载
wget https://downloads.mysql.com/archives/get/p/3/file/mysql-connector-j-8.0.31.tar.gz
tar -zxf mysql-connector-j-8.0.31.tar.gz
# 复制jar包到/xx/logstash/logstash-core/lib/jars/目录
cp /mysql-connector-j-8.0.31/mysql-connector-j-8.0.31.jar /xx/logstash/logstash-core/lib/jars/

这里值得提的是,将连接的jar包放置于Logstash的logstash-core/lib/jars/目录下,就不用再配置中设置jdbc_driver_library参数。

  1. input 配置
input {
	jdbc {
       jdbc_connection_string => "jdbc:mysql://数据库地址:端口/数据库名?useUnicode=true&characterEncoding=utf8&serverTimezone=Asia/Shanghai&useSSL=false"
       jdbc_user => "xxxxxx"
       jdbc_password => "xxxxxx"
       jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
       jdbc_validate_connection => true
       jdbc_paging_enabled => true
       jdbc_page_size => 50000
       lowercase_column_names => false
       statement => "SELECT * FROM xxx"
       schedule => "*/1 * * * *"
    }    
}
  • jdbc_connection_string:指定编码格式,禁用SSL协议,设定自动重连。
  • jdbc_validate_connection:连接前做验证。
  • statement:要执行的查询语句。
  • lowercase_column_names:将列名转换为小写。
  • schedule:定时执行,类似于crontab。

2.2.2 logstash-filter-mutate

同2.1.2,这里不做赘述。

2.2.3 logstash-output-elasticsearch

output {
     elasticsearch {
         hosts => [
            "xxx.xxx.xxx.xxx:9200"
        ]
        user => "xxx"
        password => "xxx"
        action => "update"       //有则更新,无则插入
        doc_as_upsert => true
        document_type => "_doc"	 //文档类型
		index => "mysql_data"	 //索引名,需要自己填写
        document_id => "%{id}"	 //文档id,可以根据mysql表中主键id匹配
}
	//标准输出,实际运行中可以取消
     stdout{
        codec => json_lines
        }
}

2.3 实战

TODO:迁移mysql某张表的数据到es实例1,再将es实例1的mysql索引迁移至es实例2。

MySQL数据:

elasticsearch logstash,Elasticsearch,elasticsearch,java,搜索引擎

将要生成的es索引名为mysql_data_otter

es实例1迁移前索引列表:

elasticsearch logstash,Elasticsearch,elasticsearch,java,搜索引擎

es实例2迁移前索引列表:

elasticsearch logstash,Elasticsearch,elasticsearch,java,搜索引擎

logstash_mysql.conf、logstahs_es.conf 配置文件按照上文内容修改即可。

执行命令:

./bin/logstash -f ./conf/xxx.conf

运行成功信息如下:

[2023-02-21T09:32:18,661][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9601}
[2023-02-21T09:32:20,326][INFO ][logstash.runner          ] Logstash shut down.

es实例1迁移后索引列表:

elasticsearch logstash,Elasticsearch,elasticsearch,java,搜索引擎

es实例2迁移后索引列表:

elasticsearch logstash,Elasticsearch,elasticsearch,java,搜索引擎

三、性能调优

3.1 标准输出stdout

在output中我们除了自己做调试使用它,在实际运行过程中避免使用,会影响使用性能。

3.2 内存 JVM

在conf/jvm.options文件中,最好设置为物理内存的50%以上,如果堆大小太低,CPU利用率可能会不必要地增加,从而导致JVM不断进行垃圾回收。且Xms和Xmx堆分配大小设置为相同的值,以防止在运行时调整堆大小。

3.3 管道线程

pipeline.workers:并行执行筛选器和管道输出阶段的工作人员数量,官方建议为CPU核数。

pipeline.batch.size:单个工作线程在尝试执行其筛选器和输出之前从输入中收集的最大事件数,在内存允许的情况下,可以合理增加批处理大小。文章来源地址https://www.toymoban.com/news/detail-772708.html

到了这里,关于[ELasticSearch]-Logstash的使用的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 使用Filebeat+Kafka+Logstash+Elasticsearch构建日志分析系统

            随着时间的积累,日志数据会越来越多,当您需要查看并分析庞杂的日志数据时,可通过Filebeat+Kafka+Logstash+Elasticsearch采集日志数据到Elasticsearch中,并通过Kibana进行可视化展示与分析。本文介绍具体的实现方法。 Kafka是一种分布式、高吞吐、可扩展的消息队列服务,

    2024年02月04日
    浏览(36)
  • 使用logstash将项目的日志存储到Elasticsearch中(详细!新手避坑点!)

    1.环境准备 我这里默认你们这个logstash和Elasticsearch已经安装好了。 我使用的是logstash版本7.4.0 Elasticsearch版本7.4.0。 使用其他版本 注意版本一定要一致,版本不一致大概率会出现logstash启动报Failed to install template.异常 ,已经踩坑了。 logstash的配置文件: 接下来 以配置文件启

    2024年02月04日
    浏览(79)
  • docker部署elasticsearch:8.6.2, kibana,logstash 版本以及kibana的使用

    How to Run Elasticsearch 8 on Docker for Local Development Docker下elasticsearch8部署、扩容、基本操作实战(含kibana) 导入es数据 docker安装elasticsearch和head插件 docker安装elasticsearch和head插件,连接失败问题解决 需要输入token 生成token报错 ERROR: [xpack.security.enrollment.enabled] must be set to true to create an

    2024年02月16日
    浏览(71)
  • 使用Logstash同步mysql数据到Elasticsearch(亲自踩坑)_将mysql中的数据导入es搜索引擎利用logstash(1)

    先自我介绍一下,小编浙江大学毕业,去过华为、字节跳动等大厂,目前阿里P7 深知大多数程序员,想要提升技能,往往是自己摸索成长,但自己不成体系的自学效果低效又漫长,而且极易碰到天花板技术停滞不前! 因此收集整理了一份《2024年最新大数据全套学习资料》,

    2024年04月28日
    浏览(39)
  • 使用Logstash和JDBC将MySQL的数据导入到Elasticsearch(ES)的过程

    使用Logstash和JDBC将MySQL的数据导入到Elasticsearch(ES)的过程包含多个步骤。请注意,首先你需要准备好的JDBC驱动,Logstash实例,Elasticsearch实例,以及你希望导入的MySQL数据。 安装Logstash JDBC Input Plugin :Logstash包含大量插件,其中一个就是JDBC Input Plugin,可以用于从JDBC兼容的数据库

    2024年02月15日
    浏览(34)
  • Logstash:迁移数据到 Elasticsearch

    在生产环境中,不使用 Apache Kafka 等流平台进行数据迁移并不是一个好的做法。 在这篇文章中,我们将详细探讨 Apache Kafka 和 Logstash 的关系。 但首先让我们简单了解一下 Apache Kafka 的含义。 Apache Kafka 是分布式流平台,擅长实时数据集成和消息传递。 Kafka 架构不复杂且直接。

    2024年02月02日
    浏览(33)
  • 如何在 Ubuntu 14.04 上使用 Rsyslog、Logstash 和 Elasticsearch 实现日志集中管理

    Elastic 的一篇文章 理解组织生成的数百万条日志行可能是一个艰巨的挑战。一方面,这些日志行提供了对应用程序性能、服务器性能指标和安全性的视图。另一方面,日志管理和分析可能非常耗时,这可能会阻碍对这些日益必要的服务的采用。 开源软件,如 rsyslog、Elasticse

    2024年04月11日
    浏览(33)
  • ELK(ElasticSearch, Logstash, Kibana)

    ELK简介 ELK是三个开源软件的缩写,分别表示:Elasticsearch , Logstash, Kibana , 它们都是开源软件。新增了一个FileBeat,它是一个轻量级的日志收集处理工具(Agent),Filebeat占用资源少,适合于在各个服务器上搜集日志后传输给Logstash,官方也推荐此工具。 Elasticsearch是个开源分布式搜

    2023年04月09日
    浏览(40)
  • 使用 Docker Compose V2 快速搭建日志分析平台 ELK (Elasticsearch、Logstash 和 Kibana)

    ELK 是指 Elasticsearch、Logstash 和 Kibana 这三个开源软件的组合。 Elasticsearch 是一个分布式的搜索和分析引擎,用于日志的存储,搜索,分析,查询。 Logstash 是一个数据收集、转换和传输工具,用于收集过滤和转换数据,然后将其发送到 Elasticsearch 或其他目标存储中。 Kibana 是一个数

    2024年01月20日
    浏览(42)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包