Flink 之 Kafka连接器

这篇具有很好参考价值的文章主要介绍了Flink 之 Kafka连接器。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

依赖

Flink附带了一个通用的Kafka连接器,它试图跟踪Kafka客户端的最新版本。Kafka的客户端版本会在Flink不同版本间发生变化。现代Kafka客户端向后兼容broker 0.10.0版本及以后的版本。

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_2.11</artifactId>
    <version>1.14.4</version>
</dependency>

Kafka Source

用法

Kafka Source 提供了一个构造器类来构建KafkaSource的实例。下面代码展示如何构建一个KafkaSource来消费主题“input-topic”最早偏移量的消息,使用“my-group”消费组并将消息的值反序列化为字符串。

KafkaSource<String> source = KafkaSource.<String>builder()
    .setBootstrapServers(brokers)
    .setTopics("input-topic")
    .setGroupId("my-group")
    .setStartingOffsets(OffsetsInitializer.earliest())
    .setValueOnlyDeserializer(new SimpleStringSchema())
    .build();

env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");

构建Kafka Source需要以下属性:

  1. Bootstrap servers,通过setBootstrapServers配置(字符串)
  2. 要订阅的主题和分区
  3. 解析Kafka消息的序列化器

主题分区订阅

Kafka Source提供了3种主题分区订阅的方式

  1. 主题列表,从主题列表中的所有分区订阅消息。eg:

    KafkaSource.builder().setTopics("topic-a", "topic-b")

  2. 主题模式,订阅名称与提供正则表达式匹配的所有主题消息。eg:

    KafkaSource.builder().setTopicPattern("topic.*")

  3. 分区集合,订阅提供的分区集合的分区。eg:

    final HashSet<TopicPartition> partitionSet = new HashSet<>(Arrays.asList(
            new TopicPartition("topic-a", 0),    // 主题"topic-a"的第0个分区
            new TopicPartition("topic-b", 5)));  // 主题"topic-b"的第5个分区
    KafkaSource.builder().setPartitions(partitionSet);
    

反序列化器

解析Kafka消息需要一个反序列化器。反序列化器(反序列化模式)可以通过setDeserializer(KafkaRecordDeserializationSchema)配置,其中KafkaRecordDeserializationSchema定义了如何反序列一个Kafka消费记录(ConsumerRecord)。

如果只需要Kafka ConsumerRecord的值,你可以在构建器中使用setValueOnlyDeserializer(DeserializationSchema),其中DeserializationSchema定义了如何反序列化K阿发消息值的二进制文件。

你也可以使用Kafka反序列化器来反序列化Kafka消息值。例如使用StringDeserializer将Kafka消息值序列化为字符串:

import org.apache.kafka.common.serialization.StringDeserializer;
KafkaSource.<String>builder()       .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class));

起始偏移量

Kafka Source可以通过指定OffsetsInitializer从不同偏移量开始消费消息。内置的初始值包括:

  1. 从消费组的已提交偏移量开始,没有重置策略

    .setStartingOffsets(OffsetsInitializer.committedOffsets())

  2. 从提交的偏移量开始,如果提交的偏移量不存在,使用early作为重置策略

    .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))

  3. 从第一个时间戳大于或等于时间戳(毫秒)的记录开始

    .setStartingOffsets(OffsetsInitializer.timestamp(1657256176000L))

  4. 从最早的偏移量开始

    .setStartingOffsets(OffsetsInitializer.earliest())

  5. 从最新的偏移量开始

    .setStartingOffsets(OffsetsInitializer.latest())

如果上面的内置初始化器不能满足你的需求,你可以实现自定义偏移量初始化器。

如果你没有指定初始化器,默认使用 OffsetsInitializer.earliest()

有界性

Kafka Source被设计成同时支持流和批处理运行模式。默认情况下,Kafka Source被设置为以流的方式运行,因此在Flink作业失败或取消之前从不停止。你可以使用setBounded(OffsetsInitializer)来指定停止偏移量,并设置Source以批量模式运行。当所有分区都达到停止偏移量时,Source将退出。

你也可以使用setUnbounded(OffsetsInitializer)来设置KafkaSource在流模式下运行,但仍然在停止偏移量处停止。

当所有分区达到指定的停止偏移量时,Source将会退出。

其他属性

除了上述描述的的属性,你可以通过使用setProperties(properties)和setProperty(String, String)为Kafka Source和Kafka Consumer设置任意的属性。

  1. client.id.prefix 定义了Kafka消费者客户端ID的前缀
  2. partition.discovery.interval.ms 定义了Kafka Source发现新分区的间隔毫秒数。
  3. register.consumer.metrics 指定是否在Flink度量组中注册KafkaConsumer的度量
  4. commit.offsets.on.checkpoint 指定是否在检查点上将消费偏移量提交给Kafka Broker

请注意,以下键将被构建器覆盖,即使它被设置了:

  1. key.deserializer通常设置为ByteArrayDeserializer
  2. value.deserializer 通常设置为 ByteArrayDeserializer
  3. auto.offset.reset.strategyOffsetsInitializer#getAutoOffsetResetStrategy()覆盖,用于初始偏移量
  4. setBounded(OffsetsInitializer)被调用时,partition.discovery.interval.ms被重写为-1

自动发现分区

为了在不重启Flink作业情况下处理主题向外扩展和主题创建等场景,Kafka Source可以配置为在提供主题分区订阅模式下定期发现新分区。如果要启动分区发现,请为partition.discovery.interval.ms 属性设置一个非负值:

KafkaSource.builder()
    .setProperty("partition.discovery.interval.ms", "10000") // 每10秒查看一次新分区

默认情况下,分区发现是被禁止的。你需要显示地设置分区发现间隔以启动此功能。

事件时间和水位线

默认情况下,记录将使用Kafka ConsumerRecord中嵌入的时间戳作为事件时间。 你可以定义自己的WatermarkStrategy(水位策略)从记录本身提取事件时间,并向下游发送水印:

env.fromSource(kafkaSource, new CustomWatermarkStrategy(), "Kafka Source With Custom Watermark Strategy")

空闲性

如果并行度高于分区的数量,Kafka Source不会进入空闲状态。你需要降低并行度,或者向水印策略添加空闲超时。如果在同一段时间内流的一个分区中没有任何记录流,那么该分区就被认为是“空闲的”,并且不会阻碍下游算子的水印进程。

消费者偏移量提交

Kafka Source在检查点完成时提交当前消费的偏移量,以确保Flink的检查点状态与Kafka Broker上提交的偏移量之间的一致性。

如果检查点没有启用,Kafka Source依赖Kafka消费者内部自动周期偏移量提交逻辑,通过Kafka Consumer属性中的enable.auto.commitauto.commit.interval.ms配置。

注意,Kafka Source不依赖提交的偏移量实现容错。提交偏移量只是为了暴露消费者和消费者组的进度以便监控。

监控

TODO

安全

为了启用包含加密和身份验证在内的 安全配置,你只需要将安全配置设置为Kafka Source的附加属性。

下面代码片段展示了如何配置Kafka Source使用PLAIN作为SASL机制,并提供JAAS配置:

KafkaSource.builder()
    .setProperty("security.protocol", "SASL_PLAINTEXT")
    .setProperty("sasl.mechanism", "PLAIN")
    .setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";");

对于一个更复杂的例子,使用SASL_SSL作为安全协议,使用SCRAM-SHA-256作为SASL机制:

KafkaSource.builder()
    .setProperty("security.protocol", "SASL_SSL")
    // SSL configurations
    // Configure the path of truststore (CA) provided by the server
    .setProperty("ssl.truststore.location", "/path/to/kafka.client.truststore.jks")
    .setProperty("ssl.truststore.password", "test1234")
    // Configure the path of keystore (private key) if client authentication is required
    .setProperty("ssl.keystore.location", "/path/to/kafka.client.keystore.jks")
    .setProperty("ssl.keystore.password", "test1234")
    // SASL configurations
    // Set SASL mechanism as SCRAM-SHA-256
    .setProperty("sasl.mechanism", "SCRAM-SHA-256")
    // Set JAAS configurations
    .setProperty("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"username\" password=\"password\";");

请注意,如果你在作业JAR中重新定位Kafka客户端依赖,那么登陆模块的类路径可能会不同,所以你可能需要用Jar中模块实际类路径重写它。文章来源地址https://www.toymoban.com/news/detail-402099.html

到了这里,关于Flink 之 Kafka连接器的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Flink系列之:Elasticsearch SQL 连接器

    Sink: Batch Sink: Streaming Append Upsert Mode Elasticsearch 连接器允许将数据写入到 Elasticsearch 引擎的索引中。本文档描述运行 SQL 查询时如何设置 Elasticsearch 连接器。 连接器可以工作在 upsert 模式,使用 DDL 中定义的主键与外部系统交换 UPDATE/DELETE 消息。 如果 DDL 中没有定义主键,那么

    2024年02月04日
    浏览(56)
  • Flink系列之:JDBC SQL 连接器

    Scan Source: Bounded Lookup Source: Sync Mode Sink: Batch Sink: Streaming Append Upsert Mode JDBC 连接器允许使用 JDBC 驱动向任意类型的关系型数据库读取或者写入数据。本文档描述了针对关系型数据库如何通过建立 JDBC 连接器来执行 SQL 查询。 如果在 DDL 中定义了主键,JDBC sink 将以 upsert 模式与外

    2024年02月02日
    浏览(48)
  • Flink系列之:Flink CDC深入了解MySQL CDC连接器

    增量快照读取是一种读取表快照的新机制。与旧的快照机制相比,增量快照具有许多优点,包括: (1)在快照读取期间,Source 支持并发读取 (2)在快照读取期间,Source 支持进行 chunk 粒度的 checkpoint (3)在快照读取之前,Source 不需要数据库锁权限。 如果希望 source 并行运

    2024年02月02日
    浏览(49)
  • 流批一体计算引擎-7-[Flink]的DataStream连接器

    参考官方手册DataStream Connectors 一、预定义的Source和Sink 一些比较基本的Source和Sink已经内置在Flink里。 1、预定义data sources支持从文件、目录、socket,以及collections和iterators中读取数据。 2、预定义data sinks支持把数据写入文件、标准输出(stdout)、标准错误输出(stderr)和 sock

    2023年04月08日
    浏览(40)
  • Kafka系列之:连接器客户端配置覆盖策略

    KAFKA引入了每个源连接器和接收器连接器从工作线程属性继承其客户端配置的功能。在工作线程属性中,任何具有“生产者”或“消费者”前缀的配置。分别应用于所有源连接器和接收器连接器。虽然最初的提案允许覆盖源连接器和接收器连接器,但它在允许连接器的不同配

    2024年02月11日
    浏览(58)
  • Kafka系列之:对源连接器的的Exactly-Once支持

    幂等生产者: 写入 Kafka 的应用程序重复记录的一个常见来源是自动生产者重试,其中生产者会在某些情况下将批次重新发送到 Kafka,即使该批次已由代理提交。KIP-98 允许用户将其生产者配置为幂等地执行这些重试,这样下游应用程序只能看到自动重新发送的生产者批次中任

    2024年02月11日
    浏览(45)
  • 【Flink实战】Flink hint更灵活、更细粒度的设置Flink sql行为与简化hive连接器参数设置

    SQL 提示(SQL Hints)是和 SQL 语句一起使用来改变执行计划的。本章介绍如何使用 SQL 提示来实现各种干预。 SQL 提示一般可以用于以下: 增强 planner:没有完美的 planner, SQL 提示让用户更好地控制执行; 增加元数据(或者统计信息):如\\\"已扫描的表索引\\\"和\\\"一些混洗键(shu

    2024年04月25日
    浏览(39)
  • 16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Elasticsearch示例(2)

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年02月11日
    浏览(52)
  • 16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及FileSystem示例(1)

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年02月12日
    浏览(57)
  • 16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及JDBC示例(4)

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年02月10日
    浏览(42)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包