大数据-玩转数据-Flink-Transform(上)

这篇具有很好参考价值的文章主要介绍了大数据-玩转数据-Flink-Transform(上)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、Transform

转换算子可以把一个或多个DataStream转成一个新的DataStream.程序可以把多个复杂的转换组合成复杂的数据流拓扑.
大数据-玩转数据-Flink-Transform(上),大数据-玩转数据-FLINK,大数据,flink

二、基本转换算子

2.1、map(映射)

将数据流中的数据进行转换, 形成新的数据流,消费一个元素并产出一个元素

package com.lyh.flink05;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;

public class Transform_map {
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment svn = ExecutionEnvironment.getExecutionEnvironment();
        DataSource<Integer> s_num = svn.fromElements(1, 2, 3, 4, 5);
        s_num.map(new MapFunction<Integer, Integer>() {
            @Override
            public Integer map(Integer values) throws Exception {
                return values*values;
            }
        }).print();

    }
}

2.2、filter(过滤)

根据指定的规则将满足条件(true)的数据保留,不满足条件(false)的数据丢弃

package com.lyh.flink05;

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;

public class Transform_filter {
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment svn = ExecutionEnvironment.getExecutionEnvironment();
        DataSource<Integer> elements = svn.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9);
//        elements.filter(new FilterFunction<Integer>() {
//            @Override
//            public boolean filter(Integer integer) throws Exception {
//                if (integer % 2 == 0 )
//                        return false;
//                        else {
//                            return true;
//                }
//            }
//        }).print();
        elements.filter(value -> value % 2 != 0).print();
    }
}

2.3、flatMap(扁平映射)

消费一个元素并产生零个或多个元素

package com.lyh.flink05;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.util.Collector;

import java.util.concurrent.ExecutionException;

public class flatMap {
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment svn = ExecutionEnvironment.getExecutionEnvironment();
        DataSource<Integer> dataSource = svn.fromElements(1, 2, 3);
        dataSource.flatMap(new FlatMapFunction<Integer, Integer>() {
            @Override
            public void flatMap(Integer integer, Collector<Integer> collector) throws Exception {
                collector.collect(integer*integer);
                collector.collect(integer*integer*integer);
            }
        }).print();
    }
}

三、聚合算子

3.1、keyBy(按键分区)

把流中的数据分到不同的分区(并行度)中.具有相同key的元素会分到同一个分区中

package com.lyh.flink05;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class keyBy_s {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)
        env.fromElements(Tuple2.of(2L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(2L, 4L), Tuple2.of(1L, 2L))
                .keyBy(0) // 以数组的第一个元素作为key
                .map((MapFunction<Tuple2<Long, Long>, String>) longLongTuple2 -> "key:" + longLongTuple2.f0 + ",value:" + longLongTuple2.f1)
                .print();

        env.execute("execute");
    }
}

3.2、sum,min,max,minBy,maxBy(简单聚合)

KeyedStream的每一个支流做聚合。执行完成后,会将聚合的结果合成一个流返回,所以结果都是DataStream

package com.lyh.flink05;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class keyBy_s {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)
        env.fromElements(Tuple2.of(2L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(2L, 4L), Tuple2.of(1L, 2L))
                .keyBy(0) // 以数组的第一个元素作为key
                .sum(1)
                .print();

        env.execute("execute");
    }
}

3.3、reduce(归约聚合)

一个分组数据流的聚合操作,合并当前的元素和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果。
大数据-玩转数据-Flink-Transform(上),大数据-玩转数据-FLINK,大数据,flink

package com.lyh.flink05;

import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class keyByReduce {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);
        env.fromElements(Tuple2.of(1L,2L),Tuple2.of(2L,4L),Tuple2.of(2L,9L),Tuple2.of(1L,9L),Tuple2.of(1L,2L),Tuple2.of(2L,3L))
                .keyBy(0)
                .reduce(new ReduceFunction<Tuple2<Long, Long>>() {
                    @Override
                    public Tuple2<Long, Long> reduce(Tuple2<Long, Long> values1, Tuple2<Long, Long> values2) throws Exception {
                        return new Tuple2<>(values1.f0,values1.f1+values2.f1);
                    }
                })
                .print();
        env.execute();
    }
}

3.4、process(底层处理)

process算子在Flink算是一个比较底层的算子, 很多类型的流上都可以调用, 可以从流中获取更多的信息(不仅仅数据本身),process 用法比较灵活,后面再做专门研究。

package com.lyh.flink05;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

public class Process_s {

    public static void main(String[] args) throws Exception {

        // 获取环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<Integer> streamSource = env.fromElements(1, 2, 3, 4, 5);
        SingleOutputStreamOperator<Integer> processed = streamSource.process(new ProcessFunction<Integer, Integer>() {
            @Override
            public void processElement(Integer value, Context ctx, Collector<Integer> out) throws Exception {
                if (value % 3 == 0) {
                    //测流数据
                    ctx.output(new OutputTag<Integer>("3%0", TypeInformation.of(Integer.class)), value);
                }
                if (value % 3 == 1) {
                    //测流数据
                    ctx.output(new OutputTag<Integer>("3%1", TypeInformation.of(Integer.class)), value);
                }
                //主流 ,数据
                out.collect(value);
            }
        });

        DataStream<Integer> output0 = processed.getSideOutput(new OutputTag<>("3%0",TypeInformation.of(Integer.class)));
        DataStream<Integer> output1 = processed.getSideOutput(new OutputTag<>("3%1",TypeInformation.of(Integer.class)));
        output1.print();

        env.execute();
    }
}

四、合流算子

4.1、connect(连接)

在某些情况下,我们需要将两个不同来源的数据流进行连接,实现数据匹配,比如订单支付和第三方交易信息,这两个信息的数据就来自于不同数据源,连接后,将订单支付和第三方交易信息进行对账,此时,才能算真正的支付完成。
Flink中的connect算子可以连接两个保持他们类型的数据流,两个数据流被connect之后,只是被放在了一个同一个流中,内部依然保持各自的数据和形式不发生任何变化,两个流相互独立。文章来源地址https://www.toymoban.com/news/detail-631749.html

package com.lyh.flink05;

import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.kafka.common.protocol.types.Field;

public class connect_s {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<Integer> data1 = env.fromElements(1, 2, 3, 4, 5);
        DataStreamSource<String> data2 = env.fromElements("a", "b", "c");
        ConnectedStreams<Integer, String> data3 = data1.connect(data2);
        data3.getFirstInput().print("data1");
        data3.getSecondInput().print("data2");
        env.execute();
    }
}

4.2、union(合并)

package com.lyh.flink05;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class union_s {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<Integer> data1 = env.fromElements(1, 2, 3);
        DataStreamSource<Integer> data2 = env.fromElements(555,666);
        DataStreamSource<Integer> data3 = env.fromElements(999);
        DataStream<Integer> data = data1.union(data2).union(data3);
        data.print();
        env.execute();
    }
}

到了这里,关于大数据-玩转数据-Flink-Transform(上)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 大数据-玩转数据-Flink RedisSink

    具体版本根据实际情况确定 参见大数据-玩转数据-Redis 安装与使用 可以根据要写入的redis的不同数据类型进行调整

    2024年02月13日
    浏览(38)
  • 大数据-玩转数据-Flink状态编程(上)

    有状态的计算是流处理框架要实现的重要功能,因为稍复杂的流处理场景都需要记录状态,然后在新流入数据的基础上不断更新状态。 SparkStreaming在状态管理这块做的不好, 很多时候需要借助于外部存储(例如Redis)来手动管理状态, 增加了编程的难度。 Flink的状态管理是它的优

    2024年02月09日
    浏览(44)
  • 大数据-玩转数据-Flink恶意登录监控

    对于网站而言,用户登录并不是频繁的业务操作。如果一个用户短时间内频繁登录失败,就有可能是出现了程序的恶意攻击,比如密码暴力破解。 因此我们考虑,应该对用户的登录失败动作进行统计,具体来说,如果同一用户(可以是不同IP)在2秒之内连续两次登录失败,就

    2024年02月07日
    浏览(41)
  • 大数据-玩转数据-Flink定时器

    基于处理时间或者事件时间处理过一个元素之后, 注册一个定时器, 然后指定的时间执行. Context和OnTimerContext所持有的TimerService对象拥有以下方法: currentProcessingTime(): Long 返回当前处理时间 currentWatermark(): Long 返回当前watermark的时间戳 registerProcessingTimeTimer(timestamp: Long): Unit 会注

    2024年02月10日
    浏览(36)
  • 大数据-玩转数据-Flink 网站UV统计

    在实际应用中,我们往往会关注,到底有多少不同的用户访问了网站,所以另外一个统计流量的重要指标是网站的独立访客数(Unique Visitor,UV)。 对于UserBehavior数据源来说,我们直接可以根据userId来区分不同的用户。 将userid放到SET集合里面,统计集合长度,便可以统计到网

    2024年02月11日
    浏览(45)
  • 大数据-玩转数据-Flink状态后端(下)

    每传入一条数据,有状态的算子任务都会读取和更新状态。由于有效的状态访问对于处理数据的低延迟至关重要,因此每个并行任务(子任务)都会在本地维护其状态,以确保快速的状态访问。 状态的存储、访问以及维护,由一个可插入的组件决定,这个组件就叫做状态后端(

    2024年02月09日
    浏览(44)
  • 大数据-玩转数据-Flink时间滚动动窗口

    在流处理应用中,数据是连续不断的,因此我们不可能等到所有数据都到了才开始处理。当然我们可以每来一个消息就处理一次,但是有时我们需要做一些聚合类的处理,例如:在过去的1分钟内有多少用户点击了我们的网页。在这种情况下,我们必须定义一个窗口,用来收集

    2024年02月11日
    浏览(45)
  • 大数据-玩转数据-FLINK-从kafka消费数据

    大数据-玩转数据-Kafka安装 运行本段代码,等待kafka产生数据进行消费。

    2024年02月14日
    浏览(36)
  • 大数据-玩转数据-Flink 海量数据实时去重

    大数据|阿里实时计算|Flink 借助redis的Set,需要频繁连接Redis,如果数据量过大, 对redis的内存也是一种压力;使用Flink的MapState,如果数据量过大, 状态后端最好选择 RocksDBStateBackend; 使用布隆过滤器,布隆过滤器可以大大减少存储的数据的数据量。 如果想判断一个元素是不

    2024年02月07日
    浏览(37)
  • 玩转数据-大数据-Flink SQL 中的时间属性

    时间属性是大数据中的一个重要方面,像窗口(在 Table API 和 SQL )这种基于时间的操作,需要有时间信息。我们可以通过时间属性来更加灵活高效地处理数据,下面我们通过处理时间和事件时间来探讨一下Flink SQL 时间属性。 2.1、准备WaterSensor类,方便使用 2.2、DataStream 到

    2024年02月07日
    浏览(43)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包