基于Promise.resolve实现Koa请求队列中间件

这篇具有很好参考价值的文章主要介绍了基于Promise.resolve实现Koa请求队列中间件。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

本文作者为360奇舞团前端工程师

前言

最近在做一个 AIGC 项目,后端基于 Koa2 实现。其中有一个需求就是调用兄弟业务线服务端 AIGC 能力生成图片。但由于目前兄弟业务线的 AIGC 项目也是处于测试阶段,能够提供的服务器资源有限,当并发请求资源无法满足时,会响应【服务器繁忙】,这样对于 C 端展示的我们是非常不友好的。基于当前的困境,第一想到的解决方案就是KafkaRabbitMQ,但实际上对于我们目前的用户体量来说,简直就是大材小用。于是转换思路,是不是可以利用js模拟队列的方式解决问题呢,答案是:可以,PromiseResolve 队列!

分析

Resolve 的理解

Promise 的核心用法就是利用 Resolve 函数做链式传递。例如:

new Promise(resolve => {
  resolve('ok')
}).then(res => {
  console.log(res)
})
// 输出结果:ok

通过上边的例子我们可以理解,ResolvePromise 对象的状态从 pending 变为 fullfilled ,在异步操作成功时调用,并将异步操作的结果,作为参数传递出去。

核心点:异步

此时抛出一个问题:假如我把 resolve 回调函数都放入一个队列里,Promise 是不是一直处于pending 状态?pending 状态就意味着then函数一直处于 waitting 状态,直到队列中的 resolve 函数执行后,then 函数才能被执行?

制造阻塞的 Promise 函数

const queue = []
new Promise(resolve => {
  queue.push(resolve)
}).then(res => {
  console.log(res)
})
// 输出结果:Promise {<pending>}

queue[0]('ok')
// 输出结果:ok

为了佐证,直接贴图:

基于Promise.resolve实现Koa请求队列中间件,中间件
image.png

异步转同步

Koa2 属于洋葱模型,当请求过来以后需要调用 next 函数继续穿透,而我们的需求是限流,这意味着我们要阻塞请求,此时此刻,await举起了双手,阻塞这种不要脸的事我在行呀!

const queue = []
const fn = async = () => {
  await new Promise(resolve => {
    queue.push(resolve)
  })
  // ...一大波操作
}
// queue[0]()

如果 queue[0] 不执行,代码就会一直处于阻塞状态。那我们就可以利用await写一个中间件实现阻塞某些 api 的需求了。

// 阻塞所有请求,知道queue中的resolve函数被执行才会执行next
const queue = []
module.exports = function () {
  return async function (ctx, next) {
    await new Promise(resolve => {
      queue.push(resolve)
    })
    await next();
  };
};

实现中间件

原理和思路都捋直了,那就开搞吧。话不多说,贴代码:

const resolveMap = {};

/**
 * 请求队列
 * @param {*} ctx
 * @param {*} ifer 是否是图生图
 * @param {*} maxReqNumber 最大请求数量
 * @returns
 * @description
 * 使用promise解决请求队列问题
 * 1. 用于限制aicg的并发请求
 * 2. 当文生图是,根据风格分类存储resolve,当前请求响应完成时,触发消费队列中下一个请求
 * 3. 当图生图是,直接存储resolve到image风格,当前请求响应完成时,触发消费队列中下一个请求
 * 4. 同时处理的请求数量不超过maxReqNumber个,否则加入队列等待。
 */
function requestQueue(ctx, maxReqNumber) {
  const params = ctx.request.body ?? ctx.request.query ?? ctx.request.params ?? {};
  const style = params.style ?? 'pruned_cgfull';

  resolveMap[style] = resolveMap[style] || { list: [], processNumber: 0 };
  const currentResolve = resolveMap[style];

  ((currentResolve) => {
    ctx.res.on('close', () => {
      saveNumberMinus(currentResolve);
      // 当前请求响应完成时,触发消费队列中下一个请求
      if (currentResolve.list.length !== 0) {
        const node = currentResolve.list.shift();
        node.resolve();
        currentResolve.processNumber++;
      }
      currentResolve = null;
    });
  })(currentResolve);
  // 当前请求正在处理中,将resolve存储到队列中
  if (currentResolve.processNumber + 1 > maxReqNumber) {
    // 利用promise阻塞请求
    return new Promise((resolve, reject) => {
      // 当前请求正在处理中,将resolve存储到队列中
      currentResolve.list.push({ resolve, reject, timeStamp: Date.now(), params });
    });
  } else {
    currentResolve.processNumber++;
    return Promise.resolve();
  }
}

module.exports = function (options = {}) {
  const { maxReqNumber = 2, apis = [] } = options;
  return async function (ctx, next) {
    const url = ctx.url;
    if (apis.includes(url)) {
      try {
        await requestQueue(ctx, maxReqNumber);
      } catch (error) {
        console.log(error);
        ctx.body = {
          code: 0,
          msg: error,
        };
        return;
      }
    }
    await next();
  };
};

const fiveMinutes = 5 * 60 * 1000;
setInterval(() => {
  Object.values(resolveMap).forEach((item) => {
    const { timeStamp, resolve } = item;
    if (Date.now() - timeStamp > fiveMinutes) {
      resolve(); // 执行并释放请求,防止用户请求因异常积压导致一直挂起
      saveNumberMinus(item);
    }
  });
}, 5 * 60 * 1000);

这里要着重提示一点,闭包的使用。之所以使用闭包是为了保证当前请求的close事件触发时能够使用currentResolve对象。因为当前请求是放在自身对应风格的数组中,close时要消费下一个等待的请求,同时也不要忘了手动释放资源。

app.js 逻辑部分

const requsetQueue = require('./app/middleware/request-queue');
const app = new Koa();
app.use(
  requsetQueue({
    maxReqNumber: 1,
    apis: ['/api/aigc/image', '/api/aigc/textToImage', '/api/aigc/img2img'],
  })
);
app.listen(process.env.NODE_ENV === 'development' ? '9527' : '3000');

总结

其实基于 PromiseResolve 队列,我们还可以实现一些其他的功能,比如:前端代码中未登录状态下收集某些请求,等到登录成功后发送请求。也希望大家一起探索和讨论Promise的其他解决能力的实现方案。


- END -

关于奇舞团

奇舞团是 360 集团最大的大前端团队,代表集团参与 W3C 和 ECMA 会员(TC39)工作。奇舞团非常重视人才培养,有工程师、讲师、翻译官、业务接口人、团队 Leader 等多种发展方向供员工选择,并辅以提供相应的技术力、专业力、通用力、领导力等培训课程。奇舞团以开放和求贤的心态欢迎各种优秀人才关注和加入奇舞团。
文章来源地址https://www.toymoban.com/news/detail-646655.html

到了这里,关于基于Promise.resolve实现Koa请求队列中间件的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 基于golang多消息队列中间件的封装nsq,rabbitmq,kafka

    场景 在创建个人的公共方法库中有这样一个需求,就是不同的项目会用到不同的消息队列中间件,我的思路把所有的消息队列中间件进行封装一个消息队列接口(MQer)有两个方法一个生产一个消费,那么在实例化对象的时候根据配置文件指定当前项目使用的那个消息队列中

    2024年02月14日
    浏览(64)
  • 基于Laravel封装一个强大的请求响应日志记录中间件

    记录全面: 包含请求路径、请求方法、客户端IP、设备标识、荷载数据、文件上传、请求头、业务逻辑处理时间、业务逻辑所耗内存、用户id、HTTP响应状态码、以及响应数据。 配置简单: 默认不需要写任何逻辑可开箱即用,靠前5个方法,就可指定某些url不记录日志,某些

    2024年02月08日
    浏览(53)
  • 消息队列中间件,RabbitMQ的使用,死信队列,延迟队列,利用枚举实现队列,交换机,RountKey的声明

    目录 0.交换机种类和区别 1.声明队列和交换机以及RountKey 2.初始化循环绑定 3.声明交换机 4.监听队列 4.1 监听普通队列 4.2监听死信队列  5.削峰填谷的实现 Direct Exchange(直连交换机) : 直连交换机将消息发送到与消息的路由键完全匹配的队列。它是最简单的交换机类型之一。

    2024年04月23日
    浏览(155)
  • 消息队列中间件(一)

    流量削峰 应用解耦 异步处理 ActiveMQ 优:单机吞吐万级,时效性ms级,可用性高(主从架构),可靠性高(丢失率低) 缺:官方维护少,高吞吐场景较少使用 Kafka 大数据 - 数据采集,传输,存储 优:高吞吐量(百万级),时效性ms级,可用性高,日志成熟 缺:短轮询,失败

    2024年02月11日
    浏览(55)
  • 消息队列(中间件)

    通信协议: 为了实现客户端和服务器之间的通信来完成的逻辑,基于TCP实现的自定义应用层协议。通过这个协议,完成客户端–服务器远程方法调用。 序列化/反序列化: 通过网络传输对象把对象存储到硬盘上。 序列化:把对象转化为二进制的数据序列,反序列化:把二进制数

    2024年02月07日
    浏览(56)
  • 消息队列中间件介绍

    消息队列介绍   消息队列中间件是大型系统中的重要组件,已经逐渐成为企业系统内部通信的核心手段。它具有松耦合、异步消息、流量削峰、可靠投递、广播、流量控制、最终一致性等一系列功能,已经成为异步RPC的主要手段之一。 目前常见的消息中间件有ActiveMQ、Rabbi

    2024年02月04日
    浏览(55)
  • 中间件RabbitMQ消息队列介绍

    1.1 什么是 MQ MQ ( message queue ),从字面意思上看,本质是个队列, FIFO 先入先出,只不过队列中存放的内容是 message 而已,还是一种跨进程的通信机制,用于上下游传递消息。在互联网架构中, MQ 是一种非常常 见的上下游 逻辑解耦+物理解耦 的消息通信服务。使用了 MQ 之

    2024年02月13日
    浏览(72)
  • RabbitMQ 消息中间件 消息队列

    RabbitMQ 1、RabbitMQ简介 RabbiMQ是⽤Erang开发的,集群⾮常⽅便,因为Erlang天⽣就是⼀⻔分布式语⾔,但其本身并 不⽀持负载均衡。支持高并发,支持可扩展。支持AJAX,持久化,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。 2、RabbitMQ 特点 可

    2024年02月03日
    浏览(69)
  • 消息队列中间件 MetaQ/RocketMQ

    推荐电子书:云原生架构白皮书 2022版-藏经阁-阿里云开发者社区 (aliyun.com) 简介—— 消息队列中间件 MetaQ/RocketMQ 中间件 MetaQ 是一种基于队列模型的消息中间件,MetaQ 据说最早是受 Kafka 的影响开发的,第一版的名字 \\\"metamorphosis\\\",是奥地利作家卡夫卡的名作——《变形记》。

    2024年02月14日
    浏览(57)
  • 消息队列中间件(二)- RabbitMQ(一)

    接收,存储,转发消息 生产者 交换机 队列 消费者 简单模式 工作模式 发布 路由模式 主题模式 发布订阅模式 Broker 接收和分发消息的应用 Virtual host 虚拟分组 Connection: TCP连接 Channel: 节省连接,每次访问建立一次Connection消耗太大,所以使用信道代替连接 交换机 队列 www.r

    2024年02月11日
    浏览(68)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包