【Flume】Flume实践之采集文件内容上传至HDFS

这篇具有很好参考价值的文章主要介绍了【Flume】Flume实践之采集文件内容上传至HDFS。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1. 需求

       使用Flume从文件夹中采集数据并上传到HDFS中。要完成这个任务就需要使用在采集数据时使用Spooling Directory Source组件;传输数据时为了保证数据没有丢失风险,使用File Channel组件;输出数据时使用HDFS Sink。

2. 配置

       Flume各个组件的参数很多,因此通常复制官网的各组件样例程序并参照参数表进行修改。

2.1 Source

       Source组件使用Spooling Directory Source,本次实践需要指定两个参数,分别是:

  • type参数指定组件类型;
  • spoolDir参数指定要采集数据的源文件夹。

       根据需求编写的Source配置如下:

a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /data/log/studentDir

2.2 Channel

       Channel组件使用File Channel,本次实践需要指定三个参数,分别是:

  • type参数指定组件类型;
  • checkpointDir参数指定存放检查点文件的目录;
  • dataDirs 表示存放数据的目录。

       根据需求编写的Channel配置如下:

a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /data/soft/apache-flume-1.9.0-bin/data/studentDir/checkpoint
a1.channels.c1.dataDirs = /data/soft/apache-flume-1.9.0-bin/data/studentDir/data

2.3 Sink

       Sink组件使用HDFS Sink,本次实践需要指定八个参数,分别是:

  • type参数指定组件类型;
  • path参数指定最终数据要上传到的HDFS目录地址;
  • filePrefix参数指定存放数据的文件前缀;
  • fileType参数指定存放文件的类型,共有三种:SequenceFile、DataStream和CompressedStream,其中SequenceFile在Hadoop的MapReduce任务解决小文件问题时使用过;
  • writeFormat参数指定写文件的格式,有两种:Text和Writable(默认),如果后期想使用Hive或者Impala操作这份数据的话,必须在生成数据之前设置为Text,Text表示是普通文本数据;
  • rollInterval参数指定Sink间隔多长将临时文件滚动成最终目标文件,以秒为单位,默认值为30s。如果设置成0,则表示不根据时间来滚动文件(滚动roll指的是,Sink将临时文件重命名成最终目标文件,并新打开一个临时文件来写入数据);
  • rollSize参数指定当临时文件达到多大时(单位:bytes),滚动成目标文件,默认值为1024。如果设置成0,则表示不根据临时文件大小来滚动文件;
  • rollCount参数指定当events数据达到该数量时候,将临时文件滚动成目标文件,默认值为10。如果设置成0,则表示不根据events数据来滚动文件。

       根据需求编写的Sink配置如下:

a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://bigData01:9000/flume/studentDir
a1.sinks.k1.hdfs.filePrefix = stu-
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.rollInterval = 3600
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0

2.4 完整的配置代码

       将上述三个组件的配置完成后,再将其连接起来(设置Source后接的Channel和Sink前接的Channel),完整代码如下:

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /data/log/studentDir

# Describe the sink
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /data/soft/apache-flume-1.9.0-bin/data/studentDir/checkpoint
a1.channels.c1.dataDirs = /data/soft/apache-flume-1.9.0-bin/data/studentDir/data

# Use a channel
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://bigData01:9000/flume/studentDir
a1.sinks.k1.hdfs.filePrefix = stu-
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.rollInterval = 3600
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0


# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

3. 实践

       在运行Flume之前应该先检查建立采集数据的文件夹和文件,且系统此时直接启动会报错提示找不到SequenceFile,虽然我们已经把fileType改为了DataStream,但是Flume默认还是会加载这个类。这个问题的解决思路有两种:

  • 第一种解决思路:把SequenceFile相关的jar包都拷贝到Flume的lib目录下。但后续还会报错缺少HDFS相关的jar包,再拷贝对应的jar包即可。
  • 第二种解决思路:把这个节点设置为Hadoop集群的一个客户端节点,即在这个节点上配置Hadoop的框架。

       解决了这个问题后,在a1.sinks.k1.hdfs.path路径的机器上开启Hadoop服务框架,即可启动Flume服务采集信息并上传,运行过程如下:文章来源地址https://www.toymoban.com/news/detail-511230.html

  • 运行Flume
bin/flume-ng agent --name a1 --conf conf --conf-file conf/file2hdfs.conf -Dflume.root.logger=INFO,console
  • 检查HDFS的上传结果
    可以看到文件后有一个.tmp的后缀,是因为当前该文件并未达到配置中设置的滚动的要求,后续的数据依旧要写入到该文件。若要去掉这个后缀那根据配置需要等待1h或者等文件达到128M,除去配置的条件外,我们也可以强制关闭这个Agent来达到目的。但需要注意一点,当我们再重启Agent后,并不会给这个文件再给加上.tmp后缀,每次停止之前都会把所有的文件解除占用状态,下次启动的时候如果有新数据,则会产生新的文件,这其实仅仅模拟了自动切文件的效果。
[root@bigData01 soft]# hdfs dfs -ls /flume/studentDir
Found 1 items
-rw-r--r--   1 root supergroup         65 2023-02-03 06:23 /flume/studentDir/stu-.1675376620835.tmp
[root@bigData01 soft]# hdfs dfs -cat /flume/studentDir/*
jack    18      male
jessic  20      female
tom     17      male
  • 检查数据源文件
    为了防止重复读取同一个文件,Flume会在读取过的文件加上COMPLETED后缀,再不关闭Flume的前提下再加入一个class2.dat文件,会发现该文件立即读取并写入HDFS文件中。
[root@bigData02 studentDir]# ll
total 4
-rw-r--r--. 1 root root 65 Feb  3 05:31 class1.dat.COMPLETED
[root@bigData02 studentDir]# vi class2.dat
[root@bigData02 studentDir]# ll
total 8
-rw-r--r--. 1 root root 65 Feb  3 05:31 class1.dat.COMPLETED
-rw-r--r--. 1 root root 15 Feb  3 06:26 class2.dat.COMPLETED
-------------------------------------------------------------------
[root@bigData01 soft]# hdfs dfs -cat /flume/studentDir/*
jack    18      male
jessic  20      female
tom     17      male
Chaoql	23	male

到了这里,关于【Flume】Flume实践之采集文件内容上传至HDFS的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • wx.chooseMessageFile小程序文件上传,小程序开发工具上可正常使用,手机上上传显示无内容

    wx.chooseMessageFile 小程序文件上传时,小程序开发工具上可正常使用,手机上上传显示无内容 在手机上小程序是没有直接上传文件这个功能的,只能在微信聊天记录里面选; type 值是 file 时是选不到图片视频的,但是小程序开发工具上可以,真机上选不到 小程序官方文档 修改

    2024年02月11日
    浏览(36)
  • MinIO框架安装使用+实现上传需求

    前置知识:对象存储 MinIO对象存储系统是为海量数据存储、人工智能、大数据分析而设计,基于 Apache License v2.0 开源协议的对象存储系统,它完全兼容 Amazon S3 接口,单个对象最大可达5TB,适合存储海量图片、视频、日志文件、备份数据和容器/虚拟机镜像等。 通常在企业中我

    2024年02月10日
    浏览(35)
  • python采集数据保存csv, 文件内容乱码了怎么解决?

    如果你的 Python 程序采集到的数据在保存成 CSV 格式的文件时出现了乱码,那么可尝试以下解决方法: 1. 在打开 CSV 文件时指定编码方式 你可以使用 Python 中的 open() 函数打开 CSV 文件,并在 open() 函数中指定文件编码方式为 CSV 文件原始编码方式。如果 CSV 文件原始编码方式为

    2024年02月16日
    浏览(47)
  • 老大加需求:做一个支持超大文件 HTTP 断点续传的上传服务,我懵逼了~

    作者: 大飞飞鱼 来源: blog.csdn.net/ababab12345/article/details/80490621 最近由于笔者所在的研发集团产品需要,需要支持高性能的大文件(大都数是4GB以上)的http上传,并且要求支持http断点续传。笔者在以前的博客如何实现支持大文件的高性能HTTP文件上传服务器已经介绍了实现大文

    2024年02月08日
    浏览(44)
  • Flume 数据采集

    1 . 2 . 1  集群 进程查看 脚本 (1)在/home/bigdata_admin/bin目录下创建脚本xcall.sh [bigdata_admin@hadoop102  bin]$ vim xcall.sh (2)在脚本中编写如下内容 (3)修改脚本执行权限 [bigdata_admin@hadoop102 bin ]$ chmod 777 xcall.sh (4)启动脚本 [bigdata_admin@hadoop102 bin ]$ xcall.sh jps 1 . 2.2 H adoop 安装 1)安

    2024年02月11日
    浏览(45)
  • 【数据采集与预处理】流数据采集工具Flume

    目录 一、Flume简介 (一)Flume定义 (二)Flume作用 二、Flume组成架构 三、Flume安装配置 (一)下载Flume (二)解压安装包 (三)配置环境变量 (四)查看Flume版本信息 四、Flume的运行 (一)Telnet准备工作 (二)使用Avro数据源测试Flume (三)使用netcat数据源测试Flume 五、F

    2024年01月21日
    浏览(103)
  • Flume采集端口数据kafka消费

    1.flume单独搭建 2.Flume采集端口数据kafka消费

    2024年02月06日
    浏览(49)
  • IO流读取上传文件的内容

     

    2024年01月21日
    浏览(63)
  • Flume采集数据到Kafka操作详解

    目录 一、创建一个Kafka主题 二、配置Flume 三、开启Flume 四、开启Kafka消费者 五、复制文件到Flume监控的source目录下 六、查看Flume是否能够成功采集 七、采集后查看Kafka消费者主题 八、采集数据错误解决办法 1.Ctrl+C关闭flume 2.删除出错的topic并重新创建 3.删除对应Flume文件中指定

    2024年02月09日
    浏览(62)
  • Hadoop高手之路8-Flume日志采集

    在大数据系统的开发中,数据收集工作无疑是开发者首要解决的一个难题,但由于生产数据的源头丰富多样,其中包含网站日志数据、后台监控数据、用户浏览网页数据等,数据工程师要想将它们分门别类的采集到HDFS系统中,就可以使用Apache Flume(数据采集)系统。 1. Flum

    2024年02月05日
    浏览(49)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包