TDengine Kafka Connector将 Kafka 中指定 topic 的数据(批量或实时)同步到 TDengine

这篇具有很好参考价值的文章主要介绍了TDengine Kafka Connector将 Kafka 中指定 topic 的数据(批量或实时)同步到 TDengine。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

教程放在这里:TDengine Java Connector,官方文档已经写的很清晰了,不再赘述。
TDengine Kafka Connector将 Kafka 中指定 topic 的数据(批量或实时)同步到 TDengine,数据库,tdengine,kafka,大数据

这里记录一下踩坑:

1.报错
java.lang.UnsatisfiedLinkError: no taos in java.library.path
	at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1860)
	at java.lang.Runtime.loadLibrary0(Runtime.java:843)
	at java.lang.System.loadLibrary(System.java:1136)
	at com.taosdata.jdbc.TSDBJNIConnector.<clinit>(TSDBJNIConnector.java:30)
	at com.taosdata.jdbc.TSDBDriver.connect(TSDBDriver.java:159)
	at java.sql.DriverManager.getConnection(DriverManager.java:664)
	at java.sql.DriverManager.getConnection(DriverManager.java:208)
	at com.taosdata.kafka.connect.db.TSDBConnectionProvider.getConnection(TSDBConnectionProvider.java:35)
	at com.taosdata.kafka.connect.db.CacheProcessor.getConnection(CacheProcessor.java:40)
	at com.taosdata.kafka.connect.db.CacheProcessor.execute(CacheProcessor.java:66)
	at com.taosdata.kafka.connect.db.CacheProcessor.initDB(CacheProcessor.java:55)
	at com.taosdata.kafka.connect.db.CacheProcessor.setDbName(CacheProcessor.java:33)
	...

这是由于没有按照taos客户端,连机器内部调用了taos客户端程序去连接TDengine服务,只要安装TDengine Client即可。安装TDengine Client教程

2.报错
Caused by: java.lang.NoClassDefFoundError: Could not initialize class com.taosdata.jdbc.TSDBJNIConnector
	at com.taosdata.jdbc.TSDBDriver.connect(TSDBDriver.java:159)
	at java.sql.DriverManager.getConnection(DriverManager.java:664)
	at java.sql.DriverManager.getConnection(DriverManager.java:208)
	at com.taosdata.kafka.connect.db.TSDBConnectionProvider.getConnection(TSDBConnectionProvider.java:35)
	at com.taosdata.kafka.connect.db.CacheProcessor.getConnection(CacheProcessor.java:40)
	at com.taosdata.kafka.connect.db.CacheProcessor.execute(CacheProcessor.java:66)
	at com.taosdata.kafka.connect.db.CacheProcessor.initDB(CacheProcessor.java:55)
	at com.taosdata.kafka.connect.db.CacheProcessor.setDbName(CacheProcessor.java:33)
	at com.taosdata.kafka.connect.sink.TDengineSinkTask.bulkWriteBatch(TDengineSinkTask.java:94)
	at com.taosdata.kafka.connect.sink.TDengineSinkTask.put(TDengineSinkTask.java:85)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:601)
	... 11 more

这个报错是因为我部署TDengine服务的时候用的是docker的方式部署,用原生连接的方式连接TDengine,很多依赖库本地访问不了导致的报错。我们可以改用TestFul的方式去连接。

下面是官方教程中创建 Sink Connector 实例用到的配置文件

{
  "name": "TDengineSinkConnector",
  "config": {
    "connection.database": "power",
    "connection.password": "taosdata",
    "connection.url": "jdbc:TAOS://127.0.0.1:6030",
    "connection.user": "root",
    "connector.class": "com.taosdata.kafka.connect.sink.TDengineSinkConnector",
    "data.precision": "ns",
    "db.schemaless": "line",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "tasks.max": "1",
    "topics": "meters",
    "value.converter": "org.apache.kafka.connect.storage.StringConverter",
    "name": "TDengineSinkConnector",
    "errors.tolerance": "all",
    "errors.deadletterqueue.topic.name": "dead_letter_topic",
    "errors.deadletterqueue.topic.replication.factor": "1",    
  },
  "tasks": [],
  "type": "sink"
}

阅读源码后,我们将其中的"connection.url": "jdbc:TAOS://127.0.0.1:6030"修改为"connection.url": "jdbc:TAOS-RS://127.0.0.1:6041?user=root&password=taosdata"

解释一下:文章来源地址https://www.toymoban.com/news/detail-758713.html

  • jdbc:TAOS:代表使用原生连接,jdbc:TAOS-RS代表使用REST连接。
  • 加上?user=root&password=taosdata是因为我发现改成REST连接后,connection.user配置项不生效,只好在url上直接拼接了。

到了这里,关于TDengine Kafka Connector将 Kafka 中指定 topic 的数据(批量或实时)同步到 TDengine的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Debezium vs OGG vs Tapdata:如何实时同步 Oracle 数据到 Kafka 消息队列?

    随着信息时代的蓬勃发展,企业对实时数据处理的需求逐渐成为推动业务创新和发展的重要驱动力。在这个快速变化的环境中,许多企业选择将 Oracle 数据库同步到 Kafka,以满足日益增长的实时数据处理需求。本文将深入探讨这一趋势的背后原因,并通过一个真实的客户案例

    2024年04月10日
    浏览(40)
  • Enterprise:使用 MySQL connector 同步 MySQL 数据到 Elasticsearch

    Elastic MySQL 连接器是 MySQL 数据源的连接器。它可以帮我们把 MySQL 里的数据同步到 Elasticsearch 中去。在今天的文章里,我来详细地描述如何一步一步地实现。 在下面的展示中,我将使用 Elastic Stack 8.8.2 来进行展示。 无缝集成:将 Elasticsearch 连接到 MongoDB Enterprise:使用 MySQL c

    2024年02月16日
    浏览(34)
  • 第3.4章:StarRocks数据导入--Flink Connector与CDC秒级数据同步

    Flink作为当前流行的流式计算框架,在对接StarRocks时,若直接使用JDBC的方式“流式”写入数据,对StarRocks是不友好的,StarRocks作为一款MVCC的数据库,其导入的核心思想还是“攒微批+降频率”。为此,StarRocks单独开发了flink-connector-starrocks,其内部实现仍是通过对数据缓存攒批

    2023年04月15日
    浏览(61)
  • 生态短讯 | Tapdata 与 TDengine 完成产品兼容性互认证,打造物联网实时数据生态

    近月,深圳钛铂数据有限公司(以下简称钛铂数据)自主研发的 实时数据平台 ( Tapdata Live Data Platform )与北京涛思数据科技有限公司(以下简称涛思数据)自主研发的大数据平台 TDengine ,已经完成了产品兼容性互认证。 经双方团队共同严格测试,Tapdata Live Data Platform 与

    2024年04月25日
    浏览(28)
  • Kafka如何彻底删除topic及数据

    我的kafka是CDH安装的默认目录  /opt/cloudera/parcels/KAFKA-4.1.0-1.4.1.0.p0.4 1、停止生产和消费程序。否则topic的offset信息会一直在broker更新。调用kafka delete命令则无法删除该topic。取消自动创建topic,设置 auto.create.topics.enable = false。 2、server.properties 设置 delete.topic.enable=true,否则调用

    2023年04月19日
    浏览(28)
  • 【ArcGIS遇上Python】ArcGIS Python批量筛选多个shp中指定字段值的图斑(以土地利用数据为例)

    以土地利用数据为例,提取多个shp数据中的旱地。 原始土地利用数据: 属性表: 提取的旱地:(以图层名称+地类名称命名)

    2024年01月17日
    浏览(37)
  • 怎样查看kafka写数据送到topic是否成功

    要查看 Kafka 写数据是否成功送到主题(topic),可以通过以下几种方法来进行确认: Kafka 生产者确认机制 :Kafka 提供了生产者的确认机制,您可以在创建生产者时设置 acks 属性来控制确认级别。常见的确认级别包括 0、1 和 all。当设置为 1 或 all 时,生产者会等待主题的 le

    2024年01月18日
    浏览(26)
  • Kafka - 获取 Topic 生产者发布数据命令

    从头开始获取 20 条数据(等价于时间升序) ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic your-topic --from-beginning --max-messages 20 获取最新 20 条数据(等价于时间降序)去掉 --from-beginning 即可(默认) ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic your-topic --max-me

    2024年02月14日
    浏览(28)
  • 大数据之使用Flink消费Kafka中topic为ods_mall_log的数据,根据不同的表前缀区分在存入Kafka的topic当中

    前言 题目: 一、读题分析 二、处理过程   1.数据处理部分: 2.HBaseSink(未经测试,不能证明其正确性,仅供参考!) 三、重难点分析 总结  什么是HBase? 本题来源于全国职业技能大赛之大数据技术赛项赛题 - 电商数据处理 - 实时数据处理 注:由于设备问题,代码执行结果

    2024年02月03日
    浏览(34)
  • MySQL FlinkCDC 通过Kafka实时同步到ClickHouse(自定义Debezium格式支持增加删除修改)

    MySQL FlinkCDC 通过Kafka实时同步到ClickHouse(自定义Debezium格式支持增加删除修改) 把MySQL多库多表的数据通过FlinkCDC DataStream的方式实时同步到同一个Kafka的Topic中,然后下游再写Flink SQL拆分把数据写入到ClickHouse,FlinkCDC DataStream通过自定义Debezium格式的序列化器,除了增加,还能进行

    2024年02月15日
    浏览(33)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包