使用 Spring Kafka 进行非阻塞重试的集成测试

这篇具有很好参考价值的文章主要介绍了使用 Spring Kafka 进行非阻塞重试的集成测试。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

​Kafka的非阻塞重试是通过为主题配置重试主题来实现的。如果需要,还可以配置额外的死信主题。如果所有重试都耗尽,事件将被转发到DLT。在公共领域中有很多资源可用于了解技术细节。对于代码中的重试机制编写集成测试确实是一项具有挑战性的工作。以下是一些测试方法,可以用来验证重试机制的正确性:

  1. 验证事件已经按照所需的次数进行了重试:
  • 在测试中,模拟一个会触发重试的事件,并设置重试次数为所需的次数。

  • 使用断言来验证事件是否被重试了指定的次数。

  1. 验证只有在特定的异常发生时才进行重试,而不是其他异常:
  • 在测试中,模拟不同的异常情况,包括需要重试的异常和不需要重试的异常。

  • 使用断言来验证只有特定的异常触发了重试,而其他异常没有触发重试。

  1. 验证如果前一次重试已经解决了异常,不会进行另一次重试:
  • 在测试中,模拟一个会触发重试的事件,并在每次重试之间解决异常。

  • 使用断言来验证只有在异常没有被解决的情况下才进行重试。

  1. 验证在前面的 (n-1) 次重试失败后,第 n 次重试成功:
  • 在测试中,模拟一个会触发重试的事件,并设置重试次数为 n。

  • 使用断言来验证在前面的 (n-1) 次重试失败后,第 n 次重试成功。

  1. 验证如果所有的重试尝试都失败,事件是否已经发送到了死信队列:
  • 在测试中,模拟一个会触发重试的事件,并设置重试次数为一个较小的值。
  • 使用断言来验证当所有的重试尝试都失败后,事件是否已经发送到了死信队列。

设置可重试的消费者

@Slf4j
@Component
@RequiredArgsConstructor
public class CustomEventConsumer {
​
    private final CustomEventHandler handler;
​
    @RetryableTopic(attempts = "${retry.attempts}",
            backoff = @Backoff(
                    delayExpression = "${retry.delay}",
                    multiplierExpression = "${retry.delay.multiplier}"
            ),
            topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE,
            dltStrategy = FAIL_ON_ERROR,
            autoStartDltHandler = "true",
            autoCreateTopics = "false",
            include = {CustomRetryableException.class})
    @KafkaListener(topics = "${topic}", id = "${default-consumer-group:default}")
    public void consume(CustomEvent event, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
        try {
            log.info("Received event on topic {}", topic);
            handler.handleEvent(event);
        } catch (Exception e) {
            log.error("Error occurred while processing event", e);
            throw e;
        }
    }
​
    @DltHandler
    public void listenOnDlt(@Payload CustomEvent event) {
        log.error("Received event on dlt.");
        handler.handleEventFromDlt(event);
    }
​
}

如果您注意上面的代码片段,参数@RetryableTopic中包含includes。这告诉消费者只在方法抛出CustomRetryableException时进行重试。您可以添加任意数量的异常类型。还有一个exclude参数,但一次只能使用其中一个。在将事件发布到死信队列之前,事件处理最多应重试指定的次数。

设置测试基础设施

为了编写集成测试,您需要确保拥有一个正常运行的Kafka代理(最好是嵌入式的)和一个完全运行的发布者。让我们设置我们的基础设施:

@EnableKafka
@SpringBootTest
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_CLASS)
@EmbeddedKafka(partitions = 1,
        brokerProperties = {"listeners=" + "${kafka.broker.listeners}", 
                            "port=" + "${kafka.broker.port}"},
        controlledShutdown = true,
        topics = {"test", "test-retry-0", "test-retry-1", "test-dlt"}
)
@ActiveProfiles("test")
class DocumentEventConsumerIntegrationTest {
​
  @Autowired
  private KafkaTemplate<String, CustomEvent> testKafkaTemplate;
​
​
    // tests
​
}

配置从application-test.yml文件中导入。当使用嵌入式Kafka代理时,重要的是要提及要创建的主题。它们不会自动创建。在这种情况下,我们将创建四个主题,分别是:

"test", "test-retry-0", "test-retry-1", "test-dlt"

我们将最大重试次数设置为三次。每个主题对应于每次重试尝试。因此,如果三次重试都耗尽,事件应该被转发到DLT(死信队列)。

测试用例

如果在第一次尝试中成功消费,就不应该进行重试。可以通过方法只被调用一次来测试这一点。还可以添加对日志语句的进一步测试。

 @Test
    void test_should_not_retry_if_consumption_is_successful() throws ExecutionException, InterruptedException {
        CustomEvent event = new CustomEvent("Hello");
        // GIVEN
        doNothing().when(customEventHandler).handleEvent(any(CustomEvent.class));
​
        // WHEN
        testKafkaTemplate.send("test", event).get();
​
        // THEN
        verify(customEventHandler, timeout(2000).times(1)).handleEvent(any(CustomEvent.class));
        verify(customEventHandler, timeout(2000).times(0)).handleEventFromDlt(any(CustomEvent.class));
    }

如果引发了不可重试的异常,就不应该进行重试。在这种情况下,方法CustomEventHandler#handleEvent应该只被调用一次。

 @Test    void test_should_not_retry_if_non_retryable_exception_raised() throws ExecutionException, InterruptedException {        CustomEvent event = new CustomEvent("Hello");        // GIVEN        doThrow(CustomNonRetryableException.class).when(customEventHandler).handleEvent(any(CustomEvent.class));​        // WHEN        testKafkaTemplate.send("test", event).get();​        // THEN        verify(customEventHandler, timeout(2000).times(1)).handleEvent(any(CustomEvent.class));        verify(customEventHandler, timeout(2000).times(0)).handleEventFromDlt(any(CustomEvent.class));    }

如果抛出了RetryableException,则应该按照配置的最大重试次数进行重试,当重试次数耗尽时,事件应该被发布到死信主题。在这种情况下,方法CustomEventHandler#handleEvent应该被调用三次(maxRetries次),而方法CustomEventHandler#handleEventFromDlt应该只被调用一次。

 @Test
    void test_should_not_retry_if_non_retryable_exception_raised() throws ExecutionException, InterruptedException {
        CustomEvent event = new CustomEvent("Hello");
        // GIVEN
        doThrow(CustomNonRetryableException.class).when(customEventHandler).handleEvent(any(CustomEvent.class));
​
        // WHEN
        testKafkaTemplate.send("test", event).get();
​
        // THEN
        verify(customEventHandler, timeout(2000).times(1)).handleEvent(any(CustomEvent.class));
        verify(customEventHandler, timeout(2000).times(0)).handleEventFromDlt(any(CustomEvent.class));
    }

在验证阶段添加了相当长的超时时间,以便在测试完成之前考虑指数退避延迟。这是很重要的,如果没有正确设置,可能会导致断言失败。应该重试直到RetryableException被解决,并且如果引发了不可重试的异常或者最终成功消费,就不应该继续重试。测试已经设置为首先抛出RetryableException,然后再抛出NonRetryableException,以便进行一次重试。

@Test
    void test_should_retry_until_retryable_exception_is_resolved_by_non_retryable_exception() throws ExecutionException,
            InterruptedException {
        CustomEvent event = new CustomEvent("Hello");
        // GIVEN
        doThrow(CustomRetryableException.class).doThrow(CustomNonRetryableException.class).when(customEventHandler).handleEvent(any(CustomEvent.class));
​
        // WHEN
        testKafkaTemplate.send("test", event).get();
​
        // THEN
        verify(customEventHandler, timeout(10000).times(2)).handleEvent(any(CustomEvent.class));
        verify(customEventHandler, timeout(2000).times(0)).handleEventFromDlt(any(CustomEvent.class));
    }ndleEventFromDlt(any(CustomEvent.class));    }
 @Test
    void test_should_retry_until_retryable_exception_is_resolved_by_successful_consumption() throws ExecutionException,
            InterruptedException {
        CustomEvent event = new CustomEvent("Hello");
        // GIVEN
        doThrow(CustomRetryableException.class).doNothing().when(customEventHandler).handleEvent(any(CustomEvent.class));
​
        // WHEN
        testKafkaTemplate.send("test", event).get();
​
        // THEN
        verify(customEventHandler, timeout(10000).times(2)).handleEvent(any(CustomEvent.class));
        verify(customEventHandler, timeout(2000).times(0)).handleEventFromDlt(any(CustomEvent.class));
        }

结论

因此,您可以看到集成测试是一种混合和匹配的策略,超时时间,延迟和验证,以确保您的Kafka事件驱动架构的重试机制是可靠的。

作者: Mukut Bhattacharjee

更多技术干货请关注公众号“云原生数据库

squids.cn,目前可体验全网zui低价RDS,免费的迁移工具DBMotion、SQL开发工具等文章来源地址https://www.toymoban.com/news/detail-683258.html

到了这里,关于使用 Spring Kafka 进行非阻塞重试的集成测试的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 接口请求重试的8种方法

    日常业务开发过程中,可能第三方的服务器分布在世界的各个角落,所以请求三方接口的时候,难免会遇到一些网络问题,这时候需要加入重试机制了,这期就给大家分享几个接口重试的写法。   这是最简单也最直接的一种方式。在请求接口的代码块中加入循环,如果请求失

    2024年02月04日
    浏览(48)
  • 大厂都是怎么做Redis重试的?

    潜心打造国内一流,国际领先的技术干货。 文章收录在我的 GitHub 仓库,欢迎Star/fork: JavaEdge-Interview 受网络和运行环境影响,应用程序可能遇到暂时性故障,如瞬时网络抖动、服务暂时不可用、服务繁忙导致超时等。 自动重试机制可大幅避免此类故障,保障操作成功执行。

    2024年02月05日
    浏览(71)
  • docker安装kafka,并集成springboot进行测试

    大家好,今天我们开始学习kafka中间件,今天我们改变一下策略,不刷视频学习,改为实践学习,在网上找一些案例功能去做,来达到学习实践的目的。 首先,是安装相关组件。 1. docker安装 安装 1.1 yum-utils软件包 1.2 设置阿里云镜像 1.3 安装docker 1.4 启动docker 1.5 测试 至此

    2023年04月25日
    浏览(40)
  • java阻塞队列/kafka/spring整合kafka

    queue增加删除元素 增加元素 add方法在添加元素的时候,若超出了度列的长度会直接抛出异常: put方法,若向队尾添加元素的时候发现队列已经满了会发生阻塞一直等待空间,以加入元素 offer方法在添加元素时,如果发现队列已满无法添加的话,会直接返回false 删除元素 pol

    2024年02月12日
    浏览(46)
  • 使用Spring Boot集成中间件:Kafka的高级使用案例讲解

    在实际应用中,Kafka作为一种强大的分布式消息系统,广泛应用于实时数据处理和消息传递。本文将通过一个全面的使用案例,详细介绍如何使用Spring Boot集成Kafka,并展示其在实际场景中的应用。 在开始之前,我们需要确保已经完成以下准备工作: 安装并启动Kafka集群 创建

    2024年02月01日
    浏览(54)
  • kafka--技术文档--spring-boot集成基础简单使用

            查阅了很多资料了解到,使用了spring-boot中整合的kafka的使用是被封装好的。也就是说这些使用其实和在linux中的使用kafka代码的使用其实没有太大关系。但是逻辑是一样的。这点要注意! 核心配置为: 如果在下面规定了spring-boot的版本那么就不需要再使用版本号,如

    2024年02月11日
    浏览(48)
  • 如何使用Java进行集成测试?

    在Java中进行集成测试有很多种方法,以下介绍一种比较常见的基于JUnit框架的集成测试方法: 确定需要测试的代码 首先需要确定需要进行集成测试的代码,可以是整个应用程序,也可以是特定的模块或者方法。 配置测试环境 在测试环境中创建测试数据库、配置文件、mock对

    2024年02月11日
    浏览(41)
  • 使用TestContainers在Docker中进行集成测试

    现代软件应用很少独立工作。典型的应用程序会与几个外部系统进行通信,如: 数据库、 消息系统、 缓存提供商 其他第三方服务。 你应该编写测试确保一切正常运行。 单元测试 有助于隔离地测试业务逻辑,不涉及任何外部服务。它们易于编写并提供几乎即时的反馈。 有了

    2024年02月08日
    浏览(40)
  • SpringBoot 如何使用 TestEntityManager 进行 JPA 集成测试, 如何使用

    Spring Boot 是一个非常流行的 Java Web 开发框架,它简化了开发过程,提高了开发效率。在开发过程中,我们通常需要使用 JPA 操作数据库,为了保证代码的质量和正确性,我们需要进行集成测试。TestEntityManager 是 Spring Boot 提供的用于 JPA 集成测试的工具,它可以模拟 EntityManag

    2024年02月13日
    浏览(69)
  • SpringBoot 如何使用 MockMvc 进行 Web 集成测试

    SpringBoot 是一个流行的 Java Web 开发框架,它提供了一些强大的工具和库,使得开发 Web 应用程序变得更加容易。其中之一是 MockMvc,它提供了一种测试 SpringBoot Web 应用程序的方式,可以模拟 HTTP 请求和响应的行为。 在本文中,我们将介绍 SpringBoot 中的 MockMvc,演示如何使用它

    2024年02月16日
    浏览(48)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包