Flink异步IO初步了解

这篇具有很好参考价值的文章主要介绍了Flink异步IO初步了解。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

        之前使用Flink查询Redis数据的过程中,由于对数据一致性的要求并不是很高,当时是用MapFunction +  State 的方案。先缓存一大堆数据到State中,达到一定数量之后,将批量Key提交到Redis中进行查询。

        由于Redis性能极高,所以并没有出现什么问题,后来了解到了Flink异步IO机制,感觉使用异步IO机制实现会更加优雅一点。本文就是记录下自己对Flink异步IO的一个初步认识。

异步算子主要应用于和外部系统交互,提高吞吐量,减少等待延迟。用户只需关注业务逻辑即可,消息顺序性和一致性由Flink框架来处理:

图来自官网:

       Flink异步IO初步了解,flink,大数据

异步IO支持输出无序和有序,也支持watermark以及ExactlyOnce语义:

异步IO的核心代码都在AsyncWaitOperator里面:

switch (outputMode) {
   case ORDERED:
      queue = new OrderedStreamElementQueue<>(capacity);
      break;
   case UNORDERED:
      queue = new UnorderedStreamElementQueue<>(capacity);
      break;
   default:
      throw new IllegalStateException("Unknown async mode: " + outputMode + '.');
}

        orderedWait(有序):消息的发送顺序与接收到的顺序完全相同(包括 watermark )。

        unorderWait(无序):在ProcessingTime中是完全无序的,即哪个先完成先发送(最低延迟和消耗);在EventTime中,以watermark为边界,介于两个watermark之间的消息是乱序的,但是多个watermark之间的消息是有序的。

        异步IO处理内部会执行 userFunction.asyncInvoke(element.getValue(), resultHandler) 调用用户自己编写的方法来处理数据。userFunction就是用户自己编写的自定义方法。resultHandler就是用户在完成异步调用自己,如何把结果传入到异步IO算子中:

(ps: userFunction是基于CompletableFuture来完成开发的。CompletableFuture是 Java 8 中引入的一个类,它实现了CompletionStage接口,提供了一组丰富的方法来处理异步操作和多个任务的结果。它支持链式操作,可以方便地处理任务的依赖关系和结果转换。相比于传统的Future接口,CompletableFuture更加灵活和强大。具体demo可以看官网示例 或者 看下面参考中的链接)

@Override
public void processElement(StreamRecord<IN> element) throws Exception {
   // add element first to the queue
   final ResultFuture<OUT> entry = addToWorkQueue(element);

   // 这里的ResultHandler就是对数据和ResultFuture的一个封装
   final ResultHandler resultHandler = new ResultHandler(element, entry);

   // register a timeout for the entry if timeout is configured
   if (timeout > 0L) {
      final long timeoutTimestamp = timeout + getProcessingTimeService().getCurrentProcessingTime();

      final ScheduledFuture<?> timeoutTimer = getProcessingTimeService().registerTimer(
         timeoutTimestamp,
         timestamp -> userFunction.timeout(element.getValue(), resultHandler));

      resultHandler.setTimeoutTimer(timeoutTimer);
   }

   // 调用用户编写的方法。 传入的resultHandler就是让用户在异步完成的时候传值用的
   userFunction.asyncInvoke(element.getValue(), resultHandler);
}
@Override
// resultHandler类内部的complete方法就是在用户自定义函数中传结果用的,最终执行结果会调用processInMainBox(results)方法,将结果发送给下游算子
public void complete(Collection<OUT> results) {
   Preconditions.checkNotNull(results, "Results must not be null, use empty collection to emit nothing");

   // already completed (exceptionally or with previous complete call from ill-written AsyncFunction), so
   // ignore additional result
   if (!completed.compareAndSet(false, true)) {
      return;
   }

   processInMailbox(results);
}

orderedWait 实现:

        有序的话很简单,就是创建一个队列,然后从队首取元素即可

public OrderedStreamElementQueue(int capacity) {
   Preconditions.checkArgument(capacity > 0, "The capacity must be larger than 0.");

   this.capacity = capacity;
   // 所有的元素都放在这么一个队列里面
   this.queue = new ArrayDeque<>(capacity);
}

@Override
public boolean hasCompletedElements() {
   // 然后FIFO就好了
   return !queue.isEmpty() && queue.peek().isDone();
}

unorderWait 实现:

        无序的话实现就会稍微复杂点。queue里面放的不是一条条数据,而是一个个segment。数据存放在segment中,中间使用watermark分隔(每条watermark都会有自己单独的segment)。

public UnorderedStreamElementQueue(int capacity) {
   Preconditions.checkArgument(capacity > 0, "The capacity must be larger than 0.");

   this.capacity = capacity;
   // most likely scenario are 4 segments <elements, watermark, elements, watermark>
   this.segments = new ArrayDeque<>(4);
   this.numberOfEntries = 0;
}
// 每个segment内部会有两个队列:未完成 和 已完成。未完成的数据在完成之后会放置到已完成队列里面,然后发送到下游算子
static class Segment<OUT> {
   /** Unfinished input elements. */
   private final Set<StreamElementQueueEntry<OUT>> incompleteElements;

   /** Undrained finished elements. */
   private final Queue<StreamElementQueueEntry<OUT>> completedElements;

   Segment(int initialCapacity) {
      incompleteElements = new HashSet<>(initialCapacity);
      completedElements = new ArrayDeque<>(initialCapacity);
   }

   /**
    * Signals that an entry finished computation.
    */
   void completed(StreamElementQueueEntry<OUT> elementQueueEntry) {
      // adding only to completed queue if not completed before
      // there may be a real result coming after a timeout result, which is updated in the queue entry but
      // the entry is not re-added to the complete queue
      if (incompleteElements.remove(elementQueueEntry)) {
         completedElements.add(elementQueueEntry);
      }
   }

一致性实现:

        一致性实现看起来很简单,就是将queue中未完成/已完成的数据备份下来。这里的queue就是上面的 OrderedStreamElementQueue 和 UnorderedStreamElementQueue:

@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
   super.snapshotState(context);

   ListState<StreamElement> partitionableState =
      getOperatorStateBackend().getListState(new ListStateDescriptor<>(STATE_NAME, inStreamElementSerializer));
   partitionableState.clear();

   // 这里的queue == OrderedStreamElementQueue / UnorderedStreamElementQueue
   try {
      partitionableState.addAll(queue.values());
   } catch (Exception e) {
      partitionableState.clear();

      throw new Exception("Could not add stream element queue entries to operator state " +
         "backend of operator " + getOperatorName() + '.', e);
   }
}

参考:

      https://nightlies.apache.org/flink/flink-docs-release-1.18/zh/docs/dev/datastream/operators/asyncio/(官网文章)

        [Flink] Flink异步I/O原理和实现 - 知乎 

flink 异步 io(Async I/O) 示例_java.util.concurrent.cancellationexception flink-CSDN博客文章来源地址https://www.toymoban.com/news/detail-769946.html

到了这里,关于Flink异步IO初步了解的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【flink番外篇】14、Flink异步I/O访问外部数据示例

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年01月16日
    浏览(45)
  • 大数据学习之Flink、10分钟了解Flink的核心组件以及它们的工作原理

     第一章、Flink的容错机制 第二章、Flink核心组件和工作原理 第三章、Flink的恢复策略 第四章、Flink容错机制的注意事项 第五章、Flink的容错机制与其他框架的容错机制相比较 目录 第二章、Flink核心组件和工作原理 Ⅰ、核心组件 1. Checkpoint组件: 2. Savepoint组件: 3. Barrier组件

    2024年01月23日
    浏览(39)
  • 55、Flink之用于外部数据访问的异步 I/O介绍及示例

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年01月17日
    浏览(46)
  • 大数据学习之Flink算子、了解DataStream API(基础篇一)

    注: 本文只涉及DataStream 原因:随着大数据和流式计算需求的增长,处理实时数据流变得越来越重要。因此,DataStream由于其处理实时数据流的特性和能力,逐渐替代了DataSet成为了主流的数据处理方式。 目录 DataStream API (基础篇) 前摘: 一、执行环境 1. 创建执行环境 2. 执

    2024年01月23日
    浏览(52)
  • 大数据学习之Flink算子、了解(Transformation)转换算子(基础篇三)

    目录 Transformation转换算子(基础篇三) 三、转换算子(Transformation) 1.基本转换算子 1.1 映射(Map) 1.2 过滤(filter) 1.3 扁平映射(flatmap) 1.4基本转换算子的例子 2.聚合算子(Aggregation) 2.1 按键分区(keyBy) 2.2 简单聚合 2.3 归约聚合(reduce) 3.用户自定义函数(UDF) 3.1 函

    2024年02月20日
    浏览(40)
  • [Flink01] 了解Flink

    Flink入门系列文章主要是为了给想学习Flink的你建立一个大体上的框架,助力快速上手Flink。学习Flink最有效的方式是先入门了解框架和概念,然后边写代码边实践,然后再把官网看一遍。 Flink入门分为四篇,第一篇是《了解Flink》,第二篇《架构和原理》,第三篇是《DataStre

    2024年02月19日
    浏览(32)
  • Flink实战(1)-了解Flink

            😄伙伴们,好久不见!这里是 叶苍ii          ❀   作为一名大数据博主,我一直致力于分享最新的技术趋势和实战经验。近期,我在参加Flink的顾客营销项目,使用了PyFlink项目进行数据处理和分析。          ❀   在这个文章合集中,我将与大家分享我的实战

    2024年01月16日
    浏览(31)
  • 关于Flink的旁路缓存与异步操作

    将数据库中的数据,比较经常访问的数据,保存起来,以减少和硬盘数据库的交互 比如: 我们使用mysql时 经常查询一个表 , 而这个表又一般不会变化,就可以放在内存中,查找时直接对内存进行查找,而不需要再和mysql交互 dim层使用的是hbase存储,因为dim层可能会出现大表,出现数据量过

    2024年02月01日
    浏览(32)
  • 怎么理解flink的异步检查点机制

    flink的checkpoint监控页面那里有两个指标Sync Duration 和Async Duration,一个是开始进行同步checkpoint所需的时间,一个是异步checkpoint过程所需的时间,你是否也有过疑惑,是否只是同步过程中的时间才会阻塞正常的数据处理,而异步checkpoint的时间不会影响正常的数据处理流程? 这

    2024年02月09日
    浏览(58)
  • 一、Flink使用异步算子+线程池查询MySQL

    目录 Flink异步算子使用介绍 使用Flink异步算子+多线程异步查询MySQL 相关阅读 1 Flink使用异步算子请求高德地图获取位置信息 1、概述 1)Flink异步算子使用介绍 1.异步与同步概述 同步:向数据库发送一个请求然后一直等待,直到收到响应。在许多情况下,等待占据了函数运行的

    2024年02月14日
    浏览(42)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包