SSE实现消息实时推送,前端渐进式学习、实践,真香

这篇具有很好参考价值的文章主要介绍了SSE实现消息实时推送,前端渐进式学习、实践,真香。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、SSE概念

SSE(Server Sent Event),直译为服务器发送事件,顾名思义,也就是客户端可以获取到服务器发送的事件。我们常见的 http 交互方式是客户端发起请求,服务端响应,然后一次请求完毕;但是在 sse 的场景下,客户端发起请求,连接一直保持,服务端有数据就可以返回数据给客户端,这个返回可以是多次间隔的方式

二、SSE应用场景

在web端消息推送功能中,由于传统的HTTP协议是由客户端主动发起请求,服务端才会响应。基本的ajax轮询技术便是如此。而在SSE中,浏览发送一个请求给服务端,通过响应头中的Content-Type:text/event-stream等向客户端声名这是一个长连接,发送的是流数据,这样客户端就不会关闭连接,一直等待服务端发送数据。

如果服务器返回的数据中包含了事件标识符,浏览器会记录最后一次接收的事件的标识符。如果与服务器的连接中断,当浏览器再次进行连接时,会通过头来声明最后一次接收的事件的标识符。服务器端可以通过浏览器发送的事件标识符来确定从哪个事件来继续连接

三、前端使用方法、问题

1、get方式

使用eventsource完成get请求

缺点:客户端无法通过一个get请求完成数据传递

参考文档:

EventSource - Web API 接口参考 | MDN

实现流程:

  1. 后端提供了两个接口,一个是:post,用以完成前端信息的传递,我这边是做大语言模型的,所以包括了模型必要参数、问题等;二、get接口,完成流式输出的接口,配置相应的具名事件、请求头等
  2. 前端通过调用post接口拿到本次会话id,将id携带在get请求里,完成信息传递
  3. 前端处理SSE流式返回

代码实现:

const eventSourceRef = useRef<any>(null)
const contact = async (messageData: any) => {
  eventSourceRef.current = new EventSource(
    `${API_BASE}/v1/model/stream?id=${id}`,
  )
  if (!eventSourceRef.current) return
  // 监听 SSE 事件,因为后端定义了具名事件,所以这儿要用addEventListener监听,而不是onmessage
  eventSourceRef.current.addEventListener('add', function (e: any) {
	// 处理数据展示
  })
  eventSourceRef.current.addEventListener('finish', function (e: any) {
   // 结束标识finish
    eventSourceRef.current.close() // 关闭连接
  })
  eventSourceRef.current.addEventListener('error', function (e: any) {
    if (e.status === 401) {
      // 用户登录状态失效处理
    }
    // error报错处理
    console.log('Error occurred:', e)
    // 关闭连接
    eventSourceRef.current.close()
  })
}

2、post方式

使用fetch-event-source完成连接,仅需一个接口,支持添加请求头

缺点:在浏览器返回的text/eventstream里看不到具体返回,无法进行预览

参考文档:

@microsoft/fetch-event-source

实现流程:

  1. 后端提供一个接口,支持前端传参、流式返回
  2. 前端通过fetch-event-source,完成传参、请求头添加等
  3. 处理返回数据

具体实现:

  const eventSourceRef = useRef<any>(null)
  const [abortController, setAbortController] = useState(new AbortController())
  // 通信事件
  const contact = async (messageData: any) => {
    messageData = { ...messageData, do_stream: modelArg.do_stream } // 请求参数
    receivedDataRef.current = ''
    const token: string = getLocal('AUTHCODE') || '' 
    fetchEventSource(`${MAAS_API_BASE}/v1/model_api/invoke`, {
      method: 'POST',
      // 添加请求头
      headers: {
        Authorization: token,
        'Content-Type': 'application/json',
      }, 
       // 传参必须保证是json
      body: JSON.stringify(messageData),
      // abortController.signal 提供了一个信号对象给 fetchEventSource 函数。
      // 如果在任何时候你想取消正在进行的 fetch 操作,你可以调用 
      // abortController.abort()。这会发出关联任务的信号,你可以使用 
      // AbortController 的信号来检查异步操作是否已被取消。
      signal: abortController.signal, 
      openWhenHidden: true, // 切换标签页时连接不关闭
      async onopen(resp) {
        // 处理登录失效
        if (resp.status === 401) {
          message.warning('登录过期')
          return
        }
      },
      onmessage(msg: any) {
        const eventType = msg.event // 监听event的具名事件
        switch (eventType) {
          case 'add':
            // 流式输出事件,add每次会返回具体字符,前端负责拼接展示
            break
          case 'finish':
            setStatu('finish') // 结束标识
            break
          case 'error':
            if (msg.status === 401) {
               message.warning('登录过期')
            }
            console.log('Error occurred:', e)
            break
        }
      },
      onerror(err) {
        throw err // 连接遇到http错误时,如跨域等,必须要throw才能停止,不然会一直重连
      },
      onclose() {},
    })
  }

  // 终止连接方法,比如在切换模型时,你可能有必要终止上一次连接来避免问答串联
  const closeSSE = () => {
    abortController.abort()
    setAbortController(new AbortController())
  }

3、一种接口同时兼容流式/非流式

同上post方法

    fetchEventSource(sseUrl, {
      method: 'POST',
      headers,
      signal: abortController.signal,
      body: JSON.stringify(customInferData),
      openWhenHidden: true,
    /**
    *在onopen阶段处理
    第一步:判断resp.headers.get('content-type'),如果不包含text/event-stream,
    则代表非流式
    第二步:需要在onopen阶段处理非流式返回,即json返回,读取json返回并渲染,注意异常也要处理
    第三步:
     */
      async onopen(resp) {
        const contentype = resp.headers.get('content-type') || ''
        console.log('contentype =>', contentype)
        console.log('resp.ok =>', resp.ok)
        if (resp.ok && !contentype.includes('text/event-stream')) {
          // 读取json数据
          const responseData = await resp.json()
          if (responseData.code !== 0) {
             // 报错处理+关闭连接
          } else {
            //处理数据渲染+关闭连接
            
            stopSession()
          }
        } else if (resp.status === 401) {
          message.warning('登录过期')
          // 报错处理+关闭连接
          stopSession()
        
        }
      },
      onmessage(msg: any) {
        const eventType = msg.event
        const messages: any = cloneDeep(chatState.sessionMessages)
        let lastMessage: any = messages[messages.length - 1] || {}

        switch (eventType) {
          case 'add':
            lastMessage = {
              ...lastMessage,
              text: `${lastMessage.text}${msg.data || ' '}`,
              loading: false,
            }
            messages.splice(messages.length - 1, 1, lastMessage)
            chatAction.updateSessionMessages(messages)
            break
          case 'finish':
            console.log('finish lastMessage =>', lastMessage)
            chatAction.updateSessionStatu(SessionStatuTypes.ready)
            chatAction.updateContext(msg.data)
            break
          case 'info':
            {
              const messages: any = cloneDeep(chatState.sessionMessages)
              let lastMessage: any = messages[messages.length - 1] || {}
              lastMessage = {
                referenceDocs: JSON.parse(msg.data).reference_by_docs,
                ...lastMessage,
              }
              messages.splice(messages.length - 1, 1, lastMessage)
              chatAction.updateSessionMessages(messages)
            }
            break
          case 'error':
            if (msg.status === 401) {
              chatAction.updateSessionStatu(SessionStatuTypes.ready)
            } else {
              errorItemFn(msg?.msg || msg?.data || '抱歉,暂无法回答问题')
            }
            break
        }
      },
      onerror(err: any) {
        errorItemFn(err?.msg || '抱歉,暂无法回答该问题')
        console.log('eventSource error: ', `${err}`)
        throw err  // 连接遇到http错误时,如跨域等,必须要throw才能停止,不然会一直重连
      },
      onclose() {
        console.log('eventSource close')
      },
    })

  // 终止会话
  const stopSession = () => {
    abortController.abort()
    setAbortController(new AbortController())
  }

四、常见问题汇总

1、无法添加请求头

应用fetch-event-source解决

2、一个方法需要同时兼容流式和非流式

应用fetch-event-source在onopen阶段处理非流式输出,如报错、接口json返回等

3、遇到跨域时候,请求一直连接

应用fetch-event-source在监听具名事件时,如error,将错误throw err,否则无法中断连接文章来源地址https://www.toymoban.com/news/detail-832115.html

4、fetch方法如何终止

  const stopSession = () => {
    abortController.abort()
    setAbortController(new AbortController())
  }

到了这里,关于SSE实现消息实时推送,前端渐进式学习、实践,真香的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 写点东西《渐进式网络应用入门》

    PWA 是一种渐进式网络应用程序,它结合了应用程序的功能和网络技术。 您可以说它们是使用网络技术构建的应用程序,但感觉和功能都像原生应用程序。 网络应用程序似乎变得有限,因为大多数人更喜欢构建移动应用程序,以便用户可以将它们保存在手机上,而不是构建网

    2024年01月19日
    浏览(46)
  • Redis-渐进式遍历scan的使用

    目录 1、为什么使用渐进式遍历? 2、scan的使用 3、渐进式遍历的缺点 4、补充知识点:redis中也区分database          前面的博客中,我们有提到使用keys *来获取所有的key,但这种办法,当Redis中存储的有很多key时,实行此命令所耗费的时长就会很长,不符合使用规范,redis一

    2024年02月07日
    浏览(46)
  • Vue3 Flask 渐进式入门笔记

    以下均在Windows 10环境下实现。 安装node.js的过程略过。 1、在cmd命令行中执行以下命令: 2、查看vue版本 注意,如果电脑中以前有vue2版本,则需要卸载后重启电脑再重新安装,否则有可能安装失败。 1、执行以下命令以创建项目 第一步需要填写项目名称;后面的除router建议选

    2024年02月09日
    浏览(43)
  • 渐进式编程之旅:探寻PHP函数的奇妙世界

    目录 前言 一、函数的定义和调用 1.1 初识函数 1.1.1 函数分类 1.1.2 自定义函数 1.1.3 return 1.2 参数设置 1.2.1 无参函数 1.2.2 按值传递参数 1.2.3 引用传参 1.2.4 设置参数默认值 1.2.5 指定参数类型(弱) 1.3 变量的作用域 1.3.1 变量分类 1.3.2 全局变量的使用 1.3.3 global关键

    2024年02月08日
    浏览(65)
  • 渐进式web全栈:blazor web app

    本文要说的这种开发模式,这种模式并不是只有blazor支持,js中有一样的方案next.js nuxt.js;blazor还有很多其它内容,本文近关注渐进式开发模式。 是的,前后端是主流,不过以下情况也许前后端分离并不是最好的选择: 小公司,人员不多,利润不高,创业阶段能省则省 个人

    2024年02月05日
    浏览(51)
  • 【GitOps系列】如何实施自动化渐进式交付?

    前言 在实施金丝雀发布的过程中,我们通过 Argo Rollout 的金丝雀策略将发布过程分成了 3 个阶段,每个阶段金丝雀的流量比例都不同,经过一段时间之后,金丝雀环境变成了新的生产环境。实际上,这也是一种渐进式的交付方式,它通过延长发布时间来保护生产环境,降低了

    2024年02月14日
    浏览(48)
  • Unity教程||Unity 渐进式光照贴图烘焙详解

    随着各大计算平台的算力稳步增长,特别是GPU技术的不断进化,原先可望而不可及的技术比如实时光线追踪技术开始逐步走入玩家的视野。一些先锋厂商甚至已经超出Demo的范畴,开始正式推出支持实时光追的游戏。 不过目前的实时光追技术还只能在配备了最新Nvidia RTX 20系列

    2024年02月08日
    浏览(53)
  • Vue.js:构建用户界面的渐进式框架

    Vue.js是一种流行的JavaScript前端框架,用于构建用户界面。本文将介绍Vue.js的基本概念、特点、应用场景以及与其他框架的对比。 一、引言 在当今的前端开发领域,Vue.js已经成为了一个备受瞩目的框架。它的简洁、灵活和易于上手的特性使得开发人员能够快速高效地构建用户

    2024年01月23日
    浏览(56)
  • 2023-06-17:说一说redis中渐进式rehash?

    2023-06-17:说一说redis中渐进式rehash? 答案2023-06-17: 在Redis中,如果哈希表的数组一直保持不变,就会增加哈希冲突的可能性,从而降低检索效率。为了解决这个问题,Redis会对数组进行扩容,通常是将数组大小扩大为原来的两倍。然而,这个扩容过程会引起元素在哈希桶中的

    2024年02月09日
    浏览(43)
  • redis到底是怎么样进行渐进式Rehash的

    Redis 是一个开源(BSD许可)的,内存中的数据结构存储系统,它可以用作数据库、缓存和消息中间件。那么redis的底层是如何来存储数据的呢? 一、redis如何在存储大量的key时候,查询速度还能接近O(1)呢? 查询速度接近O(1)的数据结构通常让我们想到的就是HashMap结构,那下面

    2024年02月09日
    浏览(38)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包