一百零八、Kettle采集Kafka数据到HDFS(踩坑,亲测有效)

这篇具有很好参考价值的文章主要介绍了一百零八、Kettle采集Kafka数据到HDFS(踩坑,亲测有效)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Kafka到HDFS,除了用Kafka API和flume之外,还可以用kettle,最大优点是不用写代码!

版本:Kettle版本:8.2、Hadoop版本:3.1.3

前提:    详情请看鄙人的一百零一、Kettle8.2.0连接Hive3.1.2(踩坑,亲测有效)

http://t.csdn.cn/mWfOChttp://t.csdn.cn/mWfOC

前提一、Hadoop系列配置文件已复制到kettle路径下     路径为:D:\java\kettle\pdi-ce-8.2.0.0-342\data-integration\plugins\pentaho-big-data-plugin\hadoop-configurations\hdp30

kettle消费kafka数据,Kafka,Kettle,hdfs,kafka,hadoop,kettle

注意:以上的配置文件要与自己使用的服务器配置文件一致 

前提二、在D:\java\kettle\pdi-ce-8.2.0.0-342\data-integration\plugins\pentaho-big-data-plugin\hadoop-configurations\hdp30\lib文件夹下注意替换jar包

kettle消费kafka数据,Kafka,Kettle,hdfs,kafka,hadoop,kettle

 前提三、在D:\java\kettle\pdi-ce-8.2.0.0-342\data-integration\lib增加MySQL驱动包,注意驱动包版本问题

kettle消费kafka数据,Kafka,Kettle,hdfs,kafka,hadoop,kettle

 这些准备好之后,下面开始用kettle采集Kafka中的数据到hdfs!

第1步,在kettle中创建新的转换任务。

kettle消费kafka数据,Kafka,Kettle,hdfs,kafka,hadoop,kettle

第2步,在Streaming中拖拽Kafka consumer控件,并且修改Kafka consumer控件配置信息

首先,Setup模块

Transformation:最好新建一个,用于返回流中的结果

Connection:选择direct ,右侧输入bootstrap-server     IP地址:端口号9092

Topics:选择kafka消费的topic

Consumer group:消费者组,随便填

kettle消费kafka数据,Kafka,Kettle,hdfs,kafka,hadoop,kettle

其次,Batch模块(根据自己的实际情况修改参数

Duration: 处理批次间隔

Number-of-records: 处理批次条数

maximum concurrent batches:最大并发批次(8.2版本没有,9.3版本有)

message prefetch limit:消息预取限制,防止读取数据量太大,造成kettle挂掉(8.2版本没有,9.3版本有)

Offset-management: 偏移量管理

kettle消费kafka数据,Kafka,Kettle,hdfs,kafka,hadoop,kettle

最后,Result fields模块

 kettle消费kafka数据,Kafka,Kettle,hdfs,kafka,hadoop,kettle

 其余,Fields和Options两个模块不需要修改。

 第3步,拖拽一个应用模块里面的写日志控件,方便查看Kafka中的字段

kettle消费kafka数据,Kafka,Kettle,hdfs,kafka,hadoop,kettle

 第4步,拖拽一个转换模块里面的字段选择控件,只取Kafka日志中的Message字段

kettle消费kafka数据,Kafka,Kettle,hdfs,kafka,hadoop,kettle

 第5步,是关键的一步!!! 拖拽输入模块里的JSON input插件

首先,在文件模块页面,勾选源定义在一个字段里,并从Message字段获取源。可以修改步骤名称

kettle消费kafka数据,Kafka,Kettle,hdfs,kafka,hadoop,kettle

其次,在字段模块。如果Select fields查不到字段的话,别慌。手动输入即可,不过此处为第一级别的字段,二级字段不在此处。不需要很详细。另外,要注意字段的类型

这是第一个大坑。明明Kafka里有数据,JSON input控件却找不到字段,最后发现可以手动输入字段名。

kettle消费kafka数据,Kafka,Kettle,hdfs,kafka,hadoop,kettle

第6步,由于还有二级字段需要解析,因此还要再拖拽一个JSON input字段

首先,拖拽输入模块里的JSON input插件

其次,在文件模块,勾选源定义在一个字段里,并从evaluationList字段获取源

kettle消费kafka数据,Kafka,Kettle,hdfs,kafka,hadoop,kettle

最后,在字段页面,Select fields字段。如果找不到,手动输入即可。注意,字段类型

 kettle消费kafka数据,Kafka,Kettle,hdfs,kafka,hadoop,kettle

 第7步,为了方便查看目前已有的字段,并且修改一些字段的名称,再次拖拽转换模块里面的字段选择控件

首先,在选择和修改页面,获取选择的字段,并且把不用的字段删掉

kettle消费kafka数据,Kafka,Kettle,hdfs,kafka,hadoop,kettle

其次,在元数据页面,修改需要修改的字段名称,注意字段类型以及格式的修改,特别是时间戳类型的字段,不能用string

kettle消费kafka数据,Kafka,Kettle,hdfs,kafka,hadoop,kettle

 第8步,最后一个控件。由于我这边是从Kafka读取数据到HDFS,所以我这边是拖拽Big Data里的Hadoop file output控件

首先,拖拽Big Data里的Hadoop file output控件

kettle消费kafka数据,Kafka,Kettle,hdfs,kafka,hadoop,kettle

其次,在文件页面。

kettle消费kafka数据,Kafka,Kettle,hdfs,kafka,hadoop,kettle

 

1.选择Hadoop Cluster  如果没有,可以新建一个  (前提是开启了Hadoop)

1.1.点击New新建Hadoop Cluster 

cluster name:随便填

storage:选择HDFS

HDFS的Hostname、Port,注意与Hadoop配置文件core-site.xml里填的一模一样

Username和Password就填自己的服务器密码,一般用户名是root,密码就是自己设置的root用户密码

kettle消费kafka数据,Kafka,Kettle,hdfs,kafka,hadoop,kettle

 注意:如果配置文件core-site.xml里hostname是hurys22,那么kettle里的Hostname也必须是hurys22,而不能是192.168.1.22这种,这是第二个大坑

kettle消费kafka数据,Kafka,Kettle,hdfs,kafka,hadoop,kettle

 1.2 这部分填好即可,剩下的部分不用填。填好后测试一下

这4个好即可,其他不需要

kettle消费kafka数据,Kafka,Kettle,hdfs,kafka,hadoop,kettle

 2.在Folder/File,选择hdfs的文件路径。可以先自己创建好,也可以让它自动生成。不过,自己要先创建好目标文件夹,因为她有个文件权限的问题。这是第三个大坑!!!

其实,经踩坑后发现,目标文件evaluation.csv,kettle任务可以自行创建,不过目标文件夹rtp,如果HDFS没有则必须提前手动创建好,然后给这个文件夹赋权即可。

2.1 创建hdfs目标文件夹

[root@hurys22 soft]# hdfs dfs -mkdir  -p  /rtp

2.2 文件赋权

[root@hurys22 ~]# hdfs dfs -chmod -R 777 /rtp/

kettle消费kafka数据,Kafka,Kettle,hdfs,kafka,hadoop,kettle

注意:必须要修改文件夹权限,否则kettle会报错文件权限不够

Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.AccessControlException): Permission denied: user=Administrator, access=WRITE, inode="/rtp":root:supergroup:drwxr-xr-x

3.如果自己已经创建好目标文件,那么要勾选 启动时不创建文件

4.文件扩展名填csv  

5.一定要指定日期时间格式,一般选择年月日或者年月日时分秒。否则。你第二次导到HDFS的数据会覆盖第一次导出的数据,因为文件路径是一样的    这是第四个大坑!!!

6.一个小技巧,可以通过显示文件名查看即将生成的目标文件名,从而可以确认文件名是否正确

kettle消费kafka数据,Kafka,Kettle,hdfs,kafka,hadoop,kettle

 

 7.在内容页面。选择分隔符,一般是,或者;     还有编码:UTF-8

kettle消费kafka数据,Kafka,Kettle,hdfs,kafka,hadoop,kettle

 8.在字段页面,点击获取字段和最小宽度,检查所需字段以及字段类型,注意时间戳字段类型的格式

kettle消费kafka数据,Kafka,Kettle,hdfs,kafka,hadoop,kettle

 第9步,在Hadoop file output控件修改好,保存kettle任务,点击运行

kettle消费kafka数据,Kafka,Kettle,hdfs,kafka,hadoop,kettle

 同时,HDFS会在提前建好的目标文件夹下自动生成目标文件,当任务停止时,文件才会显示数据大小。

kettle消费kafka数据,Kafka,Kettle,hdfs,kafka,hadoop,kettle

 不过,可以直接下载查看

第一步,点击Download

kettle消费kafka数据,Kafka,Kettle,hdfs,kafka,hadoop,kettle

 第二步,在下载文件夹下可以看到刚才下载的文件evaluation.csv以及文件大小kettle消费kafka数据,Kafka,Kettle,hdfs,kafka,hadoop,kettle

第三步,用notepad++打开目标文件evaluation.csv查看数据

kettle消费kafka数据,Kafka,Kettle,hdfs,kafka,hadoop,kettle

截止这边,用kettle采集Kafka数据到HDFS中就全部结束了,坑也挺多的。

乐于奉献共享,帮我你我他!!!文章来源地址https://www.toymoban.com/news/detail-608040.html

到了这里,关于一百零八、Kettle采集Kafka数据到HDFS(踩坑,亲测有效)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 一百六十八、Kettle——用海豚调度器定时调度从Kafka到HDFS的任务脚本(持续更新追踪、持续完善)

    在实际项目中,从Kafka到HDFS的数据是每天自动生成一个文件,按日期区分。而且Kafka在不断生产数据,因此看看kettle是不是需要时刻运行?能不能按照每日自动生成数据文件? 为了测试实际项目中的海豚定时调度从Kafka到HDFS的kettle任务情况,特地提前跑一下海豚定时调度这个

    2024年02月10日
    浏览(49)
  • 一百五十二、Kettle——Kettle9.3.0本地连接Hive3.1.2(踩坑,亲测有效)

    由于先前使用的kettle8.2版本在Linux上安装后,创建共享资源库点击connect时页面为空,后来采用如下方法,在/opt/install/data-integration/ui/menubar.xul文件里添加如下代码 menuitem id=\\\"file-openZiyuanku\\\" label=\\\"openZiyuanku\\\" command=\\\"spoon.openRepository()\\\"/ 共享资源库创建后又遇到任务Save时为空的问题。

    2024年02月13日
    浏览(45)
  • Bilibili For Linux版本(二百零八)

    简介: CSDN博客专家,专注Android/Linux系统,分享多mic语音方案、音视频、编解码等技术,与大家一起成长! 优质专栏: Audio工程师进阶系列 【 原创干货持续更新中…… 】🚀 优质专栏: 多媒体系统工程师系列 【 原创干货持续更新中…… 】🚀 人生格言: 人生从来没有捷径

    2024年01月23日
    浏览(42)
  • 一百五十二、Kettle——Kettle9.3.0本地连接Hive3.1.2(踩坑,亲测有效,附截图)

    由于先前使用的kettle8.2版本在Linux上安装后,创建共享资源库点击connect时页面为空,后来采用如下方法,在/opt/install/data-integration/ui/menubar.xul文件里添加如下代码 menuitem id=\\\"file-openZiyuanku\\\" label=\\\"openZiyuanku\\\" command=\\\"spoon.openRepository()\\\"/ 共享资源库创建后又遇到任务Save时为空的问题。

    2024年02月12日
    浏览(38)
  • 一百五十九、Kettle——Kettle9.2通过配置Hadoop clusters连接Hadoop3.1.3(踩坑亲测、附流程截图)

    由于kettle的任务需要用到Hadoop(HDFS),所以就要连接Hadoop服务。 之前使用的是kettle9.3,由于在kettle新官网以及博客百度等渠道实在找不到shims的驱动包,无奈换成了kettle9.2,kettle9.2的安装包里自带了shims的驱动包,不需要额外下载,挺好! kettle9.2.0安装包网盘链接,请看鄙人

    2024年02月12日
    浏览(63)
  • 大数据Flink(一百零二):SQL 聚合函数(Aggregate Function)

    文章目录 SQL 聚合函数(Aggregate Function) Python UDAF,即 Python AggregateFunction。Python UDAF 用来针对一组数据进行聚合运算,比如同一个 window 下的多条数据、或者同一个 key 下的多条数据等。针对同一组输入数据,Python AggregateFunction 产生一条输出数据。比如以下示例,定义了一个

    2024年02月08日
    浏览(42)
  • 第一百零五天学习记录:数据结构与算法基础:顺序表(王卓教学视频)

    注:笔记截图均来自王卓数据结构教学视频 线性表是具有相同特性的数据元素的一个有限序列 同一线性表中的元素必定具有相同特性,数据元素间的关系是线性关系。 稀疏多项式的运算 顺序存储结构存在的问题 1、存储空间分配不灵活 2、运算的空间复杂度高 引出链式存储

    2024年02月15日
    浏览(39)
  • 大数据Flink(一百零三):SQL 表值聚合函数(Table Aggregate Function)

    文章目录 SQL 表值聚合函数(Table Aggregate Function) Python UDTAF,即 Python TableAggregateFunction。Python UDTAF 用来针对一组数据进行聚合运算,比如同一个 window 下的多条数据、或者同一个 key 下的多条数据等,与 Python UDAF 不同的是,针对同一组输入数据,Python UDTAF 可以产生 0 条、1 条

    2024年02月07日
    浏览(41)
  • 用海豚调度器定时调度从Kafka到HDFS的kettle任务脚本

    在实际项目中,从Kafka到HDFS的数据是每天自动生成一个文件,按日期区分。而且Kafka在不断生产数据,因此看看kettle是不是需要时刻运行?能不能按照每日自动生成数据文件? 为了测试实际项目中的海豚定时调度从Kafka到HDFS的Kettle任务情况,特地提前跑一下海豚定时调度这个

    2024年04月15日
    浏览(37)
  • 第一百零六天学习记录:数据结构与算法基础:单链表(王卓教学视频)

    结点在存储器中的位置是任意的,即逻辑上相邻的数据元素在物理上不一定相邻 线性表的链式表示又称为非顺序映像或链式映像。 用一组物理位置任意的存储单元来存放线性表的数据元素。 这组存储单元既可以是连续的,也可以是不连续的,甚至是零散分布在内存中的任意

    2024年02月16日
    浏览(55)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包