flink处理函数--副输出功能

这篇具有很好参考价值的文章主要介绍了flink处理函数--副输出功能。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

背景

在flink中,如果你想要访问记录的处理时间或者事件时间,注册定时器,或者是将记录输出到多个输出流中,你都需要处理函数的帮助,本文就来通过一个例子来讲解下副输出

副输出

本文还是基于streaming-with-flink这本书的例子作为演示,它实现一个把温度低于32度的记录输出到副输出的功能,正常的记录还是从主输出中输出.代码如下:

package wikiedits.processfunc.job;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.OutputTag;

import wikiedits.processfunc.pojo.SensorReading;
import wikiedits.processfunc.process.FreezingMonitor;
import wikiedits.processfunc.source.SensorSource;

public class SideOutPutJob {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<SensorReading> readings = see.addSource(new SensorSource());

        SingleOutputStreamOperator<SensorReading> monitoredReadings = readings.process(new FreezingMonitor());
        // 打印附输出
        monitoredReadings.getSideOutput(new OutputTag<String>("freezing-alarms"){}).print();
        // 打印主输出
        monitoredReadings.print();
        see.execute();
    }
}


package wikiedits.processfunc.process;

import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import wikiedits.processfunc.pojo.SensorReading;

public class FreezingMonitor extends ProcessFunction<SensorReading, SensorReading> {

    private OutputTag<String> freezingAlarmOutput = new OutputTag<String>("freezing-alarms") {};


    @Override
    public void processElement(SensorReading value, Context ctx, Collector<SensorReading> out) throws Exception {
        if (value.temperature < 32.0) {
            ctx.output(freezingAlarmOutput, "freezing alarm for " + value.id + " :" + value.temperature);
        }
        out.collect(value);
    }

}
package wikiedits.processfunc.source;

/*
 * Copyright 2015 Fabian Hueske / Vasia Kalavri
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *  http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import wikiedits.processfunc.pojo.SensorReading;

import java.util.Calendar;
import java.util.Random;

/**
 * Flink SourceFunction to generate SensorReadings with random temperature values.
 *
 * Each parallel instance of the source simulates 10 sensors which emit one sensor reading every 100 ms.
 *
 * Note: This is a simple data-generating source function that does not checkpoint its state.
 * In case of a failure, the source does not replay any data.
 */
public class SensorSource extends RichParallelSourceFunction<SensorReading> {

    // flag indicating whether source is still running
    private boolean running = true;

    /** run() continuously emits SensorReadings by emitting them through the SourceContext. */
    @Override
    public void run(SourceContext<SensorReading> srcCtx) throws Exception {

        // initialize random number generator
        Random rand = new Random();
        // look up index of this parallel task
        int taskIdx = this.getRuntimeContext().getIndexOfThisSubtask();

        // initialize sensor ids and temperatures
        String[] sensorIds = new String[10];
        double[] curFTemp = new double[10];
        for (int i = 0; i < 10; i++) {
            sensorIds[i] = "sensor_" + (taskIdx * 10 + i);
            curFTemp[i] = 65 + (rand.nextGaussian() * 20);
        }

        while (running) {

            // get current time
            long curTime = Calendar.getInstance().getTimeInMillis();

            // emit SensorReadings
            for (int i = 0; i < 10; i++) {
                // update current temperature
                curFTemp[i] += rand.nextGaussian() * 0.5;
                // emit reading
                srcCtx.collect(new SensorReading(sensorIds[i], curTime, curFTemp[i]));
            }

            // wait for 100 ms
            Thread.sleep(3000);
        }
    }

    /** Cancels this SourceFunction. */
    @Override
    public void cancel() {
        this.running = false;
    }
}

程序运行结果:
flink处理函数--副输出功能,flink,大数据,flink,算法,大数据文章来源地址https://www.toymoban.com/news/detail-728017.html

到了这里,关于flink处理函数--副输出功能的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Flink学习-处理函数

    处理函数是Flink底层的函数,工作中通常用来做一些更复杂的业务处理,处理函数分好几种,主要包括基本处理函数,keyed处理函数,window处理函数。 Flink提供了8种不同处理函数: ProcessFunction :dataStream KeyedProcessFunction :用于KeyedStream,keyBy之后的流处理 CoProcessFunction :用于

    2024年02月03日
    浏览(33)
  • Flink处理函数(一)

    目录  7.1 基本处理函数(ProcessFunction) 7.1.1 处理函数的功能和使用 7.1.2 ProcessFunction 解析 7.1.3 处理函数的分类 7.2 按键分区处理函数(KeyedProcessFunction) 7.2.1 定时器(Timer)和定时服务(TimerService) 7.2.2 KeyedProcessFunction 的使用 7.3 窗口处理函数 7.3.1 窗口处理函数的使用 7.3.

    2024年02月03日
    浏览(35)
  • 《Flink学习笔记》——第七章 处理函数

    为了让代码有更强大的表现力和易用性,Flink 本身提供了多层 API 在更底层,我们可以不定义任何具体的算子(比如 map,filter,或者 window),而只是提炼出一个统一的“处理”(process)操作——它是所有转换算子的一个概括性的表达,可以自定义处理逻辑,所以这一层接口

    2024年02月10日
    浏览(50)
  • Flink处理函数解析(ProcessFunction和KeyedProcessFunction)

    Flink中的处理函数(ProcessFunction和KeyedProcessFunction)在对于数据进行颗粒化的精确计算时使用较多,处理函数提供了一个定时服务(TimerService),可以向未来注册一个定时服务,我们可以把它理解为一个闹钟,当闹钟响起时,就调用ProcessFunction中的onTimer()方法,会对数据进行一

    2024年02月04日
    浏览(35)
  • Flink学习——处理函数ProcessFunction及多流转换

            在DataStream的更底层,我们可以不定义任何具体的算子(如map(),filter()等)二只提炼出一个统一的“处理”(process)操作 。它是所有转换算子的概括性的表达。可以自定义处理逻辑。         所以这一层接口就被叫做“ 处理函数 ”( process function )         处理

    2024年02月14日
    浏览(46)
  • flink重温笔记(六):Flink 流批一体 API 开发—— 数据输出 sink

    前言:今天是学习 flink 的第七天啦!学习了 flink 中 sink(数据槽) 部分知识点,这一部分只要是解决数据处理之后,数据到哪里去的问题,我觉得 flink 知识点虽然比较难理解,但是代码跑通后,逻辑还是比较有趣的! Tips:毛爷爷说过:“宜将剩勇追穷寇,不可沽名学霸王

    2024年02月21日
    浏览(42)
  • Flink DataStream之输出数据到File中

    新建类 启动程序并启动nc -lp 输入数据: 正在写入的文件会有inprogress的标识(在指定的目录下生成文件时会按照日期的年月日时进行分目录,因为我在执行时的时间是2023/7/12 22点,所以它就会自动生成一个2023-07-12--22目录,分桶策略也可以自己在代码中配置。):  当满足滚

    2024年02月15日
    浏览(39)
  • C语言(输入输出函数getchar,putchar、gets、puts,scanf,printf的功能以及用法)

    int getchar( void ); 返回值为int,所以需要用一个int变量来接收,不管输入几个字符,每次都只接收第一个字符,常与while和putchar配合使用。 从下面这张图可以看出,输入一个空格也会打印 当然,获取一个字符用得不多,每次都需要获取一串,所以我们可以配合while来使用。 用

    2024年02月02日
    浏览(50)
  • 二次开发Flink-coGroup算子支持迟到数据通过测输出流提取

    目录 1.背景 2.coGroup算子源码分析 2.1完整的coGroup算子调用流程 2.2coGroup方法入口 2.3 CoGroupedStreams对象分析 2.4WithWindow内部类分析 2.5CoGroupWindowFunction函数分析 3.修改源码支持获取迟到数据测输出流 3.1复制CoGroupedStreams 3.2新增WithWindow.sideOutputLateData方法 3.3新增WithWindow构造方法 3

    2024年04月11日
    浏览(44)
  • 商业智能系统的主要功能包括数据仓库、数据ETL、数据统计输出、分析功能

    ETL服务内容包含: 数据迁移 数据合并 数据同步 数据交换 数据联邦 数据仓库

    2024年02月07日
    浏览(40)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包