seatunnel(2.1.3)调用spark-sql(2.4)、flink-sql(1.14)对结构化数据进行处理;能够通过配置,在一个任务里调度多个source和sink
一、为spark structured streaming任务添加对ES7的支持
在seatunnel源码里升级elasticsearch-spark组件,添加spark-catalyst的依赖后,重新打包
<properties>
<elasticsearch7.client.version>7.16.3</elasticsearch7.client.version>
<elasticsearch-spark.version>7.16.3</elasticsearch-spark.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-catalyst_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>${spark.scope}</scope>
</dependency>
</dependencies>
</dependencyManagement>
二、配置任务
1、配置env
指定spark任务启动参数
env {
spark.app.name = "seatunnel"
spark.yarn.am.memory = "2g"
spark.executor.memory = "3g"
spark.executor.instances = 3
spark.streaming.batchDuration = 2
}
2、配置source
定义临时动态表,kafka consumer官方配置参数都添加在consumer.XXX
source {
kafkaStream {
topics = "monitor-log"
consumer.bootstrap.servers = "10.0.0.1:9092,10.0.0.2:9092"
consumer.group.id = "seatunnel"
consumer.auto.offset.reset = "latest"
consumer.heartbeat.interval.ms = 2000
consumer.session.timeout.ms = 6000
consumer.reconnect.backoff.ms = 2000
consumer.request.timeout.ms = 30000
consumer.fetch.min.bytes = 10485760
consumer.fetch.max.wait.ms = 1000
consumer.max.partition.fetch.bytes = 20971520
consumer.max.poll.interval.ms = 30000
consumer.max.poll.records = 1000
# 指定临时动态表的名字
result_table_name = "monitor_log_kafka"
}
}
3、配置transform
把数据从一个临时表转换到另一个临时表文章来源:https://www.toymoban.com/news/detail-542421.html
transform {
json {
source_table_name = "monitor_log_kafka"
# kafka动态数据表里有topic和raw_message两个字段
source_field = "raw_message"
# 文件放在plugins/json/files/schemas目录下
# 如果采集到的数据,和样例数据里的字段格式不一致,则转化为样例数据里的格式
schema_file = "monitor-log.json"
result_table_name = "monitor_log"
# target_field为root,会舍弃原有的raw_message、topic字段;
# 要保留raw_message,需指定一个非root的target_field,来存储json反序列化结果
target_field = "__root__"
}
sql {
# sql里可直接引用前面定义的各个临时表
sql = "SELECT * FROM monitor_log WHERE app is not null AND type = 'svc_link'"
result_table_name = "svc_link"
}
sql {
sql = "SELECT * FROM monitor_log WHERE app is not null AND type <> 'svc_link'"
result_table_name = "app_log"
}
}
4、配置sink
把数据从临时表存入目标数据库文章来源地址https://www.toymoban.com/news/detail-542421.html
sink {
elasticsearch {
source_table_name = "svc_link"
hosts = ["10.0.1.1:9200", "10.0.1.2:9200"]
index = "wzp_svc_link-${now}"
index_time_format = "yyyy.MM.dd"
es.net.ssl = false
es.net.http.auth.user = "elastic"
es.net.http.auth.pass = "elastic"
es.batch.size.bytes = "100mb"
es.batch.size.entries = 10000
es.batch.write.refresh = false
es.write.rest.error.handlers = "log"
es.write.rest.error.handler.log.logger.name = "BulkErrors"
es.write.rest.error.handler.log.logger.level = "WARN"
}
elasticsearch {
source_table_name = "app_log"
# 存到另一个ES库或index
}
}
三、启动任务
export JAVA_HOME=/etc/alternatives/jre
export HADOOP_HOME=/apps/svr/hadoop-2.9.2
export SPARK_HOME=/apps/svr/spark-2.4.8-bin-hadoop2.7
cd /apps/svr/apache-seatunnel-incubating-2.1.3
./bin/start-seatunnel-spark.sh --master yarn --deploy-mode cluster --config ./config/wzp.spark.streaming.conf
到了这里,关于用seatunnel替代logstash,把数据从kafka抽取到ES的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!