flink重温笔记(三):Flink 流批一体 API 开发——Transformation 重要算子操作

这篇具有很好参考价值的文章主要介绍了flink重温笔记(三):Flink 流批一体 API 开发——Transformation 重要算子操作。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Flink学习笔记

前言:今天是学习 flink 第三天啦,学习了高级 api 开发中11 中重要算子,查找了好多资料理解其中的原理,以及敲了好几个小时代码抓紧理解原理。
Tips:虽然学习进度有点慢,希望自己继续努力,不断猜想 api 原理,通过敲代码不断印证自己的想法,转码大数据之路一定会越来越好的!

二、Flink 流批一体 API 开发

2. Transfromation

2.1 Map

将 DataStream 中的每一个元素转化为另一个元素,类似于之前 wordcount 案例中 word—> (word,1)

flink重温笔记(三):Flink 流批一体 API 开发——Transformation 重要算子操作,Flink重温笔记,flink,笔记,大数据,学习方法,数据仓库

案例:使用map操作,读取 apache.log 文件中的字符串数据转换成 ApacheLogEvent 对象

# 日志数据
83.149.9.216 10002 17/05/2015:10:06:53 GET /presentations/logstash-monitorama-2013/css/print/paper.css
package cn.itcast.day02.transformation;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.text.SimpleDateFormat;

/**
 * @author lql
 * @time 2024-02-13 19:44:52
 * @description TODO:使用map操作,读取apache.log文件中的字符串数据转换成ApacheLogEvent对象
 */
public class MapDemo {
    public static void main(String[] args) throws Exception {
        /**
         * 获取ExecutionEnvironment运行环境
         * 使用readTextFile读取数据构建数据源
         * 创建一个ApacheLogEvent类
         * 使用map操作执行转换
         * 打印测试
         */

        //TODO 获取ExecutionEnvironment运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //TODO 使用readTextFile读取数据构建数据源
        DataStream<String> lines = env.readTextFile("D:\\IDEA_Project\\BigData_Java\\flinkbase_pro\\data\\input\\apache2.log");

        //TODO 创建一个ApacheLogEvent类
        //TODO 使用map操作执行转换
        /**
         * String:传入值类型
         * ApacheEvent:返回值类型
         */
        SingleOutputStreamOperator<ApacheEvent> apacheEventBean = lines.map(new MapFunction<String, ApacheEvent>() {
            @Override
            public ApacheEvent map(String line) throws Exception {
                String[] elements = line.split(" ");
                String ip = elements[0];
                int userId = Integer.parseInt(elements[1]);
                SimpleDateFormat simpleDateFormat = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss");
                long timestamp = simpleDateFormat.parse(elements[2]).getTime();
                String method = elements[3];
                String path = elements[4];
                return new ApacheEvent(ip, userId, timestamp, method, path);
            }
        });

        //TODO 打印测试
        apacheEventBean.print();

        env.execute();
    }

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class ApacheEvent{
        String ip;      // 访问ip
        int userId;     // 用户id
        long timestamp; // 访问时间戳
        String method;  // 访问方法
        String path;    // 访问路径
    }
}
# 打印数据
2> MapDemo.ApacheEvent(ip=10.0.0.1, userId=10003, timestamp=1431829613000, method=POST, path=/presentations/logstash-monitorama-2013/css/print/paper.css)

总结:

  • 1- env.readTextFile 返回的类型是:DataStream,而不是 DataStreamSource 类型,不然会报错,这里用 var 快捷键需要注意!
  • 2- 重写 map 方法,切割后列表形式,以脚标形式取值
  • 3- Intger.parseInt 可以将字符串转化为整数类型
  • 4- new 一个 SimpleDateFormat()进行日期格式化处理
  • 5- simpleDateFormat.parse(字符串).getTime() 可以获取指定格式的日期
2.2 FlatMap

将 DataStream 中的每一个元素转化为 0……n 个元素,类似于 wordcount 案例中以空格切割单词

flink重温笔记(三):Flink 流批一体 API 开发——Transformation 重要算子操作,Flink重温笔记,flink,笔记,大数据,学习方法,数据仓库

实例:读取 flatmap.log 文件中的数据

将数据:
张三,苹果手机,联想电脑,华为平板
李四,华为手机,苹果电脑,小米平板

转化为:
张三有苹果手机
张三有联想电脑
张三有华为平板
李四有…
package cn.itcast.day02.transformation;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * @author lql
 * @time 2024-02-14 21:04:09
 * @description TODO:读取 flatmap.log文件中的数据,以上数据为一条转换为三条
 */
public class FlatMapDemo {
    public static void main(String[] args) throws Exception {
        /**
         * 开发步骤:
         * 构建批处理运行环境
         * 构建本地集合数据源
         * 使用flatMap将一条数据经过处理转换为三条数据
         * 使用逗号分隔字段
         * 分别构建三条数据
         * 打印输出
         */
        // TODO 1: 构建 flink 流处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // TODO 2: 获取本地数据源
        DataStream<String> lines = env.readTextFile("D:\\IDEA_Project\\BigData_Java\\flinkbase_pro\\data\\input\\flatmap.log");

        // TODO 3: 使用flatMap将一条数据经过处理转换为三条数据
        SingleOutputStreamOperator<String> result = lines.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String line, Collector<String> collector) throws Exception {
                String[] elements = line.split(",");
                collector.collect(elements[0] + "有" + elements[1]);
                collector.collect(elements[0] + "有" + elements[2]);
                collector.collect(elements[0] + "有" + elements[3]);
            }
        });
        result.print();
        env.execute();
    }
}

结果:

8> 李四有华为手机
8> 李四有苹果电脑
8> 李四有小米平板
5> 张三有苹果手机
5> 张三有联想电脑
5> 张三有华为平板

总结:collect 可以多行书写

2.3 Filter

过滤出来符合条件的元素

flink重温笔记(三):Flink 流批一体 API 开发——Transformation 重要算子操作,Flink重温笔记,flink,笔记,大数据,学习方法,数据仓库

实例:读取 apache.log 文件中的访问日志数据,过滤出来以下访问IP是 83.149.9.216 的访问日志。

83.149.9.216 10002 17/05/2015:10:06:53 GET /presentations/logstash-monitorama-2013/css/print/paper.css
83.149.9.216 10002 17/05/2015:10:06:53 GET /presentations/logstash-monitorama-2013/css/print/paper.css
83.149.9.216 10002 17/05/2015:10:06:53 GET /presentations/logstash-monitorama-2013/css/print/paper.css
package cn.itcast.day02.transformation;

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @author lql
 * @time 2024-02-14 21:20:30
 * @description TODO:读取apache.log文件中的访问日志数据,过滤出来以下访问IP是83.149.9.216的访问日志。
 */
public class FilterDemo {
    public static void main(String[] args) throws Exception {
        /**
         * 获取ExecutionEnvironment运行环境
         * 使用fromCollection构建数据源
         * 使用filter操作执行过滤
         * 打印测试
         */
        //TODO 获取ExecutionEnvironment运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //TODO 使用fromCollection构建数据源
        DataStream<String> lines = env.readTextFile("D:\\IDEA_Project\\BigData_Java\\flinkbase_pro\\data\\input\\apache.log");

        //TODO 使用filter操作执行过滤(66.249.73.135)
        SingleOutputStreamOperator<String> result = lines.filter(new FilterFunction<String>() {
            @Override
            public boolean filter(String line) throws Exception {
                return line.contains("83.149.9.216");
            }
        });
        result.print();
        env.execute();
    }
}

结果:

2> 83.149.9.216 - - 17/05/2015:10:05:03 +0000 GET /presentations/logstash-monitorama-2013/images/kibana-search.png
2> 83.149.9.216 - - 17/05/2015:10:05:43 +0000 GET /presentations/logstash-monitorama-2013/images/kibana-dashboard3.png
2> 83.149.9.216 - - 17/05/2015:10:05:47 +0000 GET /presentations/logstash-monitorama-2013/plugin/highlight/highlight.js
2> 83.149.9.216 - - 17/05/2015:10:05:12 +0000 GET /presentations/logstash-monitorama-2013/plugin/zoom-js/zoom.js
2> 83.149.9.216 - - 17/05/2015:10:05:07 +0000 GET /presentations/logstash-monitorama-2013/plugin/notes/notes.js
2> 83.149.9.216 - - 17/05/2015:10:05:34 +0000 GET /presentations/logstash-monitorama-2013/images/sad-medic.png

总结:contains 方法可以达到过滤效果

2.4 KeyBy

流处理中没有 groupBy,而是 keyBy

flink重温笔记(三):Flink 流批一体 API 开发——Transformation 重要算子操作,Flink重温笔记,flink,笔记,大数据,学习方法,数据仓库

实例:读取本地数据源, 进行单词的计数

package cn.itcast.day02.transformation;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @author lql
 * @time 2024-02-14 21:29:52
 * @description TODO:读取本地元组数据源, 进行单词的计数
 */
public class KeyByDemo {
    public static void main(String[] args) throws Exception {
        // TODO 1: 初始化 Fink 环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // TODO 2: 读取本地数据源
        DataStreamSource<Tuple2<String, Integer>> source  = env.fromElements(
                Tuple2.of("篮球", 1),
                Tuple2.of("篮球", 2),
                Tuple2.of("篮球", 3),
                Tuple2.of("足球", 3),
                Tuple2.of("足球", 2),
                Tuple2.of("足球", 3)
        );

        // 在流计算内,来一条算一条,就是每个组的数据,挨个进行计算,求和累加,所以结果中最后一个打印的数据才是最终的求和结果
        SingleOutputStreamOperator<Tuple2<String, Integer>> sum = source.keyBy(t -> t.f0).sum(1);

        // 如果不分组的话, sum的结果是 1+2+3+3+2+3 = 14 分组后是 篮球 6  足球 8
        sum.print();
        env.execute();
    }
}

结果:

4> (足球,3)
4> (足球,5)
4> (足球,8)
5> (篮球,1)
5> (篮球,3)
5> (篮球,6)

总结:

  • 1- keyBy 是流式分组
  • 2- keyBy () 可以填写 t -> f0, 也可以直接填 0
2.5 Reduce

可以对一个 dataset 或者一个 group 来进行聚合计算,最终聚合成一个元素

flink重温笔记(三):Flink 流批一体 API 开发——Transformation 重要算子操作,Flink重温笔记,flink,笔记,大数据,学习方法,数据仓库

实例:读取 apache.log 日志,统计ip地址访问pv数量,使用 reduce 操作聚合成一个最终结果

83.149.9.216 10002 17/05/2015:10:06:53 GET /presentations/logstash-monitorama-2013/css/print/paper.css
83.149.9.216 10002 17/05/2015:10:06:53 GET /presentations/logstash-monitorama-2013/css/print/paper.css
83.149.9.216 10002 17/05/2015:10:06:53 GET /presentations/logstash-monitorama-2013/css/print/paper.css
package cn.itcast.day02.transformation;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @author lql
 * @time 2024-02-14 21:43:10
 * @description TODO: 读取apache.log日志,统计ip地址访问pv数量,使用 reduce 操作聚合成一个最终结果
 */
public class ReduceDemo {
    public static void main(String[] args) throws Exception {
        /**
         * 获取 ExecutionEnvironment 运行环境
         * 使用 readTextFile 构建数据源
         * 使用 reduce 执行聚合操作
         * 打印测试
         */
        //TODO 获取 ExecutionEnvironment 运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //TODO 使用 readTextFile 构建数据源
        DataStream<String> lines = env.readTextFile("D:\\IDEA_Project\\BigData_Java\\flinkbase_pro\\data\\input\\apache.log");

        //TODO 使用 reduce 执行聚合操作
        SingleOutputStreamOperator<Tuple2<String, Integer>> ipAndOne  = lines.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String line) throws Exception {
                String[] dataArray = line.split(" ");
                return Tuple2.of(dataArray[0], 1);
            }
        });

        SingleOutputStreamOperator<Tuple2<String, Integer>> result = ipAndOne.keyBy(0).reduce(new ReduceFunction<Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> reduce(Tuple2<String, Integer> tuple1, Tuple2<String, Integer> tuple2) throws Exception {
                return Tuple2.of(tuple1.f0, tuple1.f1 + tuple2.f1);
            }
        });

        result.print();
        env.execute();
    }
}

结果:

3> (74.218.234.48,3)
3> (74.218.234.48,4)
3> (74.218.234.48,5)
3> (74.218.234.48,6)

总结:

  • 1- reduce 类似于 sum 操作
  • 2- 重写方法注意返回值写法:return Tuple2.of(tuple1.f0, tuple1.f1 + tuple2.f1)
2.6 minBy 和 maxBy

获取指定字段的最大值、最小值

flink重温笔记(三):Flink 流批一体 API 开发——Transformation 重要算子操作,Flink重温笔记,flink,笔记,大数据,学习方法,数据仓库
flink重温笔记(三):Flink 流批一体 API 开发——Transformation 重要算子操作,Flink重温笔记,flink,笔记,大数据,学习方法,数据仓库

2.6.1 场景一:

实例:Tuple2 情况

package cn.itcast.day02.transformation;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @author lql
 * @time 2024-02-14 21:57:18
 * @description TODO:分组后,求组内最值
 */
public class MinMaxByDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<String> lines = env.socketTextStream("node1", 9999);
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndCount = lines.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String line) throws Exception {
                String[] fields = line.split(",");
                String word = fields[0];
                int count = Integer.parseInt(fields[1]);
                return Tuple2.of(word, count);
            }
        });
        KeyedStream<Tuple2<String, Integer>, String> keyd = wordAndCount.keyBy(t -> t.f0);
        keyd.minBy(1).print("最小数据>>>");
        keyd.maxBy(1).print("最大数据>>>");
        env.execute();
    }
}

结果:

最大数据>>>:1> (spark,2)
最小数据>>>:1> (spark,2)
最小数据>>>:1> (spark,2)
最大数据>>>:1> (spark,5)
最大数据>>>:8> (hadoop,7)
最大数据>>>:8> (hadoop,7)
最小数据>>>:8> (hadoop,3)
最小数据>>>:8> (hadoop,3)
2.6.2 场景二

实例:Tuple3 情况

package cn.itcast.day02.transformation;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @author lql
 * @time 2024-02-14 21:57:18
 * @description TODO:分组后,求组内最值
 */
public class MinMaxByDemo2 {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //调用Source创建DataStream
        //辽宁,沈阳,1000
        //北京,朝阳,8000
        //辽宁,朝阳,1000
        //辽宁,朝阳,1000
        //辽宁,沈阳,2000
        //北京,朝阳,1000
        //辽宁,大连,3000
        //辽宁,铁岭,500
        DataStream<String> lines = env.socketTextStream("node1", 9999);
        SingleOutputStreamOperator<Tuple3<String, String, Double>> pcm = lines.map(new MapFunction<String, Tuple3<String, String, Double>>() {

            @Override
            public Tuple3<String, String, Double> map(String value) throws Exception {
                String[] fields = value.split(",");
                String province = fields[0];
                String city = fields[1];
                double money = Double.parseDouble(fields[2]);
                return Tuple3.of(province, city, money);
            }
        });

        KeyedStream<Tuple3<String, String, Double>, String> keyed = pcm.keyBy(t -> t.f0);

        // considerTimestamps 设置为 false,则 Flink 在比较时不会考虑元素的时间戳,而只会根据指定的字段
        SingleOutputStreamOperator<Tuple3<String, String, Double>> res = keyed.minBy(2, false);

        res.print();
        env.execute();
    }
}

结果:

5> (辽宁,沈阳,1000.0)
4> (北京,朝阳,8000.0)
5> (辽宁,朝阳,1000.0)
5> (辽宁,朝阳,1000.0)
5> (辽宁,朝阳,1000.0)
4> (北京,朝阳,1000.0)
5> (辽宁,朝阳,1000.0)
5> (辽宁,铁岭,500.0)
2.7 min max 和 minBy maxBy 的区别
package cn.itcast.day02.transformation;

import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.awt.event.TextEvent;

/**
 * @author lql
 * @time 2024-02-14 22:52:36
 * @description TODO
 */
public class MinVSMinByDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<Tuple3<Integer, Integer, Integer>> source = env.fromElements(
                Tuple3.of(1, 3, 2),
                Tuple3.of(1, 1, 2),
                Tuple3.of(1, 2, 3),
                Tuple3.of(1, 111, 1),
                Tuple3.of(1, 1, 1),
                Tuple3.of(1, 2, 0),
                Tuple3.of(1, 33, 2)
        );
        source.keyBy(t -> t.f0).min(2).print("min>>>");
        source.keyBy(t->t.f0).minBy(2).printToErr("minBy>>>");

        env.execute();
    }
}

结果:

minBy>>>:6> (1,3,2)
minBy>>>:6> (1,3,2)
minBy>>>:6> (1,3,2)
minBy>>>:6> (1,111,1)
minBy>>>:6> (1,111,1)
minBy>>>:6> (1,2,0)
minBy>>>:6> (1,2,0)

min>>>:6> (1,3,2)
min>>>:6> (1,3,2)
min>>>:6> (1,3,2)
min>>>:6> (1,3,1)
min>>>:6> (1,3,1)
min>>>:6> (1,3,0)
min>>>:6> (1,3,0)

总结:

  • 1- minBy 和 maxBy 会返回整个对象数据(包括最小值所在的前缀
  • 2- min 和 max 只会返回最小值以及第一次最小值的前缀
2.8 Union

将多个DataSet合并成一个DataSet,union合并的DataSet的类型必须是一致的

flink重温笔记(三):Flink 流批一体 API 开发——Transformation 重要算子操作,Flink重温笔记,flink,笔记,大数据,学习方法,数据仓库

package cn.itcast.day02.transformation;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @author lql
 * @time 2024-02-14 23:06:20
 * @description TODO:
 *  * 使用union实现
 *  * 将以下数据进行取并集操作
 *  * 数据集1
 *  * "hadoop", "hive","flume"
 *  * 数据集2
 *  * "hadoop","hive","spark"
 *  *
 *  * 注意:
 *  * 1:合并后的数据不会自动去重
 *  * 2:要求数据类型必须一致
 *  */

public class UnionDemo {
    public static void main(String[] args) throws Exception {
        /**
         * 实现步骤:
         * 1)初始化flink的流处理的运行环境
         * 2)加载/创建数据源
         * 3)处理数据
         * 4)打印输出
         * 5)递交执行作业
         */
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> ds1 = env.fromElements("hadoop", "hive", "flume");
        DataStreamSource<String> ds2 = env.fromElements("hadoop","hive","spark");
        DataStream<String> result = ds1.union(ds2);
        result.printToErr();
        env.execute();
    }
}

结果:

2> hive
6> flume
3> spark
1> hadoop
4> hadoop
5> hive

总结:

  • 1- Uinon 合并 dataset, 数据集类型必须一致
  • 2- Union 合并不会去除
  • 3- Union 合并出来的数据集是乱序的
2.9 Connect

DataStream,DataStream → ConnectedStreams,流相互独立, 作为对比Union后是真的变成一个流了

flink重温笔记(三):Flink 流批一体 API 开发——Transformation 重要算子操作,Flink重温笔记,flink,笔记,大数据,学习方法,数据仓库

package cn.itcast.day02.transformation;

/**
 * @author lql
 * @time 2024-02-14 23:10:14
 * @description TODO
 */

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.util.concurrent.TimeUnit;

/**
 * 读取两个数据流(生成两个不同类型的数据流),使用connect进行合并输出
 * 和union类似,但是connect只能连接两个流,两个流之间的数据类型可以不同,对两个流的数据可以分别应用不同的处理逻辑
 */
public class ConnectDemo {
    public static void main(String[] args) throws Exception {
        /**
         * 实现步骤:
         * 1)初始化flink流处理的运行环境
         * 2)构建两个不同类型数据的数据流
         * 3)对连接后的流数据进行业务处理
         * 4)打印输出
         * 5)启动作业
         */

        //TODO 1)初始化flink流处理的运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //TODO 2)构建两个不同类型数据的数据流
        DataStream<Long> longDataStreamSource = env.addSource(new MyNoParallelSource());
        DataStream<Long> longDataStreamSource2 = env.addSource(new MyNoParallelSource());

        //TODO 3)对连接后的流数据进行业务处理
        SingleOutputStreamOperator<String> strDataStreamSource = longDataStreamSource2.map(new MapFunction<Long, String>() {
            @Override
            public String map(Long aLong) throws Exception {
                return "str_" + aLong;
            }
        });

        ConnectedStreams<Long, String> connectedStreams = longDataStreamSource.connect(strDataStreamSource);
        //对连接后的流应用不同的业务逻辑
        SingleOutputStreamOperator<Object> result = connectedStreams.map(new CoMapFunction<Long, String, Object>() {
            @Override
            public Object map1(Long value) throws Exception {
                return value;
            }

            @Override
            public Object map2(String value) throws Exception {
                return value;
            }
        });

        //TODO 4)打印输出
        result.print();

        //TODO 5)启动作业
        env.execute();
    }

    public static class MyNoParallelSource implements SourceFunction<Long> {
        //定义一个变量,是否循环生成数据
        private boolean isRunning = true;
        private Long count = 0L;

        /**
         * 这是主要的方法,启动一个数据源
         * 实现数据的生成操作
         * @param ctx
         * @throws Exception
         */
        @Override
        public void run(SourceContext<Long> ctx) throws Exception {
            //不断生成订单数据
            while (isRunning){
                count+=1;
                //收集数据返回
                ctx.collect(count);

                //每隔一秒钟生成一条订单数据
                TimeUnit.SECONDS.sleep(1);
            }
        }

        /**
         * 取消数据的生成操作
         */
        @Override
        public void cancel() {
            isRunning = false;
        }
    }
}

结果:

3> 1
5> str_1
4> 2
6> str_2
5> 3
7> str_3

总结:

  • Connect 两个流可以类型不一样
2.10 split、select 和 Side Outputs

flink重温笔记(三):Flink 流批一体 API 开发——Transformation 重要算子操作,Flink重温笔记,flink,笔记,大数据,学习方法,数据仓库

Split 就是将一个 DataStream 分成两个或者多个 DataStream

Select 就是获取分流后对应的数据

Tips:

  • 简单认为就是, Split会给数据打上标记,然后通过Select, 选择标记来划分出不同的Stream,效果类似KeyBy分流,但是比KeyBy更自由些,可以自由打标记并进行分流。
  • Side Outputs:split 过期啦,可以使用process方法对流中数据进行处理,并针对不同的处理结果将数据收集到不同的OutputTag中
package cn.itcast.day02.transformation;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

/**
 * @author lql
 * @time 2024-02-14 23:25:38
 * @description TODO
 */
public class StreamSplitDemo {
    public static void main(String[] args) throws Exception {

        //TODO 0.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        // 当设置为 AUTOMATIC 时,Flink 会自动选择最佳的并行度来执行作业。

        //TODO 1.source
        DataStreamSource<Integer> ds = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

        //TODO 2.transformation
        //需求:对流中的数据按照奇数和偶数拆分并选择
        OutputTag<Integer> oddTag = new OutputTag<>("奇数", TypeInformation.of(Integer.class));
        OutputTag<Integer> evenTag = new OutputTag("偶数", TypeInformation.of(Integer.class));

        SingleOutputStreamOperator<Integer> result = ds.process(new ProcessFunction<Integer, Integer>() {
            @Override
            public void processElement(Integer value, Context ctx, Collector<Integer> out) throws Exception {
                //out收集完的还是放在一起的,ctx可以将数据放到不同的OutputTag
                if (value % 2 == 0) {
                    ctx.output(evenTag, value);
                } else {
                    ctx.output(oddTag, value);
                }

            }
        });

        DataStream<Integer> oddResult = result.getSideOutput(oddTag);
        DataStream<Integer> evenResult = result.getSideOutput(evenTag);

        //TODO 3.sink
        System.out.println(oddTag);//OutputTag(Integer, 奇数)
        System.out.println(evenTag);//OutputTag(Integer, 偶数)
        oddResult.print("奇数:");
        evenResult.print("偶数:");

        //TODO 4.execute
        env.execute();
    }
}

结果:

OutputTag(Integer, 奇数)
OutputTag(Integer, 偶数)
奇数::3> 1
偶数::8> 6
偶数::6> 4
偶数::4> 2
奇数::5> 3
奇数::1> 7
奇数::7> 5
偶数::2> 8
偶数::4> 10
奇数::3> 9

总结:

  • 1- OutputTag 对象用于定义输出类型
  • 2- process 可以分流
  • 3- 引流数据使用:getSideOutput 方法
2.11 Iterate

在流中创建“反馈(feedback)”循环,通过将一个算子的输出重定向到某个先前的算子。

迭代的数据流向:DataStream → IterativeStream → DataStream

package cn.itcast.day02.transformation;

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.IterativeStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @author lql
 * @time 2024-02-14 23:34:23
 * @description TODO:Iterate迭代流式计算
 */
public class IterateDemo {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //10
        DataStreamSource<String> strs = env.socketTextStream("node1", 9999);

        DataStream<Long> numbers = strs.map(Long::parseLong);

        //调用iterate方法 DataStream -> IterativeStream
        //对Nums进行迭代(不停的输入int的数字)
        IterativeStream<Long> iteration = numbers.iterate();

        //IterativeStream -> DataStream
        //对迭代出来的数据进行运算 //对输入的数据应用更新模型,即输入数据的处理逻辑
        DataStream<Long> iterationBody = iteration.map(new MapFunction<Long, Long>() {
            @Override
            public Long map(Long value) throws Exception {
                System.out.println("iterate input =>" + value);
                return value -= 2;
            }
        });
        //只要满足value > 0的条件,就会形成一个回路,重新的迭代,即将前面的输出作为输入,在进行一次应用更新模型,即输入数据的处理逻辑
        DataStream<Long> feedback = iterationBody.filter(new FilterFunction<Long>() {
            @Override
            public boolean filter(Long value) throws Exception {
                return value > 0;
            }
        });
        //传入迭代的条件
        iteration.closeWith(feedback);

        //不满足迭代条件的最后要输出
        DataStream<Long> output = iterationBody.filter(new FilterFunction<Long>() {
            @Override
            public boolean filter(Long value) throws Exception {
                return value <= 0;
            }
        });

        //数据结果
        output.printToErr("output value:");
        env.execute();
    }

}

结果:

iterate input =>7
iterate input =>5
iterate input =>3
iterate input =>1
output value::2> -1
iterate input =>6
iterate input =>4
output value::3> 0
iterate input =>2

总结:

  • 1- 更新模型,更新参数较为常见
  • 2- 算子迭代,需要理解应用

ction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.IterativeStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**

  • @author lql

  • @time 2024-02-14 23:34:23

  • @description TODO:Iterate迭代流式计算
    */
    public class IterateDemo {
    public static void main(String[] args) throws Exception {

     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
     //10
     DataStreamSource<String> strs = env.socketTextStream("node1", 9999);
    
     DataStream<Long> numbers = strs.map(Long::parseLong);
    
     //调用iterate方法 DataStream -> IterativeStream
     //对Nums进行迭代(不停的输入int的数字)
     IterativeStream<Long> iteration = numbers.iterate();
    
     //IterativeStream -> DataStream
     //对迭代出来的数据进行运算 //对输入的数据应用更新模型,即输入数据的处理逻辑
     DataStream<Long> iterationBody = iteration.map(new MapFunction<Long, Long>() {
         @Override
         public Long map(Long value) throws Exception {
             System.out.println("iterate input =>" + value);
             return value -= 2;
         }
     });
     //只要满足value > 0的条件,就会形成一个回路,重新的迭代,即将前面的输出作为输入,在进行一次应用更新模型,即输入数据的处理逻辑
     DataStream<Long> feedback = iterationBody.filter(new FilterFunction<Long>() {
         @Override
         public boolean filter(Long value) throws Exception {
             return value > 0;
         }
     });
     //传入迭代的条件
     iteration.closeWith(feedback);
    
     //不满足迭代条件的最后要输出
     DataStream<Long> output = iterationBody.filter(new FilterFunction<Long>() {
         @Override
         public boolean filter(Long value) throws Exception {
             return value <= 0;
         }
     });
    
     //数据结果
     output.printToErr("output value:");
     env.execute();
    

    }

}


结果:

```java
iterate input =>7
iterate input =>5
iterate input =>3
iterate input =>1
output value::2> -1
iterate input =>6
iterate input =>4
output value::3> 0
iterate input =>2

总结:文章来源地址https://www.toymoban.com/news/detail-827393.html

  • 1- 更新模型,更新参数较为常见
  • 2- 算子迭代,需要理解应用

到了这里,关于flink重温笔记(三):Flink 流批一体 API 开发——Transformation 重要算子操作的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Flink流批一体计算(10):PyFlink Tabel API

    简述 PyFlink 是 Apache Flink 的 Python API ,你可以使用它构建可扩展的批处理和流处理任务,例如实时数据处理管道、大规模探索性数据分析、机器学习( ML )管道和 ETL 处理。 如果你对 Python 和 Pandas 等库已经比较熟悉,那么 PyFlink 可以让你更轻松地利用 Flink 生态系统的全部功

    2024年02月11日
    浏览(40)
  • Flink流批一体计算(16):PyFlink DataStream API

    目录 概述 Pipeline Dataflow 代码示例WorldCount.py 执行脚本WorldCount.py 概述 Apache Flink 提供了 DataStream API,用于构建健壮的、有状态的流式应用程序。它提供了对状态和时间细粒度控制,从而允许实现高级事件驱动系统。 用户实现的Flink程序是由Stream和Transformation这两个基本构建块组

    2024年02月11日
    浏览(45)
  • Flink流批一体计算(20):DataStream API和Table API互转

    目录 举个例子 连接器 下载连接器(connector)和格式(format)jar 包 依赖管理  如何使用连接器 举个例子 StreamExecutionEnvironment 集成了DataStream API,通过额外的函数扩展了TableEnvironment。 下面代码演示两种API如何互转 TableEnvironment 将采用StreamExecutionEnvironment所有的配置选项。 建

    2024年02月10日
    浏览(40)
  • Flink流批一体计算(19):PyFlink DataStream API之State

    目录 keyed state Keyed DataStream 使用 Keyed State 实现了一个简单的计数窗口 状态有效期 (TTL) 过期数据的清理 全量快照时进行清理 增量数据清理 在 RocksDB 压缩时清理 Operator State算子状态 Broadcast State广播状态 keyed state Keyed DataStream 使用 keyed state,首先需要为DataStream指定 key(主键)

    2024年02月10日
    浏览(37)
  • Flink流批一体计算(11):PyFlink Tabel API之TableEnvironment

    目录 概述 设置重启策略 什么是flink的重启策略(Restartstrategy) flink的重启策略(Restartstrategy)实战 flink的4种重启策略 FixedDelayRestartstrategy(固定延时重启策略) FailureRateRestartstrategy(故障率重启策略) NoRestartstrategy(不重启策略) 配置State Backends 以及 Checkpointing Checkpoint 启用和配置

    2024年02月13日
    浏览(52)
  • Flink流批一体计算(12):PyFlink Tabel API之构建作业

    目录 1.创建源表和结果表。 创建及注册表名分别为 source 和 sink 的表 使用 TableEnvironment.execute_sql() 方法,通过 DDL 语句来注册源表和结果表 2. 创建一个作业 3. 提交作业Submitting PyFlink Jobs 1.创建源表和结果表。 创建及注册表名分别为 source 和 sink 的表 其中,源表 source 有一列

    2024年02月13日
    浏览(42)
  • Flink流批一体计算(17):PyFlink DataStream API之StreamExecutionEnvironment

    目录 StreamExecutionEnvironment Watermark watermark策略简介 使用 Watermark 策略 内置水印生成器 处理空闲数据源 算子处理 Watermark 的方式 创建DataStream的方式 通过list对象创建 ​​​​​​使用DataStream connectors创建 使用Table SQL connectors创建 StreamExecutionEnvironment 编写一个 Flink Python DataSt

    2024年02月11日
    浏览(42)
  • Flink流批一体计算(14):PyFlink Tabel API之SQL查询

    举个例子 查询 source 表,同时执行计算 Table API 查询 Table 对象有许多方法,可以用于进行关系操作。 这些方法返回新的 Table 对象,表示对输入 Table 应用关系操作之后的结果。 这些关系操作可以由多个方法调用组成,例如 table.group_by(...).select(...)。 Table API 文档描述了流和批

    2024年02月12日
    浏览(41)
  • Flink流批一体计算(18):PyFlink DataStream API之计算和Sink

    目录 1. 在上节数据流上执行转换操作,或者使用 sink 将数据写入外部系统。 2. File Sink File Sink Format Types  Row-encoded Formats  Bulk-encoded Formats  桶分配 滚动策略 3. 如何输出结果 Print 集合数据到客户端,execute_and_collect方法将收集数据到客户端内存 将结果发送到DataStream sink conne

    2024年02月11日
    浏览(38)
  • Flink流批一体计算(13):PyFlink Tabel API之SQL DDL

    1. TableEnvironment 创建 TableEnvironment TableEnvironment 是 Table API 和 SQL 集成的核心概念。 TableEnvironment 可以用来: ·创建 Table ·将 Table 注册成临时表 ·执行 SQL 查询 ·注册用户自定义的 (标量,表值,或者聚合) 函数 ·配置作业 ·管理 Python 依赖 ·提交作业执行 创建 source 表 创建 sink

    2024年02月12日
    浏览(40)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包