Springboot整合ETL引擎Kettle的使用

这篇具有很好参考价值的文章主要介绍了Springboot整合ETL引擎Kettle的使用。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

简介

ETL是英文Extract-Transform-Load的缩写,用来描述将数据从源端经过抽取(extract)、转换(transform)、加载(load)至目的端的过程,它能够对各种分布的、异构的源数据(如关系数据)进行抽取,按照预先设计的规则将不完整数据、重复数据以及错误数据等“脏"数据内容进行清洗,得到符合要求的“干净”数据,并加载到数据仓库中进行存储,这些“干净”数据就成为了数据分析、数据挖掘的基石。

kettle是一个开源ETL工具。kettle提供了基于java的图形化界面,使用很方便。kettle提供了基于 JAVA的脚步编写功能,可以灵活地自定义ETL过程,使自行定制、批量处理等成为可能,这才是一个程序员需要做的工作,而不仅是象使用word一样操作 kettle用户界面。

springboot kettle,etl,数据挖掘,数据仓库

环境集成:

参考:java集成kettle教程(附示例代码)_kettle java_成伟平2022的博客-CSDN博客

代码:

pom.xml添加:

<!--mysql数据库链接驱动以及连接池-->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid</artifactId>
            <version>1.2.11</version>
        </dependency>
<!-- kettle 工具本地jar包加载 -->
        <dependency>
            <groupId>pentaho-kettle</groupId>
            <artifactId>kettle-core</artifactId>
            <version>8.2.0.7-719</version>
            <scope>system</scope>
            <systemPath>${project.basedir}/lib/kettle-core-8.2.0.7-719.jar</systemPath>
        </dependency>
        <dependency>
            <groupId>pentaho-kettle</groupId>
            <artifactId>kettle-engine</artifactId>
            <version>8.2.0.7-719</version>
            <scope>system</scope>
            <systemPath>${project.basedir}/lib/kettle-engine-8.2.0.7-719.jar</systemPath>
        </dependency>
        <dependency>
            <groupId>pentaho-kettle</groupId>
            <artifactId>metastore</artifactId>
            <version>8.2.0.7-719</version>
            <scope>system</scope>
            <systemPath>${project.basedir}/lib/metastore-8.2.0.7-719.jar</systemPath>
        </dependency>
        <!--kettle需要用到的其它依赖-->
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-vfs2</artifactId>
            <version>2.2</version>
        </dependency>
        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>17.0</version>
        </dependency>
        <dependency>
            <groupId>commons-io</groupId>
            <artifactId>commons-io</artifactId>
            <version>2.2</version>
        </dependency>
        <dependency>
            <groupId>commons-lang</groupId>
            <artifactId>commons-lang</artifactId>
            <version>2.6</version>
        </dependency>
        <dependency>
            <groupId>commons-codec</groupId>
            <artifactId>commons-codec</artifactId>
            <version>1.10</version>
        </dependency>
        <dependency>
            <groupId>com.jcraft</groupId>
            <artifactId>jsch</artifactId>
            <version>0.1.54</version>
        </dependency>
        <dependency>
            <groupId>net.sourceforge.jexcelapi</groupId>
            <artifactId>jxl</artifactId>
            <version>2.6.12</version>
        </dependency>
@RestController
@RequestMapping("${application.admin-path}/etl-kettl")
//@Api(tags = "ETL-Kettle的demo接口")
public class KettleDemoContrllor {
	@Resource
	KettleService kettleService;

	@GetMapping("/execKtr")
	//@ApiOperation("执行ktr文件")
	private Object runKtr(String filename) throws Exception {
		return R.buildOkData(kettleService.runTaskKtr(filename,null).toString());
	}

	@GetMapping("/execKjb")
	//@ApiOperation("执行kjb文件")
	private Object runKjb(String filename) throws Exception {
		return R.buildOkData(kettleService.runTaskKjb(filename, null).toString());
	}
}
public interface KettleService {
    /**
     * 开始执行ETL任务(ktr文件)
     *
     * @param taskFileName 执行的任务文件名(ktr)
     * @param params 执行任务输入的参数
     * @return 运行结果
     * @throws Exception 没有找到配置文件,Kettle的运行异常不会抛出
     */
    Object runTaskKtr(String taskFileName, Map<String, String> params) throws Exception;
    /**
     * 开始执行ETL任务(kjb文件)
     *
     * @param taskFileName 执行的任务文件名(kjb)
     * @param params 执行任务输入的参数
     * @return 运行结果
     * @throws Exception 没有找到配置文件,Kettle的运行异常不会抛出
     */
    Object runTaskKjb(String taskFileName, Map<String, String> params) throws Exception;
}
@Service
public class KettleServiceImpl implements KettleService {

    @Value("${kettle.script.path}")
    private String kettleScriptPath;

    private static final Logger logger = LoggerFactory.getLogger("kettle-service-log");

    private final List<KtrMeta> KTR_METAS = new ArrayList<>();
    private final List<KjbMeta> KJB_METAS = new ArrayList<>();

    private List<String> getFiles(String path, String subName) {
        List<String> files = new ArrayList<>();
        File file = new File(path);
        File[] tempList = file.listFiles();
        if (tempList == null){
            return files;
        }
        for (File value : tempList) {
            if (value.isFile()) {
                if (Objects.equals(value.toString().substring(value.toString().length() - 3), subName)) {
                    files.add(value.getName());
                }
            }
        }
        return files;
    }

    //采用单列模式,项目启动时加载环境,加载所有的转换配置、任务配置,后续执行就会快一点
    //@PostConstruct
    public void init() throws KettleException {
        logger.info("----------------------开始初始化ETL配置------------------------");
        KettleEnvironment.init();
        List<String> ktrFiles = getFiles(kettleScriptPath, "ktr");
        List<String> kjbFiles = getFiles(kettleScriptPath, "kjb");
        logger.info("需要加载的转换为:" + ktrFiles.toString());
        logger.info("需要加载的任务为:" + kjbFiles.toString());
        logger.info("----------------------开始加载ETL配置--------------------------");
        for (String ktrFile : ktrFiles) {
            KtrMeta ktrMeta = new KtrMeta();
            ktrMeta.setName(ktrFile);
            ktrMeta.setTransMeta(new TransMeta(kettleScriptPath + ktrFile));
            KTR_METAS.add(ktrMeta);
            logger.info("成功加载转换配置:" + ktrFile);
        }
        for (String kjbFile : kjbFiles) {
            KjbMeta kjbMeta = new KjbMeta();
            kjbMeta.setName(kjbFile);
            kjbMeta.setJobMeta(new JobMeta(kettleScriptPath + kjbFile, null));
            KJB_METAS.add(kjbMeta);
            logger.info("成功加载任务配置:" + kjbFile);
        }
        logger.info("----------------------全部ETL配置加载完毕-----------------------");
    }


    @Override
    public Object runTaskKtr(String ktrFileName, Map<String, String> params) {
        logger.info("开始执行转换:" + ktrFileName);
        TransMeta transMeta = null;
        for (KtrMeta ktrMeta : KTR_METAS) {
            if(Objects.equals(ktrFileName,ktrMeta.getName())){
                transMeta = ktrMeta.getTransMeta();
                break;
            }
        }
        //如果在缓存的列表里面没找到需要自信的配置,尝试手动加载
        try {
            if (transMeta == null) {
                logger.warn("资源池没有找到配置文件:" + ktrFileName+"  尝试二次加载!");
                KettleEnvironment.init();
                transMeta = new TransMeta(kettleScriptPath + File.separator + ktrFileName);
                if(transMeta==null) throw new RuntimeException("未找到需要执行的转换配置文件:");
            }
            Trans trans = new Trans(transMeta);
            if (params != null) {
                for (Map.Entry<String, String> entry : params.entrySet()) {
                    trans.setParameterValue(entry.getKey(), entry.getValue());
                }
            }
            //trans.prepareExecution(null);
            //trans.startThreads(); //启用新的线程加载
            trans.execute(null);
            trans.waitUntilFinished();
            return trans.getResult();
        }catch (Exception e)
        {
            e.printStackTrace();
            return e.getMessage();
        }

    }

    @Override
    public Object runTaskKjb(String objFileName, Map<String, String> params) throws Exception {
        logger.info("开始执行任务:" + objFileName);
        JobMeta jobMeta = null;
        for (KjbMeta kjbMeta : KJB_METAS) {
            if(Objects.equals(objFileName,kjbMeta.getName())){
                jobMeta = kjbMeta.getJobMeta();
            }
        }
        try {
            if (jobMeta == null) {
                logger.warn("资源池没有找到配置文件:" + objFileName+"  尝试二次加载!");
                KettleEnvironment.init();
                jobMeta = new JobMeta(kettleScriptPath + File.separator + objFileName,null);
                if(jobMeta==null) throw new RuntimeException("未找到需要执行的任务配置文件:"+objFileName);
            }
            Job job = new Job(null, jobMeta);
            if (params != null) {
                for (Map.Entry<String, String> entry : params.entrySet()) {
                    job.setParameterValue(entry.getKey(), entry.getValue());
                }
            }
            job.start();
            job.waitUntilFinished();
            return job.getResult();
        }catch (Exception e)
        {
            e.printStackTrace();
            return e.getMessage();
        }

    }
}
@Data
public class KtrMeta {
	private TransMeta transMeta;
	private String name;
}
@Data
public class KjbMeta {
	private JobMeta jobMeta;
	private String name;
}

总结:

集成后感觉没什么必要集成到项目里面去。关键还是需要学会工具的使用,以便进行数据收集与治理。

参考:1_ETL和Kettle概述_哔哩哔哩_bilibili

下载: kettle工具下载文章来源地址https://www.toymoban.com/news/detail-629334.html

到了这里,关于Springboot整合ETL引擎Kettle的使用的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • kettle从入门到精通 第五十三课 ETL之kettle MQTT/RabbitMQ consumer实战

    1、上一节课我们学习了MQTT producer 生产者步骤,MQTT consumer消费者步骤。该步骤可以从支持MRQTT协议的中间件获取数据,该步骤和kafka consumer 一样可以处理实时数据交互,如下图所示:  2、双击步骤打开MQTT consumer 配置窗口,如下图所示: Step name: 自定义步骤名称。 Transformat

    2024年04月28日
    浏览(51)
  • 你还在用Kettle吗?试试这款ETL工具

    当今时代,数字化转型已经成为企业发展的必由之路。数字化转型不仅可以提高企业的效率和生产力,还可以提高企业的竞争力和市场份额。在数字化转型的过程中,数据集成是至关重要的一步,可以帮助企业在数字化转型中实现更高效和可靠的数据服务。 在国内没有更好的

    2024年02月09日
    浏览(51)
  • 大数据ETL工具对比(Sqoop, DataX, Kettle)

    前言 在实习过程中,遇到了数据库迁移项目,对于数据仓库,大数据集成类应用,通常会采用 ETL 工具辅助完成,公司和客户使用的比较多的是 Sqoop , DataX 和 Kettle 这三种工具。简单的对这三种ETL工具进行一次梳理。 ETL工具,需要完成对源端数据的抽取(exat), 交互转换(

    2024年02月11日
    浏览(58)
  • 关于Kettle ETL java脚本编写遇到的一些问题记录

    使用方法**logBasic()**参数必须是字符串 这部分内容会在ETL的日志窗口显示 1.获取上个节点传输的数据 可以直接在左侧双击获取 2.全局参数获取 在启动运行的变量设置参数 在java代码中获取方式 3.获取当前节点参数 在当前窗口下方有个 参数 Tab页,在这里设置 在java代码中获取

    2024年02月12日
    浏览(41)
  • Kettle Local引擎使用记录(一)(基于Kettle web版数据集成开源工具data-integration源码)

    在前面对 data-integration 做了一些简单了解,从部署到应用,今天尝试把后端运行作业代码拎出来,去真正运行一下,只有实操之后才会有更深刻的认识,有些看着简单的功能,实操过程中会遇到很多问题,这个时候你的想法也会发生改变,所以很多时候为什么开发人员痛恨做

    2024年02月02日
    浏览(46)
  • 【数据预处理】基于Kettle的字符串数据清洗、Kettle的字段清洗、Kettle的使用参照表集成数据

    🚀 本文选自专栏:AI领域专栏 从基础到实践,深入了解算法、案例和最新趋势。无论你是初学者还是经验丰富的数据科学家,通过案例和项目实践,掌握核心概念和实用技能。每篇案例都包含代码实例,详细讲解供大家学习。 📌📌📌本专栏

    2024年02月03日
    浏览(56)
  • springboot整合ES索引引擎

    1.创建springboot工程并导入相关的依赖 2.3.12.RELEASE 2.创建一个配置类,返回 RestHighLevelClient 修改一下版本 创建路径config包下的ESconfig类 3.测试 在Test中测试 结果: 判断索引是否存在 结果: 删除索引 结果: 在索引中添加文档 在entity包创建celebrity实体类 结果: 根据id查询文档内容 结

    2024年02月11日
    浏览(41)
  • Springboot整合Flowable流程引擎

    Flowable是一个开源的工作流引擎,它基于Activiti引擎进行发展,Flowable主要用于为业务流程管理(BPM)和工作流的设计、操作、监控提供支持。 这类表在Flowable中主要提供存储通用类型数据的功能,如流程名称,创建时间等。如下是通用表的主要成员: act_ge_bytearray:存储二进

    2024年02月05日
    浏览(39)
  • 【Springboot】SpringBoot基础知识及整合Thymeleaf模板引擎

    🌕博客x主页:己不由心王道长🌕! 🌎文章说明:spring🌎 ✅系列专栏:spring 🌴本篇内容:对SpringBoot进行一个入门学习及对Thymeleaf模板引擎进行整合(对所需知识点进行选择阅读呀~)🌴 ☕️每日一语:在人生的道路上,即使一切都失去了,只要一息尚存,你就没有丝毫理

    2023年04月23日
    浏览(48)
  • SpringBoot整合模板引擎Thymeleaf(4)

    本文原创作者:谷哥的小弟 作者博客地址:http://blog.csdn.net/lfdfhl 在之前的教程中,我们介绍了Thymeleaf的基础知识。在此,以案例形式详细介绍Thymeleaf的基本使用。 要点概述: 1、在static下创建css文件夹用于存放css文件 2、在static下创建img文件夹用于存放图片文件 请在pom.xml文

    2024年02月10日
    浏览(49)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包