背景
处理函数中处理输出主输出的数据流数据外,也可以输出多个其他的副输出的数据流数据,当我们的处理函数有副输出时,我们需要测试他们功能的正确性,本文就提供一个测试flink副输出单元测试的例子
测试flink副输出单元测试
首先看一下处理函数,其中包含副输出逻辑文章来源:https://www.toymoban.com/news/detail-773275.html
public class MySideOutputProcessFunction extends ProcessFunction<String, String> {
public static final OutputTag<String> OUTPUT_TAG = new OutputTag<String>("sideoutput") {};
@Override
public void processElement(String value, Context ctx, Collector<String> out) throws Exception {
out.collect("normal:" + value);
ctx.output(OUTPUT_TAG, "side:" + value);
}
}
其次,看下对应的单元测试文章来源地址https://www.toymoban.com/news/detail-773275.html
/**
* 测试sideOutput的输出功能
*/
@Test
public void testSideOutput() throws Exception {
MySideOutputProcessFunction mySideOutputProcessFunction = new MySideOutputProcessFunction();
OneInputStreamOperatorTestHarness<String, String> testHarness =
ProcessFunctionTestHarnesses.forProcessFunction(mySideOutputProcessFunction);
testHarness.open();
testHarness.processElement("hello", 10);
// 测试主输出
Assert.assertEquals(Lists.newArrayList("normal:hello"), testHarness.extractOutputValues());
ConcurrentLinkedQueue<StreamRecord<String>> sideOutPutQueue =
testHarness.getSideOutput(MySideOutputProcessFunction.OUTPUT_TAG);
// 测试副输出
Assert.assertEquals(Lists.newArrayList("side:hello"),
sideOutPutQueue.stream().map(StreamRecord::getValue).collect(Collectors.toList()));
testHarness.close();
}
到了这里,关于flink的副输出sideoutput单元测试的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!