【Spring连载】使用Spring访问 Apache Kafka(十九)----Apache Kafka Streams支持

这篇具有很好参考价值的文章主要介绍了【Spring连载】使用Spring访问 Apache Kafka(十九)----Apache Kafka Streams支持。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。


从1.1.4版本开始,Spring for Apache Kafka为Kafka Streams提供了非常好的支持。要从Spring应用程序中使用它,kafka-streams jar必须存在于类路径中。它是Spring for Apache Kafka项目的一个可选依赖项,不会自动下载。

一、基础Basics

参考Apache Kafka Streams文档建议使用该API的方法如下:

// Use the builders to define the actual processing topology, e.g. to specify
// from which input topics to read, which stream operations (filter, map, etc.)
// should be called, and so on.

StreamsBuilder builder = ...;  // when using the Kafka Streams DSL

// Use the configuration to tell your application where the Kafka cluster is,
// which serializers/deserializers to use by default, to specify security settings,
// and so on.
StreamsConfig config = ...;

KafkaStreams streams = new KafkaStreams(builder, config);

// Start the Kafka Streams instance
streams.start();

// Stop the Kafka Streams instance
streams.close();

因此,我们有两个主要组件:

  • StreamsBuilder:使用API构建KStream(或KTable)实例。
  • KafkaStreams:管理这些实例的生命周期。

被单个StreamsBuilder暴露给KafkaStreams实例的所有KStream实例都会同时启动和停止,即使它们具有不同的逻辑。换句话说,StreamsBuilder定义的所有流都与单个生命周期控件绑定。一旦一个KafkaStreams实例被streams.close()关闭,它就无法重新启动。因此,必须创建一个新的KafkaStreams实例来重新启动流处理。

二、Spring管理

为了从Spring应用程序上下文的角度简化使用Kafka Streams,并使用容器进行生命周期管理,Spring for Apache Kafka引入了StreamsBuilderFactoryBean。这是一个AbstractFactoryBean实现,用于将单例StreamsBuilder公开为bean。以下示例创建了这样一个bean:

@Bean
public FactoryBean<StreamsBuilder> myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
    return new StreamsBuilderFactoryBean(streamsConfig);
}

从版本2.2开始,流配置现在作为KafkaStreamsConfiguration对象而不是StreamsConfig提供。
StreamsBuilderFactoryBean还实现了SmartLifecycle来管理内部KafkaStreams实例的生命周期。与Kafka Streams API类似,在启动KafkaStreams之前,必须定义KStream实例。这也适用于Kafka Streams的Spring API。因此,当在StreamsBuilderFactoryBean上使用默认的autoStartup = true时,必须在刷新应用程序上下文之前在StreamsBuilder上声明KStream实例。例如,KStream可以是一个常规的bean定义,而Kafka Streams API的使用也没有任何影响。以下示例显示了如何执行此操作:

@Bean
public KStream<?, ?> kStream(StreamsBuilder kStreamBuilder) {
    KStream<Integer, String> stream = kStreamBuilder.stream(STREAMING_TOPIC1);
    // Fluent KStream API
    return stream;
}

如果你想手动控制生命周期(例如,根据某些条件停止和启动),可以使用工厂bean 加(&)前缀直接引用StreamsBuilderFactoryBean bean。由于StreamsBuilderFactoryBean使用其内部KafkaStreams实例,因此可以安全地停止并重新启动它。每个start()都会创建一个新的KafkaStreams。如果你想单独控制KStream实例的生命周期,也可以考虑使用不同的StreamsBuilderFactoryBean实例。
你还可以在StreamsBuilderFactoryBean上指定“KafkaStreams.StateListener”、“Thread.UncoughtException Handler”和“StateRestoreListener”选项,这些选项被委托给内部KafkaStream实例。此外,除了在StreamsBuilderFactoryBean上间接设置这些选项外,从2.1.5版本开始,你还可以使用KafkaStreamsCustomizer回调接口来配置内部KafkaStreams实例。请注意,KafkaStreamsCustomizer会覆盖StreamsBuilderFactoryBean提供的选项。如果你需要直接执行一些KafkaStreams操作,你可以使用StreamsBuilderFactoryBean.getKafkaStreams()访问该内部KafkaStreams实例。你可以按类型自动装配StreamsBuilderFactoryBean bean,但应确保在bean定义中使用完整类型,如下例所示:

@Bean
public StreamsBuilderFactoryBean myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
    return new StreamsBuilderFactoryBean(streamsConfig);
}
...
@Autowired
private StreamsBuilderFactoryBean myKStreamBuilderFactoryBean;

或者,如果使用接口bean定义,可以为按名称注入添加@Qualifier。下面的示例展示了如何这样做:

@Bean
public FactoryBean<StreamsBuilder> myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
    return new StreamsBuilderFactoryBean(streamsConfig);
}
...
@Autowired
@Qualifier("&myKStreamBuilder")
private StreamsBuilderFactoryBean myKStreamBuilderFactoryBean;

工厂bean有一个infrastructureCustomizer属性,类型为KafkaStreamsInfrastructureCustomizer;这允许在创建流之前自定义Topology 和 StreamsBuilder(例如添加状态存储)。

public interface KafkaStreamsInfrastructureCustomizer {

	void configureBuilder(StreamsBuilder builder);

	void configureTopology(Topology topology);

}

默认情况下,框架提供了无任何操作实现,以避免在其中一个方法不需要的情况下必须同时实现这两种方法。
框架提供了CompositeKafkaStreamsInfrastructureCustomizer,用于需要应用多个自定义程序。

三、KafkaStreams Micrometer支持

你可以配置一个KafkaStreamsMicrometerListener来自动注册由factory bean管理的KafkaStreams 对象的micrometer meters:

streamsBuilderFactoryBean.addListener(new KafkaStreamsMicrometerListener(meterRegistry,
        Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));

四、Streams JSON 序列化和反序列化

为了在读取或写入JSON格式的主题或状态存储时序列化和反序列化数据,Spring For Apache Kafka提供了一个使用JSON的JsonSerde实现,将其委托给序列化、反序列化和消息转换中描述的JsonSerializer和JsonDeserializer。JsonSerde实现通过其构造函数(目标类型或ObjectMapper)提供相同的配置选项。在下面的例子中,我们使用JsonSerde来序列化和反序列化Kafka流的Cat payload(JsonSerde 可以以类似的方式用于任何需要实例的地方):

stream.through(Serdes.Integer(), new JsonSerde<>(Cat.class), "cats");

当以编程方式构造用于生产者/消费者工厂的序列化器/反序列化器时,您可以使用fluent API,这简化了配置。

stream.through(new JsonSerde<>(MyKeyType.class)
        .forKeys()
        .noTypeInfo(),
    new JsonSerde<>(MyValueType.class)
        .noTypeInfo(),
    "myTypes");

五、使用Kafka流分支器KafkaStreamBrancher

KafkaStreamBrancher类引入了一种更方便的方法来在KStream之上构建条件分支。先看下面这个不使用KafkaStreamBrancher的例子:

KStream<String, String>[] branches = builder.stream("source").branch(
      (key, value) -> value.contains("A"),
      (key, value) -> value.contains("B"),
      (key, value) -> true
     );
branches[0].to("A");
branches[1].to("B");
branches[2].to("C");

以下例子使用了KafkaStreamBrancher:

new KafkaStreamBrancher<String, String>()
   .branch((key, value) -> value.contains("A"), ks -> ks.to("A"))
   .branch((key, value) -> value.contains("B"), ks -> ks.to("B"))
   //default branch should not necessarily be defined in the end of the chain!
   .defaultBranch(ks -> ks.to("C"))
   .onTopOf(builder.stream("source"));
   //onTopOf method returns the provided stream so we can continue with method chaining

六、配置Configuration

要配置Kafka Streams环境,StreamsBuilderFactoryBean需要一个KafkaStreamsConfiguration实例。有关所有可能的选项,请参阅Apache Kafka文档。
流配置现在作为KafkaStreamsConfiguration 对象提供,而不是作为StreamsConfig提供。
为了避免在大多数情况下使用样板代码,特别是在开发微服务时,Spring for Apache Kafka提供了@EnableKafkaStreams注释,你应该将其放在@Configuration类中。你只需要声明一个名为defaultKafkaStreamsConfig的KafkaStreamsConfiguration bean。在应用程序上下文中自动声明一个名为defaultKafkaStreamsBuilder的StreamsBuilderFactoryBean bean。你也可以声明和使用任何额外的StreamsBuilderFactoryBean bean。您可以通过提供一个实现StreamsBuilderFactoryBeanConfigurer的bean来执行该bean的额外定制。如果有多个这样的bean,则将根据它们的Ordered.order属性应用它们。
默认情况下,当工厂bean停止时,会调用KafkaStreams.cleanUp()方法。从2.1.2版本开始,工厂bean有额外的构造函数,使用CleanupConfig对象,该对象具有属性可以控制在start()或stop()期间调用cleanUp()方法,或者两者都不调用。从2.7版本开始,默认情况是从不清理本地状态。

七、Header Enricher

3.0版本增加了ContextualProcessor的子类HeaderEnricherProcessor;提供与弃用的实现Transformer接口的HeaderEnricher相同的功能。这可以用于在流处理中添加头;报头值是SpEL表达式;表达式求值的根对象具有3个属性:

  • record ——org.apache.kafka.streams.processor.api.Record(key, value, timestamp, headers)
  • key——当前记录的键
  • value——当前记录的值
  • context——ProcessorContext,允许访问当前记录元数据
    表达式必须返回byte[]或String(将使用UTF-8将其转换为byte[])。
    要在流中使用enricher:
.process(() -> new HeaderEnricherProcessor(expressions))

processor不改变键或值;它只是添加标头。
你需要为每条记录创建一个新实例。

.process(() -> new HeaderEnricherProcessor<..., ...>(expressionMap))

下面是一个简单的例子,添加一个文字头和一个变量:

Map<String, Expression> headers = new HashMap<>();
headers.put("header1", new LiteralExpression("value1"));
SpelExpressionParser parser = new SpelExpressionParser();
headers.put("header2", parser.parseExpression("record.timestamp() + ' @' + record.offset()"));
ProcessorSupplier supplier = () -> new HeaderEnricher<String, String> enricher = new HeaderEnricher<>(headers);
KStream<String, String> stream = builder.stream(INPUT);
stream
        .process(() -> supplier)
        .to(OUTPUT);

八、MessagingProcessor

3.0版本增加了ContextualProcessor的子类MessagingProcessor;提供与已弃用的MessagingTransformer相同的功能,后者实现了已弃用的Transformer接口。这允许Kafka Streams topology 与Spring Messaging组件交互,例如Spring Integration flow。转换器需要MessagingFunction的实现。

@FunctionalInterface
public interface MessagingFunction {

	Message<?> exchange(Message<?> message);

}

Spring Integration使用其GatewayProxyFactoryBean自动提供实现。它还需要一个MessagingMessageConverter来将键、值和元数据(包括头)转换为Spring Messaging Message<?>。请参阅以下示例[从KStream调用Spring Integration Flow]:

@Bean
public KStream<byte[], byte[]> kStream(StreamsBuilder kStreamBuilder,
        MessagingTransformer<byte[], byte[], byte[]> transformer)  transformer) {
    KStream<byte[], byte[]> stream = kStreamBuilder.stream(STREAMING_TOPIC1);
    stream.mapValues((ValueMapper<byte[], byte[]>) String::toUpperCase)
            ...
            .transform(() -> transformer)
            .to(streamingTopic2);

    stream.print(Printed.toSysOut());

    return stream;
}

@Bean
@DependsOn("flow")
public MessagingTransformer<byte[], byte[], String> transformer(
        MessagingFunction function) {

    MessagingMessageConverter converter = new MessagingMessageConverter();
    converter.setHeaderMapper(new SimpleKafkaHeaderMapper("*"));
    return new MessagingTransformer<>(function, converter);
}

@Bean
public IntegrationFlow flow() {
    return IntegrationFlow.from(MessagingFunction.class)
        ...
        .get();
}

九、从反序列化异常中恢复Recovery from Deserialization Exceptions

版本2.3引入了RecoveringDeserializationExceptionHandler,它可以在发生反序列化异常时采取一些操作。请参阅有关DeserializationExceptionHandler的Kafka文档,RecoveringDeserializationExceptionHandler就是其中的一个实现。RecoveringDeserializationExceptionHandler是使用ConsumerRecordRecoverer实现配置的。该框架提供了DeadLetterPublishingRecoverer,它将失败的记录发送到死信主题。有关此恢复器的详细信息,请参阅发布死信记录。
要配置恢复器,请将以下属性添加到流配置中:

@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
    Map<String, Object> props = new HashMap<>();
    ...
    props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
            RecoveringDeserializationExceptionHandler.class);
    props.put(RecoveringDeserializationExceptionHandler.KSTREAM_DESERIALIZATION_RECOVERER, recoverer());
    ...
    return new KafkaStreamsConfiguration(props);
}

@Bean
public DeadLetterPublishingRecoverer recoverer() {
    return new DeadLetterPublishingRecoverer(kafkaTemplate(),
            (record, ex) -> new TopicPartition("recovererDLQ", -1));
}

当然,recoverer() bean可以是你自己的ConsumerRecordRecoverer实现。

十、Kafka Streams示例

下面的例子结合了本文所涉及的所有主题:文章来源地址https://www.toymoban.com/news/detail-831228.html

@Configuration
@EnableKafka
@EnableKafkaStreams
public static class KafkaStreamsConfig {

    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    public KafkaStreamsConfiguration kStreamsConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testStreams");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
        return new KafkaStreamsConfiguration(props);
    }

    @Bean
    public StreamsBuilderFactoryBeanConfigurer configurer() {
        return fb -> fb.setStateListener((newState, oldState) -> {
            System.out.println("State transition from " + oldState + " to " + newState);
        });
    }

    @Bean
    public KStream<Integer, String> kStream(StreamsBuilder kStreamBuilder) {
        KStream<Integer, String> stream = kStreamBuilder.stream("streamingTopic1");
        stream
                .mapValues((ValueMapper<String, String>) String::toUpperCase)
                .groupByKey()
                .windowedBy(TimeWindows.of(Duration.ofMillis(1000)))
                .reduce((String value1, String value2) -> value1 + value2,
                		Named.as("windowStore"))
                .toStream()
                .map((windowedId, value) -> new KeyValue<>(windowedId.key(), value))
                .filter((i, s) -> s.length() > 40)
                .to("streamingTopic2");

        stream.print(Printed.toSysOut());

        return stream;
    }

}

到了这里,关于【Spring连载】使用Spring访问 Apache Kafka(十九)----Apache Kafka Streams支持的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Kerberos安全认证-连载12-Kafka Kerberos安全配置及访问

    目录 1. Kafka配置Kerberos 2. 客户端操作Kafka ​​​​​​​3. Java API操作Kafka 4. StructuredStreaming操作Kafka 5. Flink 操作Kafka 技术连载系列,前面内容请参考前面连载11内容:​​​​​​​​​​​​​​Kerberos安全认证-连载11-HBase Kerberos安全配置及访问_IT贫道的博客-CSDN博客 Kafk

    2024年02月12日
    浏览(54)
  • Spring for Apache Kafka Deep Dive – Part 2: Apache Kafka and Spring Cloud Stream

    On the heels of part 1 in this blog series, Spring for Apache Kafka – Part 1: Error Handling, Message Conversion and Transaction Support, here in part 2 we’ll focus on another project that enhances the developer experience when building streaming applications on Kafka: Spring Cloud Stream. We will cover the following in this post: Overview of Spring Clo

    2024年02月19日
    浏览(44)
  • 第八篇——Kafka Streams源码解读

    作者:禅与计算机程序设计艺术 Kafka Streams是一个开源分布式流处理平台,它可以让你轻松处理实时数据流。通过Kafka Streams API可以轻松创建、部署和运行复杂的实时流处理应用程序。虽然Kafka Stream提供了许多高级功能,但其底层原理却十分简单易懂,在学习之余,我们还是需

    2024年02月07日
    浏览(43)
  • Spring for Apache Kafka概述和简单入门

    Spring for Apache Kafka 的高级概述以及底层概念和可运行的示例代码。 注意:进行工作开始之前至少要有一个 Apache Kafka 环境 使用 Spring Boot 使用 Spring Boot 时,省略版本,Boot 将自动引入与您的 Boot 版本兼容的正确版本 使用 Spring 使用Spring 时必须要申明使用的版本。 Apache Kafka 客户

    2024年02月09日
    浏览(38)
  • 通过源代码修改使 Apache Hudi 支持 Kerberos 访问 Hive 的功能

    本文档主要用于阐释如何基于 Hudi 0.10.0 添加支持 Kerberos 认证权限的功能。 主要贡献: 针对正在使用的 Hudi 源代码进行 Kerberos-support 功能扩展,总修改规模囊括了 12 个文件约 20 处代码共计 约 200 行代码; 对 Hudi 0.10.0 的源代码进行了在保持所有自定义特性的基础上,支持了

    2024年02月14日
    浏览(42)
  • Spring Boot与Apache Kafka实现高吞吐量消息处理:解决大规模数据处理问题

    现代数据量越来越庞大对数据处理的效率提出了更高的要求。Apache Kafka是目前流行的分布式消息队列之一。Spring Boot是现代Java应用程序快速开发的首选框架。综合使用Spring Boot和Apache Kafka可以实现高吞吐量消息处理。 Apache Kafka采用分布式发布-订阅模式具有高度的可扩展性和可

    2024年02月05日
    浏览(52)
  • Apache Flink连载(十四):Flink 本地模式开启WebUI

    🏡 个人主页:IT贫道_大数据OLAP体系技术栈,Apache Doris,Clickhouse 技术-CSDN博客  🚩 私聊博主:加入大数据技术讨论群聊,获取更多大数据资料。  🔔 博主个人B栈地址:豹哥教你大数据的个人空间-豹哥教你大数据个人主页-哔哩哔哩视频 目录

    2024年02月03日
    浏览(44)
  • [Kafka集群] 配置支持Brokers内部SSL认证\外部客户端支持SASL_SSL认证并集成spring-cloud-starter-bus-kafka

    目录 Kafka 集群配置 准备 配置流程 Jaas(Java Authentication and Authorization Service )文件 zookeeper 配置文件 SSL自签名 启动zookeeper集群 启动kafka集群  spring-cloud-starter-bus-kafka 集成 下载统一版本Kafka服务包至三台不同的服务器上 文章使用版本为  kafka_2.13-3.5.0.tgz 下载地址 jdk版本 为 Ado

    2024年02月04日
    浏览(56)
  • 【Spring Cloud】如何把Feign默认的HTTP客户端URLConnection更换成支持连接池的Apache HttpClient或OKHttp

    本次示例代码的文件结构如下图所示。 Feign 发送 HTTP 请求时,底层会使用到别的客户端。下面列出常用的 3 种 HTTP 客户端。 HTTP客户端 特点 URLConnection Feign 的默认实现,不支持连接池 Apache HttpClient 支持连接池 OKHttp 支持连接池 其中, URLConnection 是 Feign 默认使用的 HTTP 客户端

    2024年02月14日
    浏览(53)
  • Apache Flink连载(十八):Flink On Yarn运行原理及环境准备

     🏡 个人主页:IT贫道-CSDN博客  🚩 私聊博主:私聊博主加WX好友,获取更多资料哦~  🔔 博主个人B栈地址:豹哥教你学编程的个人空间-豹哥教你学编程个人主页-哔哩哔哩视频 目录 1. Flink On Yarn运行原理

    2024年02月03日
    浏览(40)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包