Flume 简介及基本使用

这篇具有很好参考价值的文章主要介绍了Flume 简介及基本使用。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1.Flume简介

Apache Flume 是一个分布式,高可用的数据收集系统。它可以从不同的数据源收集数据,经过聚合后发送到存储系统中,通常用于日志数据的收集。Flume 分为 NG 和 OG (1.0 之前) 两个版本,NG 在 OG 的基础上进行了完全的重构,是目前使用最为广泛的版本。下面的介绍均以 NG 为基础。

2. Flume架构和基本概念

下图为 Flume 的基本架构图:

Flume 简介及基本使用,大数据ETL实战,flume,大数据,etl

2.1 基本架构

外部数据源以特定格式向 Flume 发送 `events` (事件),当 `source` 接收到 `events` 时,它将其存储到一个或多个 `channel`,`channe` 会一直保存 `events` 直到它被 `sink` 所消费。`sink` 的主要功能从 `channel` 中读取 `events`,并将其存入外部存储系统或转发到下一个 `source`,成功后再从 `channel` 中移除 `events`。

2.2 基本概念

1. Event

`Event` 是 Flume NG 数据传输的基本单元。类似于 JMS 和消息系统中的消息。一个 `Event` 由标题和正文组成:前者是键/值映射,后者是任意字节数组。

2. Source 

数据收集组件,从外部数据源收集数据,并存储到 Channel 中。

3. Channel

`Channel` 是源和接收器之间的管道,用于临时存储数据。可以是内存或持久化的文件系统:

+ `Memory Channel` : 使用内存,优点是速度快,但数据可能会丢失 (如突然宕机);

+ `File Channel` : 使用持久化的文件系统,优点是能保证数据不丢失,但是速度慢。

4. Sink

`Sink` 的主要功能从 `Channel` 中读取 `Event`,并将其存入外部存储系统或将其转发到下一个 `Source`,成功后再从 `Channel` 中移除 `Event`。

5. Agent

是一个独立的 (JVM) 进程,包含 `Source`、 `Channel`、 `Sink` 等组件。

2.3 组件种类

Flume 中的每一个组件都提供了丰富的类型,适用于不同场景:

- Source 类型 :内置了几十种类型,如 `Avro Source`,`Thrift Source`,`Kafka Source`,`JMS Source`;

- Sink 类型 :`HDFS Sink`,`Hive Sink`,`HBaseSinks`,`Avro Sink` 等;

- Channel 类型 :`Memory Channel`,`JDBC Channel`,`Kafka Channel`,`File Channel` 等。

对于 Flume 的使用,除非有特别的需求,否则通过组合内置的各种类型的 Source,Sink 和 Channel 就能满足大多数的需求。

3. Flume架构模式

Flume 支持多种架构模式,分别介绍如下

3.1 multi-agent flow

Flume 简介及基本使用,大数据ETL实战,flume,大数据,etl

Flume 支持跨越多个 Agent 的数据传递,这要求前一个 Agent 的 Sink 和下一个 Agent 的 Source 都必须是 `Avro` 类型,Sink 指向 Source 所在主机名 (或 IP 地址) 和端口(详细配置见下文案例三)。

3.2 Consolidation

Flume 简介及基本使用,大数据ETL实战,flume,大数据,etl

日志收集中常常存在大量的客户端(比如分布式 web 服务),Flume 支持使用多个 Agent 分别收集日志,然后通过一个或者多个 Agent 聚合后再存储到文件系统中。

3.3 Multiplexing the flow

Flume 简介及基本使用,大数据ETL实战,flume,大数据,etl

Flume 支持从一个 Source 向多个 Channel,也就是向多个 Sink 传递事件,这个操作称之为 `Fan Out`(扇出)。默认情况下 `Fan Out` 是向所有的 Channel 复制 `Event`,即所有 Channel 收到的数据都是相同的。同时 Flume 也支持在 `Source` 上自定义一个复用选择器 (multiplexing selector) 来实现自定义的路由规则。

4.Flume配置格式

Flume 配置通常需要以下两个步骤:

1. 分别定义好 Agent 的 Sources,Sinks,Channels,然后将 Sources 和 Sinks 与通道进行绑定。需要注意的是一个 Source 可以配置多个 Channel,但一个 Sink 只能配置一个 Channel。基本格式如下:

<Agent>.sources = <Source>
<Agent>.sinks = <Sink>
<Agent>.channels = <Channel1> <Channel2>

set channel for source
<Agent>.sources.<Source>.channels = <Channel1> <Channel2> ...

set channel for sink
<Agent>.sinks.<Sink>.channel = <Channel1>

2. 分别定义 Source,Sink,Channel 的具体属性。基本格式如下:

<Agent>.sources.<Source>.<someProperty> = <someValue>

properties for channels
<Agent>.channel.<Channel>.<someProperty> = <someValue>

properties for sinks
<Agent>.sources.<Sink>.<someProperty> = <someValue>

5. Flume使用案例

介绍几个 Flume 的使用案例:

+ 案例一:使用 Flume 监听文件内容变动,将新增加的内容输出到控制台。

+ 案例二:使用 Flume 监听指定目录,将目录下新增加的文件存储到 HDFS。

+ 案例三:使用 Avro 将本服务器收集到的日志数据发送到另外一台服务器。

5.1 案例一

需求: 监听文件内容变动,将新增加的内容输出到控制台。

实现: 主要使用 `Exec Source` 配合 `tail` 命令实现。

1. 配置

新建配置文件 `exec-memory-logger.properties`,其内容如下:

#指定agentsources,sinks,channels
a1.sources = s1  
a1.sinks = k1  
a1.channels = c1  
   
#配置sources属性
a1.sources.s1.type = exec
a1.sources.s1.command = tail -F /tmp/log.txt
a1.sources.s1.shell = /bin/bash -c

#将sourceschannels进行绑定
a1.sources.s1.channels = c1
   
#配置sink 
a1.sinks.k1.type = logger

#将sinkschannels进行绑定  
a1.sinks.k1.channel = c1  
   
#配置channel类型
a1.channels.c1.type = memory

2. 启动

flume-ng agent \
--conf conf \
--conf-file /usr/app/apache-flume-1.6.0-cdh5.15.2-bin/examples/exec-memory-logger.properties \
--name a1 \
-Dflume.root.logger=INFO,console

3. 测试

向文件中追加数据:

Flume 简介及基本使用,大数据ETL实战,flume,大数据,etl

控制台的显示:

Flume 简介及基本使用,大数据ETL实战,flume,大数据,etl

5.2 案例二

需求: 监听指定目录,将目录下新增加的文件存储到 HDFS。

实现:使用 `Spooling Directory Source` 和 `HDFS Sink`。

1. 配置

#指定agentsources,sinks,channels
a1.sources = s1  
a1.sinks = k1  
a1.channels = c1  
   
#配置sources属性
a1.sources.s1.type =spooldir  
a1.sources.s1.spoolDir =/tmp/logs
a1.sources.s1.basenameHeader = true
a1.sources.s1.basenameHeaderKey = fileName 
#将sourceschannels进行绑定  
a1.sources.s1.channels =c1 

   
#配置sink 
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H/
a1.sinks.k1.hdfs.filePrefix = %{fileName}
#生成的文件类型,默认是Sequencefile,可用DataStream,则为普通文本
a1.sinks.k1.hdfs.fileType = DataStream  
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#将sinkschannels进行绑定  
a1.sinks.k1.channel = c1
   
#配置channel类型
a1.channels.c1.type = memory

2. 启动

flume-ng agent \
--conf conf \
--conf-file /usr/app/apache-flume-1.6.0-cdh5.15.2-bin/examples/spooling-memory-hdfs.properties \
--name a1 -Dflume.root.logger=INFO,console

3. 测试

拷贝任意文件到监听目录下,可以从日志看到文件上传到 HDFS 的路径:

cp log.txt logs/

Flume 简介及基本使用,大数据ETL实战,flume,大数据,etl

查看上传到 HDFS 上的文件内容与本地是否一致:

hdfs dfs -cat /flume/events/19-04-09/13/log.txt.1554788567801

Flume 简介及基本使用,大数据ETL实战,flume,大数据,etl

5.3 案例三

需求: 将本服务器收集到的数据发送到另外一台服务器。

实现:使用 `avro sources` 和 `avro Sink` 实现。

1. 配置日志收集Flume

新建配置 `netcat-memory-avro.properties`,监听文件内容变化,然后将新的文件内容通过 `avro sink` 发送到 hadoop001 这台服务器的 8888 端口:

#指定agentsources,sinks,channels
a1.sources = s1
a1.sinks = k1
a1.channels = c1

#配置sources属性
a1.sources.s1.type = exec
a1.sources.s1.command = tail -F /tmp/log.txt
a1.sources.s1.shell = /bin/bash -c
a1.sources.s1.channels = c1

#配置sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop001
a1.sinks.k1.port = 8888
a1.sinks.k1.batch-size = 1
a1.sinks.k1.channel = c1

#配置channel类型
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

2. 配置日志聚合Flume

使用 `avro source` 监听 hadoop001 服务器的 8888 端口,将获取到内容输出到控制台:

#指定agentsources,sinks,channels
a2.sources = s2
a2.sinks = k2
a2.channels = c2

#配置sources属性
a2.sources.s2.type = avro
a2.sources.s2.bind = hadoop001
a2.sources.s2.port = 8888

#将sourceschannels进行绑定
a2.sources.s2.channels = c2

#配置sink
a2.sinks.k2.type = logger

#将sinkschannels进行绑定
a2.sinks.k2.channel = c2

#配置channel类型
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100

3. 启动

启动日志聚集 Flume:

flume-ng agent \
--conf conf \
--conf-file /usr/app/apache-flume-1.6.0-cdh5.15.2-bin/examples/avro-memory-logger.properties \
--name a2 -Dflume.root.logger=INFO,console

在启动日志收集 Flume:

flume-ng agent \
--conf conf \
--conf-file /usr/app/apache-flume-1.6.0-cdh5.15.2-bin/examples/netcat-memory-avro.properties \
--name a1 -Dflume.root.logger=INFO,console

这里建议按以上顺序启动,原因是 `avro.source` 会先与端口进行绑定,这样 `avro sink` 连接时才不会报无法连接的异常。但是即使不按顺序启动也是没关系的,`sink` 会一直重试,直至建立好连接。

Flume 简介及基本使用,大数据ETL实战,flume,大数据,etl

4.测试

向文件 `tmp/log.txt` 中追加内容:

Flume 简介及基本使用,大数据ETL实战,flume,大数据,etl

可以看到已经从 8888 端口监听到内容,并成功输出到控制台:

Flume 简介及基本使用,大数据ETL实战,flume,大数据,etl文章来源地址https://www.toymoban.com/news/detail-723834.html

到了这里,关于Flume 简介及基本使用的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • ETL简介:数据集成与应用

    在当今大数据时代,组织和企业需要处理和分析庞大的数据量。ETL(Extract, Transform, Load)是一种重要的数据集成和处理方法,它在数据管理和决策支持中起着关键作用。本文将介绍ETL的基本概念、作用和关键组成部分,以帮助读者了解ETL的重要性和应用领域。 ETL是指数据提取

    2024年02月12日
    浏览(35)
  • 大数据:Trino简介及ETL场景的解决方案

    Presto 在 Facebook 的诞生最开始是为了填补当时 Facebook 内部实时查询和 ETL 处理之间的空白。Presto 的核心目标就是提供交互式查询,也就是我们常说的 Ad-Hoc Query,很多公司都使用它作为 OLAP 计算引擎。但是随着近年来业务场景越来越复杂,除了交互式查询场景,很多公司也需要

    2024年02月08日
    浏览(39)
  • ETL工具 - Kettle 介绍及基本使用

    在介绍 Kettle 前先了解下什么是 ETL , ETL 是 Extract-Transform-Load 的缩写,即数据 抽取、转换、装载 的过程,对于企业或行业应用来说,经常会遇到各种异构数据的处理、转换、迁移等操作,这些操作有可能会面对大量的数据和繁琐的步骤,因此就萌生了一些 ETL 处理工具,市面

    2024年02月07日
    浏览(65)
  • 一、大数据技术之Flume(简介)

    1.1 Flume定义 Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统。Flume基于流式架构,灵活简单。 1.2 Flume基础架构 Flume组成架构如下图所示。 1.2.1 Agent Agent是一个JVM进程,它以事件的形式将数据从源头送至目的地。 Agent主要有3个部分组成

    2024年02月16日
    浏览(40)
  • 大数据学习(六)ETL开发详解及实战

    一、ETL含义 ETL,是英文Extract-Transform-Load的缩写,用来描述将数据从来源端经过抽取(extract)、转换(transform)、加载(load)至目的端的过程。 二、ETL工具 离线: sqoop、DataX、Kettle 实时:StreamSets 三、数据同步方式 数据同步方式分为:直连同步、数据文件同步、日志解析

    2024年02月14日
    浏览(37)
  • flume基本安装与使用

    解压flume包 到/usr/local/src/目录下  配置flume环境变量 修改flume中flume-env.sh文件,先将修改名称为flume-env.sh,或者直接新建一个 其中配置jdk flume,安装成功 查看版本 flume的基本使用 使用flume传输数据至hdfs中 查询dfs中有那些文件 创建一个flume文件夹 创建一个job目录用于存放flum

    2023年04月11日
    浏览(37)
  • 【Flume 01】Flume简介、部署、组件

    Flume是一个高可用、高可靠、分布式的海量日志采集、聚合和传输的系统 主要特性: 它有一个简单、灵活的基于流的数据流结构(使用Event封装) 具有负载均衡机制和故障转移机制 一个简单可扩展的数据模型(Source、Channel、Sink) Source组件: 从外界采集各种类型数据,将数据

    2024年02月16日
    浏览(79)
  • (二十三)大数据实战——Flume数据采集之采集数据聚合案例实战

    本节内容我们主要介绍一下Flume数据采集过程中,如何把多个数据采集点的数据聚合到一个地方供分析使用。我们使用hadoop101服务器采集nc数据,hadoop102采集文件数据,将hadoop101和hadoop102服务器采集的数据聚合到hadoop103服务器输出到控制台。其整体架构如下: ①在hadoop101服务

    2024年02月09日
    浏览(51)
  • (十九)大数据实战——Flume数据采集框架安装部署

    本节内容我们主要介绍一下大数据数据采集框架flume的安装部署,Flume 是一款流行的开源分布式系统,用于高效地采集、汇总和传输大规模数据。它主要用于处理大量产生的日志数据和事件流。Flume 支持从各种数据源(如日志文件、消息队列、数据库等)实时采集数据,并将

    2024年02月10日
    浏览(45)
  • (二十二)大数据实战——Flume数据采集之故障转移案例实战

    本节内容我们完成Flume数据采集的故障转移案例,使用三台服务器,一台服务器负责采集nc数据,通过使用failover模式的Sink处理器完成监控数据的故障转移,使用Avro的方式完成flume之间采集数据的传输。整体架构如下: ①在hadoop101服务器的/opt/module/apache-flume-1.9.0/job目录下创建

    2024年02月09日
    浏览(38)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包