Kafka connect

这篇具有很好参考价值的文章主要介绍了Kafka connect。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

这里以 mysql - kafka connect - oracle 实现upsert 全量同步为例:

启动zookeeper 、 kafka 等组件后

编写kafka/config/connect-distributed.properties文件

##
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
##

# This file contains some of the configurations for the Kafka Connect distributed worker. This file is intended
# to be used with the examples, and some settings may differ from those used in a production system, especially
# the `bootstrap.servers` and those specifying replication factors.

# A list of host/port pairs to use for establishing the initial connection to the Kafka cluster.
bootstrap.servers=localhost:9092

# unique name for the cluster, used in forming the Connect cluster group. Note that this must not conflict with consumer group IDs
group.id=connect-cluster

# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
# it to
key.converter.schemas.enable=true
value.converter.schemas.enable=true

# Topic to use for storing offsets. This topic should have many partitions and be replicated and compacted.
# Kafka Connect will attempt to create the topic automatically when needed, but you can always manually create
# the topic before starting Kafka Connect if a specific topic configuration is needed.
# Most users will want to use the built-in default replication factor of 3 or in some cases even specify a larger value.
# Since this means there must be at least as many brokers as the maximum replication factor used, we'd like to be able
# to run this example on a single-broker cluster and so here we instead set the replication factor to 1.
offset.storage.topic=connect-offsets
offset.storage.replication.factor=1
#offset.storage.partitions=25

# Topic to use for storing connector and task configurations; note that this should be a single partition, highly replicated,
# and compacted topic. Kafka Connect will attempt to create the topic automatically when needed, but you can always manually create
# the topic before starting Kafka Connect if a specific topic configuration is needed.
# Most users will want to use the built-in default replication factor of 3 or in some cases even specify a larger value.
# Since this means there must be at least as many brokers as the maximum replication factor used, we'd like to be able
# to run this example on a single-broker cluster and so here we instead set the replication factor to 1.
config.storage.topic=connect-configs
config.storage.replication.factor=1

# Topic to use for storing statuses. This topic can have multiple partitions and should be replicated and compacted.
# Kafka Connect will attempt to create the topic automatically when needed, but you can always manually create
# the topic before starting Kafka Connect if a specific topic configuration is needed.
# Most users will want to use the built-in default replication factor of 3 or in some cases even specify a larger value.
# Since this means there must be at least as many brokers as the maximum replication factor used, we'd like to be able
# to run this example on a single-broker cluster and so here we instead set the replication factor to 1.
status.storage.topic=connect-status
status.storage.replication.factor=1
#status.storage.partitions=5

# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000

# List of comma-separated URIs the REST API will listen on. The supported protocols are HTTP and HTTPS.
# Specify hostname as 0.0.0.0 to bind to all interfaces.
# Leave hostname empty to bind to default interface.
# Examples of legal listener lists: HTTP://myhost:8083,HTTPS://myhost:8084"
listeners=HTTP://collector:8083

# The Hostname & Port that will be given out to other workers to connect to i.e. URLs that are routable from other servers.
# If not set, it uses the value for "listeners" if configured.
rest.advertised.host.name=my-connect-worker-host
rest.advertised.port=8083
# rest.advertised.listener=

# Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
# (connectors, converters, transformations). The list should consist of top level directories that include 
# any combination of: 
# a) directories immediately containing jars with plugins and their dependencies
# b) uber-jars with plugins and their dependencies
# c) directories immediately containing the package directory structure of classes of plugins and their dependencies
# Examples: 
# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
plugin.path=/home/connect/confluentinc-kafka-connect-jdbc-10.7.4/lib/,/home/connect/debezium-connector-oracle

注:要确保8083端口没被占用

启动 connect 

./bin/connect-distributed.sh ./config/connect-distributed.properties

注: 这里窗口会被占用,不想被占用,用 nohup  启动

编写mysql-source文件

{
  "name": "mysql-source",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "connection.url": "jdbc:mysql://collector:3306/test?user=root&password=123456",
    "mode": "bulk",
    "table.whitelist": "student",
    "topic.prefix": "student-"
  }
}

注:这里 mode 要写为bulk 才能实现全量同步,incrementing 是增量

编写oracle-sink 文件

{
  "name": "oracle-sink",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "connection.url": "jdbc:oracle:thin:@collector:1521:orcl",
    "db.hostname": "collector",
    "tasks.max": "1",
    "connection.user": "dbzuser",
    "connection.password": "dbz2023",
    "db.fetch.size": "1",
    "topics": "student-student",
    "multitenant": "false",
    "table.name.format": "t1",
    "dialect.name": "OracleDatabaseDialect",
    "auto.evolve": "true",
    "pk.mode": "record_value",
    "pk.fields": "id",
    "insert.mode": "upsert"
  }
}

注:这里的topic 是提前创建好的student-student,也可以不创建,他自己生成,但指定的时候要去指定前缀。

同时,还需要对应的mysql 、 oracle 驱动,这里用的mysql 8.0.26 、ojdbc8-23.3.0.23.09,

connector驱动类就写io.confluent.connect.jdbc.JdbcSourceConnector和JdbcSinkConnector

向8083端口注册(curl 请求)

 curl -i -X POST -H "Accept: application/json" -H "Content-Type: application/json"   http://collector:8083/connectors/   -d @/opt/installs/kafka/connector/mysql-source.json

curl -i -X POST -H "Accept: application/json" -H "Content-Type: application/json"   http://collector:8083/connectors/   -d @/opt/installs/kafka/connector/oracle-sink.json

查看目前的connect连接

curl http://collector:8083/connectors

Kafka connect,kafka,大数据

测试操作

切换oracle用户启动oracle

[root@collector connector]# su oracle


[oracle@collector connector]$ lsnrctl start

[oracle@collector connector]$ sqlplus /nolog


SQL> conn /as sysdba

SQL> startup

 mysql 源表添加一条记录

Kafka connect,kafka,大数据

去查oracle 目标表

Kafka connect,kafka,大数据

 curl操作

REST API 描述
GET / 查看Kafka集群版本信息
GET /connectors 查看当前活跃的连接器列表,显示连接器的名字
POST /connectors 根据指定配置,创建一个新的连接器
GET /connectors/{name} 查看指定连接器的信息
GET /connectors/{name}/config 查看指定连接器的配置信息
PUT /connectors/{name}/config 修改指定连接器的配置信息
GET /connectors/{name}/status 查看指定连接器的状态
POST /connectors/{name}/restart 重启指定的连接器
PUT /connectors/{name}/pause 暂停指定的连接器
GET /connectors/{name}/tasks 查看指定连接器正在运行的Task
POST /connectors/{name}/tasks 修改Task的配置
GET /connectors/{name}/tasks/{taskId}/status 查看指定连接器中指定Task的状态
POST /connectors/{name}/tasks/{tasked}/restart 重启指定连接器中指定的Task
DELETE /connectors/{name}/ 删除指定的连接器

参考文档:Kafka——Kafka Connect详解_kafka-connect-CSDN博客文章来源地址https://www.toymoban.com/news/detail-771640.html

到了这里,关于Kafka connect的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Splunk Connect for Kafka – Connecting Apache Kafka with Splunk

    1: 背景: 1: splunk 有时要去拉取kafka 上的数据: 下面要用的有用的插件:Splunk Connect for Kafka 先说一下这个Splunk connect for kafka 是什么: Spunk Connect for Kafka is a “sink connector” built on the Kafka Connect framework for exporting data from Kafka topics into Splunk. With a focus on speed and reliability, include

    2024年02月05日
    浏览(40)
  • Kafka connect

    这里以 mysql - kafka connect - oracle 实现upsert 全量同步为例: 启动zookeeper 、 kafka 等组件后 编写kafka/config/connect-distributed.properties文件 注:要确保8083端口没被占用 启动 connect  ./bin/connect-distributed.sh ./config/connect-distributed.properties 注: 这里窗口会被占用,不想被占用,用 nohup  启动

    2024年02月03日
    浏览(30)
  • kafka发送数据报错: Error connecting to node xxxxx:9092 (id: 1 rack: null)java.net.UnknownHostExceptio

    报错内容: warn报错:  [kafka-producer-network-thread | producer-1] WARN  org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Error connecting to node xxxxx:9092 (id: 1 rack: null) java.net.UnknownHostException: xxxxx     at java.base/java.net.InetAddress$CachedAddresses.get(InetAddress.java:797)     at java.base/java.net.

    2024年02月08日
    浏览(43)
  • Kafka Connect详解及应用实践

    Kafka Connect是一个用于数据导入和导出的工具。 它能够把多种数据源(如MySQL,HDFS等)与Kafka之间进行连接,实现数据在不同系统之间的交互以及数据的流动。 Kafka Connect有以下几个优势: 扩展性:Kafka Connect支持自定义Connector,用户可以通过编写自己的Connector来实现与更多数

    2024年02月07日
    浏览(31)
  • CDC 整合方案:MySQL > Kafka Connect + Schema Registry + Avro > Kafka > Hudi

    本文介绍的整体方案选型是:使用 Kafka Connect 的 Debezium MySQL Source Connector 将 MySQL 的 CDC 数据 (Avro 格式)接入到 Kafka 之后,通过 Flink 读取并解析这些 CDC 数据,其中,数据是以 Confluent 的 Avro 格式存储的,也就是说,Avro 格式的数据在写入到 Kafka 以及从 Kafka 读取时,都需要和

    2024年02月19日
    浏览(42)
  • Kafka Connect JdbcSinkConnector的schema处理

    kafka connect当写入到Mysql这类的关系型数据库时,使用JdbcSinkConnector,且kafka中的数据需要具备schemas,否则是无法写入的。 只有两种数据可以写入: 1.使用Confluent Schema Registry 在写入kafka时,就用Avro、Protobuf 或 JSON Schema的converter进行schema的转换 2.带着schema的Json数据 第一种方式的

    2024年02月12日
    浏览(31)
  • 【Kafka】记录一次基于connect-mirror-maker做的Kafka集群迁移完整过程

    一个测试环境的kafka集群,Topic有360+,Partition有2000+,部署在虚拟机上,由于多方面原因,要求迁移至k8s容器内(全量迁移),正好可以拿来练一下手。本文主要记录对MM1和MM2的实际操作过程,以及使用过程中遇到的问题及解决方案。 source集群:kafka-2.6.0、2个broker、虚拟机

    2024年02月11日
    浏览(53)
  • 【Kafka超时问题(已解决),kafka.zookeeper.ZooKeeperClientTimeoutException: Timed out waiting for connection w】

    1.Kafka正常运行一段时间后,用测试工具发送数据时报了错: 2.于是单独用黑窗口启动Kafka,看看具体报啥错: 3.原因 JDK版本和Kafka版本不匹配 。 (我手里项目较多,用的JDK版本也不一样,有的时候忘了把JDK切换回去,就报了这个错)。 我的Kafka版本: 更换JDK版本或Kafka版本

    2024年02月09日
    浏览(49)
  • 在CDP平台上安全的使用Kafka Connect

    在这篇文章中,将演示如何将 Kafka Connect 集成到 Cloudera 数据平台 (CDP) 中,从而允许用户在 Streams Messaging Manager 中管理和监控他们的连接器,同时还涉及安全功能,例如基于角色的访问控制和敏感信息处理。如果您是将数据移入或移出 Kafka 的开发人员、管理员或安全专家,那

    2023年04月09日
    浏览(26)
  • Spring boot 项目Kafka Error connecting to node xxx:xxx Kafka项目启动异常 Failed to construct kafka consumer

    新建了一个springBoot集成Kafka的项目配置好yml后发现启动失败: 下面是Kafka配置: node1:9092,node2:9092,node3:9092 是kafka的主机名,我是由之前的旧项目(非springBoot)迁移到新建项目的,所以链接名就直接抄过来了。 我以为是链接出了问题,我就换成ip形式,把node1:9092,node2:9092,node

    2023年04月08日
    浏览(36)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包