Kafka Connect JdbcSinkConnector的schema处理

这篇具有很好参考价值的文章主要介绍了Kafka Connect JdbcSinkConnector的schema处理。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

kafka connect当写入到Mysql这类的关系型数据库时,使用JdbcSinkConnector,且kafka中的数据需要具备schemas,否则是无法写入的。

只有两种数据可以写入:

1.使用Confluent Schema Registry 在写入kafka时,就用Avro、Protobuf 或 JSON Schema的converter进行schema的转换

2.带着schema的Json数据

{
  "schema": {
    "type": "struct", "optional": false, "version": 1, "fields": [
      { "field": "ID", "type": "string", "optional": true },
      { "field": "Artist", "type": "string", "optional": true },
      { "field": "Song", "type": "string", "optional": true }
    ] },
  "payload": {
    "ID": 1,
    "Artist": "Rick Astley",
    "Song": "Never Gonna Give You Up"
  }
}

第一种方式的操作如下:

首先需要启动schemaRegistry,并启动schemaRegistry服务器,配置和启动命令如下:

listeners=http://0.0.0.0:18081

kafkastore.bootstrap.servers=192.168.83.98:9092
kafkastore.topic=_schemas
kafkastore.timeout.ms=5000
kafkastore.topic.replication.factor=1

# 如果使用 SSL/TLS 加密连接,请取消注释并提供相应的配置
# kafkastore.ssl.truststore.location=/path/to/truststore.jks
# kafkastore.ssl.truststore.password=truststore_password

# 如果启用了身份验证,请取消注释并提供相应的配置
# kafkastore.sasl.mechanism=PLAIN
# kafkastore.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
#   username="schema_registry" \
#   password="schema_registry_password";

# 可选的配置项
# debug=true
# host.name=<Schema Registry 主机名>
# compatibility=BACKWARD, FORWARD, FULL, NONE
# master.eligibility=true
# authentication.method=NONE, BASIC, DIGEST, SSL, SASL_PLAIN, SASL_SCRAM_256, SASL_SCRAM_512

启动命令:

./confluent-6.0.1/bin/schema-registry-start -daemon schema-registry.properties

使用JdbcSourceConnector接入数据时,就使用AvroConverter进行处理,并指定schema.registry的服务器

{
     "name": "KTFH_O_ORG4",
     "config": {
         "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
         "tasks.max": "1",
         "topics": "xlg_test_kafka3",
         "value.converter": "io.confluent.connect.avro.AvroConverter",
         "value.converter.schema.registry.url": "http://192.168.80.231:18081",
         "connection.url": "jdbc:mysql://192.168.83.22:13319/hebei_air_529_temp?user=root&password=Zx123456@shining11&useSSL=false",
         "table.whitelist" : "xlg_test",
         "mode":"bulk",
         "topic.prefix":"mysql-"
      }
}

在sink的时候,也指定AvroConverter并指定schema.registry服务器文章来源地址https://www.toymoban.com/news/detail-531796.html

{
     "name": "KTFH_O_ORG3",
     "config": {
         "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
         "tasks.max": "1",
         "auto.create": "true",
         "topics": "mysql-xlg_test",
         "value.converter": "io.confluent.connect.avro.AvroConverter",
         "value.converter.schema.registry.url": "http://192.168.80.231:18081",
         "connection.url": "jdbc:mysql://192.168.83.22:13319/hebei_air_529_temp",
         "connection.user": "root",
         "connection.password": "Zx123456@shining11",
         "insert.mode": "INSERT",
         "table.name.format" : "xlg_test2"
      }
}

到了这里,关于Kafka Connect JdbcSinkConnector的schema处理的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Apache Kafka - 构建数据管道 Kafka Connect

    Kafka Connect 是一个工具,它可以帮助我们将数据从一个地方传输到另一个地方。比如说,你有一个网站,你想要将用户的数据传输到另一个地方进行分析,那么你可以使用 Kafka Connect 来完成这个任务。 Kafka Connect 的使用非常简单。它有两个主要的概念 :source 和 sink 。Source 是

    2024年02月15日
    浏览(45)
  • Kafka connect

    这里以 mysql - kafka connect - oracle 实现upsert 全量同步为例: 启动zookeeper 、 kafka 等组件后 编写kafka/config/connect-distributed.properties文件 注:要确保8083端口没被占用 启动 connect  ./bin/connect-distributed.sh ./config/connect-distributed.properties 注: 这里窗口会被占用,不想被占用,用 nohup  启动

    2024年02月03日
    浏览(29)
  • Kafka Connect详解及应用实践

    Kafka Connect是一个用于数据导入和导出的工具。 它能够把多种数据源(如MySQL,HDFS等)与Kafka之间进行连接,实现数据在不同系统之间的交互以及数据的流动。 Kafka Connect有以下几个优势: 扩展性:Kafka Connect支持自定义Connector,用户可以通过编写自己的Connector来实现与更多数

    2024年02月07日
    浏览(30)
  • Splunk Connect for Kafka – Connecting Apache Kafka with Splunk

    1: 背景: 1: splunk 有时要去拉取kafka 上的数据: 下面要用的有用的插件:Splunk Connect for Kafka 先说一下这个Splunk connect for kafka 是什么: Spunk Connect for Kafka is a “sink connector” built on the Kafka Connect framework for exporting data from Kafka topics into Splunk. With a focus on speed and reliability, include

    2024年02月05日
    浏览(40)
  • 实时数据分析实践之Kafka Connect

    作者:禅与计算机程序设计艺术 Kafka Connect是一个开源项目,它可以让你连接到Kafka集群,并从外部系统导入或导出数据到Kafka集群中的主题。它支持很多种不同的源(如关系数据库、文件系统、IoT设备等)和目标(如Kafka主题、Elasticsearch集群、Hive表等),而且内置了许多有用的

    2024年02月06日
    浏览(49)
  • 【Kafka】记录一次基于connect-mirror-maker做的Kafka集群迁移完整过程

    一个测试环境的kafka集群,Topic有360+,Partition有2000+,部署在虚拟机上,由于多方面原因,要求迁移至k8s容器内(全量迁移),正好可以拿来练一下手。本文主要记录对MM1和MM2的实际操作过程,以及使用过程中遇到的问题及解决方案。 source集群:kafka-2.6.0、2个broker、虚拟机

    2024年02月11日
    浏览(53)
  • 在CDP平台上安全的使用Kafka Connect

    在这篇文章中,将演示如何将 Kafka Connect 集成到 Cloudera 数据平台 (CDP) 中,从而允许用户在 Streams Messaging Manager 中管理和监控他们的连接器,同时还涉及安全功能,例如基于角色的访问控制和敏感信息处理。如果您是将数据移入或移出 Kafka 的开发人员、管理员或安全专家,那

    2023年04月09日
    浏览(25)
  • Kafka系列之:基于Apache Kafka Connect实现端到端topic数据字段级加密的详细方法

    与其他通信工具一样,加密在 Apache Kafka 中很有价值,可以保护数据。 希望通过与 Apache Kafka Connect 集成来加密数据来实现这一目标。 Kafka 可以利用多种安全功能,从身份验证和授权到基于 TLS 的数据进出 Kafka 主题的线上流量加密。尽管这些措施可以保护传输中的数据,但它

    2024年02月13日
    浏览(45)
  • Kafka Connect JNDI注入漏洞复现(CVE-2023-25194)

    2.3.0 = Apache Kafka = 3.3.2 我是通过vulhub下载的环境,下载后直接启动即可。 打开页面,漏洞点在Load data功能页中。 在Load data功能页中的Streaming功能中。 在该功能中,将payload填写到Consumer properties属性值中即可。 在payload提交前,请先在vps机器中开启ldap服务。 使用低版本的jdk中

    2024年02月16日
    浏览(46)
  • Strimzi从入门到精通系列之三:部署Kafka Connect

    Kafka Connect 是一个用于在 Apache Kafka 和其他系统之间传输数据的工具。例如,Kafka Connect 可能会将 Kafka 与外部数据库或存储和消息传递系统集成。 在Strimzi中,Kafka Connect以分布式方式部署。 Kafka Connect 也可以在独立模式下工作,但 Strimzi 不支持。 使用连接器的概念,Kafka Conn

    2024年02月13日
    浏览(36)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包