Kafka系列之:连接器客户端配置覆盖策略

这篇具有很好参考价值的文章主要介绍了Kafka系列之:连接器客户端配置覆盖策略。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、背景

KAFKA引入了每个源连接器和接收器连接器从工作线程属性继承其客户端配置的功能。在工作线程属性中,任何具有“生产者”或“消费者”前缀的配置。分别应用于所有源连接器和接收器连接器。虽然最初的提案允许覆盖源连接器和接收器连接器,但它在允许连接器的不同配置方面仍然受到限制。通常,连接用户希望能够执行以下操作:-

  • 对于每个连接器使用不同的主体,以便它们可以在细粒度级别控制 ACL
  • 能够优化每个连接器的生产者和消费者配置,以便根据其性能特征设置连接器

KIP-296:客户端配置的连接器级别可配置性旨在通过允许覆盖所有配置来解决此问题。但 KIP 不提供连接操作员控制连接器可以覆盖的内容的能力。如果没有这种能力,连接器和工作人员之间将不会有清晰的分隔线,因为连接器本身现在可以假设覆盖可用。但从操作角度来看,最好执行以下规定。

  • 能够控制可以覆盖的配置键。例如管理员可能永远不希望代理端点被覆盖
  • 能够控制被覆盖的配置的允许值。这有助于管理员定义其集群的边界并有效地管理多租户集群。管理员可能永远不希望“send.buffer.bytes”超过 512 kb
  • 能够根据连接器类型、客户端类型(管理员、生产者、消费者)等控制上述内容。

基于上述上下文,该提案允许管理员围绕可以覆盖的内容定义/实施策略,与“CreateTopicPolicy”非常相似,“CreateTopicPolicy”允许和控制在主题级别指定的配置。

二、公共接口

在较高层面上,该提案旨在引入类似于 Core Kafka 中可用的 CreateTopicPolicy 的可配置策略,用于连接器客户端配置覆盖。更具体地说,我们将引入一个新的工作配置,该配置将允许管理员配置连接器客户端配置覆盖的策略。

新配置

Connector.client.config.override.policy - 这将是连接 API 中引入的新接口 ConnectorClientConfigOverridePolicy 的实现。默认值为“None”,不允许任何覆盖。由于用户已经使用建议的前缀进行配置的可能性非常小,因此向后兼容性通常不是问题。在极少数情况下,用户在现有配置中拥有这些配置,他们必须删除配置才能使其再次工作。

可以使用以下前缀在连接器配置中指定覆盖

  • Producer.override. - 用于 SinkConnector 上下文中的源连接器的生产者和 DLQ 生产者
  • consumer.override. - 用于接收器连接器
  • admin.override. - 用于在 Sink Connector 中创建 DLQ 主题(KIP 还允许使用 admin 前缀在工作线程中指定 DLQ 设置,以便与生产者和消费者保持一致)

管理员可以指定 ConnectorClientConfigOverridePolicy 实现的完全限定类名或别名(别名被计算为接口名称“ConnectorClientConfigOverridePolicy”的前缀,这正是大多数现有连接插件计算其别名的方式)。

新接口将被视为新的连接插件,并将通过插件路径机制加载。这些插件将通过类似于 RestExtension 和 ConfigProvider 的服务加载器机制来发现。新接口的结构及其请求描述如下:-

import org.apache.kafka.common.config.ConfigValue;
 
/**
 * <p>An interface for enforcing a policy on overriding of client configs via the connector configs.
 *
 * <p>Common use cases are ability to provide principal per connector, <code>sasl.jaas.config</code>
 * and/or enforcing that the producer/consumer configurations for optimizations are within acceptable ranges.
 */
public interface ConnectorClientConfigOverridePolicy extends Configurable, AutoCloseable {
 
 
    /**
     * Worker will invoke this while constructing the producer for the SourceConnectors,  DLQ for SinkConnectors and the consumer for the
     * SinkConnectors to validate if all of the overridden client configurations are allowed per the
     * policy implementation. This would also be invoked during the validate of connector configs via the Rest API.
     *
     * If there are any policy violations, the connector will not be started.
     *
     * @param connectorClientConfigRequest an instance of {@code ConnectorClientConfigRequest} that provides the configs to overridden and
     *                                     its context; never {@code null}
     * @return List of Config, each Config should indicate if they are allowed via {@link ConfigValue#errorMessages}
     */
    List<ConfigValue> validate(ConnectorClientConfigRequest connectorClientConfigRequest);
}
public class ConnectorClientConfigRequest {
 
    private Map<String, Object> clientProps;
    private ClientType  clientType;
    private String connectorName;
    private ConnectorType connectorType;
    private Class<? extends Connector> connectorClass;
 
    public ConnectorClientConfigRequest(
        String connectorName,
        ConnectorType connectorType,
        Class<? extends Connector> connectorClass,
        Map<String, Object> clientProps,
        ClientType clientType) {
        this.clientProps = clientProps;
        this.clientType = clientType;
        this.connectorName = connectorName;
        this.connectorType = connectorType;
        this.connectorClass = connectorClass;
    }
 
    /**
     * <pre>
     * Provides Config with prefix {@code producer.override.} for {@link ConnectorType#SOURCE}.
     * Provides Config with prefix {@code consumer.override.} for {@link ConnectorType#SINK}.
     * Provides Config with prefix {@code producer.override.} for {@link ConnectorType#SINK} for DLQ.
     * Provides Config with prefix {@code admin.override.} for {@link ConnectorType#SINK} for DLQ.
     * </pre>
     *
     * @return The client properties specified in the Connector Config with prefix {@code producer.override.} ,
     * {@code consumer.override.} and {@code admin.override.}. The configs returned don't include these prefixes.
     */
    public Map<String, Object> clientProps() {
        return clientProps;
    }
 
    /**
     * <pre>
     * {@link ClientType#PRODUCER} for {@link ConnectorType#SOURCE}
     * {@link ClientType#CONSUMER} for {@link ConnectorType#SINK}
     * {@link ClientType#PRODUCER} for DLQ in {@link ConnectorType#SINK}
     * {@link ClientType#ADMIN} for DLQ  Topic Creation in {@link ConnectorType#SINK}
     * </pre>
     *
     * @return enumeration specifying the client type that is being overriden by the worker; never null.
     */
    public ClientType clientType() {
        return clientType;
    }
 
    /**
     * Name of the connector specified in the connector config.
     *
     * @return name of the connector; never null.
     */
    public String connectorName() {
        return connectorName;
    }
 
    /**
     * Type of the Connector.
     *
     * @return enumeration specifying the type of the connector {@link ConnectorType#SINK} or {@link ConnectorType#SOURCE}.
     */
    public ConnectorType connectorType() {
        return connectorType;
    }
 
    /**
     * The class of the Connector.
     *
     * @return the class of the Connector being created; never null
     */
    public Class<? extends Connector> connectorClass() {
        return connectorClass;
    }
 
    public enum ClientType {
        PRODUCER, CONSUMER, ADMIN;
    }
}

KIP 引入了 ConnectorClientConfigOverridePolicy 的以下实现,如下表所示

类名 别名 描述
AllConnectorClientConfigOverridePolicy All 允许覆盖生产者、消费者和管理员前缀的所有配置。
NoneConnectorClientConfigOverridePolicy None 不允许任何配置覆盖。这将是默认策略。
PrincipalConnectorClientConfigOverridePolicy Principal 允许覆盖生产者、消费者和管理员前缀的“security.protocol”、“sasl.jaas.config”和“sasl.mechanism”。能够为每个连接器使用不同的主体。

由于用户可以指定任何这些策略,因此连接器本身不应依赖于这些配置的可用性。这些覆盖纯粹是从操作角度使用的。

当用户尝试创建连接器或验证连接器时,将强制执行策略本身。当任何 ConfigValue 有错误消息时

  • 验证期间,响应将包含错误,并且不符合策略的特定配置也会包含响应中包含的错误消息
  • 在创建/更新连接器期间,连接器将无法启动

三、推荐的改动

如上一节所述,设计将包括引入新的工作配置和定义覆盖策略的接口。

工作人员将在创建连接器流程期间应用该策略,如下所示。被覆盖的配置将在没有策略前缀的情况下传递:-

  • 为 WorkerSourceTask 构建生产者 - 使用“ Producer.override ”调用所有配置的验证。 prefix , ClientType=Producer, ConnectorType=Source 并覆盖(如果没有违反策略)

  • 为 DLQ 主题构建 DeadLetterQueueReporter 的管理客户端和生产者

    • 使用“ Producer.override ”调用所有配置的验证。 prefix , ClientType=Producer, ConnectorType=Sink 并覆盖(如果没有违反策略)
    • 使用“admin.override”调用所有配置的验证。 prefix , ClientType=Admin, ConnectorType=Sink 并覆盖(如果没有违反策略)
  • 为 WorkerSinkTask 构建 Consumer - 使用“consumer.override”调用所有配置的验证。 prefix , ClientType=Consumer, ConnectorType=Sink 并覆盖(如果没有违反策略)

在 validate() 流程中,herder(AbstractHerder) 将按如下所示对所有覆盖应用该策略。被覆盖的配置将在没有前缀的情况下传递:-

  • 如果它是源连接器,请在每个带有“生产者”的连接器配置上应用策略。添加前缀并更新 ConfigInfos 结果(验证 API 的响应)
  • 如果是水槽连接器,将策略应用到每个带有“consumer”的连接器配置上。添加前缀并更新 ConfigInfos 结果(验证 API 的响应)
  • 使用“admin”在每个连接器配置上应用该策略。启用 DLQ 时添加前缀并更新 ConfigInfos 结果(验证 API 的响应)

四、兼容性、弃用和迁移计划

有人拥有带有建议前缀的连接器的可能性非常小,因此向后兼容性并不是真正的问题。在极少数情况下,如果用户具有带有这些前缀的配置,他们必须删除配置或更改策略才能使其正常工作。文章来源地址https://www.toymoban.com/news/detail-503916.html

到了这里,关于Kafka系列之:连接器客户端配置覆盖策略的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【flink sql】kafka连接器

    Kafka 连接器提供从 Kafka topic 中消费和写入数据的能力。 前面已经介绍了flink sql创建表的语法及说明:【flink sql】创建表 这篇博客聊聊怎么通过flink sql连接kafka 以下的连接器元数据可以在表定义中通过元数据列的形式获取。 R/W 列定义了一个元数据是可读的(R)还是可写的(

    2024年02月08日
    浏览(51)
  • 成集云 | 抖店连接器客户静默下单催付数据同步钉钉 | 解决方案

    源系统 成集云 目标系统 随着各品牌全渠道铺货,主播在平台上直播时客户下了订单后不能及时付款,第一时间客户收不到提醒,不仅造成了客户付款率下降,更大量消耗了企业的人力成本和经济。而成集云与钉钉深度合作,企业可以通过成集云-抖店连接器将电商平台的数据

    2024年02月11日
    浏览(86)
  • Semantic Kernel 入门系列:?Connector连接器

    当我们使用Native Function的时候,除了处理一些基本的逻辑操作之外,更多的还是需要进行外部数据源和服务的对接,要么是获取相关的数据,要么是保存输出结果。这一过程在Semantic Kernel中可以被归类为Connector。 Connector更像是一种设计模式,并不像Function和Memory 一样有强制和

    2023年04月15日
    浏览(45)
  • Flink系列之:Elasticsearch SQL 连接器

    Sink: Batch Sink: Streaming Append Upsert Mode Elasticsearch 连接器允许将数据写入到 Elasticsearch 引擎的索引中。本文档描述运行 SQL 查询时如何设置 Elasticsearch 连接器。 连接器可以工作在 upsert 模式,使用 DDL 中定义的主键与外部系统交换 UPDATE/DELETE 消息。 如果 DDL 中没有定义主键,那么

    2024年02月04日
    浏览(56)
  • Flink系列之:JDBC SQL 连接器

    Scan Source: Bounded Lookup Source: Sync Mode Sink: Batch Sink: Streaming Append Upsert Mode JDBC 连接器允许使用 JDBC 驱动向任意类型的关系型数据库读取或者写入数据。本文档描述了针对关系型数据库如何通过建立 JDBC 连接器来执行 SQL 查询。 如果在 DDL 中定义了主键,JDBC sink 将以 upsert 模式与外

    2024年02月02日
    浏览(48)
  • Debezium日常分享系列之:向 Debezium 连接器发送信号

    Debezium 信号机制提供了一种修改连接器行为或触发一次性操作(例如启动表的临时快照)的方法。要使用信号触发连接器执行指定操作,可以将连接器配置为使用以下一个或多个通道: 源信号通道:可以发出 SQL 命令将信号消息添加到专门的信令数据集合中。在源数据库上创

    2024年02月03日
    浏览(44)
  • Kafka系列 - 生产者客户端架构以及3个重要参数

    整个生产者客户端由两个县城协调运行,这两个线程分别为主线程和Sender线程(发送线程)。 主线程中由KafkaProducer创建消息,然后通过可能的拦截器,序列化器和分区器之后缓存到 消息累加器(RecordAccumulator) 。Sender线程负责从RecordAccumulator中获取消息并将其发送到kafka中。

    2024年02月04日
    浏览(47)
  • 【Kafka源码走读】Admin接口的客户端与服务端的连接流程

    注:本文对应的kafka的源码的版本是trunk分支。写这篇文章的主要目的是当作自己阅读源码之后的笔记,写的有点凌乱,还望大佬们海涵,多谢! 最近在写一个Web版的kafka客户端工具,然后查看Kafka官网,发现想要与Server端建立连接,只需要执行 方法即可,但其内部是如何工作

    2024年02月16日
    浏览(46)
  • Debezium日常分享系列之:使用 Debezium 连接器实现密钥外部化

    隐藏数据库的账号和密码 当 Debezium 连接器部署到 Kafka Connect 实例时,有时需要对 Connect API 的其他用户隐藏数据库凭据。 让我们回顾一下 MySQL Debezium connector的连接器注册请求: 用户名和密码以纯字符串形式传递给 API。更糟糕的是,任何有权访问 Kafka Connect 集群及其 REST AP

    2024年02月16日
    浏览(44)
  • Flink系列之:Flink CDC深入了解MySQL CDC连接器

    增量快照读取是一种读取表快照的新机制。与旧的快照机制相比,增量快照具有许多优点,包括: (1)在快照读取期间,Source 支持并发读取 (2)在快照读取期间,Source 支持进行 chunk 粒度的 checkpoint (3)在快照读取之前,Source 不需要数据库锁权限。 如果希望 source 并行运

    2024年02月02日
    浏览(49)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包