Flink流处理API大合集:掌握所有flink流处理技术

这篇具有很好参考价值的文章主要介绍了Flink流处理API大合集:掌握所有flink流处理技术。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Flink流处理API大合集:掌握所有flink流处理技术

前言

在之前的文章中有提到过,一个flink应用程序开发的步骤大致为五个步骤:构建执行环境、获取数据源、操作数据源、输出到外部系统、触发程序执行。由这五个模块组成了一个flink任务,接下来围绕着每个模块对应的API进行梳理。

以下所有的代码案例都已收录在本人的Gitee仓库,有需要的同学点击链接直接获取:

Gitee地址:https://gitee.com/xiaoZcode/flink_test

Flink流处理API,flink流处理技术

一、构建流执行环境(Environment)

getExecutionEnvironment()

创建一个执行环境,表示当前执行程序的上下文。 如果程序是独立调用的,则此方法返回本地执行环境;如果从命令行客户端调用程序以提交到集群,则此方法返回此集群的执行环境。它会根据查询运行的方式决定返回什么样的运行环境,是最常用的一种创建执行环境的方式。

代码如下:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();

createLocalEnvironment()

返回本地执行环境,需要在调用时指定默认的并行度。

代码如下:

LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);

createRemoteEnvironment()

返回集群执行环境,将 Jar 提交到远程服务器。需要在调用时指定 JobManager的 IP 和端口号,并指定要在集群中运行的 Jar 包。

代码如下:

StreamExecutionEnvironment env = 

StreamExecutionEnvironment.createRemoteEnvironment("jobmanage-hostname", 6123, "YOURPATH//xxx.jar");

二、加载数据源(Source)

案例场景:

工业物联网的背景下,收集传感器的温度值,将收集到不同传感器的温度值进行计算分析操作。

注:以下代码都围绕此场景进行编写,获取更完整源代码请移步文章开头部分。

创建传感器对象:SensorReading

public class SensorReading {



    private String id;

    private Long timestamp;

    private Double temperature;



    public SensorReading() {



    }



    public SensorReading(String id, Long timestamp, Double temperature) {

        this.id = id;

        this.timestamp = timestamp;

        this.temperature = temperature;

    }



    public String getId() {

        return id;

    }



    public void setId(String id) {

        this.id = id;

    }



    public Long getTimestamp() {

        return timestamp;

    }



    public void setTimestamp(Long timestamp) {

        this.timestamp = timestamp;

    }



    public Double getTemperature() {

        return temperature;

    }



    public void setTemperature(Double temperature) {

        this.temperature = temperature;

    }



    @Override

    public String toString() {

        return "SensorReading{" +

                "id='" + id + '\'' +

                ", timestamp=" + timestamp +

                ", temperature=" + temperature +

                '}';

    }

}

从集合读取数据

public class SourceTest1_Collection {

    public static void main(String[] args) throws Exception {

        // 创建执行环境

        StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();

        //设置并行度为 1

        env.setParallelism(1);



        //从集合中读取数据

        DataStream<SensorReading> dataStream  = env.fromCollection(Arrays.asList(

                new SensorReading("sensor_1", 1547718199L, 35.8),

                new SensorReading("sensor_2", 1547718199L, 35.0),

                new SensorReading("sensor_3", 1547718199L, 38.8),

                new SensorReading("sensor_4", 1547718199L, 39.8)

        ));



        DataStream<Integer> integerDataStream = env.fromElements(1, 2, 3, 4, 5, 789);



        //打印输出

        dataStream.print("data");

        integerDataStream.print("int");



        //执行程序

        env.execute();

    }

}

从文件读取数据

从文件中获取数据源的核心代码部分:

DataStream<String> dataStream = env.readTextFile("xxx ");


public class SourceTest2_File {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);



        //从文件读取数据

        DataStream<String> dataStream = env.readTextFile("sensor.txt");



         dataStream.print();



         env.execute();

    }

}

从Kafka读取数据

首先需要引入Kafka的以来到工程中

<dependency>

 	<groupId>org.apache.flink</groupId>

 	<artifactId>flink-connector-kafka-0.11_2.12</artifactId>

 	<version>1.10.1</version>

</dependency>
public class SourceTest3_Kafka {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);



        Properties properties=new Properties();

        properties.setProperty("bootstrap.servers","localhost:9092");

        properties.setProperty("group.id","consumer-group");

        properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        properties.setProperty("auto.offset.reset","latest");



        DataStream<String> dataStream=env.addSource(new FlinkKafkaConsumer011<String>("sensor",new SimpleStringSchema(),properties));



        dataStream.print();



        env.execute();



    }

}

自定义数据源Source

除了从集合、文件以及Kafka中获取数据外,还给我们提供了一个自定义source的方式,需要传入sourceFunction函数。核心代码如下:

DataStream<SensorReading> dataStream = env.addSource( new MySensor());
public class SourceTest4_UDF {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);



        //从文件读取数据

        DataStream<SensorReading> dataStream = env.addSource(new MySensorSource());



        dataStream.print();



        env.execute();

    }



    // 实现自定义数据源

    public static class MySensorSource implements SourceFunction<SensorReading>{

        // 定义一个标记位,控制数据产生

        private boolean running = true;



        @Override

        public void run(SourceContext<SensorReading> ctv) throws Exception {

            // 随机数

            Random random=new Random();



            //设置10个初始温度

            HashMap<String, Double> sensorTempMap = new HashMap<>();

            for (int i = 0; i < 10; i++) {

                sensorTempMap.put("sensor_"+(i+1), 60 + random.nextGaussian() * 20); // 正态分布

            }

            while (running){

                for (String sensorId: sensorTempMap.keySet()) {

                    Double newTemp = sensorTempMap.get(sensorId) + random.nextGaussian();

                    sensorTempMap.put(sensorId,newTemp);

                    ctv.collect(new SensorReading(sensorId,System.currentTimeMillis(),newTemp));

                }

                Thread.sleep(1000);

            }

        }



        @Override

        public void cancel() {

            running=false;

        }

    }

}

三、转换算子(Transform)

获取到指定的数据源后,还要对数据源进行分析计算等操作,

基本转换算子:Map、flatMap、Filter

Flink流处理API,flink流处理技术

public class TransformTest1_Base {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);



        //从文件读取数据

        DataStream<String> inputStream = env.readTextFile("sensor.txt");



        // 1. map 把String转换成长度生成

        DataStream<Integer> mapStream = inputStream.map(new MapFunction<String, Integer>() {

            @Override

            public Integer map(String value) throws Exception {

                return value.length();

            }

        });



    //   2. flatmap 按逗号切分字段

        DataStream<String> flatMapStream = inputStream.flatMap(new FlatMapFunction<String, String>() {

            @Override

            public void flatMap(String value, Collector<String> out) throws Exception {

                String[] fields=value.split(",");

                for (String field : fields){

                    out.collect(field);

                }

            }

        });



    //    3. filter ,筛选sensor_1 开头对id对应的数据

        DataStream<String> filterStream=inputStream.filter(new FilterFunction<String>() {

            @Override

            public boolean filter(String value) throws Exception {

                return value.startsWith("sensor_1");

            }

        });



    //    打印输出

        mapStream.print("map");

        flatMapStream.print("flatMap");

        filterStream.print("filter");



    // 执行程序

        env.execute();

    }

}

KeyBy、滚动聚合算子【sum()、min()、max()、minBy()、maxBy()】

  • KeyBy:DataStream → KeyedStream:逻辑地将一个流拆分成不相交的分区,每个分区包含具有相同 key 的元素,在内部以 hash 的形式实现的。

  • 如上算子可以针对 KeyedStream 的每一个支流做聚合。

Flink流处理API,flink流处理技术

public class TransformTest2_RollingAggregation {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);



        //从文件读取数据

        DataStream<String> inputStream = env.readTextFile("sensor.txt");



        // 转换成SensorReading类型

        DataStream<SensorReading> dataStream=inputStream.map(new MapFunction<String, SensorReading>() {

            @Override

            public SensorReading map(String s) throws Exception {

                String[] fields=s.split(",");

                return new SensorReading(fields[0],new Long(fields[1]),new Double(fields[2]));

            }

        });



        // DataStream<SensorReading> dataStream = inputStream.map(line -> {

        //     String[] fields = line.split(",");

        //     return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));

        // });



        // 分组

        KeyedStream<SensorReading, Tuple> keyedStream = dataStream.keyBy("id");

        // KeyedStream<SensorReading, String> keyedStream1 = dataStream.keyBy(SensorReading::getId);



        //滚动聚合,取当前最大的温度值

        // DataStream<SensorReading> resultStream = keyedStream.maxBy("temperature");

        DataStream<SensorReading> resultStream = keyedStream.maxBy("temperature");



        resultStream.print();



        env.execute();

    }

}

Reduce

KeyedStream → DataStream:一个分组数据流的聚合操作,合并当前的元素和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果。

public class TransformTest3_Reduce {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);



        //从文件读取数据

        DataStream<String> inputStream = env.readTextFile("sensor.txt");



        // 转换成SensorReading类型

        DataStream<SensorReading> dataStream=inputStream.map(new MapFunction<String, SensorReading>() {

            @Override

            public SensorReading map(String s) throws Exception {

                String[] fields=s.split(",");

                return new SensorReading(fields[0],new Long(fields[1]),new Double(fields[2]));

            }

        });

        // 分组

        KeyedStream<SensorReading, Tuple> keyedStream = dataStream.keyBy("id");



        // reduce 聚合,取最大的温度,以及当前最新对时间戳

        DataStream<SensorReading> resultStream = keyedStream.reduce(new ReduceFunction<SensorReading>() {

            @Override

            public SensorReading reduce(SensorReading value1, SensorReading value2) throws Exception {

                return new SensorReading(value1.getId(), value2.getTimestamp(), Math.max(value1.getTemperature(), value2.getTemperature()));

            }

        });

        resultStream.print();

        env.execute();

    }

}

分流【Split 、Select】、合流【Connect 、CoMap、union】

Split

DataStream → SplitStream:根据某些特征把一个 DataStream 拆分成两个或者多个 DataStream。

Flink流处理API,flink流处理技术

Select

SplitStream→DataStream:从一个 SplitStream 中获取一个或者多个DataStream。

Flink流处理API,flink流处理技术

Connect

DataStream,DataStream → ConnectedStreams:连接两个保持他们类型的数据流,两个数据流被 Connect 之后,只是被放在了一个同一个流中,内部依然保持各自的数据和形式不发生任何变化,两个流相互独立。

Flink流处理API,flink流处理技术

CoMap、CoFlatMap

ConnectedStreams → DataStream:作用于 ConnectedStreams 上,功能与 map和 flatMap 一样,对 ConnectedStreams 中的每一个 Stream 分别进行 map 和 flatMap处理。

Flink流处理API,flink流处理技术

Union

DataStream → DataStream:对两个或者两个以上的 DataStream 进行 union 操作,产生一个包含所有 DataStream 元素的新 DataStream。

Flink流处理API,flink流处理技术

DataStream<SensorReading> unionStream = xxxstream.union(xxx);


Connect 与 Union 区别:

  • Union 之前两个流的类型必须是一样,Connect 可以不一样,在之后的 coMap中再去调整成为一样的。

  • Connect 只能操作两个流,Union 可以操作多个。

public class TransformTest4_MultipleStreams {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);



        //从文件读取数据

        DataStream<String> inputStream = env.readTextFile("sensor.txt");



        // 转换成SensorReading类型

        DataStream<SensorReading> dataStream=inputStream.map(new MapFunction<String, SensorReading>() {

            @Override

            public SensorReading map(String s) throws Exception {

                String[] fields=s.split(",");

                return new SensorReading(fields[0],new Long(fields[1]),new Double(fields[2]));

            }

        });



        // 1。分流 按照温度值30度为界进行分流

        SplitStream<SensorReading> splitStream = dataStream.split(new OutputSelector<SensorReading>() {

            @Override

            public Iterable<String> select(SensorReading value) {

                return (value.getTemperature() > 30) ? Collections.singletonList("high") : Collections.singletonList("low");

            }

        });

        // 通过条件选择对应流数据

        DataStream<SensorReading> highTempStream = splitStream.select("high");

        DataStream<SensorReading> lowTempStream = splitStream.select("low");

        DataStream<SensorReading> allTempStream = splitStream.select("high","low");



        highTempStream.print("high");

        lowTempStream.print("low");

        allTempStream.print("all");



        // 2。合流 connect,先将高温流转换为二元组,与低温流合并后,输出状态信息。

        DataStream<Tuple2<String, Double>> warningStream = highTempStream.map(new MapFunction<SensorReading, Tuple2<String, Double>>() {

            @Override

            public Tuple2<String, Double> map(SensorReading value) throws Exception {

                return new Tuple2<>(value.getId(), value.getTemperature());

            }

        });



        // 只能是两条流进行合并,但是两条流的数据类型可以不一致

        ConnectedStreams<Tuple2<String, Double>, SensorReading> connectStream = warningStream.connect(lowTempStream);

        DataStream<Object> resultStream = connectStream.map(new CoMapFunction<Tuple2<String, Double>, SensorReading, Object>() {

            @Override

            public Object map1(Tuple2<String, Double> value) throws Exception {

                return new Tuple3<>(value.f0, value.f1, "high temp warning");

            }



            @Override

            public Object map2(SensorReading value) throws Exception {

                return new Tuple2<>(value.getId(), "normal");

            }

        });



        resultStream.print();



        // 3。union联合多条流 限制就是每条流数据类型必须一致

        DataStream<SensorReading> union = highTempStream.union(lowTempStream, allTempStream);

        union.print("union stream");



        env.execute();

    }

}

四、数据输出(Sink)

Flink官方提供了一部分框架的Sink,用户也可以自定义实现Sink。flink将任务进行输出的操作核心代码:stream.addSink(new MySink(xxxx))

Kafka

引入Kafka依赖:

<dependency>

 <groupId>org.apache.flink</groupId>

 <artifactId>flink-connector-kafka-0.11_2.12</artifactId>

 <version>1.10.1</version>

</dependency>


public class SinkTest1_Kafka {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);



        //从文件读取数据

        DataStream<String> inputStream = env.readTextFile("/Volumes/Update/flink/flink_test/src/main/resources/sensor.txt");



        // 转换成SensorReading类型

        DataStream<String> dataStream=inputStream.map(new MapFunction<String, String>() {

            @Override

            public String map(String s) throws Exception {

                String[] fields=s.split(",");

                return new SensorReading(fields[0],new Long(fields[1]),new Double(fields[2])).toString();

            }

        });



        //输出到外部系统

        dataStream.addSink(new FlinkKafkaProducer011<String>("localhost:9092","sinktest",new SimpleStringSchema()));



        env.execute();

    }

}

Redis

引入Redis依赖:

<dependency>

 <groupId>org.apache.bahir</groupId>

 <artifactId>flink-connector-redis_2.11</artifactId>

 <version>1.0</version>

</dependency>


文章来源地址https://www.toymoban.com/news/detail-853055.html

public class SinkTest2_Redis {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);



        //从文件读取数据

        DataStream<String> inputStream = env.readTextFile("/Volumes/Update/flink/flink_test/src/main/resources/sensor.txt");



        // 转换成SensorReading类型

        DataStream<SensorReading> dataStream=inputStream.map(new MapFunction<String, SensorReading>() {

            @Override

            public SensorReading map(String s) throws Exception {

                String[] fields=s.split(",");

                return new SensorReading(fields[0],new Long(fields[1]),new Double(fields[2]));

            }

        });

        // jedis配置

        FlinkJedisPoolConfig config = new FlinkJedisPoolConfig.Builder()

                .setHost("localhost")

                .setPort(6379)

                .build();

        dataStream.addSink(new RedisSink<>(config,new MyRedisMapper()));



        env.execute();

    }

    // 自定义RedisMapper

    public static class MyRedisMapper implements RedisMapper<SensorReading>{

        //自定义保存数据到Redis的命令,存成hash表Hset

        @Override

        public RedisCommandDescription getCommandDescription() {

            return new RedisCommandDescription(RedisCommand.HSET,"sensor_temp");

        }



        @Override

        public String getKeyFromData(SensorReading data) {



c String getKeyFromData(SensorReading data) {

到了这里,关于Flink流处理API大合集:掌握所有flink流处理技术的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Flink:处理大规模复杂数据集的最佳实践深入探究Flink的数据处理和性能优化技术

    作者:禅与计算机程序设计艺术 随着互联网、移动互联网、物联网等新型网络技术的不断发展,企业对海量数据的处理日益依赖,而大数据分析、决策支持、风险控制等领域都需要海量的数据处理能力。如何高效、快速地处理海量数据、提升处理效率、降低成本,是当下处理

    2024年02月13日
    浏览(59)
  • 实时大数据流处理技术:Spark Streaming与Flink的深度对比

    引言 在当前的大数据时代,企业和组织越来越多地依赖于实时数据流处理技术来洞察和响应业务事件。实时数据流处理不仅能够加快数据分析的速度,还能提高决策的效率和准确性。Apache Spark Streaming和Apache Flink是目前两个主要的实时数据流处理框架,它们各自拥有独特的特

    2024年03月10日
    浏览(62)
  • 什么是API网关,解释API网关的作用和特点?解释什么是数据流处理,如Apache Flink和Spark Streaming的应用?

    API网关是一种在分布式系统中的组件,用于管理不同系统之间的通信和交互。API网关的作用是在不同系统之间提供统一的接口和协议,从而简化系统之间的集成和互操作性。 API网关的特点包括: 路由和分发请求:API网关可以根据请求的URL、方法、参数等信息,将请求分发到

    2024年02月11日
    浏览(48)
  • 处理大数据的基础架构,OLTP和OLAP的区别,数据库与Hadoop、Spark、Hive和Flink大数据技术

    2022找工作是学历、能力和运气的超强结合体,遇到寒冬,大厂不招人,可能很多算法学生都得去找开发,测开 测开的话,你就得学数据库,sql,oracle,尤其sql要学,当然,像很多金融企业、安全机构啥的,他们必须要用oracle数据库 这oracle比sql安全,强大多了,所以你需要学

    2024年02月08日
    浏览(61)
  • Flink 学习二 Flink 编程基础API

    如果要使用Scala API ,需要替换 flink-java 为flink-scala_2.12 flink-streaming-java_2.12 为 flink-streaming-scala_2.12 DataStream 代表数据流,可以有界也可以无界 DataStream 类似于 java的集合 ,但是是不可变的immutable ,数据本身不可变 无法对一个 DataStream 进行添加或者删除数据 只可以通过算子对

    2024年02月10日
    浏览(50)
  • Flink 学习六 Flink 窗口计算API

    窗口 window 是处理无限流的核心就是把无界的数据流,按照一定的规则划分成一段一段的有界的数据流(桶),然后再这个有界的数据流里面去做计算; 2.1 滚动窗口 相邻窗口之间是没有数据重合 window 大小可以是时间,可以是数据长度 按照数据流是否可以是 keyed , 在分类,nonkey windo

    2024年02月09日
    浏览(47)
  • Flink学习笔记(二)Flink常用API详解

            提供了对时间和状态的细粒度控制,简洁性和易用性较差,主要应用在对一些复杂事件的处理逻辑上。         要提供了针对流数据和离线数据的处理,对低级API进行了一些封装,提供了filter、sum、max、min等高级函数,简单且易用,所以在工作中应用比较广泛

    2023年04月08日
    浏览(44)
  • 掌握实时数据流:使用Apache Flink消费Kafka数据

            导读:使用Flink实时消费Kafka数据的案例是探索实时数据处理领域的绝佳方式。不仅非常实用,而且对于理解现代数据架构和流处理技术具有重要意义。         Apache Flink  是一个在 有界 数据流和 无界 数据流上进行有状态计算分布式处理引擎和框架。Flink 设计旨

    2024年02月03日
    浏览(81)
  • 【Apache Flink】Flink DataStream API的基本使用

    Flink DataStream API的基本使用 Flink DataStream API主要用于处理无界和有界数据流 。 无界数据流 是一个持续生成数据的数据源,它没有明确的结束点,例如实时的交易数据或传感器数据。这种类型的数据流需要使用Apache Flink的实时处理功能来连续地处理和分析。 有界数据流 是一个

    2024年02月06日
    浏览(39)
  • Flink|《Flink 官方文档 - DataStream API - 概览》学习笔记

    学习文档:Flink 官方文档 - DataStream API - 概览 学习笔记如下: Flink 的 DataStream API: 数据里的起始是各种 source,例如消息队列、socket 流、文件等; 对数据流进行转换,例如过滤、更新状态、定义窗口、聚合等; 结果通过 sink 返回,例如可以将数据写入文件或标准输出。 Da

    2024年01月23日
    浏览(55)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包