SeaTunnel扩展Transform插件,自定义转换插件

这篇具有很好参考价值的文章主要介绍了SeaTunnel扩展Transform插件,自定义转换插件。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

代码结构

在seatunnel-transforms-v2中新建数据包名,新建XXXTransform,XXXTransformConfig,XXXTransformFactory三个类

SeaTunnel扩展Transform插件,自定义转换插件,java,大数据,算法,SeaTunnel

自定义转换插件功能说明

这是个适配KafkaSource的转换插件,接收到的原文格式为:

{"path":"xxx.log.gz","code":"011","cont":"{\"ID\":\"1\",\"NAME\":\"zhangsan\",\"TABLE\":\"USER\",\"create_time\":\"20230904\"}","timestamp":"20230823160246"}

需要转换为只保留cont里面的数据

{"create_time":"20230904","NAME":"zhangsan","TABLE":"USER","ID":"999"}

任务配置文件

env {

  # You can set engine configuration here STREAMING BATCH

  execution.parallelism = 1

  job.mode = "STREAMING"



  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"

}



source {

  # This is a example source plugin **only for test and demonstrate the feature source plugin**

   Kafka {

            bootstrap.servers = "xxxxx:9092"

            topic = "test_in2"

            consumer.group = "167321237613

            format="text"

            result_table_name="kafka"

        }

}



transform {

    ExtractFromCJ {

    source_table_name="kafka"

    result_table_name="kafka1"

    schema = {

        fields {

                NAME = "string"

                TABLE = "string"

                create_time = "string"

                ID="string"

            }

        }

    }

}



sink {

  kafka {

      source_table_name="kafka1"

      topic = "test_out2"

      bootstrap.servers = "xxxx:9092"

      kafka.request.timeout.ms = 60000

      semantics = EXACTLY_ONCE

  }

}

代码说明

XXXConfig代码,这个类主要用来保存transform的配置项

package org.apache.seatunnel.transform.extract;



import lombok.Getter;

import lombok.Setter;

import org.apache.seatunnel.api.configuration.Option;

import org.apache.seatunnel.api.configuration.Options;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;



import java.io.Serializable;

import java.util.Map;



@Getter

@Setter

public class ExtractFromCJTransformConfig implements Serializable {



    public static final Option<Map<String, String>> SCHEMA =

            Options.key("schema.fields")

                    .mapType()

                    .noDefaultValue()

                    .withDescription(

                            "Specify the field mapping relationship between input and output");



    private Map<String, String> fieldColumns;

    public static ExtractFromCJTransformConfig of(ReadonlyConfig config) {

        ExtractFromCJTransformConfig extractFromCJTransformConfig = new ExtractFromCJTransformConfig();

        Map<String, String> fieldColumns = config.get(SCHEMA);

        extractFromCJTransformConfig.setFieldColumns(fieldColumns);

        return extractFromCJTransformConfig;

    }

}

XXXTransformFactory说明,工厂类,主要用来初始化具体的转换类

package org.apache.seatunnel.transform.extract;



import com.google.auto.service.AutoService;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;

import org.apache.seatunnel.api.configuration.util.OptionRule;

import org.apache.seatunnel.api.table.catalog.CatalogTable;

import org.apache.seatunnel.api.table.connector.TableTransform;

import org.apache.seatunnel.api.table.factory.Factory;

import org.apache.seatunnel.api.table.factory.TableFactoryContext;

import org.apache.seatunnel.api.table.factory.TableTransformFactory;



@AutoService(Factory.class)

public class ExtractFromCJTransformFactory implements TableTransformFactory {

    @Override

    public String factoryIdentifier() {

        return  "ExtractFromCJ";

    }



    @Override

    public OptionRule optionRule() {

        return OptionRule.builder().optional(ExtractFromCJTransformConfig.SCHEMA).build();

    }



    @Override

    public TableTransform createTransform(TableFactoryContext context) {

        CatalogTable catalogTable = context.getCatalogTable();

        ReadonlyConfig options = context.getOptions();

        ExtractFromCJTransformConfig extractFromCJTransformConfig =

                ExtractFromCJTransformConfig.of(options);

        return () -> new ExtractFromCJTransform(extractFromCJTransformConfig, catalogTable);

    }

}

XXXXTransform,具体的转换类,主要用于对source数据的处理,还有数据结构类型的保存

package org.apache.seatunnel.transform.extract;



import cn.hutool.core.collection.CollUtil;

import cn.hutool.json.JSONObject;

import cn.hutool.json.JSONUtil;

import com.google.auto.service.AutoService;

import lombok.NoArgsConstructor;

import lombok.NonNull;

import lombok.extern.slf4j.Slf4j;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;

import org.apache.seatunnel.api.configuration.util.ConfigValidator;

import org.apache.seatunnel.api.table.catalog.CatalogTable;

import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;

import org.apache.seatunnel.api.table.catalog.Column;

import org.apache.seatunnel.api.table.catalog.ConstraintKey;

import org.apache.seatunnel.api.table.catalog.PhysicalColumn;

import org.apache.seatunnel.api.table.catalog.PrimaryKey;

import org.apache.seatunnel.api.table.catalog.TableIdentifier;

import org.apache.seatunnel.api.table.catalog.TableSchema;

import org.apache.seatunnel.api.table.type.SeaTunnelDataType;

import org.apache.seatunnel.api.table.type.SeaTunnelRow;

import org.apache.seatunnel.api.table.type.SeaTunnelRowType;

import org.apache.seatunnel.api.transform.SeaTunnelTransform;

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.seatunnel.transform.common.AbstractCatalogSupportTransform;



import java.util.ArrayList;

import java.util.List;

import java.util.stream.Collectors;



@AutoService(SeaTunnelTransform.class)

@NoArgsConstructor

@Slf4j

public class ExtractFromCJTransform extends AbstractCatalogSupportTransform {



    private ExtractFromCJTransformConfig config;

    protected SeaTunnelRowType inputRowType;

    @Override

    public String getPluginName() {

        return "ExtractFromCJ";

    }



    public ExtractFromCJTransform(

            @NonNull ExtractFromCJTransformConfig config, @NonNull CatalogTable catalogTable) {

        super(catalogTable);

        this.config = config;

    }

    @Override

    protected void setConfig(Config pluginConfig) {

        ConfigValidator.of(ReadonlyConfig.fromConfig(pluginConfig))

                .validate(new ExtractFromCJTransformFactory().optionRule());

        this.config = ExtractFromCJTransformConfig.of(ReadonlyConfig.fromConfig(pluginConfig));

    }



    @Override

    protected SeaTunnelRowType transformRowType(SeaTunnelRowType inputRowType) {

        return inputRowType;

    }



    @Override

    protected SeaTunnelRow transformRow(SeaTunnelRow inputRow) {

        Object content = inputRow.getFields()[0];

        String data = content.toString();

        Object[] outputDataArray = new Object[0];

        if (JSONUtil.isJson(data)) {

            JSONObject cont = JSONUtil.parseObj(data).getJSONObject("cont");

            if (!cont.isEmpty()) {

                if (!CollUtil.isEmpty(this.config.getFieldColumns())) {

                    outputDataArray = new Object[this.config.getFieldColumns().size()];

                    int t = 0;

                    for (String key : this.config.getFieldColumns().keySet()) {

                        String value = cont.getStr(key);

                        outputDataArray[t] = value;

                        t++;

                    }

                } else {

                    outputDataArray = new Object[1];

                    outputDataArray[0] = JSONUtil.toJsonStr(cont);

                }

            }

        }

        SeaTunnelRow outputRow = new SeaTunnelRow(outputDataArray);

        outputRow.setRowKind(inputRow.getRowKind());

        outputRow.setTableId(inputRow.getTableId());

        return outputRow;

    }



    @Override

    protected TableSchema transformTableSchema() {

        List<Column> inputColumns = inputCatalogTable.getTableSchema().getColumns();

        List<ConstraintKey> outputConstraintKeys =

                inputCatalogTable.getTableSchema().getConstraintKeys().stream()

                        .map(ConstraintKey::copy)

                        .collect(Collectors.toList());

        PrimaryKey copiedPrimaryKey =

                inputCatalogTable.getTableSchema().getPrimaryKey() == null

                        ? null

                        : inputCatalogTable.getTableSchema().getPrimaryKey().copy();



        if (CollUtil.isEmpty(this.config.getFieldColumns())) {

            return TableSchema.builder()

                    .primaryKey(copiedPrimaryKey)

                    .columns(inputColumns)

                    .constraintKey(outputConstraintKeys)

                    .build();

        } else {

            List<Column> transformColumns = new ArrayList<>();

            for (String key : this.config.getFieldColumns().keySet()) {

                SeaTunnelDataType<?> dataType = CatalogTableUtil.parseDataType(this.config.getFieldColumns().get(key));

                transformColumns.add(PhysicalColumn.of(key, dataType, 0, true, null, null));

            }

            return TableSchema.builder()

                    .primaryKey(copiedPrimaryKey)

                    .columns(transformColumns)

                    .constraintKey(outputConstraintKeys)

                    .build();

        }

    }



    @Override

    protected TableIdentifier transformTableIdentifier() {

        return inputCatalogTable.getTableId().copy();

    }

}

文中的转换实现的是AbstractCatalogSupportTransform类,Seatunel还提供SingleFieldOutputTransform和MultipleFieldOutputTransform,分别对应单字段和多字段的数据处理,具体扩展可根据需求来实现对应的类

执行结果

来源消息

SeaTunnel扩展Transform插件,自定义转换插件,java,大数据,算法,SeaTunnel

结果消息

SeaTunnel扩展Transform插件,自定义转换插件,java,大数据,算法,SeaTunnel

以上就是对转换插件的扩展分享,有需求的小伙伴可以参考,也欢迎大家一起评论沟通~文章来源地址https://www.toymoban.com/news/detail-701387.html

到了这里,关于SeaTunnel扩展Transform插件,自定义转换插件的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • H5 + C3基础(六)(2D转换transform 位移 & 旋转 & 缩放)

    所谓2D转换,就是在二维坐标系内进行各种操作,包括平移,转动,缩放等等; 平移 语法: transform:translate(x, y); transform:translateX(x); transform:translateY(y); x和y都是相对于屏幕左上角而言,左上为负值,右下为正值;与传统数学坐标系注意区分 要实现盒子的平移,目前有多种方式

    2024年02月11日
    浏览(35)
  • 高性能、高扩展、高稳定:解读 EasyMR 大数据组件自定义可扩展能力

    随着互联网技术的不断发展以及大数据时代的兴起,企业对于数据分析和洞察的需求日益增长。大多数企业都积累了大量的数据,需要从这些数据中快速灵活地提取有价值的信息,以便为用户提供更好的服务或者帮助企业做出更明智的决策。 然而在不同的数据场景中,企业往

    2024年02月16日
    浏览(35)
  • Unity UI的transform,recttransform,position的相互转换

    学习中遇到一些坑,记录一下,因为RectTransform坐标转换第一次遇到真的头疼 1,首先要理解RectTransform是Transform的子类,而所有的UI组件,在代码中获取的transform.position实际上都是rectTransform.anchoredPosition,也就是说,新建一个空物体,加入Image、Text等组件后,这个物体只有在

    2024年01月20日
    浏览(67)
  • 【C++】STL 算法 ⑤ ( 二元函数对象 | std::transform 算法简介 | 为 std::transform 算法传入一元函数对象进行转换操作 )

    \\\" 二元函数对象 \\\" 指的是 一个实例类 中 , 重载了 \\\" 函数调用操作符 () \\\" 函数 operator() , 并且该函数 接受 2 个参数 ; 如果 \\\" 重载 函数调用操作符 () 函数 \\\" 只接收一个参数 , 那么这个函数对象就是 一元函数对象 ; 下面的结构体类 函数对象 , 就是一个二元函数对象 , 其作用是将

    2024年01月18日
    浏览(62)
  • IDEA插件系列(6):GsonFormatPlus插件——JSON字符串转换Java实体类

    GsonFormatPlus JSON字符串与Java实体类的相互转换。 第一种安装方式是使用IDEA下载插件进行安装 第二种安装方式是使用离线插件进行安装 插件下载地址:https://plugins.jetbrains.com/plugin/7654-gsonformat/ 3. 使用方法 先在项目的默认包目录下创建一个空的类 右键-生成(Alt+insert) 选中G

    2024年02月12日
    浏览(51)
  • SHP格式建筑数据转换为SketchUp模型插件分享

    之前有给大家讲解过《如何使用3ds Max制作三维地形》,将GIS数据和传统的三维建模软件进行了结合,在很长一段时间内,一直有人问如何将水经微图中下载的建筑数据转换为SketchUp模型,这里给大家找到了一种解决方案,可以通过插件进行转换,这里给大家讲解一下详细的操

    2024年02月12日
    浏览(47)
  • 使用Java开发Jmeter自定义取样器(Sampler)插件

    Jmeter提供默认界面(AbstractJavaSamplerClient)和自定义界面的(AbstractSamplerGui)两种自定义取样器的插件开发方式,对于复杂的压测任务,可以通过自定义取样器的方式来实现。 本文通过压测SpringBoot的http接口演示两个自定义扩展类的实现方式: maven项目工程,pom.xml http接口示例

    2024年02月11日
    浏览(42)
  • Python 变量的定义和数据类型的转换

    变量的定义 基本语法: 变量名 = 值 变量名是给对象贴一个用于访问的标签,给对象绑定名字的过程也称为赋值,赋值符号 “=” 变量名自定义,要满足标识符命名规则。 Python中, 不需要事先声明变量名及其类型 ,直接赋值即可创建各种类型的对象变量。 变量在第一次赋值

    2024年02月07日
    浏览(63)
  • 二、GoLang输出HelloWorld、变量定义、数据类型的转换

    go语言中,想要输出内容到控制台,package必须是main,包括方法名也必须是main, go语言输出的语法是 fmt 库。 Go语言的基本类型有: boolean:布尔类型 true / false string :字符串类型 数值型: int8:有符号8位整型(-128到127)长度 int16:有符号16位整型(-32768到32767)长度 int32:有

    2024年02月10日
    浏览(49)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包