flink重温笔记(四):Flink 流批一体 API 开发——物理分区(上)

这篇具有很好参考价值的文章主要介绍了flink重温笔记(四):Flink 流批一体 API 开发——物理分区(上)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Flink学习笔记

前言:今天是学习flink的第四天啦!学习了物理分区的知识点,这一次学习了前4个简单的物理分区,称之为简单分区篇!
Tips:我相信自己会越来会好的,明天攻克困难分区篇,加油!

二、Flink 流批一体 API 开发

3. 物理分区

3.1 Global Partitioner

该分区器会将所有的数据都发送到下游的某个算子实例(subtask id = 0)

flink重温笔记(四):Flink 流批一体 API 开发——物理分区(上),Flink重温笔记,flink,笔记,大数据,学习方法,数据仓库

实例:编写Flink程序,接收socket的单词数据,以进程标记查看分区数据情况。

package cn.itcast.day04.partition;

import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

/**
 * @author lql
 * @time 2024-02-15 22:54:35
 * @description TODO
 */
public class GlobalPartitioningDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());

        //Source是一个非并行的Source
        //并行度是1
        DataStreamSource<String> lines = env.socketTextStream("node1", 9999);

        // 对每个输入的数据进行映射处理,给每个单词添加上一个字符串以及当前所在的子任务编号
        SingleOutputStreamOperator<String> mapped = lines.map(new RichMapFunction<String, String>() {
            @Override
            public String map(String value) throws Exception {
                int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
                return value + " : " + indexOfThisSubtask;
            }
        }).setParallelism(3); // 针对算子将并行度设置为 3;

        // 对数据流进行 global,将其随机均匀地划分到每个分区中
        DataStream<String> global = mapped.global();

        // 定义一个 sink 函数,输出每个单词和所在的子任务编号
        global.addSink(new RichSinkFunction<String>(){
            @Override
            public void invoke(String value, Context context) throws Exception {
                int index = getRuntimeContext().getIndexOfThisSubtask();
                System.out.println(value + "->" + index);
            }
        });

        env.execute();
    }
}

结果:

hadoop : 1->0
hadoop : 2->0
hadoop : 0->0
spark : 1->0
spark : 2->0

总结:

  • 1- 多个进程处理的数据,汇总到 sink 第一个分区第一个进程
  • 2- 数据多出梳理,合并一处的现象
  • 3- getRuntimeContext()方法在 Rich Function 中,最后的 addSink()用心良苦!
  • 4- 并行任务之间共享相同状态的场景,如全局计数器等
3.2 Shuffer Partition

根据均匀分布随机划分元素。

flink重温笔记(四):Flink 流批一体 API 开发——物理分区(上),Flink重温笔记,flink,笔记,大数据,学习方法,数据仓库

实例:编写Flink程序,接收socket的单词数据,并将每个字符串均匀的随机划分到每个分区。

package cn.itcast.day04.partition;

import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

/**
 * @author lql
 * @time 2024-02-15 23:26:49
 * @description TODO:编写Flink程序,接收socket的单词数据,并将每个字符串均匀的随机划分到每个分区
 */
public class ShufflePartitioningDemo {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());

        //Source是一个非并行的Source
        //并行度是1
        DataStreamSource<String> lines = env.socketTextStream("node1", 9999);

        //并行度2
        SingleOutputStreamOperator<String> mapped = lines.map(new RichMapFunction<String, String>() {
            @Override
            public String map(String value) throws Exception {
                int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
                return value + " : " + indexOfThisSubtask;
            }
        }).setParallelism(1);

        //shuffle!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
        DataStream<String> shuffled = mapped.shuffle();

        shuffled.addSink(new RichSinkFunction<String>() {
            @Override
            public void invoke(String value, Context context) throws Exception {
                int index = getRuntimeContext().getIndexOfThisSubtask();
                System.out.println(value + " -> " + index);
            }
        });
        env.execute();
    }
}

结果:

结果现象:(没有规律)
           hadoop : 0 -> 0
           hadoop : 0 -> 4
           flink  : 0 -> 6
           flink  : 0 -> 7

总结:

  • 1- 它将数据均匀地分配到下游任务的每个并行实例中,然后再对每个并行任务的数据进行分区
  • 2- 这种分发方式适用于数据量比较大的场景,可以减少网络传输压力和降低数据倾斜的概率。
3.3 Broadcast Partition

发送到下游所有的算子实例

flink重温笔记(四):Flink 流批一体 API 开发——物理分区(上),Flink重温笔记,flink,笔记,大数据,学习方法,数据仓库

实例:编写Flink程序,接收socket的单词数据,并将每个字符串广播到每个分区。

package cn.itcast.day04.partition;

import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

/**
 * @author lql
 * @time 2024-02-15 23:35:59
 * @description TODO
 */
public class BroadcastPartitioningDemo {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());

        //Source是一个非并行的Source
        //并行度是1
        DataStreamSource<String> lines = env.socketTextStream("node1", 9999);

        //并行度2
        SingleOutputStreamOperator<String> mapped = lines.map(new RichMapFunction<String, String>() {
            @Override
            public String map(String value) throws Exception {
                int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
                return value + " : " + indexOfThisSubtask;
            }
        }).setParallelism(1);

        //广播,上游的算子将一个数据广播到下游所以的subtask
        DataStream<String> shuffled = mapped.broadcast();

        shuffled.addSink(new RichSinkFunction<String>() {
            @Override
            public void invoke(String value, Context context) throws Exception {
                int index = getRuntimeContext().getIndexOfThisSubtask();
                System.out.println(value + " -> " + index);
            }
        });

        env.execute();
    }
}

结果:

hadoop : 0 -> 0
hadoop : 0 -> 2
hadoop : 0 -> 1
hadoop : 0 -> 3
hadoop : 0 -> 4
hadoop : 0 -> 6
hadoop : 0 -> 5
hadoop : 0 -> 7
spark : 0 -> 3
spark : 0 -> 2
spark : 0 -> 6
spark : 0 -> 4
spark : 0 -> 0
spark : 0 -> 1
spark : 0 -> 5
spark : 0 -> 7

总结:

  • 均匀广播数据
3.4 Rebalance Partition

通过循环的方式依次发送到下游的task

flink重温笔记(四):Flink 流批一体 API 开发——物理分区(上),Flink重温笔记,flink,笔记,大数据,学习方法,数据仓库

实例:轮询发送数据

package cn.itcast.day04.partition;

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @author lql
 * @time 2024-02-15 23:41:46
 * @description TODO: flink的数据倾斜解决方案:轮询发送(当设置并行度为1时)
 */
public class RebalanceDemo {
    public static void main(String[] args) throws Exception {
        /**
         * 构建批处理运行环境
         * 使用 env.generateSequence 创建0-100的并行数据
         * 使用 fiter 过滤出来 大于8 的数字
         * 使用map操作传入 RichMapFunction ,将当前子任务的ID和数字构建成一个元组
         * 在RichMapFunction中可以使用 getRuntimeContext.getIndexOfThisSubtask 获取子任务序号
         * 打印测试
         */
        //TODO 构建批处理运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //TODO 使用 env.generateSequence 创建0-100的并行数据
        DataStream<Long> dataSource = env.generateSequence(0, 100);

        //TODO 使用 fiter 过滤出来 大于8 的数字
        SingleOutputStreamOperator<Long> filteredDataSource = dataSource.filter(new FilterFunction<Long>() {
            @Override
            public boolean filter(Long aLong) throws Exception {
                return aLong > 8;
            }
        });

        //解决数据倾斜的问题
        DataStream<Long> rebalance = filteredDataSource.rebalance();

        //TODO 使用map操作传入 RichMapFunction ,将当前子任务的ID和数字构建成一个元组
        //查看92条数据分别被哪些线程处理的,可以看到每个线程处理的数据条数
        //spark中查看数据属于哪个分区使用哪个函数?mapPartitionWithIndex
        //TODO 在RichMapFunction中可以使用 getRuntimeContext.getIndexOfThisSubtask 获取子任务序号
        SingleOutputStreamOperator<Tuple2<Long, Integer>> tuple2MapOperator = rebalance.map(new RichMapFunction<Long, Tuple2<Long, Integer>>() {
            @Override
            public Tuple2<Long, Integer> map(Long aLong) throws Exception {
                return Tuple2.of(aLong, getRuntimeContext().getIndexOfThisSubtask());
            }
        });

        //TODO 打印测试
        tuple2MapOperator.print();

        env.execute();
    }
}

结果:

 * 0-0
 * 0-1
 * 0-2
 * 0-0
 * 0-1
 * 0-2

总结:文章来源地址https://www.toymoban.com/news/detail-825254.html

  • 1- 轮询发送数据
  • 2- 解决数据倾斜问题

到了这里,关于flink重温笔记(四):Flink 流批一体 API 开发——物理分区(上)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Flink流批一体计算(10):PyFlink Tabel API

    简述 PyFlink 是 Apache Flink 的 Python API ,你可以使用它构建可扩展的批处理和流处理任务,例如实时数据处理管道、大规模探索性数据分析、机器学习( ML )管道和 ETL 处理。 如果你对 Python 和 Pandas 等库已经比较熟悉,那么 PyFlink 可以让你更轻松地利用 Flink 生态系统的全部功

    2024年02月11日
    浏览(28)
  • Flink流批一体计算(16):PyFlink DataStream API

    目录 概述 Pipeline Dataflow 代码示例WorldCount.py 执行脚本WorldCount.py 概述 Apache Flink 提供了 DataStream API,用于构建健壮的、有状态的流式应用程序。它提供了对状态和时间细粒度控制,从而允许实现高级事件驱动系统。 用户实现的Flink程序是由Stream和Transformation这两个基本构建块组

    2024年02月11日
    浏览(36)
  • Flink流批一体计算(20):DataStream API和Table API互转

    目录 举个例子 连接器 下载连接器(connector)和格式(format)jar 包 依赖管理  如何使用连接器 举个例子 StreamExecutionEnvironment 集成了DataStream API,通过额外的函数扩展了TableEnvironment。 下面代码演示两种API如何互转 TableEnvironment 将采用StreamExecutionEnvironment所有的配置选项。 建

    2024年02月10日
    浏览(32)
  • Flink流批一体计算(19):PyFlink DataStream API之State

    目录 keyed state Keyed DataStream 使用 Keyed State 实现了一个简单的计数窗口 状态有效期 (TTL) 过期数据的清理 全量快照时进行清理 增量数据清理 在 RocksDB 压缩时清理 Operator State算子状态 Broadcast State广播状态 keyed state Keyed DataStream 使用 keyed state,首先需要为DataStream指定 key(主键)

    2024年02月10日
    浏览(26)
  • Flink流批一体计算(17):PyFlink DataStream API之StreamExecutionEnvironment

    目录 StreamExecutionEnvironment Watermark watermark策略简介 使用 Watermark 策略 内置水印生成器 处理空闲数据源 算子处理 Watermark 的方式 创建DataStream的方式 通过list对象创建 ​​​​​​使用DataStream connectors创建 使用Table SQL connectors创建 StreamExecutionEnvironment 编写一个 Flink Python DataSt

    2024年02月11日
    浏览(34)
  • Flink流批一体计算(12):PyFlink Tabel API之构建作业

    目录 1.创建源表和结果表。 创建及注册表名分别为 source 和 sink 的表 使用 TableEnvironment.execute_sql() 方法,通过 DDL 语句来注册源表和结果表 2. 创建一个作业 3. 提交作业Submitting PyFlink Jobs 1.创建源表和结果表。 创建及注册表名分别为 source 和 sink 的表 其中,源表 source 有一列

    2024年02月13日
    浏览(35)
  • Flink流批一体计算(11):PyFlink Tabel API之TableEnvironment

    目录 概述 设置重启策略 什么是flink的重启策略(Restartstrategy) flink的重启策略(Restartstrategy)实战 flink的4种重启策略 FixedDelayRestartstrategy(固定延时重启策略) FailureRateRestartstrategy(故障率重启策略) NoRestartstrategy(不重启策略) 配置State Backends 以及 Checkpointing Checkpoint 启用和配置

    2024年02月13日
    浏览(31)
  • Flink流批一体计算(14):PyFlink Tabel API之SQL查询

    举个例子 查询 source 表,同时执行计算 Table API 查询 Table 对象有许多方法,可以用于进行关系操作。 这些方法返回新的 Table 对象,表示对输入 Table 应用关系操作之后的结果。 这些关系操作可以由多个方法调用组成,例如 table.group_by(...).select(...)。 Table API 文档描述了流和批

    2024年02月12日
    浏览(30)
  • Flink流批一体计算(18):PyFlink DataStream API之计算和Sink

    目录 1. 在上节数据流上执行转换操作,或者使用 sink 将数据写入外部系统。 2. File Sink File Sink Format Types  Row-encoded Formats  Bulk-encoded Formats  桶分配 滚动策略 3. 如何输出结果 Print 集合数据到客户端,execute_and_collect方法将收集数据到客户端内存 将结果发送到DataStream sink conne

    2024年02月11日
    浏览(29)
  • Flink流批一体计算(13):PyFlink Tabel API之SQL DDL

    1. TableEnvironment 创建 TableEnvironment TableEnvironment 是 Table API 和 SQL 集成的核心概念。 TableEnvironment 可以用来: ·创建 Table ·将 Table 注册成临时表 ·执行 SQL 查询 ·注册用户自定义的 (标量,表值,或者聚合) 函数 ·配置作业 ·管理 Python 依赖 ·提交作业执行 创建 source 表 创建 sink

    2024年02月12日
    浏览(30)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包