当MySQL数据到一定的数量级,而且索引不能实现时,查询就会变得非常缓慢,所以使用ElasticSearch来查询数据。本篇博客介绍使用Logstash同步MySQL数据到ElasticSearch,再进行查询。
测试环境
- Windows系统
- MySQL 5.7
- Logstash 7.0.1
- ElasticSearch 7.0.1
- Kibana 7.0.1
ELK工具下载可访问:https://www.elastic.co/cn/downloads/
ELK同步环境搭建
ElasticSearch、Kibana启动
将下载的ElasticSearch、Kibana解压,并依次启动,Windows目录下,ElasticSearch启动可点击bin/elasticsearch.bat
,Kibana启动可点击kibana.bat
。
Logstash配置启动
核心是Logstash的配置。
1、解压Logstash
2、将MySQL的JDBC的连接包放入lib包下
3、在bin目录下新建配置文件-logstash_sync_mysql.conf,需要注意该配置文件需要UTF-8 无BOM格式,不然会报错。
4、编写配置文件
input {
jdbc {
# 索引类型
type => "product"
# 驱动包位置
jdbc_driver_library => "D:\ELk_SYNC_MYSQL\logstash-7.0.1\lib\mysql\mysql-connector-java-5.1.43.jar"
# 驱动
jdbc_driver_class => "com.mysql.jdbc.Driver"
# 数据库名称
jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/clothingsale?useUnicode=true&characterEncoding=UTF-8&useSSL=true"
# 用户名
jdbc_user => "root"
# 密码
jdbc_password => "root"
# SQL文件
# statement_filepath => "filename.sql"
# SQL语言 SELECT * FROM product WHERE update_time > :last_sql_value
statement => "SELECT * from product"
# 设置时区
jdbc_default_timezone => "Asia/Shanghai"
# 是否分页
jdbc_paging_enabled => "true"
# 分页数量
jdbc_page_size => "500"
# 追踪字段
tracking_column => "update_time"
# 这里如果是用时间追踪比如:数据的更新时间或创建时间等和时间有关的这里一定不能是true
use_column_value => false
# 设置监听间隔 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
schedule => "* * * * *"
}
jdbc {
# 索引类型
type => "message"
# 驱动包位置
jdbc_driver_library => "D:\ELk_SYNC_MYSQL\logstash-7.0.1\lib\mysql\mysql-connector-java-5.1.43.jar"
# 驱动
jdbc_driver_class => "com.mysql.jdbc.Driver"
# 数据库名称
jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/clothingsale?useUnicode=true&characterEncoding=UTF-8&useSSL=true"
# 用户名
jdbc_user => "root"
# 密码
jdbc_password => "root"
# SQL文件
# statement_filepath => "filename.sql"
# SQL语言 SELECT * FROM product WHERE update_time > :last_sql_value
statement => "SELECT * from message"
# 设置时区
jdbc_default_timezone => "Asia/Shanghai"
# 是否分页
jdbc_paging_enabled => "true"
# 分页数量
jdbc_page_size => "500"
# 追踪字段
tracking_column => "update_time"
# 这里如果是用时间追踪比如:数据的更新时间或创建时间等和时间有关的这里一定不能是true
use_column_value => false
# 设置监听间隔 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
schedule => "* * * * *"
}
}
# 修改@timestamp默认时间
filter {
ruby {
code => "event.set('timestamp', event.get('@timestamp').time.localtime + 8*60*60)"
}
ruby {
code => "event.set('@timestamp',event.get('timestamp'))"
}
mutate {
remove_field => ["timestamp"]
}
}
output {
# 目前使用的elasticsearch7.x,所以一个index只能存储一种type,所以以下的index需要不一样
if [type]=="product" {
elasticsearch {
hosts => "127.0.0.1:9200"
# 索引名称 相当于数据库名称
index => "cloproduct"
# 类型名称 相当于数据库中的数据表
document_type => "product"
document_id => "%{id}"
}
}
if [type]=="message" {
elasticsearch {
hosts => "127.0.0.1:9200"
# 索引名称 相当于数据库名称
index => "clomessage"
# 类型名称 相当于数据库中的数据表
document_type => "message"
document_id => "%{id}"
}
}
}
上述是多表同步,每行都有注释,意思比较明了,就是input中多表使用jdbc隔开,然后output中用type区分。
5、启动
logstash -f logstash_sync_mysql.conf >> C:\Users\Panlf\Desktop\log.txt
这样还能看到实时日志产生,方便查看错误和进程。文章来源:https://www.toymoban.com/news/detail-788651.html
注意
上述即可实现MySQL的数据同步,但是存在问题 - 时区问题,MySQL是时间比ElasticSearch晚8个小时,我试了各种方式还是不能解决把时间调整过来。目前可以在取数据的时候,进行时间调整,应该问题不是很大。文章来源地址https://www.toymoban.com/news/detail-788651.html
到了这里,关于Logstash同步MySQL数据到ElasticSearch的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!