【源码编译】Apache SeaTunnel-Web 适配最新2.3.4版本教程

这篇具有很好参考价值的文章主要介绍了【源码编译】Apache SeaTunnel-Web 适配最新2.3.4版本教程。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Apache SeaTunnel新版本已经发布,感兴趣的小伙伴可以看之前版本发布的文章

seatunnel web database type,大数据

本文主要给大家介绍为使用2.3.4版本的新特性,需要对Apache SeaTunnel-Web依赖的版本进行升级,而SeaTunnel2.3.4版本部分API跟之前版本不兼容,所以需要对 SeaTunnel-Web的源码进行修改适配。

源码修改编译

克隆SeaYunnel-Web源码到本地

  git  clone https://github.com/apache/seatunnel-web.git

在idea中打开项目

升级Pom中的SeaTunnel版本到2.3.4并重新导入依赖

  <seatunnel-framework.version>2.3.3</seatunnel-framework.version>
  改为
  <seatunnel-framework.version>2.3.4</seatunnel-framework.version>

因为大部分用户使用SeaTunnel Web都是基于SeaTunnel-2.3.3 版本做的适配,而最新发布的SeaTunnel2.3.4 部分API发生了改动导致直接升级的过程中会出现API不兼容的问题,所以本篇文章重点来了:我们需要对调用SeaTunnel API的SeaTunnel Web源码部分进行修改,修改完之后,我们就能完全适配2.3.4最新版本。

社区推出了2.3.X及Web系列专属的社群,感兴趣的小伙伴可以加社区小助手进群。

org.apache.dolphinscheduler.api.dto.seatunnel.bean.engine.EngineDataType

public static class SeaTunnelDataTypeConvertor
        implements DataTypeConvertor<SeaTunnelDataType<?>> {

    @Override
    public SeaTunnelDataType<?> toSeaTunnelType(String engineDataType) {
        return DATA_TYPE_MAP.get(engineDataType.toLowerCase(Locale.ROOT)).getRawType();
    }

    @Override
    public SeaTunnelDataType<?> toSeaTunnelType(
            SeaTunnelDataType<?> seaTunnelDataType, Map<String, Object> map)
            throws DataTypeConvertException {
        return seaTunnelDataType;
    }

    @Override
    public SeaTunnelDataType<?> toConnectorType(
            SeaTunnelDataType<?> seaTunnelDataType, Map<String, Object> map)
            throws DataTypeConvertException {
        return seaTunnelDataType;
    }

    @Override
    public String getIdentity() {
        return "EngineDataTypeConvertor";
    }
}
// 改为
public static class SeaTunnelDataTypeConvertor
            implements DataTypeConvertor<SeaTunnelDataType<?>> {

        @Override
        public SeaTunnelDataType<?> toSeaTunnelType(String s, String s1) {
            return DATA_TYPE_MAP.get(s.toLowerCase(Locale.ROOT)).getRawType();
        }

        @Override
        public SeaTunnelDataType<?> toSeaTunnelType(
                String s, SeaTunnelDataType<?> seaTunnelDataType, Map<String, Object> map) {
            return seaTunnelDataType;
        }

        @Override
        public SeaTunnelDataType<?> toConnectorType(
                String s, SeaTunnelDataType<?> seaTunnelDataType, Map<String, Object> map) {
            return seaTunnelDataType;
        }

        @Override
        public String getIdentity() {
            return "EngineDataTypeConvertor";
        }
    }

org.apache.seatunnel.app.service.impl.TableSchemaServiceImpl

public TableSchemaServiceImpl() throws IOException {
    Common.setStarter(true);
    Set<PluginIdentifier> pluginIdentifiers =
            SeaTunnelSinkPluginDiscovery.getAllSupportedPlugins(PluginType.SINK).keySet();
    ArrayList<PluginIdentifier> pluginIdentifiersList = new ArrayList<>();
    pluginIdentifiersList.addAll(pluginIdentifiers);
    List<URL> pluginJarPaths =
            new SeaTunnelSinkPluginDiscovery().getPluginJarPaths(pluginIdentifiersList);
    //        Path path = new SeaTunnelSinkPluginDiscovery().getPluginDir();
    if (!pluginJarPaths.isEmpty()) {
        //            List<URL> files = FileUtils.searchJarFiles(path);
        pluginJarPaths.addAll(FileUtils.searchJarFiles(Common.pluginRootDir()));
        factory =
                new DataTypeConvertorFactory(
                        new URLClassLoader(pluginJarPaths.toArray(new URL[0])));
    } else {
        factory = new DataTypeConvertorFactory();
    }
}
// 改为
    public TableSchemaServiceImpl() throws IOException {
        Common.setStarter(true);
        Set<PluginIdentifier> pluginIdentifiers =
                SeaTunnelSinkPluginDiscovery.getAllSupportedPlugins(PluginType.SINK).keySet();
        ArrayList<PluginIdentifier> pluginIdentifiersList = new ArrayList<>();
        pluginIdentifiersList.addAll(pluginIdentifiers);
        List<URL> pluginJarPaths =
                new SeaTunnelSinkPluginDiscovery().getPluginJarPaths(pluginIdentifiersList);
        //        Path path = new SeaTunnelSinkPluginDiscovery().getPluginDir();
        if (!pluginJarPaths.isEmpty()) {
            //            List<URL> files = FileUtils.searchJarFiles(path);
            pluginJarPaths.addAll(FileUtils.searchJarFiles(Common.pluginRootDir()));
            factory =
                    new DataTypeConvertorFactory(
                            new URLClassLoader(pluginJarPaths.toArray(new URL[0])));
        } else {
            factory = new DataTypeConvertorFactory();
        }
    }

SeaTunnelDataType<?> dataType = convertor.toSeaTunnelType(field.getType());
// 改为
SeaTunnelDataType<?> dataType =
                    convertor.toSeaTunnelType(field.getName(), field.getType());

org.apache.seatunnel.app.service.impl.JobExecutorServiceImpl.executeJobBySeaTunnel()

 public Long executeJobBySeaTunnel(Integer userId, String filePath, Long jobInstanceId) {
        Common.setDeployMode(DeployMode.CLIENT);
        JobConfig jobConfig = new JobConfig();
        jobConfig.setName(jobInstanceId + "_job");
        try {
            SeaTunnelConfig seaTunnelConfig = new YamlSeaTunnelConfigBuilder().build();
            SeaTunnelClient seaTunnelClient = createSeaTunnelClient();
            ClientJobExecutionEnvironment jobExecutionEnv =
                    seaTunnelClient.createExecutionContext(filePath, jobConfig, seaTunnelConfig);
                final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
            JobInstance jobInstance = jobInstanceDao.getJobInstance(jobInstanceId);
            jobInstance.setJobEngineId(Long.toString(clientJobProxy.getJobId()));
            jobInstanceDao.update(jobInstance);

            CompletableFuture.runAsync(
                    () -> {
                        waitJobFinish(
                                clientJobProxy,
                                userId,
                                jobInstanceId,
                                Long.toString(clientJobProxy.getJobId()),
                                seaTunnelClient);
                    });

        } catch (ExecutionException | InterruptedException e) {
            ExceptionUtils.getMessage(e);
            throw new RuntimeException(e);
        }
        return jobInstanceId;
    }

org.apache.seatunnel.app.service.impl.JobInstanceServiceImpl

else if (statusList.contains("CANCELLING")) {
            jobStatus = JobStatus.CANCELLING.name();
// 改为
else if (statusList.contains("CANCELING")) {
            jobStatus = JobStatus.CANCELING.name();

org.apache.seatunnel.app.service.impl.SchemaDerivationServiceImpl

TableFactoryContext context =
        new TableFactoryContext(
                Collections.singletonList(table),
                ReadonlyConfig.fromMap(config),
                Thread.currentThread().getContextClassLoader());
// 改为
TableTransformFactoryContext context =
                new TableTransformFactoryContext(
                        Collections.singletonList(table),
                        ReadonlyConfig.fromMap(config),
                        Thread.currentThread().getContextClassLoader());

org.apache.seatunnel.app.thirdparty.engine.SeaTunnelEngineProxy

public void restoreJob(
            @NonNull String filePath, @NonNull Long jobInstanceId, @NonNull Long jobEngineId) {
        SeaTunnelClient seaTunnelClient = new SeaTunnelClient(clientConfig);
        JobConfig jobConfig = new JobConfig();
        jobConfig.setName(jobInstanceId + "_job");
        try {
            seaTunnelClient.restoreExecutionContext(filePath, jobConfig, jobEngineId).execute();
        } catch (ExecutionException e) {
            throw new RuntimeException(e);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
}
// 改为
public void restoreJob(
        @NonNull String filePath, @NonNull Long jobInstanceId, @NonNull Long jobEngineId) {
        SeaTunnelClient seaTunnelClient = new SeaTunnelClient(clientConfig);
        JobConfig jobConfig = new JobConfig();
        jobConfig.setName(jobInstanceId + "_job");
        SeaTunnelConfig seaTunnelConfig = new YamlSeaTunnelConfigBuilder().build();
        try {
            seaTunnelClient
                .restoreExecutionContext(filePath, jobConfig, seaTunnelConfig, jobEngineId)
                .execute();
        } catch (ExecutionException e) {
            throw new RuntimeException(e);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

org.apache.seatunnel.app.thirdparty.framework.PluginDiscoveryUtil

public static Map<PluginIdentifier, ConnectorFeature> getConnectorFeatures(
        PluginType pluginType) throws IOException {
    Common.setStarter(true);
    if (!pluginType.equals(PluginType.SOURCE)) {
        throw new UnsupportedOperationException("ONLY support plugin type source");
    }
    Path path = new SeaTunnelSinkPluginDiscovery().getPluginDir();
    List<Factory> factories;
    if (path.toFile().exists()) {
        List<URL> files = FileUtils.searchJarFiles(path);
        factories =
                FactoryUtil.discoverFactories(new URLClassLoader(files.toArray(new URL[0])));
    } else {
        factories =
                FactoryUtil.discoverFactories(Thread.currentThread().getContextClassLoader());
    }
    Map<PluginIdentifier, ConnectorFeature> featureMap = new ConcurrentHashMap<>();
    factories.forEach(
            plugin -> {
                if (TableSourceFactory.class.isAssignableFrom(plugin.getClass())) {
                    TableSourceFactory tableSourceFactory = (TableSourceFactory) plugin;
                    PluginIdentifier info =
                            PluginIdentifier.of(
                                    "seatunnel",
                                    PluginType.SOURCE.getType(),
                                    plugin.factoryIdentifier());
                    featureMap.put(
                            info,
                            new ConnectorFeature(
                                    SupportColumnProjection.class.isAssignableFrom(
                                            tableSourceFactory.getSourceClass())));
                }
            });
    return featureMap;
}
// 改为

    public static Map<PluginIdentifier, ConnectorFeature> getConnectorFeatures(
            PluginType pluginType) {
        Common.setStarter(true);
        if (!pluginType.equals(PluginType.SOURCE)) {
            throw new UnsupportedOperationException("ONLY support plugin type source");
        }

        ArrayList<PluginIdentifier> pluginIdentifiers = new ArrayList<>();
        pluginIdentifiers.addAll(
                SeaTunnelSinkPluginDiscovery.getAllSupportedPlugins(PluginType.SOURCE).keySet());
        List<URL> pluginJarPaths =
                new SeaTunnelSinkPluginDiscovery().getPluginJarPaths(pluginIdentifiers);

        List<Factory> factories;
        if (!pluginJarPaths.isEmpty()) {
            factories =
                    FactoryUtil.discoverFactories(
                            new URLClassLoader(pluginJarPaths.toArray(new URL[0])));
        } else {
            factories =
                    FactoryUtil.discoverFactories(Thread.currentThread().getContextClassLoader());
        }
        Map<PluginIdentifier, ConnectorFeature> featureMap = new ConcurrentHashMap<>();
        factories.forEach(
                plugin -> {
                    if (TableSourceFactory.class.isAssignableFrom(plugin.getClass())) {
                        TableSourceFactory tableSourceFactory = (TableSourceFactory) plugin;
                        PluginIdentifier info =
                                PluginIdentifier.of(
                                        "seatunnel",
                                        PluginType.SOURCE.getType(),
                                        plugin.factoryIdentifier());
                        featureMap.put(
                                info,
                                new ConnectorFeature(
                                        SupportColumnProjection.class.isAssignableFrom(
                                                tableSourceFactory.getSourceClass())));
                    }
                });
        return featureMap;

代码格式化

mvn spotless:apply

编译打包

mvn clean package -DskipTests

至此,seatunnel web 适配 seatunnel2.3.4版本完成,对应的安装包会在 seatunnel-web-dist/target目录下生成

Linux部署测试

这里具体请参考之前社区其他老师发布的文章Apache SeaTunnel Web部署指南

重要的配置项

1、seatunnel-web数据库相关配置(application.yml) 
用来web服务中的数据持久化

2、SEATUNNEL_HOME(环境变量)
seatunnel-web调用seaunnel的插件获取的API,扫描connector相关的连接器

3、ST_WEB_HOME(环境变量)
seatunnel-web会加载seatunnel-web/datasource下的插件包,这里决定了seatunnel-web支持哪些数据源的任务定义

4、重要的配置文件:
connector-datasource-mapper.yaml 
该配置文件配置了支持的数据源类型以及该数据源支持的数据同步方式等信息(比如是否支持多表同步、是否支持cdc等)
hazelcast-client.yaml 
seatunnel-web服务通过seatunnel-api的方式与seatunnel集群进行交互,该配置文件配置了集群节点等相关信息

感谢大家的阅读,希望对各位兄弟有所帮助,如果有任何疑问,欢迎来社区找我交流!

本文由 白鲸开源科技 提供发布支持!文章来源地址https://www.toymoban.com/news/detail-851477.html

到了这里,关于【源码编译】Apache SeaTunnel-Web 适配最新2.3.4版本教程的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【安装部署】Apache SeaTunnel 和 Web快速安装详解

    由于作者目前接触当前最新版本为2.3.4 但是官方提供的web版本未1.0.0,不兼容2.3.4,因此这里仍然使用2.3.3版本。 可以自定义兼容处理,官方提供了文档:https://mp.weixin.qq.com/s/Al1VmBoOKu2P02sBOTB6DQ 因为大部分用户使用SeaTunnel Web都是基于SeaTunnel-2.3.3 版本做的适配,而最新发布的Sea

    2024年04月16日
    浏览(34)
  • Seatunnel 2.1.3 源码打包、编译运行

    执行报错: [ERROR] Unknown lifecycle phase \\\".skip\\\". You must specify a valid lifecycle phase or a goal in the format plugin-prefix:goal or plugin-group-id:plugin-artifact-id[:plugin-vers ion]:goal. 解决: 1、 PowerShell 窗口下,执行带参数的需要’单引号’包起来才可以 命令改为: 2、不要使用PowerShell命令行模式, 进

    2024年02月12日
    浏览(27)
  • 源码编译安装Apache

    目录 ✨apache安装步骤 🍭挂载镜像 🍭解压并安装 🍭安装主程序 🍭优化链接及服务 🍭查看httpd模块 🍭查看mpm配置文件 🍭查看apache主页 🍭使用ab命令进行压力测试    🦐博客主页:大虾好吃吗的博客    🦐专栏地址:Linux从入门到精通   检查是否已经rpm安装httpd服务,已

    2024年02月08日
    浏览(46)
  • # Apache SeaTunnel 究竟是什么?

    作者 | Shawn Gordon 翻译 | Debra Chen 原文链接 | What the Heck is Apache SeaTunnel? 我在2023年初开始注意到Apache SeaTunnel的相关讨论,一直低调地关注着。该项目始于2017年,最初名为Waterdrop,在Apache DolphinScheduler的创建者的贡献下发展起来,后者支持SeaTunnel作为任务插件。 我最初对于SeaT

    2024年04月08日
    浏览(92)
  • Apache SeaTunnel 社区 3 月月报

    各位热爱 SeaTunnel 的小伙伴们,SeaTunnel 社区 3 月月报来啦!这里将记录 SeaTunnel 社区每个月的重要更新,并评选出月度之星,欢迎关注。 感谢以下小伙伴 3 月为 Apache SeaTunnel 做的精彩贡献(排名不分先后): @Carl-Zhou-CN,@ilsl1007,@loveyang1990,@dailai,@liugddx,@CheneyYin,@litiliu,@ShaunWuu,@

    2024年04月11日
    浏览(32)
  • http服务(Apache 2.4.57)源码编译及使用

    这里安装的是Apache 2.4.57版本 下载地址

    2024年02月11日
    浏览(26)
  • apache seatunnel支持hive jdbc

    上传hive jdbc包HiveJDBC42.jar到seatunel lib安装目录 原因是cloudera 实现了add batch方法 创建seatunnel任务文件mysql2hivejdbc.conf

    2024年02月06日
    浏览(25)
  • 海豚调度任务类型Apache SeaTunnel部署指南

    Apache DolphinScheduler已支持Apache SeaTunnel任务类型,本文介绍了SeaTunnel任务类型如何创建,任务参数,以及任务样例。 SeaTunnel 任务类型,用于创建并执行 SeaTunnel 类型任务。worker 执行该任务的时候,会通过 start-seatunnel-spark.sh 、 start-seatunnel-flink.sh 和 seatunnel.sh 命令解析 config 文件

    2024年04月08日
    浏览(36)
  • Apache Doris (六十四): Flink Doris Connector - (1)-源码编译

     🏡 个人主页:IT贫道-CSDN博客   🚩 私聊博主:私聊博主加WX好友,获取更多资料哦~  🔔 博主个人B栈地址:豹哥教你学编程的个人空间-豹哥教你学编程个人主页-哔哩哔哩视频 目录 1. Flink与Doris版本兼容

    2024年01月18日
    浏览(39)
  • 解读重要功能特性:新手入门 Apache SeaTunnel CDC

    点亮 ⭐️ Star · 照亮开源之路 https://github.com/apache/incubator-seatunnel 为什么说 CDC 是SeaTunnel平台中的一个重要功能特性?今天这篇文章跟大家分享一下 CDC 是什么?目前市面上的 CDC 工具现有的痛点有哪些?SeaTunnel面对这些痛点设计的架构目标是什么?另外包括社区的展望和目前

    2024年02月09日
    浏览(48)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包