Kafka——Kafka Connect详解

这篇具有很好参考价值的文章主要介绍了Kafka——Kafka Connect详解。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Kafka Connect

1、概要介绍

Kafka Connect是一个高伸缩性、高可靠性的数据集成工具,用于在Apache Kafka与其他系统间进行数据搬运以及执行ETL操作,比如Kafka Connect能够将文件系统中某些文件的内容全部灌入Kafka topic中或者是把Kafka topic中的消息导出到外部的数据库系统,如图所示。

kafka sinkconnector,kafka,kafka,分布式,Connect

如图所示,Kafka Connect主要由source connector和sink connector组成。事实上,几乎大部分的ETL框架都是由这两大类逻辑组件组成的,如Apache Flume、Kettle等。source connector负责把输入数据从外部系统中导入到Kafka中,而sink connector则负责把输出数据
导出到其他外部系统。

根据Kafka Connect官网的介绍,目前其主要的设计特点如下。

  • 通用性:依托底层的Kafka核心系统封装了connector接口,方便开发、部署和管理。
  • 兼具分布式(distributed)和单体式(standalone)两种模式:既可以以standalone单进程的方式运行,也可以扩展到多台机器成为分布式ETL系统。
  • REST接口:提供常见的REST API方便管理和操作,只适用于分布式模式。
  • 自动位移管理:connector自动管理位移,无须开发人员干预,降低开发成本。
  • 集成性:方便与流/批处理系统对接。

显然,一个ETL框架或connector系统是否好用的主要标志之一就是,看source connector和sink connector的种类是否丰富。默认提供的connector越多,我们就能集成越多的外部系统,免去了用户自行开发的成本。

2、standalone Connect

在standalone模式下所有的操作都是在一个进程中完成的。这种模式非常适合运行在测试或功能验证环境,抑或是必须是单线程才能完成的场景(比如收集日志文件)。由于是单进程,standalone模式无法充分利用Kafka天然提供的负载均衡和高容错等特性。

2.1、数据抽取与加载示例

下面我们在一个单节点的Kafka集群上运行standalone模式的Kafka Connect,把输入文件foo.txt中的数据通过Kafka传输到输出文件bar.txt中。首先我们制作配置文件。Kafka Connectstandalone模式下通常有3类配置文件:connect配置文件,若干source connector配置文件和若干sink connector配置文件。由于本例分别启动一个source connector读取foo.txt和一个sink connector写入bar.txt,故source和sink配置文件都只有一个,所以总共有如下3个配置文件。

  • connect-standalone.properties:connect standalone模式下的配置文件。
  • connect--file-source.properties:file source connector配置文件。
  • connect-file-sink.properties:file sink connector配置文件。

首先来编辑connect-standalone.properties文件。实际上,Kafka已经在config目录下为我们提供了一个该文件的模板。我们直接使用该模板并修改对应的字段即可,如下:

# connect-standalone.properties
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets
  • bootstrap.servers:指定Connect要连接的Kafka集群主机名和端口号。本例使用localhost::9092。
  • key/value.converter:设置Kafka消息key/value的格式转化类,本例使用JsonConverter,即把每条Kafka消息转化成一个JSON格式。
  • key/value.converter.schemas.enable:设置是否需要把数据看成纯JSON字符串或者JSON格式的对象。本例设置为tue,即把数据转换成JSON对象。
  • offset.storage.file.filename:connector会定期地将状态写入底层存储中。该参数设定了状态要被写入的底层存储文件的路径。本例使用/tmp/connect.offsets保存connector的
    状态。

下面编辑connect-file-source.properties,它在Kafka的config目录下也有一份模板,本例直接在该模板的基础上进行修改:

# connect-file-source.properties
name=test-file-source
connector.class=FileStreamSource
tasks.max=1
file=foo.txt
topic=connect-file-test
  • name:设置该file source connector的名称。
  • connector.class:设置source connector类的全限定名。有时候设置为类名也是可以的,Kafka Connect可以在classpath中自动搜寻该类并加载。
  • tasks.max:每个connector下会创建若干个任务(task)执行connector逻辑以期望增加并行度,但对于从单个文件读/写数据这样的操作,任意时刻只能有一个ask访问文件,故这里设置最大任务数为1。
  • file:输入文件全路径名。本例为foo.txt,即表示该文件位于Kafka目录下。实际使用时最好使用绝对路径。
  • topic:设置source connector把数据导入到Kafka的哪个topic,若该topic之前不存在,则source connector会自动创建。最好提前手工创建出该topic。.本例使用connect-file-test.

最后,我们编辑connect-file-sink.properties。同理,直接修改位于config目录下的connect-file-sink.properties模板文件:

# connect-file-sink.properties
name=test-file-sink
connector.class=FileStreamSink
tasks.max=1
file=bar.txt
topics=connect-file-test
  • name:设置该sink connector名称。
  • connector.class:设置sink connector类的全限定名。有时候设置为类名也是可以的,Kafka Connect可以在classpath中自动搜寻该类并加载。
  • tasks.max:依然设置为l,原理与source connector中配置设置相同。
  • file:输出文件全路径名。本例为bar.txt,即表示该文件位于Kafka目录下。实际使用时最好使用绝对路径。
  • topic:设置sink connector导出Kafka中的哪个topic的数据。

启动Kafka Connect的standalone模式:

./connect-standalone.sh ../config/connect-standalone.properties ../config/connect-file-source.properties ../config/connect-file-sink.properties

启动之后,应该可以看到控制台不断地打印“Couldn’t find file foo.txt for FileStreamSourceTask,sleeping to wait for it to be created”之类的日志。这是正常的,因为我们尚未创建输入文件foo.txt。
kafka sinkconnector,kafka,kafka,分布式,Connect

下面我们在Kafka的目录下创建foo.txt并写入一些文本行:

echo 'hello' >> ./foo.txt
echo 'kafka connect test exaple' >> ./foo.txt
echo 'this is a file connector test.' >> ./foo.txt

如果一切正常,可以看到在当前目录下生成一个名为bar.txt的文件:

hello
kafka connect test exaple
this is a file connector test.

kafka sinkconnector,kafka,kafka,分布式,Connect

可见,foo.txt文件的内容已经成功地被file connector通过Kafka搬运到bar.txt文件中了。

为了验证数据的确是通过Kafka topic进行转移的,我们读取一下topic(connect-file-test)的数据,如:

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-file-test --from-beginning


{"schema":{"type":"string","optional":false},"payload":"hello"}
{"schema":{"type":"string","optional":false},"payload":"kafka connect test exaple"}
{"schema":{"type":"string","optional":false},"payload":"this is a file connector test."}

kafka sinkconnector,kafka,kafka,分布式,Connect

2.2、数据抽取、转换与加载示例

上面的例子只涉及ETL中的E和L,即数据抽取(extract.)与加载(load)。作为一个ETL框架,Kafka Connect也支持相当程度的数据转换操作。下面演示在将文件数据导出到目
标文件之前为每条消息增加一个IP字段。如果要插入P静态字段,我们必须修改source connector的配置文件,增加以下这些行:

transforms=WrapMap,InsertHost
transforms.WrapMap.type=org.apache.kafka.connect.transforms.HoistField$Value
transforms.WrapMap.field=line
transforms.InsertHost.type=org.apache.kafka.connect.transforms.InsertField$Value
transforms.InsertHost.static.field=ip
transforms.InsertHost.static.value=com.connector.machinel

然后重启kafka Connect,然后写入foo.txt文件:

./connect-standalone.sh ../config/connect-standalone.properties ../config/connect-file-source.properties ../config/connect-file-sink.properties
echo 'this is a transformation test' >> ./foo.txt

然后查看bar.txt:

hello
kafka connect test exaple
this is a file connector test.
Struct{line=this is a transformation test,ip=com.connector.machinel}

kafka sinkconnector,kafka,kafka,分布式,Connect

显然,新增的数据被封装成一个结构体(Struct),并增加了ip字段。这就是上面WrapMap和InsertHost的作用。

3、distributed Connect

和standalone模式不同,distributed Connect天然地结合了Kafka提供的负载均衡和故障转移功能,能够自动地在多节点机器上平衡负载。用户可以增减机器来实现整体系统的高伸缩性。用户需要执行下列命令来启动distributed模式的Connect,假设我们依然使用Kafka config目录下的配置文件模板:

bin/connect-distributed.sh config/connect-distributed.properties

和standalone模式不同的是,在distributed模式中我们不需要指定source和sink的配置文件。distributed模式中的connector只能通过REST API来创建和管理。

3.1、示例

依然以FileStreamSourceConnector/FileStreamSinkConnector为例来演示如何在distributed模式下运行Kafka Connect。上述命令启动成功后,我们可以执行以下命令来获取当
前所有connector:

curl http://localhost:8083/connectors
[]

kafka sinkconnector,kafka,kafka,分布式,Connect

值得注意的是,distributed模式下默认的REST端口是8083,用户可以修改connect-distributed.properties文件中的rest.port属性来变更这一端口。如上可见,当前集群中没有创建任何的connector。

下面分别创建file source connector和file sink connector,命令如下:

curl -i -X POST -H "Content-type:application/json" -H "Accept:application/json" -d '{"name":"test-file-source","config":{"connector.class":"FileStreamSource","tasks.max":"1","topic":"connect-file-test","file":"foo.txt"}}' http://localhost:8083/connectors

kafka sinkconnector,kafka,kafka,分布式,Connect

curl -i -X POST -H "Content-type:application/json" -H "Accept:application/json" -d '{"name":"test-file-sink","config":{"connector.class":"FileStreamSink","tasks.max":"1","topics":"connect-file-test","file":"bar.txt"}}' http://localhost:8083/connectors

kafka sinkconnector,kafka,kafka,分布式,Connect

本例中使用curl工具给Kafka Connect发送POST请求。当前REST API只支持application/json作为请求(request)和响应(response)的内容类型(content type),因此在发送POST请求时必须显式指定HTTP的Accept头部为application/json,以设置response的content type。另外,我们还需要设置Content-Type头部信息为application/json,以指定request
的content type。在上面命令中我们只是把standalone模式下配置文件中的所有属性封装成JSON字符串传递给curl工具。注意,connector的name字段和其他字段是分开的,即其他字段首先要被封装到config下,然后和name一起做成JSON串。

下面再次获取当前所有connector以检查之前的两个connector是否已被创建出来:
kafka sinkconnector,kafka,kafka,分布式,Connect

这次我们可以看到两个connector都已经被创建出来了。REST API还提供了/connectors//{name}/config,允许用户查询某个connector的具体配置信息,我们使用这个endpoint来查询file sink connector的信息:

kafka sinkconnector,kafka,kafka,分布式,Connect

同时使用GET/connectors//{name}/status查询两connector的运行状态:
kafka sinkconnector,kafka,kafka,分布式,Connect

kafka sinkconnector,kafka,kafka,分布式,Connect

目前两个connector都正常工作。下面开始写入输入文件foo.txt:

echo 'one' >> ./foo.txt
echo 'two' >> ./foo.txt
echo 'three' >> ./foo.txt

查看bar.txt:
kafka sinkconnector,kafka,kafka,分布式,Connect

做完这些之后,我们删除这两个connector把系统还原回初始状态。若要删除connector,可以使用REST API–DELETE/connectors/.{name},如下:

curl -i -X DELETE http://localhost:8083/connectors/test-file-source

curl -i -X DELETE http://localhost:8083/connectors/test-file-sink

3.2、REST API

我们可以通过Kafka Connect提供的基于REST风格的API接口来管理连接器,默认端口号为8083,可以通过Worker进程的配置文件中的rest,port参数来修改端口号。Kafka ConnectREST API接口如表所示。

REST API 描述
GET / 查看Kafka集群版本信息
GET /connectors 查看当前活跃的连接器列表,显示连接器的名字
POST /connectors 根据指定配置,创建一个新的连接器
GET /connectors/{name} 查看指定连接器的信息
GET /connectors/{name}/config 查看指定连接器的配置信息
PUT /connectors/{name}/config 修改指定连接器的配置信息
GET /connectors/{name}/status 查看指定连接器的状态
POST /connectors/{name}/restart 重启指定的连接器
PUT /connectors/{name}/pause 暂停指定的连接器
GET /connectors/{name}/tasks 查看指定连接器正在运行的Task
POST /connectors/{name}/tasks 修改Task的配置
GET /connectors/{name}/tasks/{taskId}/status 查看指定连接器中指定Task的状态
POST /connectors/{name}/tasks/{tasked}/restart 重启指定连接器中指定的Task
DELETE /connectors/{name}/ 删除指定的连接器

3.3、其它连接器类

connector.class用来设置连接器类的全限定名称,有时候设置为类名也是可以的,Kafka Connect会在classpath中自动搜索这个类并加载。Kafka中默认只提供了与文件相关的连接器,如果要实现与其他数据存储系统相连接,那么可以参考文件连接器的具体实现来自定义一套连接器,或者搜寻开源的实现,比如Confluent公司提供的
一些产品:

  • kafka-connect-elasticsearch(https://github.com/confluentinc/kafka-connect-elasticsearch);
  • kafka-connect-jdbc (https://github.com/confluentinc/kafka-connect-jdbc);
  • kafka-connect-hdfs (https://github.com/confluentinc/kafka-connect-hdfs);
  • kafka-connect-storage-cloud (https://github.com/confluentinc/kafka-connect-storage-cloud).

4、示例MySQL数据同步到Redis

4.1、准备连接器

下载连接器

MySQL连接器:https://www.confluent.io/hub/confluentinc/kafka-connect-jdbc

Redis连接器:https://www.confluent.io/hub/jcustenborder/kafka-connect-redis

安装插件

在kafka目录下新建connect文件夹:

cd /usr/local/kafka_2.12-3.1.0
mkdir connect

将下载的插件移动到connect文件夹中:

cp confluentinc-kafka-connect-jdbc-10.7.4 /usr/local/kafka_2.12-3.1.0/confluentinc-kafka-connect-jdbc-10.7.4

cp jcustenborder-kafka-connect-redis-0.0.4 /usr/local/kafka_2.12-3.1.0/jcustenborder-kafka-connect-redis-0.0.4

下载mysql对应的驱动,放到confluentinc-kafka-connect-jdbc-10.7.4/lib目录下

mv mysql-connector-java-8.0.20.jar /usr/local/kafka_2.12-3.1.0/confluentinc-kafka-connect-jdbc-10.7.4/lib/mysql-connector-java-8.0.20.jar

修改distributed配置:

vim /usr/local/kafka_2.12-3.1.0/config/connect-distributed.properties

指定插件位置:

plugin.path=../connect

启动Connect,查看插件是否加载成功:

./connect-distributed.sh ../config/connect-distributed.properties

kafka sinkconnector,kafka,kafka,分布式,Connect

4.2、准备MySQL

创建表及数据

CREATE TABLE `login` (
	`id` bigint(20) NOT NULL AUTO_INCREMENT PRIMARY KEY,
  	`username` varchar(30) DEFAULT NULL,
	`login_time` datetime
);

INSERT INTO `login` VALUES(1, 'aaa', NOW());
INSERT INTO `login` VALUES(2, 'bbb', NOW());
INSERT INTO `login` VALUES(3, 'ccc', NOW());
INSERT INTO `login` VALUES(4, 'ddd', NOW());
INSERT INTO `login` VALUES(5, 'eee', NOW());
INSERT INTO `login` VALUES(6, 'fff', NOW());
INSERT INTO `login` VALUES(7, 'ggg', NOW());
INSERT INTO `login` VALUES(8, 'hhh', NOW());
INSERT INTO `login` VALUES(9, 'iii', NOW());
INSERT INTO `login` VALUES(10, 'jjj', NOW());

创建连接器,新建source.json

{
	"name": "example-source",
	"config": {
		"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
		"tasks.max": "1",
		"connection.url": "jdbc:mysql://127.0.0.1:3306/kafka_db",
		"connection.user": "root",
		"connection.password": "123456",
		"table.whitelist": "login",
        "mode":"incrementing",
        "incrementing.column.name":"id"
	}
}

向Worker发送请求,创建连接器:

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors -d @source.json

确认数据是否写入kafka

./kafka-topics.sh --bootstrap-server localhost:9092 --list

kafka sinkconnector,kafka,kafka,分布式,Connect

__consumer_offsets: 记录所有Kafka Consumer Group的Offset
connect-configs: 存储连接器的配置,对应Connect 配置文件中config.storage.topic
connect-offsets: 存储Source 的Offset,对应Connect 配置文件中offset.storage.topic
connect-status: 连接器与Task的状态,对应Connect 配置文件中status.storage.topic
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic login --from-beginning

kafka sinkconnector,kafka,kafka,分布式,Connect

4.3、准备redis

创建sink.json

{
	"name": "example-sink",
	"config": {
		"connector.class": "com.github.jcustenborder.kafka.connect.redis.RedisSinkConnector",
        "topics": "login",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.storage.StringConverter",
        "tasks.max": "1",
	    "redis.client.mode": "Standalone",
	    "redis.database": "1",
	    "redis.hosts": "localhost:6379",
	    "redis.password": "123456"
	}
}

启动:文章来源地址https://www.toymoban.com/news/detail-717117.html

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors -d @sink.json

到了这里,关于Kafka——Kafka Connect详解的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 分布式 - 消息队列Kafka:Kafka生产者架构和配置参数

    生产者发送消息流程参考图1: 先从创建一个ProducerRecord对象开始,其中需要包含目标主题和要发送的内容。另外,还可以指定键、分区、时间戳或标头。在发送ProducerRecord对象时,生产者需要先把键和值对象序列化成字节数组,这样才能在网络上传输。 接下来,如果没有显式

    2024年02月13日
    浏览(46)
  • 分布式 - 消息队列Kafka:Kafka消费者的分区分配策略

    Kafka 消费者负载均衡策略? Kafka 消费者分区分配策略? 1. 环境准备 创建主题 test 有5个分区,准备 3 个消费者并进行消费,观察消费分配情况。然后再停止其中一个消费者,再次观察消费分配情况。 ① 创建主题 test,该主题有5个分区,2个副本: ② 创建3个消费者CustomConsu

    2024年02月13日
    浏览(43)
  • 分布式 - 消息队列Kafka:Kafka生产者发送消息的方式

    不管是把Kafka作为消息队列、消息总线还是数据存储平台,总是需要一个可以往Kafka写入数据的生产者、一个可以从Kafka读取数据的消费者,或者一个兼具两种角色的应用程序。 Kafka 生产者是指使用 Apache Kafka 消息系统的应用程序,它们负责将消息发送到 Kafka 集群中的一个或多

    2024年02月13日
    浏览(42)
  • 分布式 - 消息队列Kafka:Kafka 消费者消息消费与参数配置

    01. 创建消费者 在读取消息之前,需要先创建一个KafkaConsumer对象。创建KafkaConsumer对象与创建KafkaProducer对象非常相似——把想要传给消费者的属性放在Properties对象里。 为简单起见,这里只提供4个必要的属性:bootstrap.servers、key.deserializer 和 value.deserializer。 ① bootstrap.servers 指

    2024年02月12日
    浏览(43)
  • 分布式 - 消息队列Kafka:Kafka 消费者消费位移的提交方式

    最简单的提交方式是让消费者自动提交偏移量,自动提交 offset 的相关参数: enable.auto.commit:是否开启自动提交 offset 功能,默认为 true; auto.commit.interval.ms:自动提交 offset 的时间间隔,默认为5秒; 如果 enable.auto.commit 被设置为true,那么每过5秒,消费者就会自动提交 poll() 返

    2024年02月12日
    浏览(44)
  • 分布式 - 消息队列Kafka:Kafka消费者和消费者组

    1. Kafka 消费者是什么? 消费者负责订阅Kafka中的主题,并且从订阅的主题上拉取消息。与其他一些消息中间件不同的是:在Kafka的消费理念中还有一层消费组的概念,每个消费者都有一个对应的消费组。当消息发布到主题后,只会被投递给订阅它的每个消费组中的一个消费者

    2024年02月13日
    浏览(42)
  • 分布式 - 消息队列Kafka:Kafka生产者发送消息的分区策略

    01. Kafka 分区的作用 分区的作用就是提供负载均衡的能力,或者说对数据进行分区的主要原因,就是为了实现系统的高伸缩性。不同的分区能够被放置到不同节点的机器上,而数据的读写操作也都是针对分区这个粒度而进行的,这样每个节点的机器都能独立地执行各自分区的

    2024年02月13日
    浏览(51)
  • 分布式 - 消息队列Kafka:Kafka消费者分区再均衡(Rebalance)

    01. Kafka 消费者分区再均衡是什么? 消费者群组里的消费者共享主题分区的所有权。当一个新消费者加入群组时,它将开始读取一部分原本由其他消费者读取的消息。当一个消费者被关闭或发生崩溃时,它将离开群组,原本由它读取的分区将由群组里的其他消费者读取。 分区

    2024年02月12日
    浏览(37)
  • 分布式应用之Zookeeper和Kafka

    1.定义 2.特点 3.数据结构 4.选举机制 第一次选举 非第一次选举 5.部署 1.概念 中间件是一种独立的系统软件或服务程序,分布式应用软件借助这种软件在不同的技术之间共享资源。 2.消息队列型 3.Web应用型(代理服务器) 1.为什么需要MQ 2.消息队列作用 3.消息队列模式 ①点对

    2024年02月15日
    浏览(45)
  • 分布式 - 消息队列Kafka:Kafka生产者发送消息的3种方式

    不管是把Kafka作为消息队列、消息总线还是数据存储平台,总是需要一个可以往Kafka写入数据的生产者、一个可以从Kafka读取数据的消费者,或者一个兼具两种角色的应用程序。 Kafka 生产者是指使用 Apache Kafka 消息系统的应用程序,它们负责将消息发送到 Kafka 集群中的一个或多

    2024年02月13日
    浏览(45)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包