基于Confluent Kafka部署Kafka Connect集群,Kafka Connect集群加载debezium插件

这篇具有很好参考价值的文章主要介绍了基于Confluent Kafka部署Kafka Connect集群,Kafka Connect集群加载debezium插件。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、下载Confluent Kafka

Confluent Kafka的下载地址:

  • https://www.confluent.io/download/

下载社区免费版本:

基于Confluent Kafka部署Kafka Connect集群,Kafka Connect集群加载debezium插件

二、配置文件connect-distributed.properties

核心参数如下所示:

  • /data/src/confluent-7.3.3/etc/schema-registry/connect-distributed.properties
bootstrap.servers=realtime-kafka-001:9092,realtime-kafka-003:9092,realtime-kafka-002:9092


group.id=datasight-confluent-test-debezium-cluster-status

key.converter=org.apache.kafka.connect.json.JsonConverter

value.converter=org.apache.kafka.connect.json.JsonConverter

key.converter.schemas.enable=true
value.converter.schemas.enable=true

config.storage.topic=offline_confluent_test_debezium_cluster_connect_configs
offset.storage.topic=offline_confluent_test_debezium_cluster_connect_offsets
status.storage.topic=offline_confluent_test_debezium_cluster_connect_statuses


config.storage.replication.factor=3
offset.storage.replication.factor=3
status.storage.replication.factor=3

offset.storage.partitions=25
status.storage.partitions=5
config.storage.partitions=1

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=true
internal.value.converter.schemas.enable=true

#rest.host.name=0.0.0.0
#rest.port=8083

#rest.advertised.host.name=0.0.0.0
#rest.advertised.port=8083

plugin.path=/data/service/debezium/connectors2

三、启动脚本connect-distributed

  • /data/src/confluent-7.3.3/bin/connect-distributed

  • connect-distributed的脚本内容如下所示,可以不需要修改

  • 如果需要导出kafka connector的jmx,则需要设置jmx导出端口和jmx导出器,详细的部署方式可以参考博主下面这篇技术博客:

    • Debezium系列之:安装jmx导出器监控debezium指标
if [ $# -lt 1 ];
then
        echo "USAGE: $0 [-daemon] connect-distributed.properties"
        exit 1
fi

base_dir=$(dirname $0)

###
### Classpath additions for Confluent Platform releases (LSB-style layout)
###
#cd -P deals with symlink from /bin to /usr/bin
java_base_dir=$( cd -P "$base_dir/../share/java" && pwd )

# confluent-common: required by kafka-serde-tools
# kafka-serde-tools (e.g. Avro serializer): bundled with confluent-schema-registry package
for library in "confluent-security/connect" "kafka" "confluent-common" "kafka-serde-tools" "monitoring-interceptors"; do
  dir="$java_base_dir/$library"
  if [ -d "$dir" ]; then
    classpath_prefix="$CLASSPATH:"
    if [ "x$CLASSPATH" = "x" ]; then
      classpath_prefix=""
    fi
    CLASSPATH="$classpath_prefix$dir/*"
  fi
done

if [ "x$KAFKA_LOG4J_OPTS" = "x" ]; then
  LOG4J_CONFIG_DIR_NORMAL_INSTALL="/etc/kafka"
  LOG4J_CONFIG_NORMAL_INSTALL="${LOG4J_CONFIG_DIR_NORMAL_INSTALL}/connect-log4j.properties"
  LOG4J_CONFIG_DIR_ZIP_INSTALL="$base_dir/../etc/kafka"
  LOG4J_CONFIG_ZIP_INSTALL="${LOG4J_CONFIG_DIR_ZIP_INSTALL}/connect-log4j.properties"
  if [ -e "$LOG4J_CONFIG_NORMAL_INSTALL" ]; then # Normal install layout
    KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:${LOG4J_CONFIG_NORMAL_INSTALL} -Dlog4j.config.dir=${LOG4J_CONFIG_DIR_NORMAL_INSTALL}"
  elif [ -e "${LOG4J_CONFIG_ZIP_INSTALL}" ]; then # Simple zip file layout
    KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:${LOG4J_CONFIG_ZIP_INSTALL} -Dlog4j.config.dir=${LOG4J_CONFIG_DIR_ZIP_INSTALL}"
  else # Fallback to normal default
    KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/../config/connect-log4j.properties -Dlog4j.config.dir=$base_dir/../config"
  fi
fi
export KAFKA_LOG4J_OPTS

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
  export KAFKA_HEAP_OPTS="-Xms256M -Xmx2G"
fi

EXTRA_ARGS=${EXTRA_ARGS-'-name connectDistributed'}

COMMAND=$1
case $COMMAND in
  -daemon)
    EXTRA_ARGS="-daemon "$EXTRA_ARGS
    shift
    ;;
  *)
    ;;
esac

export CLASSPATH
exec $(dirname $0)/kafka-run-class $EXTRA_ARGS org.apache.kafka.connect.cli.ConnectDistributed "$@"

四、启动Kafka Connect集群

启动命令如下所示:

/data/src/confluent-7.3.3/bin/connect-distributed /data/src/confluent-7.3.3/etc/schema-registry/connect-distributed.properties

正常启动Kafka Connect集群完整输出如下所示:

[2023-06-21 16:43:01,249] INFO EnrichedConnectorConfig values: 
        config.action.reload = restart
        connector.class = io.debezium.connector.mysql.MySqlConnector
        errors.log.enable = false
        errors.log.include.messages = false
        errors.retry.delay.max.ms = 60000
        errors.retry.timeout = 0
        errors.tolerance = none
        exactly.once.support = requested
        header.converter = null
        key.converter = null
        name = mysql-dw-valuekey-test
        offsets.storage.topic = null
        predicates = []
        tasks.max = 1
        topic.creation.default.exclude = []
        topic.creation.default.include = [.*]
        topic.creation.default.partitions = 12
        topic.creation.default.replication.factor = 3
        topic.creation.groups = []
        transaction.boundary = poll
        transaction.boundary.interval.ms = null
        transforms = [unwrap, moveFieldsToHeader, moveHeadersToValue, Reroute]
        transforms.Reroute.key.enforce.uniqueness = true
        transforms.Reroute.key.field.regex = null
        transforms.Reroute.key.field.replacement = null
        transforms.Reroute.logical.table.cache.size = 16
        transforms.Reroute.negate = false
        transforms.Reroute.predicate = 
        transforms.Reroute.topic.regex = debezium-dw-encryption-test.dw.(.*)
        transforms.Reroute.topic.replacement = debezium-test-dw-encryption-all3
        transforms.Reroute.type = class io.debezium.transforms.ByLogicalTableRouter
        transforms.moveFieldsToHeader.fields = [cdc_code, product]
        transforms.moveFieldsToHeader.headers = [product_code, productname]
        transforms.moveFieldsToHeader.negate = false
        transforms.moveFieldsToHeader.operation = copy
        transforms.moveFieldsToHeader.predicate = 
        transforms.moveFieldsToHeader.type = class org.apache.kafka.connect.transforms.HeaderFrom$Value
        transforms.moveHeadersToValue.fields = [product_code2, productname2]
        transforms.moveHeadersToValue.headers = [product_code, productname]
        transforms.moveHeadersToValue.negate = false
        transforms.moveHeadersToValue.operation = copy
        transforms.moveHeadersToValue.predicate = 
        transforms.moveHeadersToValue.type = class io.debezium.transforms.HeaderToValue
        transforms.unwrap.add.fields = []
        transforms.unwrap.add.headers = []
        transforms.unwrap.delete.handling.mode = drop
        transforms.unwrap.drop.tombstones = true
        transforms.unwrap.negate = false
        transforms.unwrap.predicate = 
        transforms.unwrap.route.by.field = 
        transforms.unwrap.type = class io.debezium.transforms.ExtractNewRecordState
        value.converter = null
 (org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig:376)
[2023-06-21 16:43:01,253] INFO [mysql-dw-valuekey-test|task-0] Loading the custom topic naming strategy plugin: io.debezium.schema.DefaultTopicNamingStrategy (io.debezium.config.CommonConnectorConfig:849)
Jun 21, 2023 4:43:01 PM org.glassfish.jersey.internal.Errors logErrors
WARNING: The following warnings have been detected: WARNING: The (sub)resource method listLoggers in org.apache.kafka.connect.runtime.rest.resources.LoggingResource contains empty path annotation.
WARNING: The (sub)resource method listConnectors in org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains empty path annotation.
WARNING: The (sub)resource method createConnector in org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains empty path annotation.
WARNING: The (sub)resource method listConnectorPlugins in org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource contains empty path annotation.
WARNING: The (sub)resource method serverInfo in org.apache.kafka.connect.runtime.rest.resources.RootResource contains empty path annotation.

[2023-06-21 16:43:01,482] INFO Started o.e.j.s.ServletContextHandler@2b80497f{/,null,AVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler:921)
[2023-06-21 16:43:01,482] INFO REST resources initialized; server is started and ready to handle requests (org.apache.kafka.connect.runtime.rest.RestServer:324)
[2023-06-21 16:43:01,482] INFO Kafka Connect started (org.apache.kafka.connect.runtime.Connect:56)

五、加载debezium插件

  • 下载debezium插件到plugin.path=/data/service/debezium/connectors2设置的目录下
  • 然后重新启动Kafka Connect集群就能够成功加载debezium插件

重启Kafka Connect集群查看debezium插件是否加载成功,如下所示:成功加载到了debezium 插件

[{
"class":"io.debezium.connector.mysql.MySqlConnector",
"type":"source",
"version":"2.2.1.Final"},

{"class":"org.apache.kafka.connect.mirror.MirrorCheckpointConnector","type":"source","version":"7.3.3-ce"},

{"class":"org.apache.kafka.connect.mirror.MirrorHeartbeatConnector","type":"source","version":"7.3.3-ce"},

{"class":"org.apache.kafka.connect.mirror.MirrorSourceConnector","type":"source","version":"7.3.3-ce"}]

六、总结和延伸

总结:

  • 至此成功部署了具有一个节点的Kafka Connect集群,如果需要更多节点,需要在多台服务器上启动Kafka Connect,从而组成一个多节点的Kafka Connect集群

基于Kafka Connect加载debezium插件的更多的内容可以参考博主以下几篇技术博客或者Debezium 专栏:

  • Debezium系列之:安装部署debezium详细步骤,并把debezium服务托管到systemctl
  • Debezium系列之:打通Debezium2.0以上版本的使用技术
  • Debezium系列之:安装部署debezium2.0以上版本的详细步骤
  • Debezium系列之:实现接入上千Mysql、Sqlserver、MongoDB、Postgresql数据库的Debezium集群从Debezium1.X版本升级到Debezium2.X版本
  • Debezium系列之:安装jmx导出器监控debezium指标
  • Debezium系列之:Debezium UI部署详细步骤
  • Debezium 专栏地址

延伸:文章来源地址https://www.toymoban.com/news/detail-495000.html

  • 组成一个Kafka Connect集群后,需要启动多个connector进行Kafka Connect集群稳定性、可靠性测试。
  • 可以进一步部署Kafka Connect集群UI

到了这里,关于基于Confluent Kafka部署Kafka Connect集群,Kafka Connect集群加载debezium插件的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Confluent kafka 异常退出rd_tmpabuf_alloc0: rd kafka topic info_new_with_rack

     rd_tmpabuf_alloc0: rd kafka topic info_new_with_rack 根据网上的例子,做了一个测试程序。 C# 操作Kafka_c# kafka_Riven Chen的博客-CSDN博客 但是执行下面一行时,弹出上面的异常,闪退。 consumer.Subscribe(queueName) 解决方案: 把项目原来的anycpu,改成x64平台  

    2024年02月11日
    浏览(30)
  • Kafka(二)- Kafka集群部署

    例如在3台服务器上安装zookeeper和kafka hadoop102 hadoop103 hadoop104 zookeeper zookeeper zookeeper kafka kafka kafka (1)配置IP 需要保证 Linux 系统 ifcfg-ens33 文件中 IP 地址、虚拟网络编辑器地址 和 Windows系统中VMnet8网络IP地址相同 1.首先配置虚拟网络编辑器地址,点击编辑选择虚拟网络编辑器

    2023年04月09日
    浏览(22)
  • 【Kafka】docker部署Kafka集群

    目录 Kafka概述 Kafka集群docker部署流程 简述 环境准备 部署流程  参考文献           以下概述Kafka内的几个核心概念,可参考官方文档,有兴趣可读:kafka.apache.org Topic与日志         Topic 就是数据主题,是数据记录发布的地方,可以用来区分业务系统。Kafka 中的 Topics 总是

    2024年02月12日
    浏览(67)
  • Kafka集群部署 (KRaft模式集群)

    KRaft 模式是 Kafka 在 3.0 版本中引入的新模式。KRaft 模式使用了 Raft 共识算法来管理 Kafka 集群元数据。Raft 算法是一种分布式共识算法,具有高可用性、可扩展性和安全性等优势。 在 KRaft 模式下,Kafka 集群中的每个 Broker 都具有和 Zookeeper 类似的角色。每个 Broker 都参与管理

    2024年02月03日
    浏览(31)
  • 【Kafka】概述与集群部署

    定义 kafka是一种分布式的,基于发布/订阅的消息队列 (MessageQueue)。它可以处理消费者在网站中的所有动作流数据。 Kafka是一个开源的分布式事件流平台(Event StreamingPlatform),被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用。(既想处理消息队列,又想处

    2024年02月04日
    浏览(58)
  • ansible部署kafka集群

    其中一台作为Ansible的母机并命名为ansible,另外三台云主机命名为node1、node2、node3,通过附件中的/ansible/ansible.tar.gz软件包在ansible节点安装Ansible服务;使用这一台母机,编写Ansible脚本(在/root目录下创建example目录作为Ansible工作目录,部署的入口文件命名为cscc_install.yaml),编

    2024年01月22日
    浏览(33)
  • Kafka 集群部署

    目录 1、环境准备 2、搭建ZooKeeper集群 配置文件 节点标记 环境变量 启动集群 数据同步测试 故障测试 3、搭建 Kafka 集群 配置文件 环境变量 配置其他机器 启动服务 4、集群测试 创建 Topic 显示 Topic 配置 创建 Producer 创建consumer 删除Topic 查看Zookeeper元数据 一个Broker就是一个ka

    2024年01月20日
    浏览(21)
  • Kafka集群部署

      Kafka是一个高吞吐量、基于ZooKeeper(ZooKeeper维护Kafka的broker信息)的分布式发布订阅信息系统,它可以处理消费者在网站中的所有动作(网页浏览,搜索和其他用户的行动)流数据。通常情况下,使用Kafka构建系统或应用程序之间的数据管道,用来转换或响应实时数据,使

    2024年02月14日
    浏览(25)
  • 分布式应用:Zookeeper 集群与kafka 集群部署

    目录 一、理论 1.Zookeeper   2.部署 Zookeeper 集群 3.消息队列  4.Kafka 5.部署 kafka 集群 6.Filebeat+Kafka+ELK 二、实验 1.Zookeeper 集群部署 2.kafka集群部署 3.Filebeat+Kafka+ELK 三、问题          1.解压文件异常 2.kafka集群建立失败 3.启动 filebeat报错 4.VIM报错 5. kibana无法匹配 四、总结

    2024年02月14日
    浏览(40)
  • Kafka入门介绍+集群部署+简单使用

    官网:https://kafka.apache.org/ 中文文档:https://kafka1x.apachecn.org/intro.html Kafka是一个开源的分布式流处理平台 主要有三个关键功能 发布订阅事件流(可以用作消息队列) 分布式持久化存储事件流(可以用作数据处理系统) 可以在事件发生时处理或回顾性的处理 整体架构图如下:

    2024年04月27日
    浏览(28)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包