采用seatunnel提交Flink和Spark任务

这篇具有很好参考价值的文章主要介绍了采用seatunnel提交Flink和Spark任务。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1、seatunnel简单介绍

seatunnel 是一个非常易用,高性能、支持实时流式和离线批处理的海量数据处理产品,架构于Apache Spark 和 Apache Flink之上。

seatunnel 让Spark和Flink的使用更简单,更高效。

注:当前版本用的是2.1.3版本  如果在github下载自己编译有问题 可在此地址下载编译好的文件seatunnel-2.1.3-bin包

特性

  • 简单易用,灵活配置,无需开发
  • 模块化和插件化,易于扩展
  • 支持利用SQL做数据处理和聚合

集成Spark和Flink官方教程

集成Spark教程

集成Flink教程​​​​​​

2、提交Spark任务

参考官方文档:https://interestinglab.github.io/seatunnel-docs/#/zh-cn/v2/spark/quick-start

案例:实现Hive表导出到ClickHouse

创建测试文件

vim  /usr/local/apache-seatunnel-incubating-2.1.3/config/hive-console.conf

ClickHouse建表

#配置Spark参数
spark {  spark.sql.catalogImplementation = "hive"
  spark.app.name = "hive2clickhouse"
  spark.executor.instances = 30
  spark.executor.cores = 1 
  spark.executor.memory = "2g"
  spark.ui.port = 13000
}


input {
    hive {
        pre_sql = "select id,name,create_time from table"
        table_name = "table_tmp"
    }
}


filter {
    convert {
        source_field = "data_source"
        new_type = "UInt8"
    }

    org.interestinglab.waterdrop.filter.Slice {
        source_table_name = "table_tmp"
        source_field = "id"
        slice_num = 2
        slice_code = 0
        result_table_name = "table_8123"
    }
    org.interestinglab.waterdrop.filter.Slice {
        source_table_name = "table_tmp"
        source_field = "id"
        slice_num = 2
        slice_code = 1
        result_table_name = "table_8124"
    }
}


output {
    clickhouse {
        source_table_name="table_8123"
        host = "ip1:8123"
        database = "db_name"
        username="username"
        password="xxxxx"
        table = "model_score_local"
        fields = ["id","name","create_time"]
            clickhouse.socket_timeout = 50000
            retry_codes = [209, 210]
            retry = 3
            bulk_size = 500000
    }
    clickhouse {
        source_table_name="table_8124"
        host = "ip2:8123"
        database = "db_name"
        username="username"
        password="xxxxx"
        table = "model_score_local"
        fields = ["id","name","create_time"]
            clickhouse.socket_timeout = 50000
            retry_codes = [209, 210]
            retry = 3
            bulk_size = 500000
    }
}

运行seatunnel将数据写入ClickHouse

/bin/start-seatunnel-spark.sh --master local --deploy-mode client --config/hive-console.conf

3、提交Flink任务

参考文档:Document

案例1:Kafka到Kafka的数据同步

业务场景:对test_csv主题中的数据进行过滤,仅保留年龄在18岁以上的记录

env {
  # You can set flink configuration here
  execution.parallelism = 1
  #execution.checkpoint.interval = 10000
  #execution.checkpoint.data-uri = "hdfs://hadoop102:9092/checkpoint"
}

# 在source所属的块中配置数据源
source {
    KafkaTableStream {
        consumer.bootstrap.servers = "node1:9092"
        consumer.group.id = "seatunnel-learn"
        topics = test_csv
        result_table_name = test
        format.type = csv
        schema = "[{\"field\":\"name\",\"type\":\"string\"},{\"field\":\"age\", \"type\": \"int\"}]"
        format.field-delimiter = ";"
        format.allow-comments = "true"
        format.ignore-parse-errors = "true"
    }
}
# 在transform的块中声明转换插件
transform {

  sql {
    sql = "select name,age from test  where age > '"${age}"'"
  }
}
# 在sink块中声明要输出到哪
sink {
   kafkaTable {
    topics = "test_sink"
    producer.bootstrap.servers = "node1:9092"
        }
}

启动Flink任务

bin/start-seatunnel-flink.sh --config config/kafka_kafka.conf -i age=18

案例2:Kafka 输出到Doris进行指标统计

业务场景:使用回话日志统计用户的总观看视频数,用户最常会话市场,用户最小会话时长,用户最后一次会话时间

Doris初始化操作

create database test_db;
CREATE TABLE `example_user_video` (
  `user_id` largeint(40) NOT NULL COMMENT "用户id",
  `city` varchar(20) NOT NULL COMMENT "用户所在城市",
  `age` smallint(6) NULL COMMENT "用户年龄",
  `video_sum` bigint(20) SUM NULL DEFAULT "0" COMMENT "总观看视频数",
  `max_duration_time` int(11) MAX NULL DEFAULT "0" COMMENT "用户最长会话时长",
  `min_duration_time` int(11) MIN NULL DEFAULT "999999999" COMMENT "用户最小会话时长",
  `last_session_date` datetime REPLACE NULL DEFAULT "1970-01-01 00:00:00" COMMENT "用户最后一次会话时间"
) ENGINE=OLAP
AGGREGATE KEY(`user_id`, `city`, `age`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`user_id`) BUCKETS 16
;   

seatunnel任务配置:

env {
	execution.parallelism = 1
}

source {
    KafkaTableStream {
	    consumer.bootstrap.servers = "node1:9092"
	    consumer.group.id = "seatunnel5"
	    topics = test
	    result_table_name = test
	    format.type = json
	    schema = "{\"session_id\":\"string\",\"video_count\":\"int\",\"duration_time\":\"long\",\"user_id\":\"string\",\"user_age\":\"int\",\"city\":\"string\",\"session_start_time\":\"datetime\",\"session_end_time\":\"datetime\"}"
	    format.ignore-parse-errors = "true"
	}
}

transform{
	sql {
		sql = "select user_id,city,user_age as age,video_count as video_sum,duration_time as max_duration_time,duration_time as min_duration_time,session_end_time as last_session_date from test"
		result_table_name = test2
	}
}

sink{
	DorisSink {
		source_table_name = test2
    	fenodes = "node1:8030"
    	database = test_db
    	table = example_user_video
    	user = atguigu
    	password = 123321
    	batch_size = 50
    	doris.column_separator="\t"
    	doris.columns="user_id,city,age,video_sum,max_duration_time,min_duration_time,last_session_date"
	}
}

启动flink任务文章来源地址https://www.toymoban.com/news/detail-605652.html

bin/start-seatunnel-flink.sh --config config/kafka_doris.conf

到了这里,关于采用seatunnel提交Flink和Spark任务的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 大数据_面试_ETL组件常见问题_spark&flink

    问题列表 回答 spark与flink的主要区别 flink cdc如何确保幂等与一致性 Flink SQL CDC 实践以及一致性分析-阿里云开发者社区 spark 3.0 AQE动态优化 hbase memorystore blockcache sparksql如何调优 通过webui定位那个表以及jobid,jobid找对应的执行计划 hdfs的常见的压缩算法 hbase的数据倾斜 spark数据处

    2024年02月16日
    浏览(45)
  • 实时大数据流处理技术:Spark Streaming与Flink的深度对比

    引言 在当前的大数据时代,企业和组织越来越多地依赖于实时数据流处理技术来洞察和响应业务事件。实时数据流处理不仅能够加快数据分析的速度,还能提高决策的效率和准确性。Apache Spark Streaming和Apache Flink是目前两个主要的实时数据流处理框架,它们各自拥有独特的特

    2024年03月10日
    浏览(62)
  • 【Flink】详解Flink任务提交流程

    通常我们会使用 bin/flink run -t yarn-per-job -c com.xxx.xxx.WordCount/WordCount.jar 方式启动任务;我们看一下 flink 文件中到底做了什么,以下是其部分源码 可以看到,第一步将相对地址转换成绝对地址;第二步获取 Flink 配置信息,这个信息放在 bin 目录下的 config. sh 中;第三步获取 JV

    2024年02月14日
    浏览(44)
  • 大数据系统常用组件理解(Hadoop/hive/kafka/Flink/Spark/Hbase/ES)

    一.Hadoop Hadoop是一个由Apache基金会所开发的分布式系统基础架构。 Hadoop 以一种可靠、高效、可伸缩的方式进行数据处理。 Hadoop的核心是yarn、HDFS和Mapreduce。yarn是资源管理系统,实现资源调度,yarn是Hadoop2.0中的资源管理系统,总体上是master/slave结构。对于yarn可以粗浅将其理解

    2024年02月20日
    浏览(46)
  • 数据存储和分布式计算的实际应用:如何使用Spark和Flink进行数据处理和分析

    作为一名人工智能专家,程序员和软件架构师,我经常涉及到数据处理和分析。在当前大数据和云计算的时代,分布式计算已经成为了一个重要的技术方向。Spark和Flink是当前比较流行的分布式计算框架,它们提供了强大的分布式计算和数据分析功能,为数据处理和分析提供了

    2024年02月16日
    浏览(62)
  • JAVA代码实现Spark任务的提交

    Spark的任务提交可以通过在Spark客户端上调用shell脚本将spark任务提交到yarn上执行。 在某些场景下,无法直接去调用shell脚本或者需要监控任务的执行结果情况。这样的话,尝试通过JAVA语言、SparkLauncher实现Spark任务的提交和执行结果的获取。 以下的例子以Spark On Yarn的模式来设

    2024年02月16日
    浏览(40)
  • 问题:Spark SQL 读不到 Flink 写入 Hudi 表的新数据,打开新 Session 才可见

    博主历时三年精心创作的《大数据平台架构与原型实现:数据中台建设实战》一书现已由知名IT图书品牌电子工业出版社博文视点出版发行,点击《重磅推荐:建大数据平台太难了!给我发个工程原型吧!》了解图书详情,京东购书链接:https://item.jd.com/12677623.html,扫描左侧

    2024年02月22日
    浏览(55)
  • Spark---Master启动及Submit任务提交

    Spark集群启动之后,首先调用$SPARK_HOME/sbin/start-all.sh,start-all.sh脚本中调用了“start-master.sh”脚本和“start-slaves.sh”脚本,在start-master.sh脚本中可以看到启动Master角色的主类:“org.apache.spark.deploy.master.Master”。在对应的start-slaves.sh脚本中又调用了start-slave.sh脚本,在star-slave.

    2024年01月20日
    浏览(47)
  • SparkLaunch提交Spark任务到Yarn集群

    通过Spark-submit 提交任务 通过Yarn REST Api提交Spark任务 通过Spark Client Api 的方式提交任务 通过SparkLaunch 自带API提交任务 基于Livy的方式提交任务,可参考我的另一篇文章 Apache Livy 安装部署使用示例 上面的几种方式提交任务各自有对应的优缺点,不再进行赘述,下面要介绍的是通

    2024年02月01日
    浏览(38)
  • 大数据流处理与实时分析:Spark Streaming和Flink Stream SQL的对比与选择

    作者:禅与计算机程序设计艺术

    2024年02月07日
    浏览(45)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包