Debezium日常分享系列之:向 Debezium 连接器发送信号

这篇具有很好参考价值的文章主要介绍了Debezium日常分享系列之:向 Debezium 连接器发送信号。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、概述

Debezium 信号机制提供了一种修改连接器行为或触发一次性操作(例如启动表的临时快照)的方法。要使用信号触发连接器执行指定操作,可以将连接器配置为使用以下一个或多个通道:

  • 源信号通道:可以发出 SQL 命令将信号消息添加到专门的信令数据集合中。在源数据库上创建的信令数据集合专门用于与 Debezium 进行通信。
  • Kafka信号通道;将信号消息提交到可配置的 Kafka 主题。
  • Jmx信号通道:通过 JMX 信号操作提交信号。
  • 文件信号通道:可以使用文件来发送信号。
  • Custom:将信号提交到实施的自定义通道。当 Debezium 检测到新的日志记录或临时快照记录添加到通道时,它会读取信号并启动请求的操作。

信号传输可与以下 Debezium 连接器一起使用:

  • Db2
  • MongoDB
  • MySQL
  • Oracle
  • PostgreSQL
  • SQL Server

可以通过设置 signal.enabled.channels 配置属性来指定启用哪个通道。该属性列出了已启用的通道的名称。默认情况下,Debezium 提供以下渠道:source 和 kafka。源通道默认启用,因为增量快照信号需要它。

二、激活源信号通道

默认情况下,Debezium 源信令通道已启用。

必须为要使用它的每个连接器显式配置信令。

程序:

  • 在源数据库上,创建信令数据收集表,用于向连接器发送信号。
  • 对于实现本机变更数据捕获 (CDC) 机制的源数据库(例如 Db2 或 SQL Server),为信令表启用 CDC。
  • 将信令数据集合的名称添加到 Debezium 连接器配置中。在连接器配置中,添加属性 signal.data.collection,并将其值设置为您在步骤 1 中创建的信令数据集合的完全限定名称。

例如,signal.data.collection = inventory.debezium_signals。

信令集合的完全限定名称的格式取决于连接器。

以下示例显示了每个连接器使用的命名格式:

  • Db2:.
  • MongoDB:.
  • MySQL:.
  • Oracle:..
  • PostgreSQL:.
  • SQL Server:..

三、信令数据集合的结构

信令数据集合或信令表存储您发送到连接器以触发指定操作的信号。信令表的结构必须符合以下标准格式。

  • 包含三个字段(列)。
  • 字段按特定顺序排列,如表 1 所示。

表 1. 信令数据集合所需的结构

字段 类型 描述
id(required) string 标识信号实例的任意唯一字符串。为提交到信令表的每个信号分配一个 ID。通常,ID 是 UUID 字符串。可以使用信号实例进行日志记录、调试或重复数据删除。当信号触发 Debezium 执行增量快照时,它会生成带有任意 id 字符串的信号消息。生成的消息包含的 id 字符串与提交信号中的 id 字符串无关。
type(required) string 指定要发送的信号类型。可以将某些信号类型与任何可提供信号传输的连接器一起使用,而其他信号类型仅可用于特定的连接器。
data(optional) string 指定要传递给信号操作的 JSON 格式的参数。每种信号类型都需要一组特定的数据。

数据集合中的字段名称是任意的。上表提供了建议的名称。如果使用不同的命名约定,请确保每个字段中的值与预期内容一致。

四、创建信令数据集合

可以通过向源数据库提交标准 SQL DDL 查询来创建信令表。

先决条件:

  • 有足够的访问权限在源数据库上创建表。

程序:

  • 向源数据库提交SQL查询,创建符合所需结构的表,如下例所示:
CREATE TABLE <tableName> (id VARCHAR(<varcharValue>) PRIMARY KEY, type VARCHAR(<varcharValue>) NOT NULL, data VARCHAR(<varcharValue>) NULL);

注意:

分配给 id 变量的 VARCHAR 参数的空间量必须足以容纳发送到信令表的信号 ID 字符串的大小。如果 ID 的大小超出可用空间,连接器将无法处理信号。

以下示例显示了创建三列 debezium_signal 表的 CREATE TABLE 命令:

CREATE TABLE debezium_signal (id VARCHAR(42) PRIMARY KEY, type VARCHAR(32) NOT NULL, data VARCHAR(2048) NULL);

五、激活kafka信号通道

可以通过将 Kafka 信令通道添加到 signal.enabled.channels 配置属性,然后将接收信号的主题名称添加到 signal.kafka.topic 属性来启用 Kafka 信令通道。启用信令通道后,将创建 Kafka 消费者来消费发送到配置的信号主题的信号。

可供消费者使用的附加配置:

  • Db2 connector Kafka signal configuration properties
  • MongoDB connector Kafka signal configuration properties
  • MySQL connector Kafka signal configuration properties
  • Oracle connector Kafka signal configuration properties
  • PostgreSQL connector Kafka signal configuration properties
  • SQL Server connector Kafka signal configuration properties

注意:

  • 要使用 Kafka 信令触发大多数连接器的临时增量快照,必须首先在连接器配置中启用源信令通道。
  • 源通道实现了水印机制,以对可能由增量快照捕获并在流恢复后再次捕获的事件进行重复数据删除。
  • 使用信令通道触发启用GTID的只读MySQL数据库的增量快照时,不需要启用源通道。

六、数据格式

Kafka 消息的键必须与 topic.prefix 连接器配置选项的值匹配。

该值是具有类型和数据字段的 JSON 对象。

当信号类型设置为执行快照时,数据字段必须包括下表中列出的字段:

表 2. 执行快照数据字段

字段 默认值
type incremental 要运行的快照的类型。目前 Debezium 支持增量和阻塞类型。
data-collections N/A 一组以逗号分隔的正则表达式,与要包含在快照中的数据集合的完全限定名称相匹配。使用与 signal.data.collection 配置选项所需的格式相同的格式指定名称。
additional-condition N/A 一个可选字符串,指定连接器评估的条件,以指定要包含在快照中的记录子集。注意:此属性已弃用,应由附加条件属性替换。
additional-conditions N/A 一个可选数组,指定连接器评估的一组附加条件,以确定要包含在快照中的记录子集。每个附加条件都是一个对象,指定过滤临时快照捕获的数据的条件。您可以为每个附加条件设置以下属性:数据采集:过滤器应用到的 {data-collection} 的完全限定名称。您可以对每个{data-collection}应用不同的过滤器。过滤:指定数据库记录中必须存在的列值,快照才能包含该列值,例如“color=‘blue’”。快照进程根据过滤器值评估 {data-collection} 中的记录,并仅捕获包含匹配值的记录。分配给过滤器属性的具体值取决于临时快照的类型:对于增量快照,可以指定一个搜索条件片段,例如“color=‘blue’”,快照会将其附加到查询的条件子句中。对于阻塞快照,可以指定完整的 SELECT 语句,例如您可以在 snapshot.select.statement.overrides 属性中设置的语句。

以下示例显示了典型的执行快照 Kafka 消息:

Key = `test_connector`

Value = `{"type":"execute-snapshot","data": {"data-collections": ["schema1.table1", "schema1.table2"], "type": "INCREMENTAL"}}`

七、激活JMX信号通道

可以通过将 jmx 添加到连接器配置中的 signal.enabled.channels 属性来启用 JMX 信号,然后启用 JMX MBean 服务器来公开信号 Bean。

程序

  • 使用首选的 JMX 客户端(例如 JConsole 或 JDK Mission Control)连接到 MBean 服务器。

  • 搜索 Mbean debezium.<连接器类型>.management.signals.<服务器>。 Mbean 公开接受以下输入参数的信号操作:

    • p0:信号的 ID。
    • p1:信号的类型,例如执行快照。
    • p2:包含有关指定信号类型的附加信息的 JSON 数据字段。
  • 通过提供输入参数的值来发送执行快照信号。

在 JSON 数据字段中,包含下表中列出的信息:

表 2. 执行快照数据字段

字段 默认值
type incremental 要运行的快照的类型。目前 Debezium 支持增量和阻塞类型。
data-collections N/A 一组以逗号分隔的正则表达式,与要包含在快照中的数据集合的完全限定名称相匹配。使用与 signal.data.collection 配置选项所需的格式相同的格式指定名称。
additional-condition N/A 一个可选字符串,指定连接器评估的条件,以指定要包含在快照中的记录子集。注意:此属性已弃用,应由附加条件属性替换。
additional-conditions N/A 一个可选数组,指定连接器评估的一组附加条件,以确定要包含在快照中的记录子集。每个附加条件都是一个对象,指定过滤临时快照捕获的数据的条件。您可以为每个附加条件设置以下属性:数据采集:过滤器应用到的 {data-collection} 的完全限定名称。您可以对每个{data-collection}应用不同的过滤器。过滤:指定数据库记录中必须存在的列值,快照才能包含该列值,例如“color=‘blue’”。快照进程根据过滤器值评估 {data-collection} 中的记录,并仅捕获包含匹配值的记录。分配给过滤器属性的具体值取决于临时快照的类型:对于增量快照,可以指定一个搜索条件片段,例如“color=‘blue’”,快照会将其附加到查询的条件子句中。对于阻塞快照,可以指定完整的 SELECT 语句,例如可以在 snapshot.select.statement.overrides 属性中设置的语句。

下图显示了如何使用 JConsole 发送信号的示例:

Debezium日常分享系列之:向 Debezium 连接器发送信号,日常分享专栏,Debezium日常分享系列,向Debezium连接器,发送信号

八、自定义信令通道

信令机制被设计为可扩展的。可以根据需要实施通道,以最适合您环境的方式向 Debezium 发送信号。

添加信令通道涉及几个步骤:

  • 为通道创建一个Java项目来实现通道,并添加Debezium Core作为依赖项。
  • 部署自定义信令通道。
  • 通过修改连接器配置,使连接器能够使用自定义信令通道。

提供自定义信令通道

自定义信号通道是实现 io.debezium.pipeline.signal.channels.SignalChannelReader 服务提供者接口 (SPI) 的 Java 类。例如:

public interface SignalChannelReader {

    String name(); 

    void init(CommonConnectorConfig connectorConfig); 

    List<SignalRecord> read(); 

    void close(); 
}
  • 读者姓名。要使 Debezium 能够使用通道,请在连接器的 signal.enabled.channels 属性中指定此名称。
  • 初始化通道所需的特定配置、变量或连接。
  • 从通道读取信号。 SignalProcessor 类调用此方法来检索要处理的信号。
  • 关闭所有分配的资源。 Debezium 在连接器停止时调用此方法。

九、Debezium 核心模块依赖项

自定义信令通道 Java 项目具有对 Debezium 核心模块的编译依赖项。必须将这些编译依赖项包含在项目的 pom.xml 文件中,如以下示例所示:

<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-core</artifactId>
    <version>${version.debezium}</version> 
</dependency>
  • ${version.debezium} 表示 Debezium 连接器的版本。
  • 在 META-INF/services/io.debezium.pipeline.signal.channels.SignalChannelReader 中声明实现

十、部署自定义信令通道

先决条件

  • 有一个自定义信令通道 Java 程序。

程序

  • 要将自定义信号通道与 Debezium 连接器结合使用,请将 Java 项目导出到 JAR 文件,然后将该文件复制到包含要与其一起使用的每个 Debezium 连接器的 JAR 文件的目录。
  • 例如,在典型部署中,Debezium 连接器文件存储在 Kafka Connect 目录 (/kafka/connect) 的子目录中,每个连接器 JAR 位于其自己的子目录中 (/kafka/connect/debezium-connector-db2、/kafka /connect/debezium-connector-mysql 等)。

注意:

  • 要将自定义信号通道与多个连接器一起使用,必须将自定义信号通道 JAR 文件的副本放置在每个连接器的子目录中。

配置连接器以使用自定义信号通道

  • 将自定义信令通道的名称添加到 signal.enabled.channels 配置属性中。

十一、信号动作

可以使用信令来发起以下操作:

  • 将消息添加到日志中。
  • 触发临时增量快照。
  • 停止执行临时快照。
  • 暂停增量快照。
  • 恢复增量快照。
  • 触发临时阻塞快照。
  • 自定义动作。

有些信号并不与所有连接器兼容。

十二、记录信号

可以通过创建具有日志信号类型的信令表条目来请求连接器将条目添加到日志中。处理信号后,连接器将指定的消息打印到日志中。或者,可以配置信号,以便生成的消息包含流坐标。

表 4. 用于添加日志消息的信令记录示例

字段 描述
id 924e3ff8-2245-43ca-ba77-2af9af02fa07
type log 信号的动作类型。
data {“message”: “Signal message at offset {}”} message 参数指定要打印到日志的字符串。
如果您向消息添加占位符 ({}),它将被替换为流坐标。

十三、即席快照信号

可以通过创建具有执行快照信号类型的信号来请求连接器启动临时快照。处理信号后,连接器运行请求的快照操作。

与连接器首次启动后运行的初始快照不同,临时快照是在连接器已经开始从数据库传输更改事件之后在运行时期间发生的。可以随时启动临时快照。

临时快照可用于以下 Debezium 连接器:

  • Db2
  • MongoDB
  • MySQL
  • Oracle
  • PostgreSQL
  • SQL Server

表 5. 临时快照信号记录示例

字段
id d139b9b7-7777-4547-917d-e1775ea61d41
type execute-snapshot
data {“data-collections”: [“public.MyFirstTable”, “public.MySecondTable”]}

表 6. 即席快照信号消息示例

test_connector {“type”:“execute-snapshot”,“data”: {“data-collections”: [“public.MyFirstTable”], “type”: “INCREMENTAL”, “additional-conditions”:[{“data-collection”: “public.MyFirstTable”, “filter”:“color=‘blue’ AND brand=‘MyBrand’”]}}

其他资源

  • Db2 连接器增量快照
  • MongoDB 连接器增量快照
  • MySQL 连接器增量快照
  • Oracle 连接器增量快照
  • PostgreSQL 连接器增量快照
  • SQL Server 连接器增量快照

十四、特别快照停止信号

可以通过创建具有停止快照信号类型的信号表条目来请求连接器停止正在进行的临时快照。处理完信号后,连接器将停止当前正在进行的快照操作。

表 7. 停止临时快照信号记录示例

字段
id d139b9b7-7777-4547-917d-e1775ea61d41
type stop-snapshot
data {“type”:“INCREMENTAL”, “data-collections”: [“public.MyFirstTable”]}

必须指定信号的类型。数据收集字段是可选的。将数据收集字段留空以请求连接器停止当前快照中的所有活动。如果希望继续执行增量快照,但希望从快照中排除特定集合,请提供要排除的集合或正则表达式的名称的逗号分隔列表。连接器处理信号后,增量快照将继续,但它会排除指定的集合中的数据。

十五、增量快照

增量快照是一种特定类型的临时快照。在增量快照中,连接器捕获您指定的表的基线状态,类似于初始快照。但是,与初始快照不同,增量快照以块的形式捕获表,而不是一次捕获所有表。连接器使用水印方法来跟踪快照的进度。

通过以块的形式而不是在单个整体操作中捕获指定表的初始状态,增量快照比初始快照过程具有以下优势:

  • 当连接器捕获指定表的基线状态时,来自事务日志的近实时事件流将继续不间断。
  • 如果增量快照过程中断,可以从停止点恢复。
  • 可以随时启动增量快照。

十六、增量快照暂停信号

可以通过创建具有暂停快照信号类型的信号表条目来请求连接器暂停正在进行的增量快照。处理完信号后,连接器将停止暂停当前正在进行的快照操作。因此,无法指定数据收集,因为快照处理将暂停在处理信号时的位置。

表 8. 暂停增量快照信号记录示例

字段
id d139b9b7-7777-4547-917d-e1775ea61d41
type pause-snapshot

必须指定信号的类型。数据字段被忽略。

十七、增量快照恢复信号

可以通过创建具有恢复快照信号类型的信号表条目来请求连接器恢复暂停的增量快照。处理信号后,连接器将恢复之前暂停的快照操作。

表 9. 恢复增量快照信号记录示例

字段
id d139b9b7-7777-4547-917d-e1775ea61d41
type resume-snapshot

十八、阻止快照信号

可以通过创建具有执行快照信号类型和具有值阻塞的 data.type 的信号来请求连接器启动临时阻塞快照。处理信号后,连接器运行请求的快照操作。

与连接器首次启动后运行的初始快照不同,临时阻塞快照在连接器停止从数据库传输更改事件后在运行时发生。您可以随时启动临时阻止快照。

表 10. 阻塞快照信号记录示例

字段
id d139b9b7-7777-4547-917d-e1775ea61d41
type execute-snapshot
data {“type”: “blocking”, “data-collections”: [“schema1.table1”, “schema1.table2”], “additional-conditions”: [{“data-collection”: “schema1.table1”, “filter”: “SELECT * FROM [schema1].[table1] WHERE column1 = 0 ORDER BY column2 DESC”}, {“data-collection”: “schema1.table2”, “filter”: “SELECT * FROM [schema1].[table2] WHERE column2 > 0”}]}

表 11. 阻塞快照信号消息示例

test_connector {“type”:“execute-snapshot”,“data”: {“type”: “blocking”}

十九、应用案例

  • Debezium系列之:实现增量快照incremental技术的详细步骤
  • Debezium系列之:基于数据库信号表和Kafka信号Topic两种技术方案实现增量快照incremental技术的详细步骤
  • Debezium系列之:深入理解临时阻塞快照

更多Debezium实战应用可以参考博主Debezium专栏:文章来源地址https://www.toymoban.com/news/detail-771907.html

  • Debezium专栏,Debezium实战应用详细总结

到了这里,关于Debezium日常分享系列之:向 Debezium 连接器发送信号的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Semantic Kernel 入门系列:?Connector连接器

    当我们使用Native Function的时候,除了处理一些基本的逻辑操作之外,更多的还是需要进行外部数据源和服务的对接,要么是获取相关的数据,要么是保存输出结果。这一过程在Semantic Kernel中可以被归类为Connector。 Connector更像是一种设计模式,并不像Function和Memory 一样有强制和

    2023年04月15日
    浏览(46)
  • Flink系列之:Elasticsearch SQL 连接器

    Sink: Batch Sink: Streaming Append Upsert Mode Elasticsearch 连接器允许将数据写入到 Elasticsearch 引擎的索引中。本文档描述运行 SQL 查询时如何设置 Elasticsearch 连接器。 连接器可以工作在 upsert 模式,使用 DDL 中定义的主键与外部系统交换 UPDATE/DELETE 消息。 如果 DDL 中没有定义主键,那么

    2024年02月04日
    浏览(56)
  • Flink系列之:JDBC SQL 连接器

    Scan Source: Bounded Lookup Source: Sync Mode Sink: Batch Sink: Streaming Append Upsert Mode JDBC 连接器允许使用 JDBC 驱动向任意类型的关系型数据库读取或者写入数据。本文档描述了针对关系型数据库如何通过建立 JDBC 连接器来执行 SQL 查询。 如果在 DDL 中定义了主键,JDBC sink 将以 upsert 模式与外

    2024年02月02日
    浏览(48)
  • Kafka系列之:连接器客户端配置覆盖策略

    KAFKA引入了每个源连接器和接收器连接器从工作线程属性继承其客户端配置的功能。在工作线程属性中,任何具有“生产者”或“消费者”前缀的配置。分别应用于所有源连接器和接收器连接器。虽然最初的提案允许覆盖源连接器和接收器连接器,但它在允许连接器的不同配

    2024年02月11日
    浏览(58)
  • Flink系列之:Upsert Kafka SQL 连接器

    Scan Source: Unbounded 、 Sink: Streaming Upsert Mode Upsert Kafka 连接器支持以 upsert 方式从 Kafka topic 中读取数据并将数据写入 Kafka topic。 作为 source,upsert-kafka 连接器生产 changelog 流,其中每条数据记录代表一个更新或删除事件。更准确地说,数据记录中的 value 被解释为同一 key 的最后一

    2024年01月16日
    浏览(44)
  • Flink系列之:Apache Kafka SQL 连接器

    Scan Source: Unbounded Sink: Streaming Append Mode Kafka 连接器提供从 Kafka topic 中消费和写入数据的能力。 以下示例展示了如何创建 Kafka 表: 以下的连接器元数据可以在表定义中通过元数据列的形式获取。 R/W 列定义了一个元数据是可读的(R)还是可写的(W)。 只读列必须声明为 VI

    2024年02月01日
    浏览(47)
  • Flink系列之:Flink CDC深入了解MySQL CDC连接器

    增量快照读取是一种读取表快照的新机制。与旧的快照机制相比,增量快照具有许多优点,包括: (1)在快照读取期间,Source 支持并发读取 (2)在快照读取期间,Source 支持进行 chunk 粒度的 checkpoint (3)在快照读取之前,Source 不需要数据库锁权限。 如果希望 source 并行运

    2024年02月02日
    浏览(49)
  • Kafka系列之:对源连接器的的Exactly-Once支持

    幂等生产者: 写入 Kafka 的应用程序重复记录的一个常见来源是自动生产者重试,其中生产者会在某些情况下将批次重新发送到 Kafka,即使该批次已由代理提交。KIP-98 允许用户将其生产者配置为幂等地执行这些重试,这样下游应用程序只能看到自动重新发送的生产者批次中任

    2024年02月11日
    浏览(45)
  • Debezium日常分享系列之:Debezium and TimescaleDB

    TimescaleDB 是一个开源数据库,旨在使 SQL 对于时间序列数据具有可扩展性。它是作为 PostgreSQL 数据库的扩展实现的。这一事实促使我们重新使用标准 Debezium PostgreSQL 连接器,并将 TimescaleDB 支持实现为单个消息转换 (SMT)。 TimescaleDB 提供了三个基本构建块/概念: Hypertables Contin

    2024年01月17日
    浏览(50)
  • Debezium日常分享系列之:在 OpenShift 上部署 Debezium

    此过程用于在 Red Hat 的 OpenShift 容器平台上设置 Debezium 连接器。要在 OpenShift 上进行开发或测试,您可以使用 CodeRady 容器。 为了使容器与集群上的其他工作负载分开,请为 Debezium 创建一个专用项目。在本文档的其余部分中,将使用 debezium-example 命名空间: 对于 Debezium 部署,

    2024年02月16日
    浏览(43)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包