Debezium日常分享系列之:Debezium and TimescaleDB

这篇具有很好参考价值的文章主要介绍了Debezium日常分享系列之:Debezium and TimescaleDB。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、TimescaleDB

TimescaleDB 是一个开源数据库,旨在使 SQL 对于时间序列数据具有可扩展性。它是作为 PostgreSQL 数据库的扩展实现的。这一事实促使我们重新使用标准 Debezium PostgreSQL 连接器,并将 TimescaleDB 支持实现为单个消息转换 (SMT)。

TimescaleDB 提供了三个基本构建块/概念:

  • Hypertables
  • Continuous aggregates
  • Compression

描述实例定义的元数据(目录)和原始数据通常存储在 _timescaledb_internal_schema 中。TimescaleDb SMT 连接到数据库并读取和处理元数据。然后,从数据库读取的原始消息会使用存储在 Kafka Connect 标头中的元数据进行丰富,从而创建物理数据和 TimescaleDB 逻辑结构之间的关系。

二、完整案例

Debezium 示例存储库包含基于 Docker Compose 的部署,该部署提供了完整的环境来演示 TimescaleDB 集成。

第一步,开始部署

$ docker-compose -f docker-compose-timescaledb.yaml up --build

该命令将启动 Debezium(Zookeeper、Kafka、Kafka Connect)和源 TimescaleDB 数据库。

启动的数据库已准备好以下数据库对象:

  • 将温度和湿度测量值表示为时间序列数据的超稳定条件;使用 DDL
CREATE TABLE conditions (time TIMESTAMPTZ NOT NULL, location TEXT NOT NULL, temperature DOUBLE PRECISION NULL, humidity DOUBLE PRECISION NULL); 

SELECT create_hypertable('conditions', 'time')
  • 测量数据的单一记录
INSERT INTO conditions VALUES(NOW(), 'Prague', 22.8, 53.3)

PostgreSQL 出版物用于将时间序列数据发布到复制槽中,因为演示使用 pgoutput 解码插件

CREATE PUBLICATION dbz_publication FOR ALL TABLES WITH (publish = 'insert, update')

下一步需要注册 Debezium PostgreSQL 连接器以捕获数据库中的更改

curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-timescaledb.yaml

注册请求文件与常规文件不同,增加了这些行

{
    "name": "inventory-connector",
    "config": {
...
        "schema.include.list": "_timescaledb_internal",
        "transforms": "timescaledb",
        "transforms.timescaledb.type": "io.debezium.connector.postgresql.transforms.timescaledb.TimescaleDb",
        "transforms.timescaledb.database.hostname": "timescaledb",
        "transforms.timescaledb.database.port": "5432",
        "transforms.timescaledb.database.user": "postgres",
        "transforms.timescaledb.database.password": "postgres",
        "transforms.timescaledb.database.dbname": "postgres"
    }
}

三、Hypertables

连接器将捕获内部 TimescaleDB 架构以及包含原始数据的物理表,并且将应用 TimescaleDb SMT 来丰富消息并根据逻辑名称将它们路由到正确命名的主题。 SMT 配置选项包含连接到数据库所需的信息。在这种情况下,条件超表将物理存储在 _timescaledb_internal._hyper_1_1_chunk 中,并且当由 SMT 处理时,它将重新路由到根据固定配置的前缀 timescaledb 和逻辑名称 public.conditions 命名的 timescaledb.public.conditions 主题符合超表名称。

让我们在表中添加更多测量值

 docker-compose -f docker-compose-timescaledb.yaml exec timescaledb env PGOPTIONS="--search_path=public" bash -c 'psql -U $POSTGRES_USER postgres'
postgres=# INSERT INTO conditions VALUES (now(), 'Prague', 30, 50);
postgres=# INSERT INTO conditions VALUES (now(), 'Brno', 35, 55);
postgres=# INSERT INTO conditions VALUES (now(), 'Prague', 40, 60);

并读取捕获的主题消息(在命令中启用打印密钥和标题)

docker-compose -f docker-compose-timescaledb.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \
    --bootstrap-server kafka:9092 \
    --from-beginning \
    --property print.key=true \
    --property print.headers=true \
    --topic timescaledb.public.conditions

这些消息包含两个标头 debezium_timescaledb_chunk_table:_hyper_1_1_chunk、debezium_timescaledb_chunk_schema:_timescaledb_internal,它们描述了逻辑超表名称与从中捕获它们的物理源表之间的映射。

四、Continuous aggregates

连续聚合对存储在超表中的数据提供自动统计计算。聚合被定义为物化视图,由其自己的超表支持,而超表又由一组物理表支持。重新计算聚合后(手动或自动),新值将存储在超表中,可以从中捕获和流式传输这些值。连接器捕获物理表中的新值,SMT 通过将物理目标重新映射回聚合逻辑名称来再次解决路由问题。还添加了带有原始超表和物理表名称的 Kafka Connect 标头。

让我们创建一个名为conditions_summary的连续聚合,用于计算每个位置和时间间隔的平均、最低和最高温度

postgres=# CREATE MATERIALIZED VIEW conditions_summary WITH (timescaledb.continuous) AS
  SELECT
    location,
    time_bucket(INTERVAL '1 hour', time) AS bucket,
    AVG(temperature),
    MAX(temperature),
    MIN(temperature)
  FROM conditions
  GROUP BY location, bucket;

并阅读捕获的主题消息

docker-compose -f docker-compose-timescaledb.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \
    --bootstrap-server kafka:9092 \
    --from-beginning \
    --property print.key=true \
    --property print.headers=true \
    --topic timescaledb.public.conditions_summary

这些消息包含两个标头 debezium_timescaledb_hypertable_table:_materialized_hypertable_2,debezium_timescaledb_hypertable_schema:_timescaledb_internal 公开哪个支持超表用于存储聚合,以及两个附加标头 debezium_timescaledb_chunk_table:_hyper_2_2_chunk,debezium_timescaledb_chunk_schema:_timescaledb_internal 公开存储聚合的物理表。

`__debezium_timescaledb_chunk_table:_hyper_1_1_chunk,__debezium_timescaledb_chunk_schema:_timescaledb_internal` that describes the mapping between the logical hypertable name and the physical source table from which they were captured.

如果添加新的测量并触发聚合重新计算,则更新的聚合将发送到主题

postgres=# INSERT INTO conditions VALUES (now(), 'Ostrava', 10, 50);
postgres=# CALL refresh_continuous_aggregate('conditions_summary', CURRENT_DATE, CURRENT_DATE + 1);

看起来像

{
   "schema":{
...
   },
   "payload":{
      "before":null,
      "after":{
         "location":"Ostrava",
         "bucket":"2024-01-09T13:00:00.000000Z",
         "avg":10.0,
         "max":10.0,
         "min":10.0
      },
      "source":{
         "version":"2.5.0.Final",
         "connector":"postgresql",
         "name":"dbserver1",
         "ts_ms":1704806938840,
         "snapshot":"false",
         "db":"postgres",
         "sequence":"[\"29727872\",\"29728440\"]",
         "schema":"public",
         "table":"conditions_summary",
         "txId":764,
         "lsn":29728440,
         "xmin":null
      },
      "op":"c",
      "ts_ms":1704806939163,
      "transaction":null
   }
}

因此,该主题包含针对两个不同位置计算的两条或多条消息。

五、Compression

TimescaleDB SMT 不会增强压缩数据块(物理表记录),而只是将其作为存储在超表中的副产品。压缩后的数据被捕获并存储在 Kafka 主题中。通常,带有压缩块的消息会被丢弃,并且不会被管道中的后续作业处理。

让我们为超表启用压缩并压缩它

postgres=# ALTER TABLE conditions SET (timescaledb.compress, timescaledb.compress_segment by = 'location');
postgres=# SELECT show_chunks('conditions');
              show_chunks
----------------------------------------
 _timescaledb_internal._hyper_1_1_chunk
(1 row)

postgres=# SELECT compress_chunk( '_timescaledb_internal._hyper_1_1_chunk');

消息写入 timescaledb._timescaledb_internal._compressed_hypertable_3。

停止服务

docker-compose -f docker-compose-timescaledb.yaml down

六、结论

在这篇文章中,我们演示了从 TimescaleDB 时间序列数据库捕获数据以及通过 TimescaleDb SMT 对其进行处理。我们已经展示了如何根据作为数据源的超表和连续聚合来路由和丰富消息。

深入了解Debezium请阅读博主专栏:文章来源地址https://www.toymoban.com/news/detail-798894.html

  • Debezium专栏

到了这里,关于Debezium日常分享系列之:Debezium and TimescaleDB的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Debezium日常分享系列之:使用 Debezium 连接器实现密钥外部化

    隐藏数据库的账号和密码 当 Debezium 连接器部署到 Kafka Connect 实例时,有时需要对 Connect API 的其他用户隐藏数据库凭据。 让我们回顾一下 MySQL Debezium connector的连接器注册请求: 用户名和密码以纯字符串形式传递给 API。更糟糕的是,任何有权访问 Kafka Connect 集群及其 REST AP

    2024年02月16日
    浏览(42)
  • Debezium日常分享系列之:流式传输Cassandra第二部分

    在本博客文章系列的前半部分中,解释了Cassandra 设计流数据管道的决策过程。在这篇文章中,我们将把管道分为三个部分,并更详细地讨论每个部分: Cassandra 到 Kafka 与 CDC 代理 Kafka 与 BigQuery 和 KCBQ 使用 BigQuery 视图进行转换 相关技术博客: Debezium日常分享系列之:流式传输

    2024年02月16日
    浏览(44)
  • Debezium系列之:监控 Debezium

    Debezium JMX相关的技术博客: Debezium系列之:安装jmx导出器监控debezium指标 Debezium系列之:为Debezium集群JMX页面增加监控,JMX页面出现异常时发送飞书告警,确保任务能够获取debezium集群指标 Debezium系列之:深入解读Debezium重要的jmx指标 Debezium系列之:mysql JMX metrics指标详细解读

    2024年02月11日
    浏览(50)
  • Debezium系列之:在 Kubernetes 上部署 Debezium

    K8s相关知识可以阅读博主以下几篇技术博客: K8s系列之:搭建高可用K8s v1.23.5集群详细步骤,3个master节点,3个Node节点 K8s系列之:Pod的基本用法 k8s系列之:kubectl子命令详解一 k8s系列之:kubectl子命令详解二 更多K8s知识点详见博主K8s系列文章 更多Debezium内容请阅读博主Debezi

    2024年02月11日
    浏览(61)
  • Debezium系列之:使用 Strimzi 将 Kafka 和 Debezium 迁移到 Kubernetes

    在本文中,将探讨在生产中实现debezium与K8s的结合: 在 Kubernetes 集群中安装和管理 Apache Kafka 集群。 在 Kubernetes 集群中部署 Debezium Kafka Connect。 Kubernetes 是一个开源容器编排器,本文使用 minikube 作为 Kubernetes 集群,但相同的步骤在任何其他实现中都应该有效。 启动集群 在终

    2024年02月15日
    浏览(45)
  • Debezium系列之:记录变更事件

    Debezium 数据更改事件具有复杂的结构,可提供丰富的信息。 但是,在某些情况下,在下游消费者可以处理 Debezium 更改事件消息之前,它需要有关原始数据库更改导致的字段级更改的其他信息。 为了使用有关数据库操作如何修改源数据库中的字段的详细信息来增强事件消息,

    2024年02月09日
    浏览(51)
  • Debezium系列之:prometheus采集debezium的jmx数据,grafana通过dashboard展示debezium的jmx数据

    需要采集debezium的jmx数据,并把重要的指标展示出来 采取的方案是prometheus采集debezium的jmx数据,通过grafana展示出来,可以快速查看某个连接器重要的指标信息

    2024年02月13日
    浏览(47)
  • Debezium系列之:详细整理Debezium和Kafka的Transforms类型和全部功能

    Kafka Connect 是一个在 Apache Kafka 与外部系统之间进行数据传输的框架,其主要作用是实现可靠的数据集成和流转。 Transforms 是 Kafka Connect 中用于对数据进行处理和转换的一个重要特性。 通过使用 Transforms,用户可以对 Kafka Connect 中传输的数据进行一些处理和转换,例如过滤、格

    2024年02月10日
    浏览(41)
  • Debezium系列之:详细介绍Debezium2.X版本导出Sqlserver数据库Debezium JMX指标的方法

    Debezium2.X版本sqlserver数据库JMX指标导出的方式与Debezium1.X版本不同 需要根据Debezium2.X版本sqlserver数据库jmx的格式,导出sqlserver数据库的JMX指标

    2024年02月01日
    浏览(47)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包