Seatunnel-2.3.0源码解析

这篇具有很好参考价值的文章主要介绍了Seatunnel-2.3.0源码解析。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、概述

SeaTunnel是一个简单易用的数据集成框架,在企业中,由于开发时间或开发部门不通用,往往有多个异构的、运行在不同的软硬件平台上的信息系统同时运行。数据集成是把不同来源、格式、特点性质的数据在逻辑上或物理上有机地集中,从而为企业提供全面的数据共享。SeaTunnel支持海量数据的实时同步。它每天可以稳定高效地同步数百亿数据。

SeaTunnel适用于以下场景

SeaTunnel的特点

  • 海量数据的同步
  • 海量数据的集成
  • 海量数据的ETL
  • 海量数据聚合
  • 多源数据处理
  • 基于配置的低代码开发,易用性高,方便维护。
  • 支持实时流式传输
  • 离线多源数据分析
  • 高性能、海量数据处理能力
  • 模块化的插件架构,易于扩展
  • 支持用SQL进行数据操作和数据聚合
  • 支持Spark structured streaming
  • 支持Spark 2.x

二、工作原理

       以官方案例为例, 通过使用bin/start-seatunnel-flink.sh脚本来提交Flink任务,脚本内容如下:

set -eu
# resolve links - $0 may be a softlink
PRG="$0"

while [ -h "$PRG" ] ; do
  # shellcheck disable=SC2006
  ls=`ls -ld "$PRG"`
  # shellcheck disable=SC2006
  link=`expr "$ls" : '.*-> \(.*\)$'`
  if expr "$link" : '/.*' > /dev/null; then
    PRG="$link"
  else
    # shellcheck disable=SC2006
    PRG=`dirname "$PRG"`/"$link"
  fi
done

PRG_DIR=`dirname "$PRG"`
APP_DIR=`cd "$PRG_DIR/.." >/dev/null; pwd`
CONF_DIR=${APP_DIR}/config
APP_JAR=${APP_DIR}/starter/seatunnel-flink-13-starter.jar
APP_MAIN="org.apache.seatunnel.core.starter.flink.FlinkStarter"

if [ -f "${CONF_DIR}/seatunnel-env.sh" ]; then
    . "${CONF_DIR}/seatunnel-env.sh"
fi

if [ $# == 0 ]
then
    args="-h"
else
    args=$@
fi

set +u
# Log4j2 Config
if [ -e "${CONF_DIR}/log4j2.properties" ]; then
  JAVA_OPTS="${JAVA_OPTS} -Dlog4j2.configurationFile=${CONF_DIR}/log4j2.properties"
  JAVA_OPTS="${JAVA_OPTS} -Dseatunnel.logs.path=${APP_DIR}/logs"
  JAVA_OPTS="${JAVA_OPTS} -Dseatunnel.logs.file_name=seatunnel-flink-starter"
fi

CLASS_PATH=${APP_DIR}/starter/logging/*:${APP_JAR}

CMD=$(java ${JAVA_OPTS} -cp ${CLASS_PATH} ${APP_MAIN} ${args}) && EXIT_CODE=$? || EXIT_CODE=$?
if [ ${EXIT_CODE} -eq 234 ]; then
    # print usage
    echo "${CMD}"
    exit 0
elif [ ${EXIT_CODE} -eq 0 ]; then
    echo "Execute SeaTunnel Flink Job: $(echo "${CMD}" | tail -n 1)"
    eval $(echo "${CMD}" | tail -n 1)
else
    echo "${CMD}"
    exit ${EXIT_CODE}
fi                                                                       

         其中,比较重要的两条命令为:

APP_MAIN="org.apache.seatunnel.core.starter.flink.FlinkStarter" 

CMD=$(java ${JAVA_OPTS} -cp ${CLASS_PATH} ${APP_MAIN} ${args}) && EXIT_CODE=$? || EXIT_CODE=$?

         SeaTunnel通过脚本去执行了seatunnel-core-flink.jar并且入口类为org.apache.seatunnel.core.flink.FlinkStarter,我们接下来移步源码来看这个FlinkStarter类。

public class FlinkStarter implements Starter {

    private static final String APP_NAME = SeatunnelFlink.class.getName();
    public static final String APP_JAR_NAME = "seatunnel-flink-starter.jar";
    /**
     * SeaTunnel parameters, used by SeaTunnel application. e.g. `-c config.conf`
     */
    private final FlinkCommandArgs flinkCommandArgs;
    /**
     * SeaTunnel flink job jar.
     */
    private final String appJar;

    FlinkStarter(String[] args) {
        this.flinkCommandArgs = CommandLineUtils.parse(args, new FlinkCommandArgs(), StarterConstant.SHELL_NAME, true);
        // set the deployment mode, used to get the job jar path.
        Common.setDeployMode(flinkCommandArgs.getDeployMode());
        Common.setStarter(true);
        this.appJar = Common.appStarterDir().resolve(APP_JAR_NAME).toString();
    }

    @SuppressWarnings("checkstyle:RegexpSingleline")
    public static void main(String[] args) {
        FlinkStarter flinkStarter = new FlinkStarter(args);
        System.out.println(String.join(" ", flinkStarter.buildCommands()));
    }
    @Override
    public List<String> buildCommands() {
        List<String> command = new ArrayList<>();
        command.add("${FLINK_HOME}/bin/flink");
        command.add(flinkCommandArgs.getRunMode().getMode());
        command.addAll(flinkCommandArgs.getOriginalParameters());
        command.add("-c");
        command.add(APP_NAME);
        command.add(appJar);
        command.add("--config");
        command.add(flinkCommandArgs.getConfigFile());
        if (flinkCommandArgs.isCheckConfig()) {
            command.add("--check");
        }
        //set job name
        command.add("-Dpipeline.name=" + flinkCommandArgs.getJobName());
        // set System properties
        flinkCommandArgs.getVariables().stream()
                .filter(Objects::nonNull)
                .map(String::trim)
                .forEach(variable -> command.add("-D" + variable));
        return command;
    }
}

      在FlinkStarter类中,通过buildCommands()构建Flink提交任务命令。至此,可以大概推断出来SeaTunnel的执行逻辑,即将自己封装为一个Jar包提交Flink执行,在执行时,SeaTunnel根据用户编写的conf来装填对应的Source、Transform、Sink插件,最终将拼好任务后提交给Flink构建StreamGraph和JobGraph。

       FlinkStarter类中,将要执行的jar的main设置为SeatunnelFlink.class,该类主要完成flinkcommand构建和运行。

public class SeatunnelFlink {

    public static void main(String[] args) throws CommandException {
        FlinkCommandArgs flinkCommandArgs = CommandLineUtils.parse(args, new FlinkCommandArgs(), StarterConstant.SHELL_NAME, true);
        Command<FlinkCommandArgs> flinkCommand = new FlinkCommandBuilder()
            .buildCommand(flinkCommandArgs);
        Seatunnel.run(flinkCommand);
    }
}

        CommandLineUtils类中实现了所有实现AbstractCommandArgs接口的对象的参数解析

public static <T extends AbstractCommandArgs> T parse(String[] args, T obj, String programName, boolean acceptUnknownOptions) {
        JCommander jCommander = JCommander.newBuilder()
                .programName(programName)
                .addObject(obj)
                .acceptUnknownOptions(acceptUnknownOptions)
                .build();
        try {
            jCommander.parse(args);
            // The args is not belongs to SeaTunnel, add into engine original parameters
            obj.setOriginalParameters(jCommander.getUnknownOptions());
        } catch (ParameterException e) {
            System.err.println(e.getLocalizedMessage());
            exit(jCommander);
        }

        if (obj.isHelp()) {
            exit(jCommander);
        }
        return obj;
    }

        Seatunnel.run方法主要负责提交command执行,而command作为接口,包含七个实现类,其中flink相关的,包含 FlinkApiConfValidateCommand (校验配置参数是否规范)和FlinkApiTaskExecuteCommand (真正构建执行任务) 

    public static <T extends CommandArgs> void run(Command<T> command) throws CommandException {
        try {
            command.execute();
        } catch (ConfigRuntimeException e) {
            showConfigError(e);
            throw e;
        } catch (Exception e) {
            showFatalError(e);
            throw e;
        }
    }

Seatunnel-2.3.0源码解析,java,大数据

       FlinkApiConfValidateCommand类主要通过获取config配置信息,FlinkApiTaskExecuteCommand类主要负责任务执行。该类中通过构建FlinkExecution对象执行任务,FlinkExecution对象是TaskExecution接口的具体实现类。构造函数通过传入config,初始化对应jarpath、env,source、transform、sink。其中source、transform、sink对应为PluginExecuteProcessor 接口的对应实现类对象。 

public class FlinkApiTaskExecuteCommand implements Command<FlinkCommandArgs> {

    private final FlinkCommandArgs flinkCommandArgs;

    public FlinkApiTaskExecuteCommand(FlinkCommandArgs flinkCommandArgs) {
        this.flinkCommandArgs = flinkCommandArgs;
    }

    @Override
    public void execute() throws CommandExecuteException {
        Path configFile = FileUtils.getConfigPath(flinkCommandArgs);
        checkConfigExist(configFile);
        Config config = new ConfigBuilder(configFile).getConfig();
        FlinkExecution seaTunnelTaskExecution = new FlinkExecution(config);
        try {
            seaTunnelTaskExecution.execute();
        } catch (Exception e) {
            throw new CommandExecuteException("Flink job executed failed", e);
        }
    }
}

       FlinkApiConfValidateCommand类主要通过获取config配置信息,然后构建FlinkExecution对象,FlinkExecution对象是TaskExecution接口的具体实现类。构造函数通过传入config,初始化对应jarpath、env,source、transform、sink。其中source、transform、sink对应为PluginExecuteProcessor 接口的对应实现类对象。

private final FlinkEnvironment flinkEnvironment;
private final PluginExecuteProcessor sourcePluginExecuteProcessor;
private final PluginExecuteProcessor transformPluginExecuteProcessor;
private final PluginExecuteProcessor sinkPluginExecuteProcessor;
private final List<URL> jarPaths;

public FlinkExecution(Config config) {
      try {
          jarPaths = new ArrayList<>(Collections.singletonList(
              new File(Common.appStarterDir().resolve(FlinkStarter.APP_JAR_NAME).toString()).toURI().toURL()));
      } catch (MalformedURLException e) {
          throw new SeaTunnelException("load flink starter error.", e);
      }
      registerPlugin(config.getConfig("env"));
      JobContext jobContext = new JobContext();
      jobContext.setJobMode(FlinkEnvironmentFactory.getJobMode(config));

      this.sourcePluginExecuteProcessor = new SourceExecuteProcessor(jarPaths, config.getConfigList(Constants.SOURCE), jobContext);
      this.transformPluginExecuteProcessor = new TransformExecuteProcessor(jarPaths,
          TypesafeConfigUtils.getConfigList(config, Constants.TRANSFORM, Collections.emptyList()), jobContext);
      this.sinkPluginExecuteProcessor = new SinkExecuteProcessor(jarPaths, config.getConfigList(Constants.SINK), jobContext);

      this.flinkEnvironment = new FlinkEnvironmentFactory(this.registerPlugin(config, jarPaths)).getEnvironment();

      this.sourcePluginExecuteProcessor.setFlinkEnvironment(flinkEnvironment);
      this.transformPluginExecuteProcessor.setFlinkEnvironment(flinkEnvironment);
      this.sinkPluginExecuteProcessor.setFlinkEnvironment(flinkEnvironment);
}

        FlinkExecution对象构建完成后,执行execute方法,分别对sourcePluginExecuteProcessor、transformPluginExecuteProcessor、sinkPluginExecuteProcessor执行execute方法。其中,FlinkEnvironment和SparkEnvironment一样,是对Flink和Spark中执行上下文的封装。

@Override
public void execute() throws TaskExecuteException {
    List<DataStream<Row>> dataStreams = new ArrayList<>();
    dataStreams = sourcePluginExecuteProcessor.execute(dataStreams);
    dataStreams = transformPluginExecuteProcessor.execute(dataStreams);
    sinkPluginExecuteProcessor.execute(dataStreams);

    log.info("Flink Execution Plan:{}", flinkEnvironment.getStreamExecutionEnvironment().getExecutionPlan());
    try {
          flinkEnvironment.getStreamExecutionEnvironment().execute(flinkEnvironment.getJobName());
    } catch (Exception e) {
        throw new TaskExecuteException("Execute Flink job error", e);
    }
}

       sourcePluginExecuteProcessor中execute方法通过addsource获取带有Schema的DataStreamSource<Row>,最终返回DataStream<Row>的列表。

    @Override
    public List<DataStream<Row>> execute(List<DataStream<Row>> upstreamDataStreams) {
        StreamExecutionEnvironment executionEnvironment = flinkEnvironment.getStreamExecutionEnvironment();
        List<DataStream<Row>> sources = new ArrayList<>();
        for (int i = 0; i < plugins.size(); i++) {
            //插件方式,动态引入数据源
            SeaTunnelSource internalSource = plugins.get(i);
            BaseSeaTunnelSourceFunction sourceFunction;
            //检查数据源是否支持协同
            if (internalSource instanceof SupportCoordinate) {
                sourceFunction = new SeaTunnelCoordinatedSource(internalSource);
            } else {
                sourceFunction = new SeaTunnelParallelSource(internalSource);
            }
            DataStreamSource<Row> sourceStream = addSource(executionEnvironment,
                sourceFunction,
                "SeaTunnel " + internalSource.getClass().getSimpleName(),
                internalSource.getBoundedness() == org.apache.seatunnel.api.source.Boundedness.BOUNDED);
            Config pluginConfig = pluginConfigs.get(i);
            if (pluginConfig.hasPath(SourceCommonOptions.PARALLELISM.key())) {
                int parallelism = pluginConfig.getInt(SourceCommonOptions.PARALLELISM.key());
                sourceStream.setParallelism(parallelism);
            }
            // 将处理后的数据注册为表
            registerResultTable(pluginConfig, sourceStream);
            sources.add(sourceStream);
        }
        return sources;
    }

    private DataStreamSource<Row> addSource(
        final StreamExecutionEnvironment streamEnv,
        final BaseSeaTunnelSourceFunction function,
        final String sourceName,
        boolean bounded) {
        checkNotNull(function);
        checkNotNull(sourceName);
        checkNotNull(bounded);

        TypeInformation<Row> resolvedTypeInfo = function.getProducedType();
        boolean isParallel = function instanceof ParallelSourceFunction;
        streamEnv.clean(function);

        final StreamSource<Row, ?> sourceOperator = new StreamSource<>(function);
        return new DataStreamSource<>(streamEnv, resolvedTypeInfo, sourceOperator, isParallel, sourceName, bounded ? Boundedness.BOUNDED : Boundedness.CONTINUOUS_UNBOUNDED);
    }

       transformPluginExecuteProcessor中execute方法首先判断transform插件是否为空,为空则代表无transform操作,根据transform插件依次对数据进行转换,最后将结果写入result返回。

@Override
    public List<DataStream<Row>> execute(List<DataStream<Row>> upstreamDataStreams) throws TaskExecuteException {
        if (plugins.isEmpty()) {
            return upstreamDataStreams;
        }
        DataStream<Row> input = upstreamDataStreams.get(0);
        List<DataStream<Row>> result = new ArrayList<>();
        for (int i = 0; i < plugins.size(); i++) {
            try {
                FlinkStreamTransform transform = plugins.get(i);
                Config pluginConfig = pluginConfigs.get(i);
                DataStream<Row> stream = fromSourceTable(pluginConfig).orElse(input);
                input = transform.processStream(flinkEnvironment, stream);
                registerResultTable(pluginConfig, input);
                transform.registerFunction(flinkEnvironment);
                result.add(input);
            } catch (Exception e) {
                throw new TaskExecuteException(
                    String.format("SeaTunnel transform task: %s execute error", plugins.get(i).getPluginName()), e);
            }
        }
        return result;
    }

       SinkExecuteProcessor中execute方法同理,最终通过stream.sinkTO将结果写入对于sink端,同时根据sink端配置信息,设置是否并行。 

@Override
    public List<DataStream<Row>> execute(List<DataStream<Row>> upstreamDataStreams) throws TaskExecuteException {
        DataStream<Row> input = upstreamDataStreams.get(0);
        for (int i = 0; i < plugins.size(); i++) {
            Config sinkConfig = pluginConfigs.get(i);
            SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, Serializable> seaTunnelSink = plugins.get(i);
            DataStream<Row> stream = fromSourceTable(sinkConfig).orElse(input);
            seaTunnelSink.setTypeInfo((SeaTunnelRowType) TypeConverterUtils.convert(stream.getType()));
            DataStreamSink<Row> dataStreamSink = stream.sinkTo(new FlinkSink<>(seaTunnelSink)).name(seaTunnelSink.getPluginName());
            if (sinkConfig.hasPath(SourceCommonOptions.PARALLELISM.key())) {
                int parallelism = sinkConfig.getInt(SourceCommonOptions.PARALLELISM.key());
                dataStreamSink.setParallelism(parallelism);
            }
        }
        // the sink is the last stream
        return null;
}

三、源码补充

Flink DataSource

       在sourcePluginExecuteProcessor的execute方法中,创建了一个名为sourcefunction的BaseSeaTunnelFunction对象,该类继承至抽象类RichSourceFunction,并实现了CheckpointListener、CheckpointedFunction、ResultTypeQueryable<Row>三个接口。其中RichSourceFunction抽象类又继承至抽象类AbstractRichFunction,并实现SourcFunction接口。AbstractRichFunction的父类是抽象类RichFunction,该类是用户自定义function的基类   。     

        sourcePluginExecuteProcessor的execute中通过addSource方法将一个StreamFunction封装为StreamSource,并最终返回一个DataStreamSource对象,DataStreamSource表示DataStream的起点,是SingleOutputStreamOperator的子类,超级父类为DataStream,所有的数据源都会被包装为DataStreamSource。通过addSource方法添加具有自定义类型信息的数据源到计算任务中。用户自定义数据源则通过实现SourceFunction接口,该接口供两个方法:

run 执行方法,实现读取数据的实际操作
cancel 取消函数,用于取消/关闭连接使用

       SourceFunction定义的数据为非并行的,实现ParallelSourceFunction接口或继承RichParallelSourceFunction来定义并行的source。SeaTunnel中SeaTunnelParallelSource实现ParallelSourceFunction接口提供并行数据源。

seatunnel Source

Seatunnel-2.3.0源码解析,java,大数据

 seatunnel ExecuteProcessor

 Seatunnel-2.3.0源码解析,java,大数据

 文章来源地址https://www.toymoban.com/news/detail-521775.html

到了这里,关于Seatunnel-2.3.0源码解析的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Seatunnel 2.1.3 源码打包、编译运行

    执行报错: [ERROR] Unknown lifecycle phase \\\".skip\\\". You must specify a valid lifecycle phase or a goal in the format plugin-prefix:goal or plugin-group-id:plugin-artifact-id[:plugin-vers ion]:goal. 解决: 1、 PowerShell 窗口下,执行带参数的需要’单引号’包起来才可以 命令改为: 2、不要使用PowerShell命令行模式, 进

    2024年02月12日
    浏览(27)
  • Apache Seatunnel本地源码构建编译运行调试

    Apache Seatunnel本地源码构建编译运行调试   本文使用的是windows10-64位专业版的电脑,需要安装环境如下   jdk=1.8 - 64 位的jdk、   使用的是idea自带的maven,最好是安装一个方便源码编译构建,使用idea自带的maven无法执行mvnw,但是可以复制mvnw后面的在idea的maven中的run mave

    2024年01月16日
    浏览(25)
  • 使用 Apache SeaTunnel 实现 Kafka Source 解析复杂Json 案例

    版本说明: SeaTunnel:apache-seatunnel-2.3.2-SNAPHOT 引擎说明: Flink:1.16.2 Zeta:官方自带 近些时间,我们正好接手一个数据集成项目,数据上游方是给我们投递到Kafka,我们一开始的技术选型是SpringBoot+Flink对上游数据进行加工处理(下文简称:方案一),由于测试不到位,后来到

    2024年02月17日
    浏览(26)
  • 【源码编译】Apache SeaTunnel-Web 适配最新2.3.4版本教程

    Apache SeaTunnel新版本已经发布,感兴趣的小伙伴可以看之前版本发布的文章 本文主要给大家介绍为使用2.3.4版本的新特性,需要对Apache SeaTunnel-Web依赖的版本进行升级,而SeaTunnel2.3.4版本部分API跟之前 版本不兼容 ,所以需要对 SeaTunnel-Web的 源码进行修改 适配。 克隆SeaYunnel-We

    2024年04月14日
    浏览(25)
  • java源码-List源码解析

    Java中的List是一个接口,它定义了一组操作列表的方法。List接口的常见子类包括ArrayList、LinkedList和Vector等。 以下是Java中List接口及其常见方法的源码解析: 1. List接口定义 ``` public interface ListE extends CollectionE {     // 返回列表中元素的数量     int size();          // 返回列表

    2024年02月15日
    浏览(24)
  • Java源码-servlet源码解析

    Servlet是运行在Web服务器上的Java组件,用于处理客户端请求并生成响应。下面将介绍Servlet的源码解析。 Servlet接口源码解析 Servlet接口是所有Servlet类必须实现的接口。该接口定义了Servlet生命周期方法和服务方法。 init方法初始化Servlet,service方法处理请求并生成响应。destroy方

    2024年02月13日
    浏览(27)
  • 【Java 】从源码全面解析Java 线程池

    线程池技术在互联网技术使用如此广泛,几乎所有的后端技术面试官都要在线程池技术的使用和原理方面对小伙伴们进行 360° 的刁难。 作为一个在互联网公司面一次拿一次 Offer 的面霸,打败了无数竞争对手,每次都只能看到无数落寞的身影失望的离开,略感愧疚(请允许我

    2024年02月03日
    浏览(33)
  • 【JAVA】CyclicBarrier源码解析以及示例

    前言 在多线程编程中,同步工具是确保线程之间协同工作的重要组成部分。 CyclicBarrier (循环屏障)是Java中的一个强大的同步工具,它允许一组线程在达到某个共同点之前互相等待。 在本文中,我们将深入探讨 CyclicBarrier 的源码实现以及提供一些示例,以帮助您更好地理解

    2024年02月04日
    浏览(26)
  • 【大数据】什么是数据集成?(SeaTunnel 集成工具介绍)

    数据集成是指将来自不同数据源的数据整合到一起形成一个统一的数据集 。这个过程包括从不同的数据源中收集数据,对数据进行清洗、转换、重构和整合,以便能够在一个统一的数据仓库或数据湖中进行存储和管理。 数据集成可以帮助企业更好地理解和利用他们的数据,

    2024年02月08日
    浏览(32)
  • 从源码全面解析 Java SPI 的来龙去脉

    👏作者简介:大家好,我是爱敲代码的小黄,独角兽企业的Java开发工程师,CSDN博客专家,阿里云专家博主 📕系列专栏:Java设计模式、Spring源码系列、Netty源码系列、Kafka源码系列、JUC源码系列、duubo源码系列 🔥如果感觉博主的文章还不错的话,请👍三连支持👍一下博主哦

    2024年02月12日
    浏览(30)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包