flume案例

这篇具有很好参考价值的文章主要介绍了flume案例。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

在构建数仓时,经常会用到flume接收日志数据,通常涉及到的组件为kafka,hdfs等。下面以一个flume接收指定topic数据,并存入hdfs的案例,大致了解下flume相关使用规则。

版本:1.9

Source

Kafka Source就是一个Apache Kafka消费者,它从Kafka的topic中读取消息。 如果运行了多个Kafka Source,则可以把它们配置到同一个消费者组,以便每个source都读取一组唯一的topic分区。

目前支持Kafka 0.10.1.0以上版本,最高已经在Kafka 2.0.1版本上完成了测试,这已经是Flume 1.9发行时候的最高的Kafka版本了。

属性名

默认值

解释

channels

与Source绑定的channel,多个用空格分开

type

组件类型,这个是: org.apache.flume.source.kafka.KafkaSource

kafka.bootstrap.servers

Source使用的Kafka集群实例列表

kafka.consumer.group.id

flume

消费组的唯一标识符。如果有多个source或者Agent设定了相同的ID,表示它们是同一个消费者组

kafka.topics

将要读取消息的目标 Kafka topic 列表,多个用逗号分隔

kafka.topics.regex

会被Kafka Source订阅的 topic 集合的正则表达式。这个参数比 kafka.topics 拥有更高的优先级,如果这两个参数同时存在,则会覆盖kafka.topics的配置。

batchSize

1000

一批写入 channel 的最大消息数

batchDurationMillis

1000

一个批次写入 channel 之前的最大等待时间(毫秒)。达到等待时间或者数量达到 batchSize 都会触发写操作。

backoffSleepIncrement

1000

当Kafka topic 显示为空时触发的初始和增量等待时间(毫秒)。等待时间可以避免对Kafka topic的频繁ping操作。默认的1秒钟对于获取数据比较合适, 但是对于使用拦截器时想达到更低的延迟可能就需要配置更低一些。

maxBackoffSleep

5000

Kafka topic 显示为空时触发的最长等待时间(毫秒)。默认的5秒钟对于获取数据比较合适,但是对于使用拦截器时想达到更低的延迟可能就需要配置更低一些。

useFlumeEventFormat

false

默认情况下,从 Kafka topic 里面读取到的内容直接以字节数组的形式赋值给Event。如果设置为true,会以Flume Avro二进制格式进行读取。与Kafka Sink上的同名参数或者 Kafka channel 的parseAsFlumeEvent参数相关联,这样以对象的形式处理能使生成端发送过来的Event header信息得以保留。

setTopicHeader

true

当设置为 true 时,会把存储Event的topic名字存储到header中,使用的key就是下面的 topicHeader 的值。

topicHeader

topic

如果 setTopicHeader 设置为 true ,则定义用于存储接收消息的 topic 使用header key。注意如果与 Kafka Sink 的 topicHeader 参数一起使用的时候要小心,避免又循环将消息又发送回 topic。

kafka.consumer.security.protocol

PLAINTEXT

设置使用哪种安全协议写入Kafka。可选值:SASL_PLAINTEXTSASL_SSLSSL ,有关安全设置的其他信息,请参见下文。

more consumer security props

如果使用了SASL_PLAINTEXT、SASL_SSL 或 SSL 等安全协议,参考 Kafka security 来为消费者增加安全相关的参数配置

Other Kafka Consumer Properties

其他一些 Kafka 消费者配置参数。任何 Kafka 支持的消费者参数都可以使用。唯一的要求是使用“kafka.consumer.”这个前缀来配置参数,比如: kafka.consumer.auto.offset.reset

必需的参数已用 粗体 标明。

已经弃用的一些属性:

属性名

默认值

解释

topic

改用 kafka.topics

groupId

flume

改用 kafka.consumer.group.id

zookeeperConnect

自0.9.x起不再受kafka消费者客户端的支持。以后使用kafka.bootstrap.servers与kafka集群建立连接

migrateZookeeperOffsets

true

如果找不到Kafka存储的偏移量,去Zookeeper中查找偏移量并将它们提交给 Kafka 。 它应该设置为true以支持从旧版本的FlumeKafka客户端无缝迁移。 迁移后,可以将其设置为false,但通常不需要这样做。 如果在Zookeeper未找到偏移量,则可通过kafka.consumer.auto.offset.reset配置如何处理偏移量。可以从 Kafka documentation 查看更多详细信息。

 Channel

此处选择memory channel,内存 channel 是把 Event 队列存储到内存上,队列的最大数量就是 capacity 的设定值。它非常适合对吞吐量有较高要求的场景,但也是有代价的,当发生故障的时候会丢失当时内存中的所有 Event。 必需的参数已用 粗体 标明。

属性

默认值

解释

type

组件类型,这个是: memory

capacity

100

内存中存储 Event 的最大数

transactionCapacity

100

source 或者 sink 每个事务中存取 Event 的操作数量(不能比 capacity 大)

keep-alive

3

添加或删除一个 Event 的超时时间(秒)

byteCapacityBufferPercentage

20

指定 Event header 所占空间大小与 channel 中所有 Event 的总大小之间的百分比

byteCapacity

Channel 中最大允许存储所有 Event 的总字节数(bytes)。默认情况下会使用JVM可用内存的80%作为最大可用内存(就是JVM启动参数里面配置的-Xmx的值)。 计算总字节时只计算 Event 的主体,这也是提供 byteCapacityBufferPercentage 配置参数的原因。注意,当你在一个 Agent 里面有多个内存 channel 的时候, 而且碰巧这些 channel 存储相同的物理 Event(例如:这些 channel 通过复制机制( 复制选择器 )接收同一个 source 中的 Event), 这时候这些 Event 占用的空间是累加的,并不会只计算一次。如果这个值设置为0(不限制),就会达到200G左右的内部硬件限制。

Sink

HDFS Sink ,这个Sink将Event写入Hadoop分布式文件系统(也就是HDFS)。 目前支持创建文本和序列文件。 它支持两种文件类型的压缩。 可以根据写入的时间、文件大小或Event数量定期滚动文件(关闭当前文件并创建新文件)。 它还可以根据Event自带的时间戳或系统时间等属性对数据进行分区。 存储文件的HDFS目录路径可以使用格式转义符,会由HDFS Sink进行动态地替换,以生成用于存储Event的目录或文件名。 使用此Sink需要安装hadoop, 以便Flume可以使用Hadoop的客户端与HDFS集群进行通信。 注意, 需要使用支持sync() 调用的Hadoop版本

属性名

默认值

解释

channel

与 Sink 连接的 channel

type

组件类型,这个是: hdfs

hdfs.path

HDFS目录路径(例如:hdfs://namenode/flume/webdata/)

hdfs.filePrefix

FlumeData

Flume在HDFS文件夹下创建新文件的固定前缀

hdfs.fileSuffix

Flume在HDFS文件夹下创建新文件的后缀(比如:.avro,注意这个“.”不会自动添加,需要显式配置)

hdfs.inUsePrefix

Flume正在写入的临时文件前缀,默认没有

hdfs.inUseSuffix

.tmp

Flume正在写入的临时文件后缀

hdfs.emptyInUseSuffix

false

如果设置为 false 上面的 hdfs.inUseSuffix 参数在写入文件时会生效,并且写入完成后会在目标文件上移除 hdfs.inUseSuffix 配置的后缀。如果设置为 true 则上面的 hdfs.inUseSuffix 参数会被忽略,写文件时不会带任何后缀

hdfs.rollInterval

30

当前文件写入达到该值时间后触发滚动创建新文件(0表示不按照时间来分割文件),单位:秒

hdfs.rollSize

1024

当前文件写入达到该大小后触发滚动创建新文件(0表示不根据文件大小来分割文件),单位:字节

hdfs.rollCount

10

当前文件写入Event达到该数量后触发滚动创建新文件(0表示不根据 Event 数量来分割文件)

hdfs.idleTimeout

0

关闭非活动文件的超时时间(0表示禁用自动关闭文件),单位:秒

hdfs.batchSize

100

向 HDFS 写入内容时每次批量操作的 Event 数量

hdfs.codeC

压缩算法。可选值:gzipbzip2lzolzop` 、 ``snappy

hdfs.fileType

SequenceFile

文件格式,目前支持: SequenceFileDataStreamCompressedStream 。 1. DataStream 不会压缩文件,不需要设置hdfs.codeC 2. CompressedStream 必须设置hdfs.codeC参数

hdfs.maxOpenFiles

5000

允许打开的最大文件数,如果超过这个数量,最先打开的文件会被关闭

hdfs.minBlockReplicas

指定每个HDFS块的最小副本数。 如果未指定,则使用 classpath 中 Hadoop 的默认配置。

hdfs.writeFormat

Writable

文件写入格式。可选值: TextWritable 。在使用 Flume 创建数据文件之前设置为 Text,否则 Apache Impala(孵化)或 Apache Hive 无法读取这些文件。

hdfs.threadsPoolSize

10

每个HDFS Sink实例操作HDFS IO时开启的线程数(open、write 等)

hdfs.rollTimerPoolSize

1

每个HDFS Sink实例调度定时文件滚动的线程数

hdfs.kerberosPrincipal

用于安全访问 HDFS 的 Kerberos 用户主体

hdfs.kerberosKeytab

用于安全访问 HDFS 的 Kerberos keytab 文件

hdfs.proxyUser

代理名

hdfs.round

false

是否应将时间戳向下舍入(如果为true,则影响除 %t 之外的所有基于时间的转义符)

hdfs.roundValue

1

向下舍入(小于当前时间)的这个值的最高倍(单位取决于下面的 hdfs.roundUnit ) 例子:假设当前时间戳是18:32:01,hdfs.roundUnit = minute 如果roundValue=5,则时间戳会取为:18:30 如果roundValue=7,则时间戳会取为:18:28 如果roundValue=10,则时间戳会取为:18:30

hdfs.roundUnit

second

向下舍入的单位,可选值: secondminutehour

hdfs.timeZone

Local Time

解析存储目录路径时候所使用的时区名,例如:America/Los_Angeles、Asia/Shanghai

hdfs.useLocalTimeStamp

false

使用日期时间转义符时是否使用本地时间戳(而不是使用 Event header 中自带的时间戳)

hdfs.closeTries

0

开始尝试关闭文件时最大的重命名文件的尝试次数(因为打开的文件通常都有个.tmp的后缀,写入结束关闭文件时要重命名把后缀去掉)。

如果设置为1,Sink在重命名失败(可能是因为 NameNode 或 DataNode 发生错误)后不会重试,这样就导致了这个文件会一直保持为打开状态,并且带着.tmp的后缀;

如果设置为0,Sink会一直尝试重命名文件直到成功为止;

关闭文件操作失败时这个文件可能仍然是打开状态,这种情况数据还是完整的不会丢失,只有在Flume重启后文件才会关闭。

hdfs.retryInterval

180

连续尝试关闭文件的时间间隔(秒)。 每次关闭操作都会调用多次 RPC 往返于 Namenode ,因此将此设置得太低会导致 Namenode 上产生大量负载。 如果设置为0或更小,则如果第一次尝试失败,将不会再尝试关闭文件,并且可能导致文件保持打开状态或扩展名为“.tmp”。

serializer

TEXT

Event 转为文件使用的序列化器。其他可选值有: avro_event 或其他 EventSerializer.Builderinterface 接口的实现类的全限定类名。

serializer.*

根据上面 serializer 配置的类型来根据需要添加序列化器的参数

完整案例如下:

a1.sources=r1
a1.channels=c1
a1.sinks=k1

a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = hmcs030:9092,hmcs031:9092,hmcs032:9092
a1.sources.r1.kafka.topics= hmcs_network_enterprise_climb
a1.sources.r1.kafka.consumer.group.id = hmcs_network_enterprise_climb_group
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.hmcs.interceptor.DecodeInterceptor$Builder

a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
a1.channels.c1.parseAsFlumeEvent = false

a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://ns1/flume/enterprise/networkEnterData/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = climbNetworkEnter-
a1.sinks.k1.hdfs.round = false
a1.sinks.k1.hdfs.rollInterval = 300
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.fileType=DataStream

a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1

启动命令如下:文章来源地址https://www.toymoban.com/news/detail-817383.html

nohup /usr/local/flume/bin/flume-ng agent -c /usr/local/flume/conf/ -f /usr/local/flume/job/kafka_memory_hdfs.conf -n a1 -Dflume.root.logger=info,console >/usr/local/flume/logs/kafka_memory_hdfs.log 2>&1 &

到了这里,关于flume案例的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 影响ETL数据传输性能的9大因素及主流ETL应对策略

    现在很多企业在选择 ETL 工具时都特别关注 ETL 的数据传输性能,而有很多开源 ETL 工具都说自已是性能如何如何快,而事实上数据传输性能是不是这些工具说的那样快呢?   数据传输性能受制于哪些因素呢?企业在自身数据库性能受制的情况下一味的想用 ETL 工具来提升性能

    2024年01月23日
    浏览(38)
  • 大数据扫盲(2): 数据分析BI与ETL的紧密关系——ETL是成功BI的先决条件

    着业务的发展每个企业都将产生越来越多的数据,然后这些数据本身并不能直接带来洞察力并产生业务价值。为了释放数据的潜力,数据分析BI(商业智能)成为了现代企业不可或缺的一部分。然而,在数据分析的背后,有一个至关重要且常常被忽视的步骤——ETL(Extract, T

    2024年02月12日
    浏览(39)
  • ETL详解--数据仓库技术

      一、ETL简介 ETL ,是英文Extract-Transform-Load的缩写,用来描述将数据从来源端经过抽取(extract)、转换(transform)、加载(load)至目的端的过程,是数据仓库的生命线。它是一种数据处理过程,用于从不同的数据源中提取数据、对数据进行转换和清洗,并将处理后的数据加

    2024年02月02日
    浏览(37)
  • 大数据开发之电商数仓(hadoop、flume、hive、hdfs、zookeeper、kafka)

    1.1.1 数据仓库概念 1、数据仓库概念: 为企业制定决策,提供数据支持的集合。通过对数据仓库中数据的分析,可以帮助企业,改进业务流程、控制成本,提高产品质量。 数据仓库并不是数据的最终目的地,而是为数据最终的目的地做好准备,这些准备包括对数据的:清洗、

    2024年01月22日
    浏览(60)
  • 大数据ETL工具Kettle

    提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 提示:这里可以添加本文要记录的大概内容: 提示:以下是本篇文章正文内容,下面案例可供参考 ETL是将业务系统的数据经过抽取、清洗转换之后加载到数据仓库的过程,目的是将企业中的分散、零乱

    2024年02月10日
    浏览(47)
  • ETL简介:数据集成与应用

    在当今大数据时代,组织和企业需要处理和分析庞大的数据量。ETL(Extract, Transform, Load)是一种重要的数据集成和处理方法,它在数据管理和决策支持中起着关键作用。本文将介绍ETL的基本概念、作用和关键组成部分,以帮助读者了解ETL的重要性和应用领域。 ETL是指数据提取

    2024年02月12日
    浏览(34)
  • ETL的数据挖掘方式

    ETL的基本概念 数据抽取(Extraction):从不同源头系统中获取所需数据的步骤。比如从mysql中拿取数据就是一种简单的抽取动作,从API接口拿取数据也是。 数据转换(Transformation):清洗、整合和转化原始数据以适应目标存储或分析系统的阶段。从mysql中拿到数据之后对数据进

    2024年03月12日
    浏览(40)
  • 数据仓库的ELT/ETL

    ETL 和 ELT 有很多共同点,从本质上讲,每种集成方法都可以将数据从源端抽取到数据仓库中,两者的区别在于数据在哪里进行转换。 ETL – 抽取、转换、加载 从不同的数据源抽取信息,将其转换为根据业务定义的格式,然后将其加载到其他数据库或数据仓库中。另一种 ETL 集

    2024年04月16日
    浏览(42)
  • ETL数据集成和数据仓库的关键步骤

    在当今数据驱动的世界中,ETL(提取、转换和加载)过程在构建可靠和高效的数据仓库中扮演着关键角色。ETL数据集成和数据仓库的关键步骤对于数据质量和决策支持至关重要。本文将介绍ETL数据集成和数据仓库构建的关键步骤,以帮助读者了解构建一个可靠数据仓库所需的

    2024年02月12日
    浏览(98)
  • 使用Python进行ETL数据处理

    💂 个人网站:【海拥】【摸鱼游戏】【神级源码资源网】 🤟 前端学习课程:👉【28个案例趣学前端】【400个JS面试题】 💅 想寻找共同学习交流、摸鱼划水的小伙伴,请点击【摸鱼学习交流群】 ETL(Extract, Transform, Load)是一种广泛应用于数据处理和数据仓库建设的方法论,

    2024年02月01日
    浏览(35)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包