利用谷歌云Pub/Sub 实现多任务并行分发处理方案

这篇具有很好参考价值的文章主要介绍了利用谷歌云Pub/Sub 实现多任务并行分发处理方案。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

背景

目前老梁团队负责的Global Data Integration Platform每天有大量文件需要从来自不同地区的上游下载文件并进行处理后再发送到不同下游。老梁的数据集成平台集群有6个服务器节点,老梁希望所有机器的资源都能利用上,提升大量文件并行处理能力,并且不同机器节点的任务必须不能重复,否则可能造成文件下载或处理失败。

原有的服务是使用Quarz集群,通过定时调度去下载,但是Quartz调度框架虽然本身支持负载均衡,但是其Cluster每个节点都不是均衡分配任务,假如某一节点具有竞争资源优势,有机会一直持有任务,导致其他节点空闲下来,服务器可能某天资源消耗过大而导致宕机,这并不是老梁想要的效果。后来也尝试使用生产者消费者模型,通过F5负载均衡+API通知+异步回调方式后,服务多节点并行处理能力有所增强,但由于使用Http方式进行通信导致服务之间存在直接依赖,当消费者服务进行重启或者停机,存在生产者API通知失败的可能,需要做额外的补偿处理。如下图所示:

生产者消费者模型:
利用谷歌云Pub/Sub 实现多任务并行分发处理方案

解决思路

目前老梁公司已经完成了谷歌云和公司机房的网络搭建,并且公司的自有数据中心跟谷歌云可以直接通过谷歌的Dedicated Interconnect服务,也就是可以通过专线直接进行连接。虽然老梁的数据集成平台还部署在自有数据中心,但相对于文件下载的时间和速度损耗,谷歌云上的服务通过专线进行通信所带来的性能损耗几乎可以忽略(大约几百毫秒),老梁公司的架构战略方向是优先使用云组件,减少On-Premise部署。最后老梁选择采用谷歌云Pub/Sub服务作为事件消息服务,利用Pub/Sub高可用、使用简单并天然支持多消息并行传输的特性,来对现有的数据集成平台进行改造。

Pub/Sub介绍:
Pub/Sub 是一种设计为高度可靠且可伸缩的异步消息传递服务。该服务以十多年来许多 Google 产品都在依赖的核心 Google 基础架构组件为基础而构建。其实可以理解成云上的Kafka。官网:https://cloud.google.com/pubsub/architecture?hl=zh-cn

  • Pub/Sub 是一种可扩缩的异步消息传递服务,可将生成消息的服务与处理这些消息的服务分离开来。
  • Pub/Sub 允许服务异步通信,延迟时间大约为 100 毫秒。
  • Pub/Sub 用于流式分析和数据集成流水线,以注入和分发数据。无论是作为用于消息整合的消息传递中间件,还是作为并行处理任务的队列,它都非常有效。
  • 通过 Pub/Sub,您可以创建事件提供方和使用方的系统,称为发布者和订阅者。发布者通过广播事件而不是同步远程过程调用 (RPC) 与订阅者异步通信。
  • 发布者将事件发送到 Pub/Sub 服务,而不考虑如何或何时处理这些事件。然后,Pub/Sub 会将事件传送到对其做出响应的所有服务。在通过 RPC 进行通信的系统中,发布商必须等待订阅者接收数据。但是,Pub/Sub 中的异步集成可以提高整个系统的灵活性和稳健性。

** 基于Pub/Sub改造后的模型: **
各个消费者节点所拿到的事件都不会重复
利用谷歌云Pub/Sub 实现多任务并行分发处理方案

大概实施方案

这里只使用模拟场景展示大概思路,具体细节还需要根据各自项目进行优化。

注意事项:

  1. 首先你要创建你应用要使用的TopicSubscription,这里需要注意的是SubscriptionACK截止时间建议设置大点,否者假如你消费者如果消费事件所消耗的时间>ACK截止时间,Pub/Sub将会对消息进行重发,这时候会存在重复事件消息。也就是说,你要确保你的消费节点能在ACK截止时间之前处理好事件并且响应ACKPub/SUb
    利用谷歌云Pub/Sub 实现多任务并行分发处理方案
  2. 建议你服务使用Pull方式从Pub/SubSubscription拉取消息,因为这样可以在你Consumer代码里自由配置你请求所需要的参数,例如setMaxMessages方法可以让你自由定义你每次拉取多少事件,更好地基于你服务器的能力去配置,并且也可以避免在做负载均衡的时候某些机器节点所拿到的任务事件太多导致服务器节点的资源没办法充分利用。
    利用谷歌云Pub/Sub 实现多任务并行分发处理方案
    利用谷歌云Pub/Sub 实现多任务并行分发处理方案
  3. 使用Pub/Sub的自定义Event(事件)必须要自定义一个唯一标识,这样可以在Consumer逻辑加上幂等控制,否则当刚好消费者没有及时处理事件而Pub/Sub因为消费者ACK超时进行补偿重发,这可能会因为重复处理事件给业务带来严重后果。GCP Pub/Sub采用的是至少一次投递的策略,也就是可能对同一消息投递多次,虽然实际应用中不常见,以下官方文档说明了会重复投递的情况,通常就是上面所说的ACK超时导致的
    利用谷歌云Pub/Sub 实现多任务并行分发处理方案

完成流程

这里只截取小部分文件下载的流程作为示范,其他类似需要并行处理的任务都可以参考。
利用谷歌云Pub/Sub 实现多任务并行分发处理方案

  1. File Watch Dog 从上游远程服务器基于File Pattern去监测有没有新文件
  2. File Watch Dog 把监测到的新文件信息组装成事件分别推送到GCP Pub/Sub Topic
  3. File Process Engine所有节点并行从GCP Pub/Sub Subscription拉取任务,分别拉到不同的事件消息
  4. File Process Engine所有节点分别基于事件消息里的DatafeedId去配置中心查找该Datafeed的连接信息
  5. File Process Engine所有节点分别去上游远程服务器下载自己接收到的事件对应的文件

简单测试

这里使用官方提供的示例代码,简单测试下发布多个消息,看看消费者代码是否会重复消费相同事件。
参考示例:https://cloud.google.com/pubsub/docs/pull#java

import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class SubscribeAsyncExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String subscriptionId = "your-subscription-id";

    subscribeAsyncExample(projectId, subscriptionId);
  }

  public static void subscribeAsyncExample(String projectId, String subscriptionId) {
    ProjectSubscriptionName subscriptionName =
        ProjectSubscriptionName.of(projectId, subscriptionId);

    // Instantiate an asynchronous message receiver.
    MessageReceiver receiver =
        (PubsubMessage message, AckReplyConsumer consumer) -> {
          // Handle incoming message, then ack the received message.
          System.out.println("Received MessageId: " + message.getMessageId()+"Data: " + message.getData().toStringUtf8());
          consumer.ack();
          System.out.println("Message has been acknowledge")
        };

    Subscriber subscriber = null;
    try {
      subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();
      // Start the subscriber.
      subscriber.startAsync().awaitRunning();
      System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
      // Allow the subscriber to run for 30s unless an unrecoverable error occurs.
      subscriber.awaitTerminated(30, TimeUnit.SECONDS);
    } catch (TimeoutException timeoutException) {
      // Shut down the subscriber after 30s. Stop receiving messages.
      subscriber.stopAsync();
    }
  }
}

这里我在Topic发布了5条带有序号的消息,分别是:test:1test:2test:3test:4test:5,然后开了三个进程去监听Subscription,看看会不会每个进程会不会出现重复的消息

进程1
利用谷歌云Pub/Sub 实现多任务并行分发处理方案
进程2
利用谷歌云Pub/Sub 实现多任务并行分发处理方案
进程3
利用谷歌云Pub/Sub 实现多任务并行分发处理方案文章来源地址https://www.toymoban.com/news/detail-460046.html

到了这里,关于利用谷歌云Pub/Sub 实现多任务并行分发处理方案的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 支付宝定时任务怎么做?三层分发任务处理框架介绍

      一、背景介绍 技术同学对定时任务肯定不陌生。定时任务一般用来定时批量进行业务处理。支付宝卡包券到期提醒、删除过期失效券,五福大促批量给用户发放添福红包等场景,都是通过定时任务触发来完成的。 作者有幸参与了2023兔年五福大促的开发,主导完成了福气乐

    2023年04月12日
    浏览(63)
  • SpringBoot异步任务及并行事务实现

            上一篇介绍了原生Java如何实现串行/并行任务,主要使用了线程池 + Future + CountDownLatch,让主线程等待子线程返回后再向下进行。而在SpringBoot中,利用@Async和AOP对异步任务提供了更加便捷的支持,下面就针对SpringBoot使用异步任务需要注意的细节做一些分析。      

    2024年02月02日
    浏览(43)
  • THRUST:一个开源的、面向异构系统的并行编程语言:编程模型主要包括:数据并行性、任务并行性、内存管理、内存访问控制、原子操作、同步机制、错误处理机制、混合编程模型、运行时系统等

    作者:禅与计算机程序设计艺术 https://github.com/NVIDIA/thrust 2021年8月,当代科技巨头Facebook宣布其开发了名为THRUST的高性能计算语言,可用于在设备、集群和云环境中进行并行计算。它具有“易于学习”、“简单易用”等特征,正在逐步取代C++、CUDA、OpenCL等传统编程模型,成为

    2024年02月07日
    浏览(48)
  • (四)RabbitMQ高级特性(消费端限流、利用限流实现不公平分发、消息存活时间、优先级队列

    Lison dreamlison@163.com , v1.0.0 , 2023.06.23 之前我们讲过MQ可以对请求进行“削峰填谷”,即通过消费端限流的方式限制消息的拉取速度,达到保护消费端的目的。 1、 生产者批量发送消息 2、消费端配置限流机制 3、消费者监听队列 在RabbitMQ中,多个消费者监听同一条队列,则队列

    2024年02月15日
    浏览(42)
  • 【C#】并行编程实战:任务并行性(上)

             本教程对应学习工程:魔术师Dix / HandsOnParallelProgramming · GitCode                  在 .NET 的初始版本中,我们只能依赖线程(线程可以直接创建或者使用 ThreadPool 类创建)。ThreadPool 类提供了一个托管 抽象层 ,但是开发人员仍然需要依靠 Thread 类来进行更

    2024年02月09日
    浏览(43)
  • 【C#】并行编程实战:任务并行性(中)

            本教程对应学习工程:魔术师Dix / HandsOnParallelProgramming · GitCode                  本章继续介绍任务并行性,因篇幅所限,本章为中篇。         .NET Framework 提供了以下两个类来支持任务取消: CancellationTokenSource :此类负责创建取消令牌,并将取消请求传

    2024年02月09日
    浏览(44)
  • go 利用channel实现定时任务

    想5秒内结束就注释掉select{} 在linux上后台执行的话,可以这样

    2024年04月15日
    浏览(36)
  • 数字信号处理-10-并行FIR滤波器MATLAB与FPGA实现

    本文介绍了设计滤波器的FPGA实现步骤,并结合杜勇老师的书籍中的并行FIR滤波器部分进行一步步实现硬件设计,对书中的架构做了复现以及解读,并进行了仿真验证。 FIR滤波器的结构形式时,介绍了直接型、级联型、频率取样型和快速卷积型4种。在FPGA实现时,最常用的是最

    2023年04月09日
    浏览(48)
  • NLP(8)--利用RNN实现多分类任务

    前言 仅记录学习过程,有问题欢迎讨论 循环神经网络RNN(recurrent neural network): 主要思想:将整个序列划分成多个时间步,将每一个时间步的信息依次输入模型,同时将模型输出的结果传给下一个时间步 自带了tanh的激活函数 代码 发现RNN效率高很多 可以对model 优化一下

    2024年04月26日
    浏览(33)
  • 【面试问题】事务中执行了异步任务分发数据,由于事务未提交,导致异步任务无法执行

    客户数据分发CRMS系统的时候异常,分发任务强依赖于事务内有没有提交,异常由事务未及时提交导致异步任务无法及时查到数据,现将异步任务调整为事务提交后处理 添加事务同步管理器,声明异步是在事务提交后执行

    2024年02月13日
    浏览(36)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包