Flume采集数据到Kafka操作详解

这篇具有很好参考价值的文章主要介绍了Flume采集数据到Kafka操作详解。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

目录

一、创建一个Kafka主题

二、配置Flume

三、开启Flume

四、开启Kafka消费者

五、复制文件到Flume监控的source目录下

六、查看Flume是否能够成功采集

七、采集后查看Kafka消费者主题

八、采集数据错误解决办法

1.Ctrl+C关闭flume

2.删除出错的topic并重新创建

3.删除对应Flume文件中指定目录下的内容

4. 重新开启Flume

5.重新复制文件到Flume监控的目录下

6.采集完成后查看kafka-events的行数


一、创建一个Kafka主题

kafka-topics.sh --create --zookeeper lxm147:2181 --topic events --partitions 1 --replication-factor 1

二、配置Flume

vim /opt/soft/flume190/conf/events/events.conf

events.sources=eventsSource
events.channels=eventsChannel
events.sinks=eventsSink

events.sources.eventsSource.type=spooldir
events.sources.eventsSource.spoolDir=/opt/flumelogfile/events
events.sources.eventsSource.deserializer=LINE
events.sources.eventsSource.deserializer.maxLineLength=320000
events.sources.eventsSource.includePattern=events_[0-9]{4}-[0-9]{2}-[0-9]{2}.csv
events.sources.eventsSource.interceptors=head_filter
events.sources.eventsSource.interceptors.head_filter.type=regex_filter
events.sources.eventsSource.interceptors.head_filter.regex=^event_id*
events.sources.eventsSource.interceptors.head_filter.excludeEvents=true

events.channels.eventsChannel.type=file
events.channels.eventsChannel.checkpointDir=/opt/flumelogfile/checkpoint/events

events.channels.eventsChannel.dataDirs=/opt/flumelogfile/data/events

events.sinks.eventsSink.type=org.apache.flume.sink.kafka.KafkaSink
events.sinks.eventsSink.batchSize=640
events.sinks.eventsSink.brokerList=LINE

events.sinks.eventsSink.brokerList=lxm147:9092
events.sinks.eventsSink.topic=events

events.sources.eventsSource.channels=eventsChannel
events.sinks.eventsSink.channel=eventsChannel

三、开启Flume

[root@lxm147 flume190]# ./bin/flume-ng agent --name events --conf ./conf/ --conf-file ./conf/events/events.conf  -Dflume.root.logger=INFO,console

四、开启Kafka消费者

kafka-console-consumer.sh --bootstrap-server lxm147:9092 --topic events

五、复制文件到Flume监控的source目录下

[root@lxm147 eventdata]# cp ./events.csv /opt/flumelogfile/events/events_2023-04-01.csv
[root@lxm147 eventdata]# wc -l events.csv # 查看events.csv文件的行数
3137973 events.csv

六、查看Flume是否能够成功采集

2023-04-21 12:15:34,040 (pool-4-thread-1) [INFO - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents(ReliableSpoolingFileEventReader.java:384)] Last read took us just up to a file boundary. Rolling to the next file, if there is one.
2023-04-21 12:15:34,041 (pool-4-thread-1) [INFO - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.rollCurrentFile(ReliableSpoolingFileEventReader.java:497)] Preparing to move file /opt/flumelogfile/events/events_2023-04-01.csv to /opt/flumelogfile/events/events_2023-04-01.csv.COMPLETED
2023-04-21 12:16:03,096 (Log-BackgroundWorker-eventsChannel) [INFO - org.apache.flume.channel.file.EventQueueBackingStoreFile.beginCheckpoint(EventQueueBackingStoreFile.java:230)] Start checkpoint for /opt/flumelogfile/checkpoint/events/checkpoint, elements to sync = 32673
2023-04-21 12:16:03,102 (Log-BackgroundWorker-eventsChannel) [INFO - org.apache.flume.channel.file.EventQueueBackingStoreFile.checkpoint(EventQueueBackingStoreFile.java:255)] Updating checkpoint metadata: logWriteOrderID: 1682056665561, queueSize: 0, queueHead: 137243
2023-04-21 12:16:03,110 (Log-BackgroundWorker-eventsChannel) [INFO - org.apache.flume.channel.file.Log.writeCheckpoint(Log.java:1065)] Updated checkpoint for file: /opt/flumelogfile/data/events/log-1 position: 1447136656 logWriteOrderID: 1682056665561

七、采集后查看Kafka消费者主题

[root@lxm147 config]# kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list lxm147:9092 --topic events
events:0:3137972

采集成功!

八、采集数据错误解决办法

例如原数据是2000行,采集到kafka某个topic下只有1500行,如果重新运行flume会导致数据采集重复,可以删除topic重新进行采集。

1.Ctrl+C关闭flume

2.删除出错的topic并重新创建

kafka-topics.sh --delete --zookeeper lxm147:2181 --topic events

kafka-topics.sh --create --zookeeper lxm147:2181 --topic events --partitions 1 --replication-factor 1

3.删除对应Flume文件中指定目录下的内容

# 删除flume监控的目录下传输过的文件
rm -rf /opt/flumelogfile/uf/*

# 删除检查点监控的目录下的文件
rm -rf /opt/flumelogfile/checkpoint/uf/*

# 删除Flume对应的日志文件
rm -rf /opt/flumelogfile/data/uf/*

4. 重新开启Flume

[root@lxm147 flume190]# ./bin/flume-ng agent --name events --conf ./conf/ --conf-file ./conf/events/events.conf -Dflume.root.logger=INFO,console

5.重新复制文件到Flume监控的目录下

[root@lxm147 eventdata]# cp ./events.csv /opt/flumelogfile/events/events_2023-04-01.csv

6.采集完成后查看kafka-events的行数

[root@lxm147 config]# kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list lxm147:9092 --topic events
events:0:3137972

完成!!!文章来源地址https://www.toymoban.com/news/detail-493138.html

到了这里,关于Flume采集数据到Kafka操作详解的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 大数据之使用Flume监听端口采集数据流到Kafka

    前言 题目: 一、读题分析 二、处理过程   1.先在Kafka中创建符合题意的Kafka的topic  创建符合题意的Kafka的topic 2.写出Flume所需要的配置文件 3.启动脚本然后启动Flume监听端口数据并传到Kafka 启动flume指令 启动脚本,观察Flume和Kafka的变化 三、重难点分析 总结          本题

    2024年02月08日
    浏览(59)
  • 一百七十二、Flume——Flume采集Kafka数据写入HDFS中(亲测有效、附截图)

    作为日志采集工具Flume,它在项目中最常见的就是采集Kafka中的数据然后写入HDFS或者HBase中,这里就是用flume采集Kafka的数据导入HDFS中 kafka_2.13-3.0.0.tgz hadoop-3.1.3.tar.gz apache-flume-1.9.0-bin.tar.gz # cd  /home/hurys/dc_env/flume190/conf # vi  evaluation.properties ### Name agent, source, channels and sink ali

    2024年02月09日
    浏览(49)
  • 日志采集传输框架之 Flume,将监听端口数据发送至Kafka

    1、简介                 Flume 是 Cloudera 提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传 输的系统。Flume 基于流式架构,主要有以下几个部分组成。  主要组件介绍: 1)、 Flume Agent 是一个 JVM 进程,它以事件的形式将数据从源头送至目的。Agent 主

    2024年01月22日
    浏览(58)
  • (二十八)大数据实战——Flume数据采集之kafka数据生产与消费集成案例

    本节内容我们主要介绍一下flume数据采集和kafka消息中间键的整合。通过flume监听nc端口的数据,将数据发送到kafka消息的first主题中,然后在通过flume消费kafka中的主题消息,将消费到的消息打印到控制台上。集成使用flume作为kafka的生产者和消费者。关于nc工具、flume以及kafka的

    2024年02月09日
    浏览(53)
  • Flume多路复用模式把接收数据注入kafka 的同时,将数据备份到HDFS目录

    启动hadoop、在hdfs中创建需要访问的目录 配置Hadoop的核心配置文件 core-site.xml :设置Hadoop的核心配置参数,例如NameNode的地址、数据块大小、副本数量等。示例配置如下: hdfs-site.xml :设置HDFS(Hadoop分布式文件系统)的参数,例如数据块复制因子、NameNode的存储路径等。示例配

    2024年02月16日
    浏览(45)
  • Flume实战篇-采集Kafka到hdfs

    记录Flume采集kafka数据到Hdfs。 主要是自定义下Flume读取event头部的时间。 将打好的包放入/opt/module/flume/lib文件夹下 [root@ lib]$ ls | grep interceptor flume-interceptor-1.0-SNAPSHOT-jar-with-dependencies.jar 配置下flume的jvm    上面配置消费的形式是earliest,如果重新启动以后,他会从最新的位置开

    2024年02月06日
    浏览(54)
  • Flume监听多个文件目录,并根据文件名称不同,输出到kafka不同topic中

    https://blog.csdn.net/qinqinde123/article/details/128130131 flume监听到有新文件出现的时候,会将文件内容推送到kakfa的topic中,但是如果文件夹中有不同类型的文件,直接推送到kafka的同一个topic中,如果根据内容无法区分不同类型的文件,那就需要根据文件名称来区分。flume本身根据配置

    2023年04月08日
    浏览(66)
  • Flume 数据采集

    1 . 2 . 1  集群 进程查看 脚本 (1)在/home/bigdata_admin/bin目录下创建脚本xcall.sh [bigdata_admin@hadoop102  bin]$ vim xcall.sh (2)在脚本中编写如下内容 (3)修改脚本执行权限 [bigdata_admin@hadoop102 bin ]$ chmod 777 xcall.sh (4)启动脚本 [bigdata_admin@hadoop102 bin ]$ xcall.sh jps 1 . 2.2 H adoop 安装 1)安

    2024年02月11日
    浏览(45)
  • 【数据采集与预处理】流数据采集工具Flume

    目录 一、Flume简介 (一)Flume定义 (二)Flume作用 二、Flume组成架构 三、Flume安装配置 (一)下载Flume (二)解压安装包 (三)配置环境变量 (四)查看Flume版本信息 四、Flume的运行 (一)Telnet准备工作 (二)使用Avro数据源测试Flume (三)使用netcat数据源测试Flume 五、F

    2024年01月21日
    浏览(103)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包