kafka connect当写入到Mysql这类的关系型数据库时,使用JdbcSinkConnector,且kafka中的数据需要具备schemas,否则是无法写入的。
只有两种数据可以写入:
1.使用Confluent Schema Registry 在写入kafka时,就用Avro、Protobuf 或 JSON Schema的converter进行schema的转换
2.带着schema的Json数据
{ "schema": { "type": "struct", "optional": false, "version": 1, "fields": [ { "field": "ID", "type": "string", "optional": true }, { "field": "Artist", "type": "string", "optional": true }, { "field": "Song", "type": "string", "optional": true } ] }, "payload": { "ID": 1, "Artist": "Rick Astley", "Song": "Never Gonna Give You Up" } }
第一种方式的操作如下:
首先需要启动schemaRegistry,并启动schemaRegistry服务器,配置和启动命令如下:
listeners=http://0.0.0.0:18081 kafkastore.bootstrap.servers=192.168.83.98:9092 kafkastore.topic=_schemas kafkastore.timeout.ms=5000 kafkastore.topic.replication.factor=1 # 如果使用 SSL/TLS 加密连接,请取消注释并提供相应的配置 # kafkastore.ssl.truststore.location=/path/to/truststore.jks # kafkastore.ssl.truststore.password=truststore_password # 如果启用了身份验证,请取消注释并提供相应的配置 # kafkastore.sasl.mechanism=PLAIN # kafkastore.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ # username="schema_registry" \ # password="schema_registry_password"; # 可选的配置项 # debug=true # host.name=<Schema Registry 主机名> # compatibility=BACKWARD, FORWARD, FULL, NONE # master.eligibility=true # authentication.method=NONE, BASIC, DIGEST, SSL, SASL_PLAIN, SASL_SCRAM_256, SASL_SCRAM_512
启动命令:
./confluent-6.0.1/bin/schema-registry-start -daemon schema-registry.properties
使用JdbcSourceConnector接入数据时,就使用AvroConverter进行处理,并指定schema.registry的服务器文章来源:https://www.toymoban.com/news/detail-531796.html
{ "name": "KTFH_O_ORG4", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "tasks.max": "1", "topics": "xlg_test_kafka3", "value.converter": "io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url": "http://192.168.80.231:18081", "connection.url": "jdbc:mysql://192.168.83.22:13319/hebei_air_529_temp?user=root&password=Zx123456@shining11&useSSL=false", "table.whitelist" : "xlg_test", "mode":"bulk", "topic.prefix":"mysql-" } }
在sink的时候,也指定AvroConverter并指定schema.registry服务器文章来源地址https://www.toymoban.com/news/detail-531796.html
{ "name": "KTFH_O_ORG3", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "tasks.max": "1", "auto.create": "true", "topics": "mysql-xlg_test", "value.converter": "io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url": "http://192.168.80.231:18081", "connection.url": "jdbc:mysql://192.168.83.22:13319/hebei_air_529_temp", "connection.user": "root", "connection.password": "Zx123456@shining11", "insert.mode": "INSERT", "table.name.format" : "xlg_test2" } }
到了这里,关于Kafka Connect JdbcSinkConnector的schema处理的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!