Debezium系列之:基于debezium将mysql数据库数据更改流式传输到 Elasticsearch和PostgreSQL数据库

这篇具有很好参考价值的文章主要介绍了Debezium系列之:基于debezium将mysql数据库数据更改流式传输到 Elasticsearch和PostgreSQL数据库。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、背景

  • 基于 Debezium 的端到端数据流用例,将数据流式传输到 Elasticsearch 服务器,以利用其出色的功能对我们的数据进行全文搜索。
  • 同时把数据流式传输到 PostgreSQL 数据库,通过 SQL 查询语言来优化对数据的访问。

二、技术路线

下面的图表显示了数据如何流经我们的分布式系统。首先,Debezium MySQL 连接器不断捕获 MySQL 数据库中的更改,并将每个表的更改发送到单独的 Kafka 主题。然后,Confluence JDBC 接收器连接器不断读取这些主题并将事件写入 PostgreSQL 数据库。同时,Confluence Elasticsearch 连接器不断读取这些相同的主题并将事件写入 Elasticsearch。

Debezium系列之:基于debezium将mysql数据库数据更改流式传输到 Elasticsearch和PostgreSQL数据库,日常分享专栏,Debezium系列,mysql数据库,数据更改,流式传输,Elasticsearch,Postgresql
我们将把这些组件部署到几个不同的进程中。在此示例中,我们将所有三个连接器部署到单个 Kafka Connect 实例,该实例将代表所有连接器向 Kafka 写入和读取(在生产中,可能需要将连接器分开以实现更好的性能)。

Debezium系列之:基于debezium将mysql数据库数据更改流式传输到 Elasticsearch和PostgreSQL数据库,日常分享专栏,Debezium系列,mysql数据库,数据更改,流式传输,Elasticsearch,Postgresql

三、配置

我们将使用此 Docker Compose 文件来快速部署演示。该部署由以下 Docker 映像组成:

  • Apache ZooKeeper

  • Apache Kafka

  • 一个丰富的 Kafka Connect / Debezium 图像,有一些变化:

    • PostgreSQL JDBC 驱动程序放置在 /kafka/libs 目录中
    • Confluence JDBC 连接器放置在 /kafka/connect/kafka-connect-jdbc 目录中
  • MySQL

  • PostgreSQL

  • Elasticsearch

Debezium 源连接器以及 JDBC 和 Elasticsearch 连接器的消息格式不同,因为它们是单独开发的,并且各自关注的目标略有不同。 Debezium 发出更复杂的事件结构,以便捕获所有可用信息。特别是,更改事件包含已更改记录的旧状态和新状态。另一方面,两个接收器连接器都期望一条简单的消息,该消息仅表示要写入的记录状态。

Debezium 的 UnwrapFromEnvelope 单消息转换 (SMT) 将复杂的更改事件结构折叠为两个接收器连接器所期望的相同的基于行的格式,并有效地充当上述两种格式之间的消息转换器。

四、从mysql同步数据到Elasticsearch和PostgreSQL数据库

当所有组件启动后,我们将注册 Elasticsearch Sink 连接器写入 Elasticsearch 实例。我们希望在源以及 PostgreSQL 和 Elasticsearch 中使用相同的密钥(主 id):

curl -i -X POST -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/ \
    -d @es-sink.json

我们正在使用此注册请求:

{
  {
    "name": "elastic-sink",
    "config": {
      "connector.class":
          "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
      "tasks.max": "1",
      "topics": "customers",
      "connection.url": "http://elastic:9200",
      "transforms": "unwrap,key",
      "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",        (1)
      "transforms.key.type": "org.apache.kafka.connect.transforms.ExtractField$Key",(2)
      "transforms.key.field": "id",                                                 (2)
      "key.ignore": "false",                                                        (3)
      "type.name": "customer"                                                       (4)
    }
  }
}

该请求配置这些选项:

  • 1.从 Debezium 的更改数据消息中仅提取新行的状态
  • 2.从密钥结构中提取 id 字段,然后将相同的密钥用于源和两个目标。这是为了解决 Elasticsearch 连接器仅支持数字类型和字符串作为键的事实。如果我们不提取 ID,则由于密钥类型未知,消息将被连接器过滤掉。
  • 3.使用事件中的密钥而不是生成合成密钥
  • 4.事件将在 Elasticsearch 中注册的类型

接下来我们将注册 JDBC Sink 连接器写入 PostgreSQL 数据库:

curl -i -X POST -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/ \
    -d @jdbc-sink.json

最后,必须设置源连接器:

curl -i -X POST -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/ \
    -d @source.json

让我们检查一下数据库和搜索服务器是否同步。客户表的所有行都应该在源数据库 (MySQL) 以及目标数据库 (Postgres) 和 Elasticsearch 中找到:

docker-compose exec mysql bash -c 'mysql -u $MYSQL_USER  -p$MYSQL_PASSWORD inventory -e "select * from customers"'
+------+------------+-----------+-----------------------+
| id   | first_name | last_name | email                 |
+------+------------+-----------+-----------------------+
| 1001 | Sally      | Thomas    | sally.thomas@acme.com |
| 1002 | George     | Bailey    | gbailey@foobar.com    |
| 1003 | Edward     | Walker    | ed@walker.com         |
| 1004 | Anne       | Kretchmar | annek@noanswer.org    |
+------+------------+-----------+-----------------------+
docker-compose exec postgres bash -c 'psql -U $POSTGRES_USER $POSTGRES_DB -c "select * from customers"'
 last_name |  id  | first_name |         email
-----------+------+------------+-----------------------
 Thomas    | 1001 | Sally      | sally.thomas@acme.com
 Bailey    | 1002 | George     | gbailey@foobar.com
 Walker    | 1003 | Edward     | ed@walker.com
 Kretchmar | 1004 | Anne       | annek@noanswer.org
curl 'http://localhost:9200/customers/_search?pretty'
{
  "took" : 42,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "failed" : 0
  },
  "hits" : {
    "total" : 4,
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "customers",
        "_type" : "customer",
        "_id" : "1001",
        "_score" : 1.0,
        "_source" : {
          "id" : 1001,
          "first_name" : "Sally",
          "last_name" : "Thomas",
          "email" : "sally.thomas@acme.com"
        }
      },
      {
        "_index" : "customers",
        "_type" : "customer",
        "_id" : "1004",
        "_score" : 1.0,
        "_source" : {
          "id" : 1004,
          "first_name" : "Anne",
          "last_name" : "Kretchmar",
          "email" : "annek@noanswer.org"
        }
      },
      {
        "_index" : "customers",
        "_type" : "customer",
        "_id" : "1002",
        "_score" : 1.0,
        "_source" : {
          "id" : 1002,
          "first_name" : "George",
          "last_name" : "Bailey",
          "email" : "gbailey@foobar.com"
        }
      },
      {
        "_index" : "customers",
        "_type" : "customer",
        "_id" : "1003",
        "_score" : 1.0,
        "_source" : {
          "id" : 1003,
          "first_name" : "Edward",
          "last_name" : "Walker",
          "email" : "ed@walker.com"
        }
      }
    ]
  }
}

在连接器仍在运行的情况下,我们可以向 MySQL 数据库添加一个新行,然后检查它是否已复制到 PostgreSQL 数据库和 Elasticsearch 中:

docker-compose exec mysql bash -c 'mysql -u $MYSQL_USER  -p$MYSQL_PASSWORD inventory'

mysql> insert into customers values(default, 'John', 'Doe', 'john.doe@example.com');
Query OK, 1 row affected (0.02 sec)
docker-compose exec -postgres bash -c 'psql -U $POSTGRES_USER $POSTGRES_DB -c "select * from customers"'
 last_name |  id  | first_name |         email
-----------+------+------------+-----------------------
...
Doe        | 1005 | John       | john.doe@example.com
(5 rows)

curl 'http://localhost:9200/customers/_search?pretty'
...
{
  "_index" : "customers",
  "_type" : "customer",
  "_id" : "1005",
  "_score" : 1.0,
  "_source" : {
    "id" : 1005,
    "first_name" : "John",
    "last_name" : "Doe",
    "email" : "john.doe@example.com"
  }
}
...

五、总结

我们设置了一个复杂的流数据管道来将 MySQL 数据库与另一个数据库以及 Elasticsearch 实例同步。我们设法在所有系统中保留相同的标识符,这使我们能够将整个系统的记录关联起来。

将数据更改从主数据库近乎实时地传播到 Elasticsearch 等搜索引擎可以实现许多有趣的用例。除了全文搜索的不同应用之外,我们还可以考虑使用 Kibana 创建仪表板和各种可视化效果,以进一步深入了解数据。文章来源地址https://www.toymoban.com/news/detail-544016.html

到了这里,关于Debezium系列之:基于debezium将mysql数据库数据更改流式传输到 Elasticsearch和PostgreSQL数据库的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Debezium同步Mysql数据到Kafka

    Kafka:3.3.2 mysql-connector:1.8.1 (0)前提是安装好mysql,开启binlog (1)下载kafka (2)下载mysql-connector插件 (3)编辑配置文件 (4)启动kafka自带的zk (5)启动kafka (6)启动connect (7)调用api 注意:当成功调用api,创建此连接器后会有如下主题产生:dbhistory.inventory、mysql1、

    2024年02月10日
    浏览(35)
  • Debezium系列之:监控 Debezium

    Debezium JMX相关的技术博客: Debezium系列之:安装jmx导出器监控debezium指标 Debezium系列之:为Debezium集群JMX页面增加监控,JMX页面出现异常时发送飞书告警,确保任务能够获取debezium集群指标 Debezium系列之:深入解读Debezium重要的jmx指标 Debezium系列之:mysql JMX metrics指标详细解读

    2024年02月11日
    浏览(38)
  • Debezium系列之:更新数据,在数据的headers中添加数据发生变化的字段

    在保持debezium数据格式不变的情况下,更新数据,把数据发生变化的字段和数据没有发生变化的字段存放到headers中

    2024年02月15日
    浏览(35)
  • 从 MySQL 到 DolphinDB,Debezium + Kafka 数据同步实战

    Debezium 是一个开源的分布式平台,用于实时捕获和发布数据库更改事件。它可以将关系型数据库(如 MySQL、PostgreSQL、Oracle 等)的变更事件转化为可观察的流数据,以供其他应用程序实时消费和处理。 本文中我们将采用 Debezium 与 Kafka 组合的方式来实现从 MySQL 到 DolphinDB 的数

    2024年02月02日
    浏览(35)
  • Debezium系列之:在 Kubernetes 上部署 Debezium

    K8s相关知识可以阅读博主以下几篇技术博客: K8s系列之:搭建高可用K8s v1.23.5集群详细步骤,3个master节点,3个Node节点 K8s系列之:Pod的基本用法 k8s系列之:kubectl子命令详解一 k8s系列之:kubectl子命令详解二 更多K8s知识点详见博主K8s系列文章 更多Debezium内容请阅读博主Debezi

    2024年02月11日
    浏览(36)
  • Debezium日常分享系列之:Debezium and TimescaleDB

    TimescaleDB 是一个开源数据库,旨在使 SQL 对于时间序列数据具有可扩展性。它是作为 PostgreSQL 数据库的扩展实现的。这一事实促使我们重新使用标准 Debezium PostgreSQL 连接器,并将 TimescaleDB 支持实现为单个消息转换 (SMT)。 TimescaleDB 提供了三个基本构建块/概念: Hypertables Contin

    2024年01月17日
    浏览(36)
  • Debezium日常分享系列之:在 OpenShift 上部署 Debezium

    此过程用于在 Red Hat 的 OpenShift 容器平台上设置 Debezium 连接器。要在 OpenShift 上进行开发或测试,您可以使用 CodeRady 容器。 为了使容器与集群上的其他工作负载分开,请为 Debezium 创建一个专用项目。在本文档的其余部分中,将使用 debezium-example 命名空间: 对于 Debezium 部署,

    2024年02月16日
    浏览(30)
  • Debezium日常分享系列之:向 Debezium 连接器发送信号

    Debezium 信号机制提供了一种修改连接器行为或触发一次性操作(例如启动表的临时快照)的方法。要使用信号触发连接器执行指定操作,可以将连接器配置为使用以下一个或多个通道: 源信号通道:可以发出 SQL 命令将信号消息添加到专门的信令数据集合中。在源数据库上创

    2024年02月03日
    浏览(33)
  • Debezium日常分享系列之:Debezium 信号发送和通知 - 第 1 部分

    本系列文章将介绍 Debezium 提供的信号和通知功能,并讨论与平台交互的可用渠道。在本系列的后续部分中,我们将更深入地研究自定义信令通道并探索其他主题,例如 JMX 信令和通知。 在当今互连的软件应用程序和系统中,与其他产品无缝集成对于构建强大而高效的解决方案

    2024年02月16日
    浏览(27)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包