- 编写logstash数据转换的配置文件
export-csv.yml
需要根据es中数据和导出的原始数据格式对应的clickhouse中字段类型对数据的要求在filter中对数据的处理input { elasticsearch { hosts => "localhost:9200" index => "test" } } # 通过filter对数据做ETL以符合clickhouse要求 filter { # 将long类型毫秒值转为指定时间格式 ruby { code => "event.set('timestamp',Time.at((event.get('timestamp').to_i)/1000).strftime('%Y-%m-%d %H:%M:%S'))" } # 将指定字段类型转为符合clickhouse要求的,需要根据es中数据和导出的原始数据格式对应的clickhouse中字段类型决定 mutate { convert => { "dstType" => "integer" "downStreamOct" => "integer" "totalOct" => "integer" "upStreamOct" => "integer" "srcType" => "integer" } } } output { csv { fields => ["srcRegion","dstType","downStreamOct","totalOct","upStreamOct","srcType","timestamp"] path => "/tmp/test.csv" } }
- 启动logstash,
-f
指定配置文件不使用logstash默认配置文件路径./logstash-6.8.6/bin/logstash -f test.yml
- clickhouse中创建表,字段类型需要注意根据es中字段类型和es导出的数据格式决定,字段类型决定了可以写入clickhouse的该字段数据的格式
CREATE TABLE traffic.traffic_monitor ( -- 指定字段类型 `srcRegion` String, `dstType` UInt8, `downStreamOct` UInt64, `totalOct` UInt64, `upStreamOct` UInt64, `srcType` UInt8, `timestamp` DateTime, ) -- 指定存储引擎、分区字段和排序字段 ENGINE = MergeTree() PARTITION BY toYYYYMMDD(timestamp) ORDER BY (timestamp)
- 通过csv文件将数据导入clickhouse
csv首行有字段名称:clickhouse-client --query "INSERT INTO traffic.traffic_monitor FORMAT CSVWithNames" < test.csv
csv首行无字段名称:clickhouse-client --query "INSERT INTO traffic.traffic_monitor FORMAT CSV" < test.csv
文章来源地址https://www.toymoban.com/news/detail-538883.html
文章来源:https://www.toymoban.com/news/detail-538883.html
到了这里,关于通过logstash(6.8.6)将es(6.8.6)数据导入clickhouse的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!