pentaho(keetle)使用手册

这篇具有很好参考价值的文章主要介绍了pentaho(keetle)使用手册。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

pentaho使用

先展示一下用途和效果

1. 环境准备

1.1 pentaho是什么?

pentaho可读作“彭塔湖”,原名keetlekeetle被pentaho公司收购后改名而来。

pentaho是一款开源ETL工具,纯java编写的C/S模式的工具,可绿色免安装,开箱即用。支持Windows、macOS、Linux平台。

pentaho有2个核心设计,即转换作业

转换是一个包含输入、逻辑处理、输出的完整过程,即ETL。

作业是一个提供定时执行转换的机制,即定时服务调度。

pentaho官网下载链接:Pentaho Community Edition Download | Hitachi Vantara

pentaho由主要四部分组成

  • Spoon.bat/Spoon.sh :勺子,是一个图形化界面,可图形化操作转换和作业
  • Pan.bat/Pan.sh : 煎锅,可用命令行方式调用转换
  • Kitchen.bat/Kitchen.sh : 厨房,可用命令行方式调用作业
  • Carte.bat/Carte.sh : 菜单,是一个轻量级web容器,可建立专用、远程的ETL Server

1.2 pentaho安装

Windows

由于是纯java编写,依赖jdk环境。所以需要先配置jdk环境,这里省略。

从官网下载pentaho安装包后,直接解压。

MacOS

tar -zxvf 安装包路径 -C 目标路径

Linux

tar -zxvf 安装包路径 -C 目标路径

目录结构

重点目录以及执行文件说明

  • lib目录 : 这是依赖库目录,例如各个数据库的jdbc驱动,都放在此目录下
  • logs目录 :这是转换和作业运行的默认日志输出目录
  • simple-jndi目录 :这是各个数据库的JNDI连接信息的全局配置
  • Spoon.bat/Spoon.sh :勺子,是一个图形化界面,可图形化操作转换和作业
  • Pan.bat/Pan.sh : 煎锅,可用命令行方式调用转换
  • Kitchen.bat/Kitchen.sh : 厨房,可用命令行方式调用作业
  • Carte.bat/Carte.sh : 菜单,是一个轻量级web容器,可建立专用、远程的ETL Server

在window上运行就用.bat格式脚本,MacOS 或者 Linux 平台上使用.sh格式脚本


2. 开始使用

pentaho内置了丰富的数据处理组件,本章节主要对pentaho界面上各个功能组件作用进行说明。

2.1 启动图形化界面

Windows

运行 Spoon.bat 

MacOS

运行 Spoon.sh

Linux

运行 Spoon.sh

运行后会短暂没有任何反应,等待会议,就会出现界面

主对象树中有转换作业

  • 转换:所有的数据处理工作都在转换中完成
  • 作业:这是一个任务

开始数据处理工作前,必需新建一个转换,因为只有新建了之后,才能使用数据处理组件,此时的核心对象树是空的。

2.2 转换

在 “核心对象树 –> 转换 –> 右键 –> 新建” 或 在 “文件 –> 新建 –> 转换” ,新建一个转换,核心对象树就会出现各类组件。依靠这些组件组合使用,完成数据处理工作。

2.2.1 主对象树

一个转换就是一个数据处理工作流程。这里主要是转换的配置,例如数据源连接,运行配置等。

2.2.2 核心对象树

包含各类数据处理组件。

2.3 数据处理组件

这里对一些常用的组件进行说明

输入

输入组件,即各类数据源,例如数据库,json,xml等

输出

输出组件,将处理后的数据进行输出保存

转换

这是数据转换的核心,在这里完成数据处理

应用

包含一些数据处理外的操作,例如发送邮件,写日志等

流程

用于控制数据处理流程,例如开始,结束,终止等

脚本

当内置转换组件完成不了数据处理的逻辑时,即可使用脚本组件,用自定义代码的方式来完成处理逻辑

查询

用于一些查询请求,例如http请求,数据库查询某个表是否存在等

连接

可用于多表,单表处理完后,进行记录合并


2.4 作业组件

在“文件–>新建–>作业”创建一个作业。

主对象树包含作业运行配置,DB连接配置等

核心对象树包含作业的各类组件


通用

作业流程组件,有开始、转换、成功、空处理等

邮件

发送邮件

文件管理

文件操作,创建、删除等

条件

条件处理,例如判断某个文件是否存在

脚本

使用shell,js、sql等脚本处理复杂作业逻辑

应用

作业处理,例如终止作业、写日志等

文件传输

定时作业来上传、下载文件


2.5 使用

上面介绍了各个组件用途,现在来完成一个完整的数据处理工作流程。

启动应用


新建转换

在 “核心对象树 –> 转换 –> 右键 –> 新建” 或 在 “文件 –> 新建 –> 转换” ,新建一个转换


配置DB连接

主对象树中选择DB连接,右键新建

注意:连接数据库之前需要下载对应的jdbc驱动,例如连接pgsql则需要下载 postgresql-version.jar,r然后将驱动包放到安装目录下的\lib目录


这里以kingbase V8为例,因为这个踩了坑。经历如下:

内置的数据源里有KingbaseES,本以为可以直接用,结果发现连不上,报驱动错误。可能是因为内置的驱动版本跟数据库版本不一致,因为Kingbase V8的驱动不向前兼容。更新驱动后,依然不行。

然后发现,内置还有Generic database选项,这个是用来自定义连接内置数据源之外的数据库的。使用jdbc方式连接,需要一个连接串,驱动包名(前提是下载了对应的驱动包),用户名,密码。然而,这种方式依然不行……

后来一想,干脆用pgsql的方式来连接kingbase,没想到连接成功!



选择输入

因为数据源是数据库,所以这里从输入组件中选择表输入,将其拖入到右侧面板中


配置输入

双击“表输入”组件 或 右键选择 “编辑步骤”

点击获取SQL查询语句,会弹出界面选择数据表

选择一个数据表后,提示

选择“是”

这里会自动填充获取数据的sql,也可以在这里加上各种where条件,获取需要的数据

点击“确定”


配置输出

如果是表结构一致,则可使用

因为目标数据源也是数据库,所以这里选择表输出。从输出组件选择表输出,拖入转换视图中

然后进行步骤连接

方式一:按住shift键,鼠标左键点选“输入步骤”,会出现箭头,然后连接到“输出步骤”

方式二:鼠标左键框选输入和输出,然后右键,选择”新建节点连接“,选择”起始步骤“,”目标步骤“

点击“确定”

连接后如下:

双击“表输出”或右键选择“编辑步骤”

选择目标数据库中的数据表,然后点击”确定“

选择表输出,无法配置字段映射,所以前提是表结构一致才可使用。如果是异构表,需要字段映射的,则需要使用 插入/更新 组件

如果输入表和输出表结构不一致,即异构表,则需要使用插入/更新组件。从输出中选择插入/更新拖入转换视图中,然后进行步骤连接,进入输出配置

注意:一定要正确连接步骤,否则这步无法获取输入字段,输出字段

字段映射配置好后如下

点击“确定”

然后点击转换视图中的按钮,这个是运行

这个运行是运行一次,完成后就结束了。如果要定时运行,则需要作业

点击“启动” 会弹出界面 保存 当前转换

输入保存的文件名称,然后点击“Save”即可

每个步骤都显示绿色的箭头,说明没有错误,正确的执行完了转换。也可以在日志输出查看.

日志:完成处理 (I=1, O=1, R=1, W=1, U=0, E=0)中的 I=1 表示 输入 1 行,O=1表示 输出 1 行,R=1 表示 读取 1 行,W=1 表示 写入 1 行

然后看一下数据输出结果

源表

目标表

定时作业

如果需要定时执行同步过程,那么就需要引入作业。在“文件–>新建–>作业” 创建一个作业。

在“通用”中选择Start拖入作业视图中

然后选择转换拖入视图,并进行步骤连接。

双击“转换”或右键选择“编辑作业入口”

点击“确定”

然后选成功组件拖入视图,并连接步骤

双击视图中的Start组件或右键”编辑作业入口“,进行作业调度配置

点击运行视图中的按钮。一个定时作业即完成

定时作业调度期间,程序不能退出!程序退出,作业即停止

至此一个完整的数据处理作业完成了。

3. 案例

3.1 简单同步

本部分对简单同步进行说明。简单同步是指不涉及复杂计算、转换等同步工作。

3.1.1 单表

即一对一同步,A表数据同步到B表,A与B的字段数量、类型、名称可能都不一样,因此需要一些字段类型转换,这都很容易。

处理过程详见2.5章节


3.1.2 多表

即2个及以上的表往一个表同步,同样也需要字段映射、类型转换等操作。

外键关联

这种通过某个字段(外键)关联的表,处理思路是在获取数据时,通过sql联表查询,获取到全部需要的数据。然后用单表同步方式进行处理。

多表合并

如果是异构表的话,获取到每个数据源后,使用Multiway merge join多路合并组件处理

合并后的记录可作为一个单表,然后进行单表同步的处理

合并是笛卡尔积,即A表n条记录,B表n条记录,结果就是n x n条记录,字段是A、B表全部字段,这种方式不建议采用,会消耗更多内存资源。建议拆分成单表同步

如果是同构表的话,可拆分为多个单表同步处理。


3.2 复杂同步

本部分对涉及到数据计算、转换的同步工作进行说明。有些复杂操作,无法直接使用组件进行处理,需要用到Script组件。这里主要对如何使用脚本组件完成数据处理进行说明。

这里先展示一个实际案例

这个过程是多表同步到一个表、涉及到字段类型转换、补充字段和值、数据计算、增补数据。由于计算和增补数据使用内置组件无法完成,因此这里使用了java脚本组件,自定义代码进行数据处理。

这里对字段类型转换增加列给某列设置值java脚本进行说明。

字段类型转换

例如 数字类型 转为 字符串,字符串 转为 日期时间……

转换中选择字段选择组件

双击“字段选择”或右键选择“编辑步骤”

选择“元数据”,在“字段名称”列选择字段,然后在“类型”列选择目标类型


增加列并设置随机数

输入中找到生成随机数组件,拖入视图并连接步骤

双击生成随机数或右键选择“编辑步骤”

在“名称”列输入需要增加的字段名,类型选择生成随机数规则

点击“确定”后,运行转换,在preview data处可预览数据,可以看到增加了一列 uid 也有值


将列的值设置为常量

例如将上面随机数组件生成的值设置为常量1。在转换中选择将字段设置为常量组件,并连接步骤

双击“将字段设置为常量”或右键选择“编辑步骤”

在“字段”列选择需要设置的字段,这里选择上一步骤生成的“uid”字段,在“值替换”列输入值。

点击“确定”,运行转换,然后预览数据,可以看到uid的值被替换为1


java脚本

脚本有Java脚本、JavaScript脚本,SQL脚本等。这里使用Java脚本,脚本的目的是处理内置组件处理不了的逻辑。例如有10个地层,但是数据源中只记录了前9个地层,最后一个需要根据计算得到。

拖入Java脚本组件到转换视图中并连接步骤

双击“Java脚本”或右键选择“编辑步骤”

然后展开Code Snippits\Common use

选择Main拖入右侧编辑区,Main是整个脚本处理入口

其默认脚本结构如下

public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException {
  if (first) {
    first = false;
	// 代码逻辑区域
      
    /* TODO: Your code here. (Using info fields)

    FieldHelper infoField = get(Fields.Info, "info_field_name");

    RowSet infoStream = findInfoRowSet("info_stream_tag");

    Object[] infoRow = null;

    int infoRowCount = 0;

    // Read all rows from info step before calling getRow() method, which returns first row from any
    // input rowset. As rowMeta for info and input steps varies getRow() can lead to errors.
    while((infoRow = getRowFrom(infoStream)) != null){

      // do something with info data
      infoRowCount++;
    }
    */
  }

  Object[] r = getRow();

  if (r == null) {
    setOutputDone();
    return false;
  }

  // It is always safest to call createOutputRow() to ensure that your output row's Object[] is large
  // enough to handle any new fields you are creating in this step.
  r = createOutputRow(r, data.outputRowMeta.size());

  /* TODO: Your code here. (See Sample)

  // Get the value from an input field
  String foobar = get(Fields.In, "a_fieldname").getString(r);

  foobar += "bar";
    
  // Set a value in a new output field
  get(Fields.Out, "output_fieldname").setValue(r, foobar);

  */
  // Send the row on to the next step.
  putRow(data.outputRowMeta, r);

  return true;
}

TODO区域就是代码编辑区域,其它是默认脚本函数

点击”确定“,然后再次打开Java脚本,就能看到输入输出字段信息了

完整实现地层计算并补充最后一层的Java脚本代码逻辑如下

// 这里是需要用的 java API 所导入的包
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.lang.*;
import java.math.BigDecimal;
import java.util.*;

// 核心处理过程入口
public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException {

 if (first) {
    first = false;
	    logBasic("----------------------字段-------------------------");
        String projectCount = "project_count";
        String knumber = "knumber";
        String depth = "depth";
        String dep = "dep";
        String layerorder = "layerorder";
        String id = "id";
 
		logBasic("----------------------获取输入流-------------------------");
        // 输入数据流 input 是消息步骤中设置的标签名
        RowSet infoStream = findInfoRowSet("input"); 


 		Object[] infoRow = null;
    	int infoRowCount = 0;

		logBasic("----------------------遍历数据流,将其加载到map-------------------------");
        // 遍历数据流,将其加载到map,便于操作
        // 根据 项目索引+钻孔索引 分组
        Map<String, ArrayList<Object[]>> groups = new HashMap<String, ArrayList<Object[]>>();
        while((infoRow = getRowFrom(infoStream)) != null){
            // 获取字段值
        	String prjCode = get(TransformClassBase.Fields.In, projectCount).getString(infoRow);
            String drillCode = get(TransformClassBase.Fields.In, knumber).getString(infoRow);
            String groupKey = prjCode + drillCode;

            if (!groups.containsKey(groupKey)) {
                logBasic("----------------------创建分组-------------------------");
                groups.put(groupKey,new ArrayList<Object[]>());
            }
			logBasic("----------------------添加数据到分组-------------------------");
            ArrayList<Object[]> objects = (ArrayList<Object[]>)groups.get(groupKey);
            objects.add(infoRow);

            logBasic("----------------------添加数据到输出流-------------------------");
			// 将当前行拷贝一份
            Object[] row=infoRow;
            // 创建一个输出行
             row = createOutputRow(infoRow, data.outputRowMeta.size());
            //putRow(infoStream.getRowMeta(), row);
            // 将输出行添加到输出数据集
            putRow(data.outputRowMeta, row);

      		infoRowCount++;
    	}
           
        logBasic("----------------------分组完成,处理最后一条数据-------------------------");
        // 将最后一条数据拷贝一份,场地分层索引+1,层底深度dep 赋值为 钻孔深度 depth,然后将此行数数据添加
		Object[] keys = groups.keySet().toArray();
		for (int i = 0; i < keys.length; i++)  {
			 String s = keys[i].toString();
			 logBasic("----------------------当前分组-----------------------:"+ s);	
        	  ArrayList<Object[]> list = (ArrayList<Object[]>) groups.get(s);
		
			
              Object[] last = (Object[])list.get(list.size() - 1);         
              Object[] newLast=last;
            // 设置 layerorder 的值
           	  String layerorderVal = get(TransformClassBase.Fields.In, layerorder).getString(last);
			  BigDecimal v = new BigDecimal(layerorderVal);
              v = v.add(new BigDecimal(1));
              get(TransformClassBase.Fields.Out, layerorder).setValue(newLast, v);

            // 设置 dep 的值
              String layerDepVal = get(TransformClassBase.Fields.In, depth).getString(last);
         	  BigDecimal v2 = new BigDecimal(layerDepVal);
              get(TransformClassBase.Fields.Out, dep).setValue(newLast, v2);
            
			 // 设置id
              String idVal = UUID.randomUUID().toString();
              get(TransformClassBase.Fields.Out, id).setValue(newLast, idVal);
              
			  
			  logBasic("----------------------添加数据到输出流-------------------------");
			  newLast=createOutputRow(newLast, data.outputRowMeta.size());
              // 将新的一行数据添加到输出数据集
              putRow(data.outputRowMeta, newLast);

        }        
        



    /* TODO: Your code here. (Using info fields)

    FieldHelper infoField = get(Fields.Info, "info_field_name");

    RowSet infoStream = findInfoRowSet("info_stream_tag");

    Object[] infoRow = null;

    int infoRowCount = 0;

    // Read all rows from info step before calling getRow() method, which returns first row from any
    // input rowset. As rowMeta for info and input steps varies getRow() can lead to errors.
    while((infoRow = getRowFrom(infoStream)) != null){

      // do something with info data
      infoRowCount++;
    }
    */
  }


  Object[] r = getRow();
	 logBasic("----------------------getRow-----------------------:"+ r);	


  if (r == null) {
    setOutputDone();
    return false;
  }

  // It is always safest to call createOutputRow() to ensure that your output row's Object[] is large
  // enough to handle any new fields you are creating in this step.
  r = createOutputRow(r, data.outputRowMeta.size());
 logBasic("----------------------createOutputRow-----------------------:"+ r);	

  /* TODO: Your code here. (See Sample)

  // Get the value from an input field
  String foobar = get(Fields.In, "a_fieldname").getString(r);

  foobar += "bar";
    
  // Set a value in a new output field
  get(Fields.Out, "output_fieldname").setValue(r, foobar);

  */
  // Send the row on to the next step.
  putRow(data.outputRowMeta, r);

  return true;
}

至此 Java脚本处理完成。

痛(坑)点总结:

1.脚本编辑区是个文本编辑框,不能像IDEA一样帮助写代码,只能通过日志进行输出验证逻辑

2.建议通用的不涉及pentaho的java代码操作,可以在IDEA中完成,然后拷贝到脚本编辑区。例如需要导入的包就是在IDEA中通过智能导入,然后拷贝的

验证一下数据,图中标记的行,就是根据前2行数据计算而来,然后进行补充的。在数据源中只记录了前2行数据。文章来源地址https://www.toymoban.com/news/detail-700096.html

到了这里,关于pentaho(keetle)使用手册的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • element表格tooltip内容换行展示(本人第一次写帖子效果图在最后如果是各位想要的效果请点个赞,写的不好的地方也可以指导一下万分感谢)

    前言 在使用element的table中咱们有可能会遇到后端返回的单个字段有多个内容这个时候想一个单元格展示换行其实很简单直接就是template加个块标签v-for就可以了如果是很多的话这样表格就不美观如下图,这个时候大家会想到是show-overflow-tooltip,但是如果说数据跟我的一样是时

    2024年02月15日
    浏览(41)
  • 使用了百度OCR,记录一下

    由于识别ocr有的频率不高,图片无保密性需求,也不想太大的库, 就决定还是用下api算了,试用了几家,决定用百度的ocr包,相对简单。 遇到的问题里面下列基本有提到:例如获取ID,KEY;例如安装库; 参考帖子:python+百度OCR的使用方法(踩坑+测试程序)_no module named \\\'ai

    2024年02月06日
    浏览(47)
  • ubuntu20搭建环境使用的一下指令

    1.更新源 2.更新软件 3.ifconfig使用 4.安装vim 5.安装python 安装gcc 安装这个gcc source .bashrc

    2024年02月12日
    浏览(35)
  • 【大数据模型】使用Claude浅试一下

    汝之观览,吾之幸也!本文主要聊聊Claude使用的流程,在最后对国内外做了一个简单问题的对比,希望国内的大数据模型更快的发展。 一、产品介绍 claude官网 Claude是一款由前OpenAI的研究员和工程师开发的新型聊天机器人,它可以执行各种对话和文本处理任务,同时保持高度

    2024年02月05日
    浏览(31)
  • 分享一下如何使用echarts绘制散点图

    今天我来分享一下如何使用echarts绘制散点图 首先,我们需要引入echarts库。可以通过以下代码在HTML文件中引入: 或者引入下载好的js文件: 然后,我们需要准备好数据。散点图需要至少两个数据集,分别表示x轴和y轴的坐标。我们可以使用JavaScript数组来存储数据。 接下来,

    2024年02月06日
    浏览(40)
  • 记录一下Unity使用过程中出现的问题

    1.(2022.3.16) 问题:Unity打开已存在的项目时,一直停留在Hold on... Importing assests界面。 原因及解决方案:Unity Hub中项目设置的默认位置带有中文,将其修改为不含中文的路径即可。 (更新) 重新打开又出现类似问题,一使用VS进行脚本编辑时再次出现加载框,尝试先打开V

    2024年02月08日
    浏览(51)
  • 科普一下Elasticsearch中BM25算法的使用

    首先还是先了解几个概念,Elasticsearch是一个开源的分布式搜索和分析引擎,它使用一系列算法来计算文档的相关性分数(relevance score)。这些算法用于确定查询与文档的匹配程度,以便按相关性对搜索结果进行排序。以下是Elasticsearch中常用的算分算法: 词频(Term Frequency,

    2024年02月16日
    浏览(36)
  • 记录一下使用vs2022 上传到gitee项目

    第一个红框的地址一会需要用到

    2024年02月16日
    浏览(62)
  • Git - VSCode 使用手册

    本文为在 VSCode 中使用 Git 的基本使用手册。 本文仅演示 VSCode 默认提供的 Git 源代码管理功能,进阶Git功能可安装额外的Git插件。 源代码管理 面板中,点击 克隆仓库 输入 仓库URL 仓库URL,示例 选择仓库存储地址 打开仓库或仓库内的项目 源代码管理 面板中,可以看到Git仓库

    2024年02月04日
    浏览(39)
  • Obsidian 入门使用手册

      Obsidian 是一款基于 Markdown 语法编辑的笔记软件。与传统的 Markdown 软件不同的是,Obsidian 可以创建双向链接,而其他的一些软件可能需要付费才能实现这一功能。在 Obsidian 中,所有的笔记都是以 Markdown 格式存储的,这使得它易于导入和导出到其他工具,同时也使得用户可

    2024年02月12日
    浏览(34)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包