Kafka集群间同步数据方案-Flume

这篇具有很好参考价值的文章主要介绍了Kafka集群间同步数据方案-Flume。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Apache Flume 是一个分布式、高可靠、高可用的用来收集、聚合、转移不同来源的大量日志数据到中央数据仓库的工具。

系统要求

  1. Java运行环境 - Java 1.8或更高版本

体系结构

Event是Flume定义的一个数据流传输的最小单元。Agent就是一个Flume的实例,本质是一个JVM进程,该JVM进程控制Event数据流从外部日志生产者那里传输到目的地(或者是下一个Agent)。

学习Flume必须明白这几个概念,Event英文直译是事件,但是在Flume里表示数据传输的一个最小单位(被Flume收集的一条条日志又或者一个个的二进制文件,不管你在外面叫什么,进入Flume之后它就叫event)。参照下图可以看得出Agent就是Flume的一个部署实例, 一个完整的Agent中包含了必须的三个组件Source、Channel和Sink,Source是指数据的来源和方式,Channel是一个数据的缓冲池,Sink定义了数据输出的方式和目的地。

kafka数据同步到另外一个kafka,运维,1024程序员节,flume,kafka

Source消耗由外部(如Web服务器)传递给它的Event。外部以Flume Source识别的格式向Flume发送Event。例如,Avro Source 可接收从Avro客户端(或其他FlumeSink)接收Avro Event。用 Thrift Source 也可以实现类似的流程,接收的Event数据可以是任何语言编写的只要符合Thrift协议即可。

kafka集群间同步方案

flume同步kafka的本质是flume起一个java进程,监控并消费待同步的kafka集群的topic,并写入到需要同步到的kafka集群的kafka,所以在配置时,Source为Kafka Source,Sink为Kfka Sink。

kafka数据同步到另外一个kafka,运维,1024程序员节,flume,kafka

Kafka Source就是一个Apache Kafka消费者,它从Kafka的topic中读取消息。 如果运行了多个Kafka Source,则可以把它们配置到同一个消费者组,以便每个source都读取一组唯一的topic分区。目前支持Kafka 0.10.1.0以上版本。Kafka Sink 可以把数据发送到 Kafka topic上。

同步案例

1.源集群和目标集群(从源集群同步数据到目标集群)

集群 地址 版本
源集群 10.19.162.107(其中一个节点) 2.13-3.0.2
目标集群 110.19.162.101(其中一个节点) 2.13-2.7.0

在10.19.162.107和110.19.162.101上分别新建测试topic:test-flume-logs1

# 新建topic
./bin/kafka-topics.sh --create --bootstrap-server 10.19.162.107:9092 --replication-factor 2 --partitions 1 --topic test-flume-logs1

kafka数据同步到另外一个kafka,运维,1024程序员节,flume,kafka

这里使用python脚本往10.19.162.107的topic:test-flume-logs1写入100000条测点数据,python脚本如下:

import os

from pykafka import KafkaClient

dirPath = 'E:\\log\\测点数据'
fileList = os.listdir(dirPath)

client = KafkaClient(hosts="10.19.162.107:9092,10.19.162.108:9092,10.19.162.109:9092")

topic = client.topics['test-flume-logs1']

print(len(fileList))

f2 = open('E:\\log\\all.txt',mode="w",encoding='utf-8')


for j in range(10):
    with topic.get_sync_producer() as producer:
        for i in range(len(fileList)):
            fileName = dirPath + "\\" + fileList[i]
            f = open(fileName,encoding='utf-8')

            line = f.readline()
            # 以下是文件读取伪代码
            while line:
                line = line + '\n'
                f2.write(line)
                # 写入kafka
                producer.produce(line.encode())
                # print(line)
                line = f.readline()
            f.close()

f2.close()

写入后查看10.19.162.107的test-flume-logs1中消息的数量:

kafka数据同步到另外一个kafka,运维,1024程序员节,flume,kafka

 在如下位置新建flume同步的配置文件kafka-to-kafka.conf

kafka数据同步到另外一个kafka,运维,1024程序员节,flume,kafka

配置文件内容如下:

#kafka相关source和sink参数可以参考如下网址
# 中文翻译版:https://flume.liyifeng.org/#kafka-sink
# 英文原版: https://flume.apache.org/releases/content/1.10.1/FlumeUserGuide.html#kafka-sink
# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1


# Describe/configure the source
a2.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a2.sources.r1.batchSize = 1000
a2.sources.r1.batchDurationMillis = 2000
# 此处hadoop01,hadoop02,hadoop03在host文件中已配置,ip分别为10.19.162.107,10.19.162.108,10.19.162.109
a2.sources.r1.kafka.bootstrap.servers = hadoop01:9092,hadoop02:9092,hadoop03:9092
a2.sources.r1.kafka.topics = test-flume-logs1
a2.sources.r1.kafka.consumer.group.id = testgruoup
a2.sources.r1.migrateZookeeperOffsets = false
a2.sources.r1.kafka.consumer.auto.offset.reset = earliest

## Source 拦截器,如果不指定拦截器,flume有bug,sink会使用最上方配置的kafka配置
a2.sources.r1.interceptors = i1
a2.sources.r1.interceptors.i1.type = static
a2.sources.r1.interceptors.i1.key = topic
a2.sources.r1.interceptors.i1.preserveExisting = false
a2.sources.r1.interceptors.i1.value = test-flume-logs1

# Describe the sink
a2.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a2.sinks.k1.kafka.topic = test-flume-logs1
a2.sinks.k1.kafka.bootstrap.servers = 10.19.162.101:9092,10.19.162.102:9092,10.19.162.105:9092
a2.sinks.k1.kafka.flumeBatchSize = 20
a2.sinks.k1.kafka.producer.acks = 1
a2.sinks.k1.kafka.producer.linger.ms = 1
a2.sinks.ki.kafka.producer.compression.type = snappy

# Use a channel which buffers events in memory
a2.channels.c1.type = memory
a2.channels.c1.capacity = 2000
a2.channels.c1.transactionCapacity = 1000

# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

写入前,查看10.19.162.101的test-flume-logs1中消息的数量:

 bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 10.19.162.107:9092 --topic test-flume-logs1  --time -1

kafka数据同步到另外一个kafka,运维,1024程序员节,flume,kafka

使用如下指令开启flume:

# 其中9001位flume的状态监控端口
bin/flume-ng agent --conf conf --conf-file conf/kafka-to-kafka.conf --name a2 -Dflume.root.logger=INFO,console -Dflume.monitoring.type=http -Dflume.monitoring.port=9001

执行完之后查看10.19.162.101的test-flume-logs1中消息的数量,可以看到已经全部导入:

 kafka数据同步到另外一个kafka,运维,1024程序员节,flume,kafka

访问http://10.19.162.107:9001/metrics可以查看flume运行状态:

kafka数据同步到另外一个kafka,运维,1024程序员节,flume,kafka

格式化后如下:

{
	"SINK.k1": {
		"ConnectionCreatedCount": "0",
		"BatchCompleteCount": "0",
		"BatchEmptyCount": "26",
		"EventDrainAttemptCount": "100000", # event数量,也就是从源kafka的topic中读取的数量
		"StartTime": "1666580989920",
		"BatchUnderflowCount": "0",
		"ConnectionFailedCount": "0",
		"ConnectionClosedCount": "0",
		"Type": "SINK",
		"RollbackCount": "0",
		"EventDrainSuccessCount": "100000",	# 成功写入的数量
		"KafkaEventSendTimer": "9861",
		"StopTime": "0"
	},
	"CHANNEL.c1": {
		"ChannelCapacity": "2000",
		"ChannelFillPercentage": "0.0",
		"Type": "CHANNEL",
		"ChannelSize": "0",
		"EventTakeSuccessCount": "100000",
		"EventTakeAttemptCount": "100027",
		"StartTime": "1666580989317",
		"EventPutAttemptCount": "100000",
		"EventPutSuccessCount": "100000",
		"StopTime": "0"
	},
	"SOURCE.r1": {
		"KafkaEventGetTimer": "4470",
		"AppendBatchAcceptedCount": "0",
		"EventAcceptedCount": "100000",
		"AppendReceivedCount": "0",
		"StartTime": "1666580989944",
		"AppendBatchReceivedCount": "0",
		"KafkaCommitTimer": "257",
		"EventReceivedCount": "100000",
		"Type": "SOURCE",
		"AppendAcceptedCount": "0",
		"OpenConnectionCount": "0",
		"KafkaEmptyCount": "30",
		"StopTime": "0"
	}
}

指标项说明

source监控项

objectName
(会随实际情况而变化)
指标项 说明
org.apache.flume.source:type=r1 OpenConnectionCount 目前与客户端或sink保持连接的总数量
org.apache.flume.source:type=r1 AppendBatchAcceptedCount 成功提交到channel的批次的总数量
org.apache.flume.source:type=r1 AppendBatchReceivedCount 接收到事件批次的总数量
org.apache.flume.source:type=r1 AppendAcceptedCount 逐条录入的次数
org.apache.flume.source:type=r1 AppendReceivedCount 每批只有一个事件的事件总数量
org.apache.flume.source:type=r1 EventAcceptedCount 成功写出到channel的事件总数量
org.apache.flume.source:type=r1 EventReceivedCount 目前为止source已经接收到的事件总数量
org.apache.flume.source:type=r1 StartTime source启动时的毫秒值时间
org.apache.flume.source:type=r1 StopTime source停止时的毫秒值时间,为0表示一直在运行

channel监控项

objectName
(会随实际情况而变化)
指标项 说明
org.apache.flume.channel:type=c1 EventPutAttemptCount Source尝试写入Channe的事件总次数
org.apache.flume.channel:type=c1 EventPutSuccessCount 成功写入channel且提交的事件总次数
org.apache.flume.channel:type=c1 EventTakeAttemptCount sink尝试从channel拉取事件的总次数。
org.apache.flume.channel:type=c1 EventTakeSuccessCount sink成功从channel读取事件的总数量
org.apache.flume.channel:type=c1 ChannelSize 目前channel中事件的总数量
org.apache.flume.channel:type=c1 ChannelCapacity channel的容量
org.apache.flume.channel:type=c1 ChannelFillPercentage channel已填入的百分比
org.apache.flume.channel:type=c1 StartTime channel启动时的毫秒值时间
org.apache.flume.channel:type=c1 StopTime channel停止时的毫秒值时间,为0表示一直在运行

sink监控项

objectName
(会随实际情况而变化)
指标项 说明
org.apache.flume.sink:type=k1 ConnectionCreatedCount 创建的连接数量
org.apache.flume.sink:type=k1 ConnectionClosedCount 关闭的连接数量
org.apache.flume.sink:type=k1 ConnectionFailedCount 由于错误关闭的连接数量
org.apache.flume.sink:type=k1 BatchEmptyCount 批量处理event的个数为0的数量-表示source写入数据的速度比sink处理数据的速度慢
org.apache.flume.sink:type=k1 BatchUnderflowCount 批量处理event的个数小于批处理大小的数量
org.apache.flume.sink:type=k1 BatchCompleteCount 批量处理event的个数等于批处理大小的数量
org.apache.flume.sink:type=k1 EventDrainAttemptCount sink尝试写出到存储的事件总数量
org.apache.flume.sink:type=k1 EventDrainSuccessCount sink成功写出到存储的事件总数量
org.apache.flume.sink:type=k1 StartTime channel启动时的毫秒值时间
org.apache.flume.sink:type=k1 StopTime channel停止时的毫秒值时间,为0表示一直在运行

更多flume监控可以参考:

Flume 1.9用户手册中文版 — 可能是目前翻译最完整的版本了文章来源地址https://www.toymoban.com/news/detail-668397.html

到了这里,关于Kafka集群间同步数据方案-Flume的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Flink CDC 整库 / 多表同步至 Kafka 方案(附源码)

    本文讨论的方案将是本博客以往介绍的所有关于 CDC 数据同步方案中最贴合实际生产需要的,因为以往介绍的开箱即用方案往往都是一张表对应一个 Kafka Topic,在数据库和数据表数量很大的情况下,这种模式是不实用的,主要问题在于: 每张表都对应一个单独的 Flink 作业,占

    2024年04月22日
    浏览(40)
  • 3、Kafka 线上集群部署方案怎么做?

    对比 Linux、Mac、Window,Linux 系统显然要更加适合部署 Kafka。主要有下面这三个方面,Linux 的表现更胜一筹。 主流的 I/O 模型通常有 5 种类型:阻塞式 I/O、非阻塞式 I/O、I/O 多路复用、信号驱动 I/O 和异步 I/O。 通常情况下我们认为后一种模型会比前一种模型要高级。 相关实现

    2024年02月03日
    浏览(32)
  • Kafka跨集群备份解决方案MirrorMaker

    一般情况下,我们会使用一套 Kafka 集群来完成业务,但有些场景确实会需要多套 Kafka 集群同时工作,比如为了便于实现灾难恢复,你可以在两个机房分别部署单独的 Kafka 集群。如果其中一个机房出现故障,你就能很容易地把流量打到另一个正常运转的机房下。再比如,你想

    2024年02月11日
    浏览(30)
  • Clickhouse Engine kafka 将kafka数据同步clickhouse

    根据官方给出的kafka引擎文档,做一个实践记录。 官方地址:https://clickhouse.tech/docs/zh/engines/table-engines/integrations/kafka/ 1、特性介绍 clickhouse支持kafka的表双向同步,其中提供的为Kafka引擎。 其大致情况为如下情况:Kafka主题中存在对应的数据格式,Clickhouse创建一个Kafka引擎表(

    2024年01月16日
    浏览(44)
  • Kafka数据同步原理详解

    Kafka是一种分布式的消息队列系统,它具有高吞吐量、可扩展性和分布式特性等优势。在Kafka中,数据按照主题进行分区,每个主题都有一组分区。每个分区都有自己的生产者和消费者,生产者负责向分区中写入消息,消费者负责从分区中读取消息。因此,Kafka的数据同步主要

    2024年02月08日
    浏览(34)
  • Kafka实时数据同步

    目录 1 概述 2 捕获Oracle数据到Kafka 2.1 数据捕获设置 2.2 数据发布设置 2.3 捕获到发布数据流映射 2.4 查看任务执行日志 3 订阅Kafka数据到ClickHouse 3.1 数据订阅设置 3.2 数据加载设置 3.3 订阅到加载数据流映射 3.4 查看任务执行日志  4 校验数据一致性 BeeDI 支持实时捕获业务系统变

    2024年02月07日
    浏览(41)
  • 十万字图文详解mysql、redis、kafka、elasticsearch(ES)多源异构不同种类数据库集成、数据共享、数据同步、不同中间件技术实现与方案,如何构建数据仓库、数据湖、数仓一体化?

    数据库大数据量、高并发、高可用解决方案,十万字图文详解mysql、redis、kafka、elasticsearch(ES)多源异构不同种类数据库集成、数据共享、数据同步、不同中间件技术实现与方案,如何构建数据仓库、数据湖、数仓一体化?Delta Lake、Apache Hudi和Apache Iceberg数仓一体化技术架构

    2024年02月07日
    浏览(51)
  • kafka入门(十):副本数据同步

    副本 副本(Replica),指的是分布式系统对数据和服务提供的一种冗余方式。 Kafka通过多副本机制实现故障自动转移,在Kafka集群中某个broker节点失效的情况下仍然保证服务可用。 失效副本 在ISR集合之外,也就是处于同步失效或功能失效(比如副本处于非存活状态)的副本统

    2024年01月22日
    浏览(44)
  • kafka的安装,用于数据库同步数据

    因业务需求,需要查询其他部门的数据库数据,不方便直连数据库,所以要定时将他们的数据同步到我们的环境中,技术选型选中了kafka+CDC Kafka是Apache旗下的一款分布式流媒体平台,Kafka是一种高吞吐量、持久性、分布式的发布订阅的消息队列系统。 它最初由LinkedIn(领英)公司

    2024年02月20日
    浏览(43)
  • Canal+Kafka实现Mysql数据同步

    canal [kə\\\'næl] ,译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费 canal可以用来监控数据库数据的变化,从而获得新增数据,或者修改的数据。 canal是应阿里巴巴存在杭州和美国的双机房部署,存在跨机房同步的业务需求而提出的。

    2024年02月12日
    浏览(45)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包