用seatunnel替代logstash,把数据从kafka抽取到ES

这篇具有很好参考价值的文章主要介绍了用seatunnel替代logstash,把数据从kafka抽取到ES。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

seatunnel(2.1.3)调用spark-sql(2.4)、flink-sql(1.14)对结构化数据进行处理;能够通过配置,在一个任务里调度多个source和sink

一、为spark structured streaming任务添加对ES7的支持

在seatunnel源码里升级elasticsearch-spark组件,添加spark-catalyst的依赖后,重新打包

    <properties>
        <elasticsearch7.client.version>7.16.3</elasticsearch7.client.version>
        <elasticsearch-spark.version>7.16.3</elasticsearch-spark.version>
    </properties>
    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-catalyst_${scala.binary.version}</artifactId>
                <version>${spark.version}</version>
                <scope>${spark.scope}</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

二、配置任务

1、配置env

指定spark任务启动参数

env {
  spark.app.name = "seatunnel"
  spark.yarn.am.memory = "2g"
  spark.executor.memory = "3g"
  spark.executor.instances = 3
  spark.streaming.batchDuration = 2
}

2、配置source

定义临时动态表,kafka consumer官方配置参数都添加在consumer.XXX

source {
    kafkaStream {        
        topics = "monitor-log"
        consumer.bootstrap.servers = "10.0.0.1:9092,10.0.0.2:9092"
        consumer.group.id = "seatunnel"
        consumer.auto.offset.reset = "latest"
        consumer.heartbeat.interval.ms = 2000
        consumer.session.timeout.ms = 6000
        consumer.reconnect.backoff.ms = 2000
        consumer.request.timeout.ms = 30000
        consumer.fetch.min.bytes = 10485760
        consumer.fetch.max.wait.ms = 1000
        consumer.max.partition.fetch.bytes = 20971520
        consumer.max.poll.interval.ms = 30000
        consumer.max.poll.records = 1000
        # 指定临时动态表的名字
        result_table_name = "monitor_log_kafka"
    }
}

3、配置transform

把数据从一个临时表转换到另一个临时表

transform {
    json {
        source_table_name = "monitor_log_kafka"
        # kafka动态数据表里有topic和raw_message两个字段
        source_field = "raw_message"

        # 文件放在plugins/json/files/schemas目录下
        # 如果采集到的数据,和样例数据里的字段格式不一致,则转化为样例数据里的格式
        schema_file = "monitor-log.json"

        result_table_name = "monitor_log"
        # target_field为root,会舍弃原有的raw_message、topic字段;
        # 要保留raw_message,需指定一个非root的target_field,来存储json反序列化结果
        target_field = "__root__"
    }
    sql {
        # sql里可直接引用前面定义的各个临时表
        sql = "SELECT * FROM monitor_log WHERE app is not null AND type = 'svc_link'"
        result_table_name = "svc_link"
    }
    sql {
        sql = "SELECT * FROM monitor_log WHERE app is not null AND type <> 'svc_link'"
        result_table_name = "app_log"
    }
}

4、配置sink

把数据从临时表存入目标数据库文章来源地址https://www.toymoban.com/news/detail-542421.html

sink {
    elasticsearch {
        source_table_name = "svc_link"

        hosts = ["10.0.1.1:9200", "10.0.1.2:9200"]
        index = "wzp_svc_link-${now}"
        index_time_format = "yyyy.MM.dd"
        es.net.ssl = false
        es.net.http.auth.user = "elastic"
        es.net.http.auth.pass = "elastic"
        es.batch.size.bytes = "100mb"
        es.batch.size.entries = 10000
        es.batch.write.refresh = false
        es.write.rest.error.handlers = "log" 
        es.write.rest.error.handler.log.logger.name = "BulkErrors"
        es.write.rest.error.handler.log.logger.level = "WARN"
    }
    elasticsearch {
        source_table_name = "app_log"
        # 存到另一个ES库或index
    }
}

三、启动任务

export JAVA_HOME=/etc/alternatives/jre
export HADOOP_HOME=/apps/svr/hadoop-2.9.2
export SPARK_HOME=/apps/svr/spark-2.4.8-bin-hadoop2.7

cd /apps/svr/apache-seatunnel-incubating-2.1.3
./bin/start-seatunnel-spark.sh --master yarn --deploy-mode cluster --config ./config/wzp.spark.streaming.conf

到了这里,关于用seatunnel替代logstash,把数据从kafka抽取到ES的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • SeaTunnel本地运行以及kafka发送到redis说明

    Seatunnel2.3.1源码 Idea中的目录结构 通过maven进行代码编译 编译命令 mvn  clean package -pl seatunnel-dist -am -Dmaven.test.skip=true 编译单个模块命令 mvn  clean package -pl seatunnel-examples/seatunnel-engine-examples -am -Dmaven.test.skip=true -T 1C 编译完通过 SeaTunnelEngineExample 类来运行 这样就运行成功啦 附上

    2024年02月15日
    浏览(51)
  • 使用 Apache SeaTunnel 实现 Kafka Source 解析复杂Json 案例

    版本说明: SeaTunnel:apache-seatunnel-2.3.2-SNAPHOT 引擎说明: Flink:1.16.2 Zeta:官方自带 近些时间,我们正好接手一个数据集成项目,数据上游方是给我们投递到Kafka,我们一开始的技术选型是SpringBoot+Flink对上游数据进行加工处理(下文简称:方案一),由于测试不到位,后来到

    2024年02月17日
    浏览(34)
  • Logstash输入Kafka输出Es配置

    Logstash是一个开源的数据收集引擎,具有实时管道功能。它可以从各种数据源中动态地统一和标准化数据,并将其发送到你选择的目的地。 Logstash 的早期目标主要是用于收集日志,但现在的功能已经远远超出这个范围。任何事件类型都可以通过Logstash进行分析,通过输入、过

    2024年02月03日
    浏览(45)
  • ELK集群 日志中心集群、kafka、logstash

    ES:用来日志存储 Logstash:用来日志的搜集,进行日志格式转换并且传送给别人(转发) Kibana:主要用于日志的展示和分析 kafka Filebeat:搜集文件数据 es-1 本地解析 vi /etc/hosts scp /etc/hosts es-2:/etc/hosts scp /etc/hosts es-3:/etc/hosts  yum -y install wget 安装配置jdk wget 8u191 scp -3 tar xf jdk-8u19

    2024年02月07日
    浏览(39)
  • 利用logstash将graylog日志传输到kafka中

    在System-outputs,选择GELF Output,填写如下内容,其它选项默认 在要输出的Stream中,选择Manage Outputs 选择GELF Output,右边选择刚才创建好的test。 下载logstash,最新版就可以。 上传到服务器,编写test.conf配置文件,内容如下 运行logstash,输入以下命令 前提:安装好kafka集群, 创建

    2024年02月13日
    浏览(33)
  • 使用Filebeat+Kafka+Logstash+Elasticsearch构建日志分析系统

            随着时间的积累,日志数据会越来越多,当您需要查看并分析庞杂的日志数据时,可通过Filebeat+Kafka+Logstash+Elasticsearch采集日志数据到Elasticsearch中,并通过Kibana进行可视化展示与分析。本文介绍具体的实现方法。 Kafka是一种分布式、高吞吐、可扩展的消息队列服务,

    2024年02月04日
    浏览(47)
  • EFLK日志平台(filebeat-->kafka-->logstash-->es-->kiabana)

    ELK平台是一套完整的日志集中处理解决方案,将 ElasticSearch、Logstash 和 Kiabana 三个开源工具配合使用, 完成更强大的用户对日志的查询、排序、统计需求。 1. es安装(单节点部署) 前提 安装 2.es web 查看 浏览器访问 http://esIP:9200 或者本地访问curl ‘http://localhost:9200/?pretty’ 安

    2024年02月10日
    浏览(38)
  • 利用logstash/filebeat/插件,将graylog日志传输到kafka中

    在System-outputs,选择GELF Output,填写如下内容,其它选项默认 在要输出的Stream中,选择Manage Outputs 选择GELF Output,右边选择刚才创建好的test。 下载logstash,最新版就可以。 上传到服务器,编写test.conf配置文件,内容如下 运行logstash,输入以下命令 前提:安装好kafka集群, 创建

    2024年02月13日
    浏览(43)
  • filebeat+kafka+logstash+elasticsearch+kibana实现日志收集解决方案

    前言:我们使用nginx来模拟产生日志的服务,通过filebeat收集,交给kafka进行消息队列,再用logstash消费kafka集群中的数据,交给elasticsearch+kibana监控 服务器环境: 192.168.2.1:elasticsearch 192.168.2.2:filebeat+nginx 192.168.2.3:kafka 192.168.2.4:logstash elasticseatch+filebeat+kafka+logsstash(6.60)清

    2024年02月03日
    浏览(58)
  • Elasticsearch:使用 Logstash 构建从 Kafka 到 Elasticsearch 的管道 - Nodejs

    在我之前的文章 “Elastic:使用 Kafka 部署 Elastic Stack”,我构建了从 Beats = Kafka = Logstash = Elasticsearch 的管道。在今天的文章中,我将描述从 Nodejs = Kafka = Logstash = Elasticsearch 这样的一个数据流。在之前的文章 “Elastic:Data pipeline:使用 Kafka => Logstash => Elasticsearch” 中,我也

    2023年04月08日
    浏览(80)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包