【Flink】Flink 的八种分区策略(源码解读)

这篇具有很好参考价值的文章主要介绍了【Flink】Flink 的八种分区策略(源码解读)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Flink 包含 8 种分区策略,这 8 种分区策略(分区器)分别如下面所示,本文将从源码的角度解读每个分区器的实现方式。

flink分区策略,# Flink 核心篇,flink,分区,分区策略,分区器,Partitioner,Partition

  • GlobalPartitioner
  • ShufflePartitioner
  • RebalancePartitioner
  • RescalePartitioner
  • BroadcastPartitioner
  • ForwardPartitioner
  • KeyGroupStreamPartitioner
  • CustomPartitionerWrapper

1.继承关系图

1.1 接口:ChannelSelector

public interface ChannelSelector<T extends IOReadableWritable> {

    /**
     * 初始化channels数量,channel可以理解为下游Operator的某个实例(并行算子的某个subtask).
     */
    void setup(int numberOfChannels);

    /**
     *根据当前的record以及Channel总数,
     *决定应将record发送到下游哪个Channel。
     *不同的分区策略会实现不同的该方法。
     */
    int selectChannel(T record);

    /**
    *是否以广播的形式发送到下游所有的算子实例
     */
    boolean isBroadcast();
}

1.2 抽象类:StreamPartitioner

public abstract class StreamPartitioner<T> implements
        ChannelSelector<SerializationDelegate<StreamRecord<T>>>, Serializable {
    private static final long serialVersionUID = 1L;

    protected int numberOfChannels;

    @Override
    public void setup(int numberOfChannels) {
        this.numberOfChannels = numberOfChannels;
    }

    @Override
    public boolean isBroadcast() {
        return false;
    }

    public abstract StreamPartitioner<T> copy();
}

1.3 继承关系图

flink分区策略,# Flink 核心篇,flink,分区,分区策略,分区器,Partitioner,Partition

2.分区策略

2.1 GlobalPartitioner

该分区器会将所有的数据都发送到下游的某个算子实例(subtask id = 0)。

flink分区策略,# Flink 核心篇,flink,分区,分区策略,分区器,Partitioner,Partition

/**
 * 发送所有的数据到下游算子的第一个task(ID = 0)
 * @param <T>
 */
@Internal
public class GlobalPartitioner<T> extends StreamPartitioner<T> {
    private static final long serialVersionUID = 1L;

    @Override
    public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
        //只返回0,即只发送给下游算子的第一个task
        return 0;
    }

    @Override
    public StreamPartitioner<T> copy() {
        return this;
    }

    @Override
    public String toString() {
        return "GLOBAL";
    }
}

2.2 ShufflePartitioner

随机选择一个下游算子实例进行发送。

flink分区策略,# Flink 核心篇,flink,分区,分区策略,分区器,Partitioner,Partition

/**
 * 随机的选择一个channel进行发送
 * @param <T>
 */
@Internal
public class ShufflePartitioner<T> extends StreamPartitioner<T> {
    private static final long serialVersionUID = 1L;

    private Random random = new Random();

    @Override
    public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
        //产生[0,numberOfChannels)伪随机数,随机发送到下游的某个task
        return random.nextInt(numberOfChannels);
    }

    @Override
    public StreamPartitioner<T> copy() {
        return new ShufflePartitioner<T>();
    }

    @Override
    public String toString() {
        return "SHUFFLE";
    }
}

2.3 BroadcastPartitioner

发送到下游所有的算子实例。

flink分区策略,# Flink 核心篇,flink,分区,分区策略,分区器,Partitioner,Partition

/**
 * 发送到所有的channel
 */
@Internal
public class BroadcastPartitioner<T> extends StreamPartitioner<T> {
    private static final long serialVersionUID = 1L;
    /**
     * Broadcast模式是直接发送到下游的所有task,所以不需要通过下面的方法选择发送的通道
     */
    @Override
    public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
        throw new UnsupportedOperationException("Broadcast partitioner does not support select channels.");
    }

    @Override
    public boolean isBroadcast() {
        return true;
    }

    @Override
    public StreamPartitioner<T> copy() {
        return this;
    }

    @Override
    public String toString() {
        return "BROADCAST";
    }
}

2.4 RebalancePartitioner

通过循环的方式依次发送到下游的 task

flink分区策略,# Flink 核心篇,flink,分区,分区策略,分区器,Partitioner,Partition

/**
 *通过循环的方式依次发送到下游的task
 * @param <T>
 */
@Internal
public class RebalancePartitioner<T> extends StreamPartitioner<T> {
    private static final long serialVersionUID = 1L;

    private int nextChannelToSendTo;

    @Override
    public void setup(int numberOfChannels) {
        super.setup(numberOfChannels);
        //初始化channel的id,返回[0,numberOfChannels)的伪随机数
        nextChannelToSendTo = ThreadLocalRandom.current().nextInt(numberOfChannels);
    }

    @Override
    public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
        //循环依次发送到下游的task,比如:nextChannelToSendTo初始值为0,numberOfChannels(下游算子的实例个数,并行度)值为2
        //则第一次发送到ID = 1的task,第二次发送到ID = 0的task,第三次发送到ID = 1的task上...依次类推
        nextChannelToSendTo = (nextChannelToSendTo + 1) % numberOfChannels;
        return nextChannelToSendTo;
    }

    public StreamPartitioner<T> copy() {
        return this;
    }

    @Override
    public String toString() {
        return "REBALANCE";
    }
}

2.5 RescalePartitioner

基于上下游 Operator 的并行度,将记录以循环的方式输出到下游 Operator 的每个实例。

举例:

  • 上游并行度是 2,下游是 4,则上游一个并行度以循环的方式将记录输出到下游的两个并行度上;上游另一个并行度以循环的方式将记录输出到下游另两个并行度上。
  • 若上游并行度是 4,下游并行度是 2,则上游两个并行度将记录输出到下游一个并行度上;上游另两个并行度将记录输出到下游另一个并行度上。

flink分区策略,# Flink 核心篇,flink,分区,分区策略,分区器,Partitioner,Partition

@Internal
public class RescalePartitioner<T> extends StreamPartitioner<T> {
    private static final long serialVersionUID = 1L;

    private int nextChannelToSendTo = -1;

    @Override
    public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
        if (++nextChannelToSendTo >= numberOfChannels) {
            nextChannelToSendTo = 0;
        }
        return nextChannelToSendTo;
    }

    public StreamPartitioner<T> copy() {
        return this;
    }

    @Override
    public String toString() {
        return "RESCALE";
    }
}

Flink 中的执行图可以分成四层:StreamGraphJobGraphExecutionGraph物理执行图

  • StreamGraph:是根据用户通过 Stream API 编写的代码生成的最初的图。用来表示程序的拓扑结构。
  • JobGraph:StreamGraph 经过优化后生成了 JobGraph,提交给 JobManager 的数据结构。主要的优化为,将多个符合条件的节点 chain 在一起作为一个节点,这样可以减少数据在节点之间流动所需要的序列化 / 反序列化 / 传输消耗。
  • ExecutionGraph:JobManager 根据 JobGraph 生成 ExecutionGraph。ExecutionGraph 是 JobGraph 的并行化版本,是调度层最核心的数据结构。
  • 物理执行图:JobManager 根据 ExecutionGraph 对 Job 进行调度后,在各个 TaskManager 上部署 Task 后形成的 “”,并不是一个具体的数据结构。

StreamingJobGraphGenerator 就是 StreamGraph 转换为 JobGraph。在这个类中,把 ForwardPartitionerRescalePartitioner 列为 POINTWISE 分配模式,其他的为 ALL_TO_ALL 分配模式。代码如下:

if (partitioner instanceof ForwardPartitioner || partitioner instanceof RescalePartitioner) {
            jobEdge = downStreamVertex.connectNewDataSetAsInput(
                headVertex,
                // 上游算子(生产端)的实例(subtask)连接下游算子(消费端)的一个或者多个实例(subtask)
                DistributionPattern.POINTWISE,
                resultPartitionType);
        } else {
            jobEdge = downStreamVertex.connectNewDataSetAsInput(
                headVertex,
                // 上游算子(生产端)的实例(subtask)连接下游算子(消费端)的所有实例(subtask)
                DistributionPattern.ALL_TO_ALL,
                resultPartitionType);
        }

2.6 ForwardPartitioner

发送到下游对应的第一个 task,保证上下游算子并行度一致,即上游算子与下游算子是 1 : 1 1:1 1:1 的关系。

flink分区策略,# Flink 核心篇,flink,分区,分区策略,分区器,Partitioner,Partition

/**
 * 发送到下游对应的第一个task
 * @param <T>
 */
@Internal
public class ForwardPartitioner<T> extends StreamPartitioner<T> {
    private static final long serialVersionUID = 1L;

    @Override
    public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
        return 0;
    }

    public StreamPartitioner<T> copy() {
        return this;
    }

    @Override
    public String toString() {
        return "FORWARD";
    }
}

在上下游的算子没有指定分区器的情况下,如果上下游的算子并行度一致,则使用 ForwardPartitioner,否则使用 RebalancePartitioner,对于 ForwardPartitioner,必须保证上下游算子并行度一致,否则会抛出异常。

//在上下游的算子没有指定分区器的情况下,如果上下游的算子并行度一致,则使用ForwardPartitioner,否则使用RebalancePartitioner
if (partitioner == null && upstreamNode.getParallelism() == downstreamNode.getParallelism()) {
    partitioner = new ForwardPartitioner<Object>();
} else if (partitioner == null) {
    partitioner = new RebalancePartitioner<Object>();
}

if (partitioner instanceof ForwardPartitioner) {
    //如果上下游的并行度不一致,会抛出异常
    if (upstreamNode.getParallelism() != downstreamNode.getParallelism()) {
        throw new UnsupportedOperationException("Forward partitioning does not allow " +
            "change of parallelism. Upstream operation: " + upstreamNode + " parallelism: " + upstreamNode.getParallelism() +
            ", downstream operation: " + downstreamNode + " parallelism: " + downstreamNode.getParallelism() +
            " You must use another partitioning strategy, such as broadcast, rebalance, shuffle or global.");
    }
}

2.7 KeyGroupStreamPartitioner

根据 key 的分组索引选择发送到相对应的下游 subtask

flink分区策略,# Flink 核心篇,flink,分区,分区策略,分区器,Partitioner,Partition

  • org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner
/**
 * 根据key的分组索引选择发送到相对应的下游subtask
 * @param <T>
 * @param <K>
 */
@Internal
public class KeyGroupStreamPartitioner<T, K> extends StreamPartitioner<T> implements ConfigurableStreamPartitioner {
...

    @Override
    public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
        K key;
        try {
            key = keySelector.getKey(record.getInstance().getValue());
        } catch (Exception e) {
            throw new RuntimeException("Could not extract key from " + record.getInstance().getValue(), e);
        }
        //调用KeyGroupRangeAssignment类的assignKeyToParallelOperator方法,代码如下所示
        return KeyGroupRangeAssignment.assignKeyToParallelOperator(key, maxParallelism, numberOfChannels);
    }
...
}
  • org.apache.flink.runtime.state.KeyGroupRangeAssignment
public final class KeyGroupRangeAssignment {
...

    /**
     * 根据key分配一个并行算子实例的索引,该索引即为该key要发送的下游算子实例的路由信息,
     * 即该key发送到哪一个task
     */
    public static int assignKeyToParallelOperator(Object key, int maxParallelism, int parallelism) {
        Preconditions.checkNotNull(key, "Assigned key must not be null!");
        return computeOperatorIndexForKeyGroup(maxParallelism, parallelism, assignToKeyGroup(key, maxParallelism));
    }

    /**
     *根据key分配一个分组id(keyGroupId)
     */
    public static int assignToKeyGroup(Object key, int maxParallelism) {
        Preconditions.checkNotNull(key, "Assigned key must not be null!");
        //获取key的hashcode
        return computeKeyGroupForKeyHash(key.hashCode(), maxParallelism);
    }

    /**
     * 根据key分配一个分组id(keyGroupId),
     */
    public static int computeKeyGroupForKeyHash(int keyHash, int maxParallelism) {

        //与maxParallelism取余,获取keyGroupId
        return MathUtils.murmurHash(keyHash) % maxParallelism;
    }

    //计算分区index,即该key group应该发送到下游的哪一个算子实例
    public static int computeOperatorIndexForKeyGroup(int maxParallelism, int parallelism, int keyGroupId) {
        return keyGroupId * parallelism / maxParallelism;
    }
...

2.8 CustomPartitionerWrapper

通过 Partitioner 实例的 Partition 方法(自定义的)将记录输出到下游。

public class CustomPartitionerWrapper<K, T> extends StreamPartitioner<T> {
    private static final long serialVersionUID = 1L;

    Partitioner<K> partitioner;
    KeySelector<T, K> keySelector;

    public CustomPartitionerWrapper(Partitioner<K> partitioner, KeySelector<T, K> keySelector) {
        this.partitioner = partitioner;
        this.keySelector = keySelector;
    }

    @Override
    public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
        K key;
        try {
            key = keySelector.getKey(record.getInstance().getValue());
        } catch (Exception e) {
            throw new RuntimeException("Could not extract key from " + record.getInstance(), e);
        }
		//实现Partitioner接口,重写partition方法
        return partitioner.partition(key, numberOfChannels);
    }

    @Override
    public StreamPartitioner<T> copy() {
        return this;
    }

    @Override
    public String toString() {
        return "CUSTOM";
    }
}

比如:文章来源地址https://www.toymoban.com/news/detail-845960.html

public class CustomPartitioner implements Partitioner<String> {
      // key: 根据key的值来分区
      // numPartitions: 下游算子并行度
      @Override
      public int partition(String key, int numPartitions) {
         return key.length() % numPartitions;//在此处定义分区策略
      }
  }

到了这里,关于【Flink】Flink 的八种分区策略(源码解读)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • STM32的八种工作模式

    STM32单片机具有高性能、低成本、低功耗的优点,与它打交道就必须先了解它的几种工作模式,它共有八种IO口模式,分别是:模拟输入、浮空输入、上拉输入、下拉输入、开漏输出、推挽输出、复用开漏输出和复用推挽输出。 在这八种工作模式中分为: 1、四种 输入 模式

    2024年02月10日
    浏览(28)
  • Spring 事务失效的八种场景

    原因:Spring 默认只会回滚非检查异常 解法:配置 rollbackFor 属性 @Transactional(rollbackFor = Exception.class) 原因:事务通知只有捉到了目标抛出的异常,才能进行后续的回滚处理,如果目标自己处理掉异常,事务通知无法知悉 解法1:异常原样抛出: 在 catch 块添加 throw new RuntimeExc

    2024年02月14日
    浏览(26)
  • python安装包(模块)的八种方法

    easy_install 这应该是最古老的包安装方式了,目前基本没有人使用了。下面是 easy_install 的一些安装示例 pip 是最主流的包管理方案,使用 pip install xxx 就可以从 PYPI 上搜索并安装 xxx (如果该包存在的话)。 下面仅列出一些常用的 pip install 的安装示例 更多 pip 的使用方法,可

    2024年02月05日
    浏览(24)
  • [数据挖掘] 数据分析的八种方法

    不 同类型的数据分析包括描述性、诊断性、探索性、推理性、预测性、因果性、机械性和规范性。以下是您需要了解的有关每个的信息。本文对于前人归纳的8种进行叙述。

    2024年02月13日
    浏览(44)
  • HTTP/1.1协议中的八种请求

    2023年8月29日,周二晚上 目录 概述八种请求 GET请求 POST请求 PUT请求 PATCH请求 DELETE请求 HEAD请求 OPTIONS请求 TRACE请求  HTTP/1.1协议中定义了8种常用的请求方法,分别是: 1. GET 用途:请求指定的页面信息,并返回实体主体。 例子:获取一个网页、图片等静态内容。 2. POST  用途:向指定

    2024年02月09日
    浏览(25)
  • Mysql 提升索引效率优化的八种方法

    目录 1. 选择唯一性索引 2. 为经常需要排序、分组和联合操作的字段建立索引 3. 为常作为查询条件的字段建立索引 4. 限制索引的数目 5. 尽量使用数据量少的索引 6. 数据量小的表最好不要使用索引 7. 尽量使用前缀来索引 8. 删除不再使用或者很少使用的索引 总结 索引的设计可

    2024年04月26日
    浏览(23)
  • Selenium元素定位的八种方法(建议收藏)

    自动化一般需要四步操作:获取元素,操作元素,获取返回结果,断言(返回结果与期望结果是否一致),最后自动出测试报告。Selenium提供8种元素定位的方法:id,name,class name,link text,xpath,css selector,tag name ,partial link tex。 这八种元素定位方法用python语言表示为: find_element_b

    2024年02月09日
    浏览(27)
  • STM32 (三)GPIO的八种模式及其原理

    GPIO就是通用I/O(输入/输出)端口,是STM32可控制的引脚。 STM32芯片的GPIO引脚与外部设备连接起来,可实现与外部通讯、控制外部硬件或者采集外部硬件数据的功能。 1. 四种输入模式     GPIO_Mode_IN_FLOATING 浮空输入模式     GPIO_Mode_IPU 上拉输入模式     GPIO_Mode_IPD 下拉输入模式

    2024年02月02日
    浏览(26)
  • 单例模式的八种写法、单例和并发的关系

    为什么需要单例? 节省内存和计算 保证结果正确 方便管理 无状态的工具类:比如日志工具类,不管是在哪里使用,我们需要的只是它帮我们记录日志信息,除此之外,并不需要在它的实例对象上存储任何状态,这时候我们就只需要一个实例对象即可。 全局信息类:比如我

    2024年01月18日
    浏览(37)
  • Java List集合取交集的八种不同实现方式

    码到三十五 : 个人主页 心中有诗画,指尖舞代码,目光览世界,步履越千山,人间尽值得 ! 在Java中,取两个List集合的交集可以通过多种方式实现,包括使用Java 8的Stream API、传统的for循环遍历、使用集合的retainAll方法,以及使用Apache Commons Collections库等。 方法一:使用Jav

    2024年04月12日
    浏览(25)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包