在构建数仓时,经常会用到flume接收日志数据,通常涉及到的组件为kafka,hdfs等。下面以一个flume接收指定topic数据,并存入hdfs的案例,大致了解下flume相关使用规则。
版本:1.9
Source
Kafka Source就是一个Apache Kafka消费者,它从Kafka的topic中读取消息。 如果运行了多个Kafka Source,则可以把它们配置到同一个消费者组,以便每个source都读取一组唯一的topic分区。
目前支持Kafka 0.10.1.0以上版本,最高已经在Kafka 2.0.1版本上完成了测试,这已经是Flume 1.9发行时候的最高的Kafka版本了。
属性名 |
默认值 |
解释 |
---|---|---|
channels |
– |
与Source绑定的channel,多个用空格分开 |
type |
– |
组件类型,这个是: |
kafka.bootstrap.servers |
– |
Source使用的Kafka集群实例列表 |
kafka.consumer.group.id |
flume |
消费组的唯一标识符。如果有多个source或者Agent设定了相同的ID,表示它们是同一个消费者组 |
kafka.topics |
– |
将要读取消息的目标 Kafka topic 列表,多个用逗号分隔 |
kafka.topics.regex |
– |
会被Kafka Source订阅的 topic 集合的正则表达式。这个参数比 kafka.topics 拥有更高的优先级,如果这两个参数同时存在,则会覆盖kafka.topics的配置。 |
batchSize |
1000 |
一批写入 channel 的最大消息数 |
batchDurationMillis |
1000 |
一个批次写入 channel 之前的最大等待时间(毫秒)。达到等待时间或者数量达到 batchSize 都会触发写操作。 |
backoffSleepIncrement |
1000 |
当Kafka topic 显示为空时触发的初始和增量等待时间(毫秒)。等待时间可以避免对Kafka topic的频繁ping操作。默认的1秒钟对于获取数据比较合适, 但是对于使用拦截器时想达到更低的延迟可能就需要配置更低一些。 |
maxBackoffSleep |
5000 |
Kafka topic 显示为空时触发的最长等待时间(毫秒)。默认的5秒钟对于获取数据比较合适,但是对于使用拦截器时想达到更低的延迟可能就需要配置更低一些。 |
useFlumeEventFormat |
false |
默认情况下,从 Kafka topic 里面读取到的内容直接以字节数组的形式赋值给Event。如果设置为true,会以Flume Avro二进制格式进行读取。与Kafka Sink上的同名参数或者 Kafka channel 的parseAsFlumeEvent参数相关联,这样以对象的形式处理能使生成端发送过来的Event header信息得以保留。 |
setTopicHeader |
true |
当设置为 |
topicHeader |
topic |
如果 setTopicHeader 设置为 |
kafka.consumer.security.protocol |
PLAINTEXT |
设置使用哪种安全协议写入Kafka。可选值: |
more consumer security props |
如果使用了SASL_PLAINTEXT、SASL_SSL 或 SSL 等安全协议,参考 Kafka security 来为消费者增加安全相关的参数配置 |
|
Other Kafka Consumer Properties |
– |
其他一些 Kafka 消费者配置参数。任何 Kafka 支持的消费者参数都可以使用。唯一的要求是使用“kafka.consumer.”这个前缀来配置参数,比如: |
必需的参数已用 粗体 标明。
已经弃用的一些属性:
属性名 |
默认值 |
解释 |
---|---|---|
topic |
– |
改用 kafka.topics |
groupId |
flume |
改用 kafka.consumer.group.id |
zookeeperConnect |
– |
自0.9.x起不再受kafka消费者客户端的支持。以后使用kafka.bootstrap.servers与kafka集群建立连接 |
migrateZookeeperOffsets |
true |
如果找不到Kafka存储的偏移量,去Zookeeper中查找偏移量并将它们提交给 Kafka 。 它应该设置为true以支持从旧版本的FlumeKafka客户端无缝迁移。 迁移后,可以将其设置为false,但通常不需要这样做。 如果在Zookeeper未找到偏移量,则可通过kafka.consumer.auto.offset.reset配置如何处理偏移量。可以从 Kafka documentation 查看更多详细信息。 |
Channel
此处选择memory channel,内存 channel 是把 Event 队列存储到内存上,队列的最大数量就是 capacity 的设定值。它非常适合对吞吐量有较高要求的场景,但也是有代价的,当发生故障的时候会丢失当时内存中的所有 Event。 必需的参数已用 粗体 标明。
属性 |
默认值 |
解释 |
---|---|---|
type |
– |
组件类型,这个是: |
capacity |
100 |
内存中存储 Event 的最大数 |
transactionCapacity |
100 |
source 或者 sink 每个事务中存取 Event 的操作数量(不能比 capacity 大) |
keep-alive |
3 |
添加或删除一个 Event 的超时时间(秒) |
byteCapacityBufferPercentage |
20 |
指定 Event header 所占空间大小与 channel 中所有 Event 的总大小之间的百分比 |
byteCapacity |
Channel 中最大允许存储所有 Event 的总字节数(bytes)。默认情况下会使用JVM可用内存的80%作为最大可用内存(就是JVM启动参数里面配置的-Xmx的值)。 计算总字节时只计算 Event 的主体,这也是提供 byteCapacityBufferPercentage 配置参数的原因。注意,当你在一个 Agent 里面有多个内存 channel 的时候, 而且碰巧这些 channel 存储相同的物理 Event(例如:这些 channel 通过复制机制( 复制选择器 )接收同一个 source 中的 Event), 这时候这些 Event 占用的空间是累加的,并不会只计算一次。如果这个值设置为0(不限制),就会达到200G左右的内部硬件限制。 |
Sink
HDFS Sink ,这个Sink将Event写入Hadoop分布式文件系统(也就是HDFS)。 目前支持创建文本和序列文件。 它支持两种文件类型的压缩。 可以根据写入的时间、文件大小或Event数量定期滚动文件(关闭当前文件并创建新文件)。 它还可以根据Event自带的时间戳或系统时间等属性对数据进行分区。 存储文件的HDFS目录路径可以使用格式转义符,会由HDFS Sink进行动态地替换,以生成用于存储Event的目录或文件名。 使用此Sink需要安装hadoop, 以便Flume可以使用Hadoop的客户端与HDFS集群进行通信。 注意, 需要使用支持sync() 调用的Hadoop版本 。
属性名 |
默认值 |
解释 |
---|---|---|
channel |
– |
与 Sink 连接的 channel |
type |
– |
组件类型,这个是: |
hdfs.path |
– |
HDFS目录路径(例如:hdfs://namenode/flume/webdata/) |
hdfs.filePrefix |
FlumeData |
Flume在HDFS文件夹下创建新文件的固定前缀 |
hdfs.fileSuffix |
– |
Flume在HDFS文件夹下创建新文件的后缀(比如:.avro,注意这个“.”不会自动添加,需要显式配置) |
hdfs.inUsePrefix |
– |
Flume正在写入的临时文件前缀,默认没有 |
hdfs.inUseSuffix |
.tmp |
Flume正在写入的临时文件后缀 |
hdfs.emptyInUseSuffix |
false |
如果设置为 |
hdfs.rollInterval |
30 |
当前文件写入达到该值时间后触发滚动创建新文件(0表示不按照时间来分割文件),单位:秒 |
hdfs.rollSize |
1024 |
当前文件写入达到该大小后触发滚动创建新文件(0表示不根据文件大小来分割文件),单位:字节 |
hdfs.rollCount |
10 |
当前文件写入Event达到该数量后触发滚动创建新文件(0表示不根据 Event 数量来分割文件) |
hdfs.idleTimeout |
0 |
关闭非活动文件的超时时间(0表示禁用自动关闭文件),单位:秒 |
hdfs.batchSize |
100 |
向 HDFS 写入内容时每次批量操作的 Event 数量 |
hdfs.codeC |
– |
压缩算法。可选值: |
hdfs.fileType |
SequenceFile |
文件格式,目前支持: |
hdfs.maxOpenFiles |
5000 |
允许打开的最大文件数,如果超过这个数量,最先打开的文件会被关闭 |
hdfs.minBlockReplicas |
– |
指定每个HDFS块的最小副本数。 如果未指定,则使用 classpath 中 Hadoop 的默认配置。 |
hdfs.writeFormat |
Writable |
文件写入格式。可选值: |
hdfs.threadsPoolSize |
10 |
每个HDFS Sink实例操作HDFS IO时开启的线程数(open、write 等) |
hdfs.rollTimerPoolSize |
1 |
每个HDFS Sink实例调度定时文件滚动的线程数 |
hdfs.kerberosPrincipal |
– |
用于安全访问 HDFS 的 Kerberos 用户主体 |
hdfs.kerberosKeytab |
– |
用于安全访问 HDFS 的 Kerberos keytab 文件 |
hdfs.proxyUser |
代理名 |
|
hdfs.round |
false |
是否应将时间戳向下舍入(如果为true,则影响除 |
hdfs.roundValue |
1 |
向下舍入(小于当前时间)的这个值的最高倍(单位取决于下面的 hdfs.roundUnit ) 例子:假设当前时间戳是18:32:01,hdfs.roundUnit = |
hdfs.roundUnit |
second |
向下舍入的单位,可选值: |
hdfs.timeZone |
Local Time |
解析存储目录路径时候所使用的时区名,例如:America/Los_Angeles、Asia/Shanghai |
hdfs.useLocalTimeStamp |
false |
使用日期时间转义符时是否使用本地时间戳(而不是使用 Event header 中自带的时间戳) |
hdfs.closeTries |
0 |
开始尝试关闭文件时最大的重命名文件的尝试次数(因为打开的文件通常都有个.tmp的后缀,写入结束关闭文件时要重命名把后缀去掉)。 如果设置为1,Sink在重命名失败(可能是因为 NameNode 或 DataNode 发生错误)后不会重试,这样就导致了这个文件会一直保持为打开状态,并且带着.tmp的后缀; 如果设置为0,Sink会一直尝试重命名文件直到成功为止; 关闭文件操作失败时这个文件可能仍然是打开状态,这种情况数据还是完整的不会丢失,只有在Flume重启后文件才会关闭。 |
hdfs.retryInterval |
180 |
连续尝试关闭文件的时间间隔(秒)。 每次关闭操作都会调用多次 RPC 往返于 Namenode ,因此将此设置得太低会导致 Namenode 上产生大量负载。 如果设置为0或更小,则如果第一次尝试失败,将不会再尝试关闭文件,并且可能导致文件保持打开状态或扩展名为“.tmp”。 |
serializer |
TEXT |
Event 转为文件使用的序列化器。其他可选值有: |
serializer.* |
根据上面 serializer 配置的类型来根据需要添加序列化器的参数 |
完整案例如下:文章来源:https://www.toymoban.com/news/detail-817383.html
a1.sources=r1
a1.channels=c1
a1.sinks=k1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = hmcs030:9092,hmcs031:9092,hmcs032:9092
a1.sources.r1.kafka.topics= hmcs_network_enterprise_climb
a1.sources.r1.kafka.consumer.group.id = hmcs_network_enterprise_climb_group
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.hmcs.interceptor.DecodeInterceptor$Builder
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
a1.channels.c1.parseAsFlumeEvent = false
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://ns1/flume/enterprise/networkEnterData/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = climbNetworkEnter-
a1.sinks.k1.hdfs.round = false
a1.sinks.k1.hdfs.rollInterval = 300
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.fileType=DataStream
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1
启动命令如下:文章来源地址https://www.toymoban.com/news/detail-817383.html
nohup /usr/local/flume/bin/flume-ng agent -c /usr/local/flume/conf/ -f /usr/local/flume/job/kafka_memory_hdfs.conf -n a1 -Dflume.root.logger=info,console >/usr/local/flume/logs/kafka_memory_hdfs.log 2>&1 &
到了这里,关于flume案例的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!