flink对状态ttl进行单元测试

这篇具有很好参考价值的文章主要介绍了flink对状态ttl进行单元测试。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

背景

在处理键值分区状态时,使用ttl设置过期时间是我们经常使用的,但是任何代码的修改都需要首先进行单元测试,本文就使用单元测试来验证一下状态ttl的设置是否正确

测试状态ttl超时的单元测试

首先看一下处理函数:

// 处理函数
public class MyStateProcessFunction extends KeyedProcessFunction<String, String, String> {
 
    // 键值分区状态
    ValueState<String> previousInput;
 
    @Override
    public void open(Configuration parameters) throws Exception {
        ValueStateDescriptor stateDescriptor = new ValueStateDescriptor<String>("previousInput", Types.STRING);
        // 状态ttl超时时间设置
        StateTtlConfig ttlConfig =
                StateTtlConfig.newBuilder(Time.minutes(1)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
                        .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
                        // check 10 keys for every state access
                        .cleanupIncrementally(10, false).build();
        stateDescriptor.enableTimeToLive(ttlConfig);
        previousInput = getRuntimeContext().getState(stateDescriptor);
    }
 
    @Override
    public void processElement(String in, Context context, Collector<String> collector) throws Exception {
        context.timerService().registerProcessingTimeTimer(100);
        String out = (Objects.nonNull(previousInput.value()) ? previousInput.value() : "") + in;
        collector.collect(out);
        if (!in.contains("NotUpdate")) {// 为了模仿有访问状态,但是不更新状态,正常情况下业务逻辑是访问其他key组的其它state,而一直没有访问的key的状态会在超时时间到之后被清理掉
            previousInput.update(in);
        }
    }
 
    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
        if (Objects.nonNull(previousInput.value())) {
            out.collect(String.format("timer trigger %s", previousInput.value()));
        } else {
            out.collect(String.format("timer trigger state clear", previousInput.value()));
        }
    }
 
}

单元测试代码:

/**
 * 测试状态处理函数,包含状态的ttl配置,以及ontimer方法
 **/
@Test
public void testKeyedStateProcessFunction() throws Exception {
    MyStateProcessFunction myStateProcessFunction = new MyStateProcessFunction();
    OneInputStreamOperatorTestHarness<String, String> testHarness =
            ProcessFunctionTestHarnesses.forKeyedProcessFunction(myStateProcessFunction, x -> "1", Types.STRING);
    testHarness.open();
    testHarness.processElement("hello", 10);
    // 注册了一个定时器,定时器100后过期
    Assert.assertEquals(1, testHarness.numProcessingTimeTimers());
    // 测试输出
    Assert.assertEquals(Lists.newArrayList("hello"), testHarness.extractOutputValues());
    ValueState<String> previousInput = myStateProcessFunction.getRuntimeContext()
            .getState(new ValueStateDescriptor<>("previousInput", Types.STRING));
    // 查看下状态应该已经被设置
    Assert.assertEquals("hello", previousInput.value());
 
    testHarness.processElement("world", 10);
    // 再次测试输出
    Assert.assertEquals(Lists.newArrayList("hello", "helloworld"), testHarness.extractOutputValues());
    // 再次查看下状态应该已经被设置
    Assert.assertEquals("world", previousInput.value());
 
    // 设置时间为1分钟,让状态超时
    testHarness.setStateTtlProcessingTime(Time.minutes(1).toMilliseconds());
    // 触发下状态访问,这样flink就会清理,正常生产中不需要这一步,访问状态本来就一直在进行中,只是可能是其他key分组的状态
    testHarness.processElement("NotUpdate1", System.currentTimeMillis());
    // 查看下状态应该已经被清理
    Assert.assertNull(previousInput.value());
 
    // 设置让定时器过期,顺带确认下状态已经被清理
    testHarness.setProcessingTime(100);
 
    // 测试输出(包含两个输入+一个定时器的输出)
    Assert.assertEquals(Lists.newArrayList("hello", "helloworld", "NotUpdate1", "timer trigger state clear"),
            testHarness.extractOutputValues());
    testHarness.close();
}

测试代码中已经包含了详细的注解,我们实现自己的ttl单元测试时可以参考下文章来源地址https://www.toymoban.com/news/detail-753564.html

到了这里,关于flink对状态ttl进行单元测试的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【实战】一次简单的log4j漏洞测试

    更新时间:2021.12.19 参考文档:https://www.yuque.com/u8021480/crow/dg9xax 在去年 log4j 漏洞刚爆发的时候,很多平台都存在漏洞,当时也在第一时间在有授权的情况下对某论坛进行了渗透测试,结果发现存在漏洞,报告之后,漏洞也被很快修复。 本次对该渗透过程进行一个简单的记录

    2023年04月26日
    浏览(49)
  • SpringBoot初级开发--加入Log4j进行日志管理打印(6)

      日志记录在整个java工程开发中占着很重要的比重,因为很多问题的排查需要通过日志分析才能确认。在SpringBoot中我用得最多的就是log4j这个日志框架。接下来我们具体配置log4j.   log4j定义了8个级别的log(除去OFF和ALL,可以说分为6个级别),优先级从高到低依次为:

    2024年02月11日
    浏览(57)
  • JAVA中使用log4j及slf4j进行日志输出的方法

    JAVA中输出日志比较常用的是log4j,这里讲下log4j的配置和使用方法,以及slf4j的使用方法。 一、下载log4j的架包,并导入项目中,如下: 二、创建log4j.properties配置文件 1、log4j配置文件的位置: (1)如果是java project项目,则在项目的根目录下创建log4j.properties而不是在src目录下

    2024年02月07日
    浏览(46)
  • Springboot配置Log4j日志系统,并将日志存入数据库

    Log4j是apache公司开发的一款日志管理系统,可以高效的管理系统中出现的BUG或者各种信息,并且可以已文本的方式或者数据库存入的方式来记录数据 在pom.xml中导入Log4j依赖 在Resources文件夹下创建一个log4j.properties文件 编写配置文件 这是个测试类 可以看见,控制台和数据库表

    2024年02月08日
    浏览(94)
  • 【flink番外篇】10、对有状态或及时 UDF 和自定义算子进行单元测试

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年02月02日
    浏览(53)
  • elasticsearch-7.13.3 升级log4j 到log4j-2.17.1

    1、升级原因 log4j低版本存在严重漏洞,根据需要升级到安全版本,不一定是最新。 log4j-2.17.1 jar包下载地址https://archive.apache.org/dist/logging/log4j/2.17.1/ 2、下载后解压apache-log4j-2.17.1-bin.tar.gz 升级需要用到截图中四个jar包 3、升级 删除旧版本log4j 进入elasticsearch-7.13.3目录 $ rm -rf l

    2024年02月12日
    浏览(53)
  • Log4J

    为什么要用日志? -- 方便调试代码 什么时候用?什么时候不用? ​ 出错调试代码时候用 生产环境下就不需要,就需要删除 怎么用? -- 输出语句 ​ log4j是Apache的一个开放源代码的项目,通过使用log4j,我们可以控制日志信息输送的目的地是控制台、文件、GUI组件、甚至是套接口服

    2024年02月08日
    浏览(43)
  • 【日志加载 log4j】

    2.编写配置 3.获取日志对象 4.1 Loggers 记录器 4.2 Appenders 输出源 4.3 Layouts 布局 5. 配置文件 log4j.properties

    2024年02月11日
    浏览(91)
  • Log4j源码解析

    Log4j源码解析 主要流程 Logger logger = Logger.getLogger(Main.class); 1、通过Logger.getLogger(Class clazz) 或 Logger.getLogger(String name)进入。 2、加载LogManager进jvm, 执行静态代码块执行初始化, 创建出RepositorySelector实例及LoggerRepository实例(Hierarchy)。 3、调用OptionConverter.selectAndConfigure(URL url, String

    2024年02月15日
    浏览(49)
  • log4j漏洞详解

    log4j全名就是(log for java),就是apache的一个开源的日志记录组件 ,它在Java项目中使用的比较广泛。 使用方法:                 1.pom引入依赖                 2.获取logger实例                 3.logger.info() debug() error() warn()... 优点:功能丰富,易于集成

    2024年02月16日
    浏览(44)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包