SSE[Server-Sent Events]实现页面流式数据输出(模拟ChatGPT流式输出)

这篇具有很好参考价值的文章主要介绍了SSE[Server-Sent Events]实现页面流式数据输出(模拟ChatGPT流式输出)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

前言

        服务端向客户端推送消息,除了用WebSocket可实现,还有一种服务器发送事件(Server-Sent Events)简称 SSE,这是一种服务器端到客户端(浏览器)的单向消息推送。ChatGPT 就是采用的 SSE。对于需要长时间等待响应的对话场景,ChatGPT 采用了一种巧妙的策略:它会将已经计算出的数据“推送”给用户,并利用 SSE 技术在计算过程中持续返回数据。这样做的好处是可以避免用户因等待时间过长而选择关闭页面。

SSE 简介

        SSE 基于 HTTP 协议的,我们知道一般意义上的 HTTP 协议是无法做到服务端主动向客户端推送消息的,但 SSE 是个例外,它变换了一种思路。
SSE[Server-Sent Events]实现页面流式数据输出(模拟ChatGPT流式输出),Spring 框架研究,开源框架,chatgpt
SSE 在服务器和客户端之间打开一个单向通道,服务端响应的不再是一次性的数据包而是text/event-stream类型的数据流信息,在有数据变更时从服务器流式传输到客户端。整体的实现思路有点类似于在线视频播放,视频流会连续不断的推送到浏览器,你也可以理解成,客户端在完成一次用时很长(网络不畅)的下载。
SSE[Server-Sent Events]实现页面流式数据输出(模拟ChatGPT流式输出),Spring 框架研究,开源框架,chatgpt
SSE 与 WebSocket 作用相似,都可以建立服务端与浏览器之间的通信,实现服务端向客户端推送消息,但还是有些许不同:

  • SSE 是基于 HTTP 协议的,它们不需要特殊的协议或服务器实现即可工作;WebSocket 需单独服务器来处理协议。
  • SSE 单向通信,只能由服务端向客户端单向通信;WebSocket 全双工通信,即通信的双方可以同时发送和接受信息。
  • SSE 实现简单开发成本低,无需引入其他组件;WebSocket 传输数据需做二次解析,开发门槛高一些
  • SSE 默认支持断线重连;WebSocket 则需要自己实现。
  • SSE 只能传送文本消息,二进制数据需要经过编码后传送;WebSocket 默认支持传送二进制数据。

应用场景区分

  • WebSocket: 提供更丰富的协议来执行双向、全双工通信。对于游戏、即时通信以及需要双向近乎实时更新的场景,拥有双向通道更具吸引力。
  • 而某些情况下,不需要从客户端发送数据,而只需要一些服务器操作的更新。比如:站内信、未读消息数、状态更新、股票行情、监控数量等场景。SSE 具有 WebSocket 在设计上缺乏的多种功能,例如:自动重新连接、事件 ID 和发送任意事件的能力。前端只需进行一次 HTTP 请求,带上唯一 ID,打开事件流,监听服务端推送的事件就可以反向不定期的推送实时数据。SEE 从实现的难易和成本上都更有优势。

浏览器支撑性

SSE 不支持 IE 浏览器,对其他主流浏览器兼容性做的还不错。
SSE[Server-Sent Events]实现页面流式数据输出(模拟ChatGPT流式输出),Spring 框架研究,开源框架,chatgpt

实现过程

环境提示:JDK8 、 Spring Boot 2.5.15 、Spring framework 5.3.27
1、配置与初始化工作
导入pom依赖

<dependency>
    <groupId>com.unfbx</groupId>
    <artifactId>chatgpt-java</artifactId>
    <version>1.0.12</version>
</dependency>

启动类自定义OkHttpClient客户端和OpenAi流客户端OpenAiStreamClient使用Bean对象

package com.merak;
import com.unfbx.chatgpt.OpenAiStreamClient;
import com.unfbx.chatgpt.function.KeyRandomStrategy;
import com.unfbx.chatgpt.interceptor.OpenAILogger;
import okhttp3.OkHttpClient;
import okhttp3.logging.HttpLoggingInterceptor;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
/**
 * 启动程序
 */
@SpringBootApplication(exclude = { DataSourceAutoConfiguration.class })
@ComponentScan(basePackages = "com.*")
public class MerakAdminApplication {

    public static void main(String[] args)
    {
        SpringApplication.run(MerakAdminApplication.class, args);
    }

    @Bean
    public OpenAiStreamClient openAiStreamClient() {
        //本地开发需要配置代理地址
//        Proxy proxy = new Proxy(Proxy.Type.HTTP, new InetSocketAddress("127.0.0.1", 8002));
        HttpLoggingInterceptor httpLoggingInterceptor = new HttpLoggingInterceptor(new OpenAILogger());
        //!!!!!!测试或者发布到服务器千万不要配置Level == BODY!!!!
        httpLoggingInterceptor.setLevel(HttpLoggingInterceptor.Level.HEADERS);
        //自定义OkHttpClient客户端
        OkHttpClient okHttpClient = new OkHttpClient
                .Builder()
//                .proxy(proxy)
                .addInterceptor(httpLoggingInterceptor)
                .connectTimeout(30, TimeUnit.SECONDS)
                .writeTimeout(600, TimeUnit.SECONDS)
                .readTimeout(600, TimeUnit.SECONDS)
                .build();
        //OpenAi流客户端OpenAiStreamClient,http://127.0.0.1:8080/ 为服务侧查询数据的流式接口,也必须返回text/event-stream类型
        return OpenAiStreamClient
                .builder()
                .apiHost("http://127.0.0.1:8080/")
                .apiKey(Arrays.asList("1","2"))
                //自定义key使用策略 默认随机策略
                .keyStrategy(new KeyRandomStrategy())
                .okHttpClient(okHttpClient)
                .build();
    }
}

定义SSE服务类[SseService]和实现类[SseServiceImpl]

public interface SseService {
    /**
     * 创建SSE
     * @param uid
     * @return
     */
    SseEmitter createSse(String uid);

    /**
     * 关闭SSE
     * @param uid
     */
    void closeSse(String uid);

    /**
     * 客户端发送消息到服务端
     */
    boolean sseChat(String uid, String chatRequestInfo, JSONObject jsonObject, Long userId);
}

实现类代码见源代码

LocalCache配置类
定义缓存对象CACHE存储每个请求UUid和SseEmitter关系,并定时清理

package com.merak.web.controller.stream.config;

import cn.hutool.cache.CacheUtil;
import cn.hutool.cache.impl.TimedCache;
import cn.hutool.core.date.DateUnit;

/**
 * 定义缓存对象CACHE存储每个请求UUid和SseEmitter关系,并定时清理
 *
 */
public class LocalCache {
    /**
     * 缓存时长
     */
    public static final long TIMEOUT = 5 * DateUnit.MINUTE.getMillis();
    /**
     * 清理间隔
     */
    private static final long CLEAN_TIMEOUT = 5 * DateUnit.MINUTE.getMillis();
    /**
     * 缓存对象
     */
    public static final TimedCache<String, Object> CACHE = CacheUtil.newTimedCache(TIMEOUT);

    static {
        //启动定时任务
        CACHE.schedulePrune(CLEAN_TIMEOUT);
    }
}

定义OpenAISSEEventSourceListener类
继承、重载EventSourceListener抽象类建立sse连接-onOpen、监听事件->解析响应数据-onEvent、关闭-onClosed以及异常onFailure 等方法。

package com.merak.web.controller.stream.listener;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.unfbx.chatgpt.entity.chat.MerakChatCompletionResponse;
import com.unfbx.chatgpt.entity.chat.Message;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import okhttp3.Response;
import okhttp3.ResponseBody;
import okhttp3.sse.EventSource;
import okhttp3.sse.EventSourceListener;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.io.IOException;
import java.util.*;
/**
 * 描述:定义OpenAISSEEventSourceListener类,继承EventSourceListener抽象类
 */
@Slf4j
public class OpenAISSEEventSourceListener extends EventSourceListener {
    private static final Logger log = LoggerFactory.getLogger(OpenAISSEEventSourceListener.class);
    private SseEmitter sseEmitter;
    private String uid;
    private JSONObject jsonObject;
    private Long userId;

    /**
     * 结构化
     */
    public OpenAISSEEventSourceListener(SseEmitter sseEmitter, String uid, JSONObject jsonObject, Long userId) {
        this.sseEmitter = sseEmitter;
        this.uid = uid;
        this.jsonObject = jsonObject;
        this.userId = userId;
    }

    /**
     * 建立sse连接
     */
    @Override
    public void onOpen(EventSource eventSource, Response response) {
        log.info("response=" + response);
        log.info("sseEmitter uid=" + uid);
        log.info("OpenAI建立sse连接...");
    }

    /**
     * 监听事件->解析响应数据
     */
    @Override
    public void onEvent(EventSource eventSource, String id, String type, String data) {
        log.info("sseEmitter uid=" + uid + ",OpenAI返回数据:{}", data);
        if (data.equals("[DONE]")) {
            log.info("OpenAI返回数据结束了");
            try {
                String answerContext = "[DONE]";
                sseEmitter.send(answerContext);// .id("[DONE]" reconnectTime(30000)
                // 传输完成后自动关闭sse
                sseEmitter.complete();
                return;
            } catch (IOException e) {
                log.error("onEvent error msg=" + e.getMessage());
            }
        }
        ObjectMapper mapper = new ObjectMapper();
        //由于业务特性需要自定义MerakChatCompletionResponse -> 重写原响应消息对象CompletionResponse [说明:没要求可以不重写]
        MerakChatCompletionResponse completionResponse = null;
        String answerContext = "";
        try {
            completionResponse = mapper.readValue(data, MerakChatCompletionResponse.class); //转换成自定义MerakChatCompletionResponse
            String finishReason = completionResponse.getChoices().get(0).getFinishReason();
            if ("stop".equals(finishReason)) {
                //在原始Json串的"delta"节点增加自定义的业务数据节点bussinessDataNodeJson
                String bussinessDataNodeJson = "{\"userId\":\"" + userId + "\"}";
                //{"id": "chatcmpl-3BpHEcUKNMUk7jbWkKB2gU", "object": "chat.completion.chunk", "created": 1699844126, "model": "chatglm2-6b-32k", "choices": [{"index": 0, "delta": {}, "finish_reason": "stop"}]}
                JSONObject originalObj = JSON.parseObject(data);
                originalObj.remove("body");
                JSONObject targetObj = JSON.parseObject(bussinessDataNodeJson);
                //将自定义的目标Json串添加到原始Json串的"delta"节点后面
                originalObj.getJSONArray("choices").getJSONObject(0).put("delta", targetObj);
                String lastData = originalObj.toJSONString();
                sseEmitter.send(SseEmitter.event()
                        .id(completionResponse.getId())
                        .data(lastData)
                        .reconnectTime(3000));
            } else {
                //过滤Message delta节点有内容的有效内容
                Message delta = completionResponse.getChoices().get(0).getDelta();
                if (null != delta) {
                    answerContext = delta.getContent();
                    if (null != answerContext && !StringUtils.isBlank(answerContext)) {
                        Thread.sleep(50);
                        log.info("休眠50毫秒");
                        sseEmitter.send(SseEmitter.event()
                                .id(completionResponse.getId())
                                .data(data)
                                .reconnectTime(3000));
                    }
                }
            }
        } catch (Exception e) {
            log.error("sse信息推送失败!");
            eventSource.cancel();
            log.error("sse信息推送失败,error msg=" + e.getMessage());
        }
    }

    @Override
    public void onClosed(EventSource eventSource) {
        log.info("OpenAI关闭sse连接...");
    }


    @SneakyThrows
    @Override
    public void onFailure(EventSource eventSource, Throwable t, Response response) {
        log.info("OpenAI 连接操作失败...");
        if (Objects.isNull(response)) {
            return;
        }
        ResponseBody body = response.body();
        if (Objects.nonNull(body)) {
            try {
                log.error("OpenAI sse连接异常data:{},异常:{}", body.string(), t);
            } catch (IOException e) {
                log.error("OpenAI sse连接异常,error msg=" + e.getMessage());
            }
        } else {
            log.error("OpenAI  sse连接异常data:{},异常:{}", response, t);
        }
        eventSource.cancel();
    }

}

2、业务控制类:智能问答,接收Web客户端请求并向调用数据服务

package com.merak.web.controller.knowledge;
import com.alibaba.fastjson2.JSONObject;
import com.merak.common.core.controller.BaseController;
import com.merak.common.utils.uuid.UUID;
import com.merak.web.controller.stream.config.LocalCache;
import com.merak.web.service.AianswerService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitter;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
/**
 * 4.7.机器人智能问答管理
 */
@RestController
@RequestMapping("/aianswer")
public class AianswerController extends BaseController {
    @Autowired
    private AianswerService aianswerService;

    /**
     * 机器人智能问答:客户端发送消息到服务端-流式响应
     * @param jsonObject 智能问答对象 {"robotId":"1","aiQuestion":"请介绍chatgpt"}
     * @return emitter SseEmitter
     * produces:request请求头中的(Accept)类型包含text/event-stream时,指定返回内容类型text/event-stream;
     */
    @PostMapping(path = "/streamAsk", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public ResponseBodyEmitter streamAskBak(@RequestBody JSONObject jsonObject) {
        SseEmitter emitter = null;
        try {
            String uid = UUID.randomUUID().toString().replace("-", "");
            Long userId = 1L;
            boolean blnAnswer = aianswerService.aianswerStreamAsk(jsonObject, userId, uid);
            logger.info("sse 完成连接和发送请求,blnAnswer=" + blnAnswer);
            if (blnAnswer) {
                emitter = (SseEmitter) LocalCache.CACHE.get(uid);
            }
        } catch (Exception e) {
            logger.info("机器人智能问答新增error msg=" + e.getMessage());
        }
        return emitter;
    }

}

智能问答实现方法-aianswerStreamAsk

package com.merak.web.service;
import com.alibaba.fastjson2.JSONObject;
import com.merak.web.controller.stream.service.SseService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.*;
/**
 * 智能问答-服务层
 */
@Service
public class AianswerService {
    private static final Logger logger = LoggerFactory.getLogger(AianswerService.class);
    @Autowired
    private SseService sseService;

    public boolean aianswerStreamAsk(JSONObject jsonObject, Long userId, String uid) {
        boolean bln = false;
        String prompt = "";//提示词
        try {
            //1.拼装问答提问入参chatCompletion
            String robotId = (String) jsonObject.get("robotId");//提问机器人
            String originalQuestion = (String) jsonObject.get("aiQuestion");//提问问题
            Map<String, Object> chatInfo = new HashMap();
            chatInfo.put("source_question", originalQuestion);
            chatInfo.put("prompt", prompt);
            chatInfo.put("temperature", 1);
            chatInfo.put("top_p", 1);
            chatInfo.put("history", null);
            String chatCompletion = JSONObject.toJSONString(chatInfo);

            //2.创建SSE
            sseService.createSse(uid);
            logger.info("chatCompletion=" + chatCompletion);
            //3.客户端发送消息到服务端
            bln = sseService.sseChat(uid, chatCompletion, jsonObject, userId);
        } catch (Exception e) {
            logger.error("机器人智能问答失败,error msg=" + e.getMessage());
        }
        return bln;
    }
}

客户端发送消息到服务端-sseService.sseChat

@Override
    public boolean sseChat(String uid, String chatCompletion, JSONObject jsonObject, Long userId) {
        boolean bln = true;
        try {
            //从缓存对象TimedCache内获取SseEmitter
            SseEmitter sseEmitter = (SseEmitter) LocalCache.CACHE.get(uid);
            if (sseEmitter == null) {
                log.info("消息推送失败uid:[{}],没有创建连接,请重试。", uid);
                throw new BaseException("消息推送失败uid:[{}],没有创建连接,请重试。~");
            }
            OpenAISSEEventSourceListener openAIEventSourceListener = new OpenAISSEEventSourceListener(sseEmitter, uid, jsonObject, userId);
            openAiStreamClient.streamChatCompletion(chatCompletion, openAIEventSourceListener);
        }catch (Exception e){
            bln = false;
            log.info("[{}]消息推送失败失败!", uid);
        }
        return bln;
    }

服务端调用ChatGPT API方法streamChatCompletion 实现同最底层数据提供服务方通信
即调用openAiStreamClient.streamChatCompletion(chatCompletion, openAIEventSourceListener);方法
发送的API接口地址为:String streamUrl = this.apiHost + "v1/completions";

响应的实时数据

{"id": "chatcmpl-42rNbjJ9WFAnMGF9cNZTwn", "model": "chatglm2-6b-32k", "choices": [{"index": 0, "delta": {"role": "assistant"}, "finish_reason": null}]}
{"id": "chatcmpl-42rNbjJ9WFAnMGF9cNZTwn", "model": "chatglm2-6b-32k", "choices": [{"index": 0, "delta": {"content": "尊敬"}, "finish_reason": null}]}
{"id": "chatcmpl-42rNbjJ9WFAnMGF9cNZTwn", "model": "chatglm2-6b-32k", "choices": [{"index": 0, "delta": {"content": "的"}, "finish_reason": null}]}
{"id": "chatcmpl-42rNbjJ9WFAnMGF9cNZTwn", "model": "chatglm2-6b-32k", "choices": [{"index": 0, "delta": {"content": "客户"}, "finish_reason": null}]}
{"id": "chatcmpl-42rNbjJ9WFAnMGF9cNZTwn", "model": "chatglm2-6b-32k", "choices": [{"index": 0, "delta": {"content": ","}, "finish_reason": null}]}
{"id": "chatcmpl-42rNbjJ9WFAnMGF9cNZTwn", "model": "chatglm2-6b-32k", "choices": [{"index": 0, "delta": {"content": "您好"}, "finish_reason": null}]}
。。。。。。
{"id": "chatcmpl-42rNbjJ9WFAnMGF9cNZTwn", "model": "chatglm2-6b-32k", "choices": [{"index": 0, "delta": {}, "finish_reason": "stop"}]}
[DONE]

3、监听响应的实时数据
EventSourceListener onEvent()监听事件->解析响应数据:

    /**
     * 监听事件->解析响应数据
     */
    @Override
    public void onEvent(EventSource eventSource, String id, String type, String data) {
        log.info("sseEmitter uid=" + uid + ",OpenAI返回数据:{}", data);
        if (data.equals("[DONE]")) {
            log.info("OpenAI返回数据结束了");
            try {
                String answerContext = "[DONE]";
                sseEmitter.send(answerContext);// .id("[DONE]" reconnectTime(30000)
                // 传输完成后自动关闭sse
                sseEmitter.complete();
                return;
            } catch (IOException e) {
                log.error("onEvent error msg=" + e.getMessage());
            }
        }
        ObjectMapper mapper = new ObjectMapper();
        //由于业务特性需要自定义MerakChatCompletionResponse -> 重写原响应消息对象CompletionResponse [说明:没要求可以不重写]
        MerakChatCompletionResponse completionResponse = null;
        String answerContext = "";
        try {
            completionResponse = mapper.readValue(data, MerakChatCompletionResponse.class); //转换成自定义MerakChatCompletionResponse
            String finishReason = completionResponse.getChoices().get(0).getFinishReason();
            if ("stop".equals(finishReason)) {
                //在原始Json串的"delta"节点增加自定义的业务数据节点bussinessDataNodeJson
                String bussinessDataNodeJson = "{\"userId\":\"" + userId + "\"}";
                //{"id": "chatcmpl-3BpHEcUKNMUk7jbWkKB2gU", "object": "chat.completion.chunk", "created": 1699844126, "model": "chatglm2-6b-32k", "choices": [{"index": 0, "delta": {}, "finish_reason": "stop"}]}
                JSONObject originalObj = JSON.parseObject(data);
                originalObj.remove("body");
                JSONObject targetObj = JSON.parseObject(bussinessDataNodeJson);
                //将自定义的目标Json串添加到原始Json串的"delta"节点后面
                originalObj.getJSONArray("choices").getJSONObject(0).put("delta", targetObj);
                String lastData = originalObj.toJSONString();
                sseEmitter.send(SseEmitter.event()
                        .id(completionResponse.getId())
                        .data(lastData)
                        .reconnectTime(3000));
            } else {
                //过滤Message delta节点有内容的有效内容
                Message delta = completionResponse.getChoices().get(0).getDelta();
                if (null != delta) {
                    answerContext = delta.getContent();
                    if (null != answerContext && !StringUtils.isBlank(answerContext)) {
                        Thread.sleep(50);
                        log.info("休眠50毫秒");
                        sseEmitter.send(SseEmitter.event()
                                .id(completionResponse.getId())
                                .data(data)
                                .reconnectTime(3000));
                    }
                }
            }
        } catch (Exception e) {
            log.error("sse信息推送失败!");
            eventSource.cancel();
            log.error("sse信息推送失败,error msg=" + e.getMessage());
        }
    }

4、sseEmitter向Web客户端实时推送数据,将数据实时写入outputMessage Body体内

 sseEmitter.send(SseEmitter.event().id(completionResponse.getId())
           .data(lastData).reconnectTime(3000));

流程说明:

  sseEmitter.send() -> 
  ResponseBodyEmitter.sendInternal(Object object, @Nullable MediaType mediaType)  ->
  this.handler.send(object, mediaType) ->
  ResponseBodyEmitterReturnValueHandler.sendInternal(T data, @Nullable MediaType mediaType)  ->
   {  converter.write(data, mediaType, this.outputMessage);
     this.outputMessage.flush(); 
   }                                  ->
   
   public final void write(final T t, @Nullable MediaType contentType, HttpOutputMessage outputMessage) throws IOException, HttpMessageNotWritableException {
        HttpHeaders headers = outputMessage.getHeaders();
        this.addDefaultHeaders(headers, t, contentType);
        if (outputMessage instanceof StreamingHttpOutputMessage) {
            StreamingHttpOutputMessage streamingOutputMessage = (StreamingHttpOutputMessage)outputMessage;
            streamingOutputMessage.setBody((outputStream) -> {
                this.writeInternal(t, new HttpOutputMessage() {
                    public OutputStream getBody() {
                        return outputStream;
                    }
                    public HttpHeaders getHeaders() {
                        return headers;
                    }
                });
            });
        } else {
            this.writeInternal(t, outputMessage);
            outputMessage.getBody().flush();
        }

    }

Web VUE核心解析数据代码

<script setup name="Chat">

  // 核心解析服务端方法requestStreamingChat
async function requestStreamingChat(message,index,subBussiness) {
  const url = `${window.location.origin}${import.meta.env.VITE_APP_BASE_API}/aianswer/streamAsk`

  controller = new AbortController()
  const reqTimeoutId = setTimeout(() => controller.abort(), 30000)

  try {
    let respString = ''
    fetchEventSource(url,{
      method: 'POST',
      headers: {
        'Content-Type': 'application/json',
        'Authorization': `Bearer ${getToken()}`,
      },
      signal: controller.signal,
      body: JSON.stringify({
        robotId: currentChatRobot.value?.robotId,
        knowledgeId:currentChatRobot.value?.knowledgeId,
        aiQuestion: message,
        aiQuestionHistory: subBussiness?bussinessProcess.historyQa:history.value,
        classifyFrom:'1',
        bussinessType:subBussiness?bussinessProcess.bussinessType:'',
        bussinessContent:subBussiness?bussinessProcess.formData:''
      }),
      async onopen(response) {
        console.log(response)
        if (response.ok && response.headers.get('content-type')?.includes('text/event-stream')) {
          // everything's good
          console.log('everything\'s good')
        } else if (response.status >= 400 && response.status < 500 && response.status !== 429) {
          chatStore.updateChat(
            index,
            {
              dateTime: dayjs().format('YYYY-MM-DD HH:mm:ss'),
              text: 'Stream Error',
              inversion: false,
              error: true,
              loading: false,
              streaming: false,
              requestOptions: { prompt: message },
            },
          )
          scrollToBottomIfAtBottom()
        } else {
          console.log('其他错误')
          finish()
        }
      },
      async onmessage(event) {
        console.log(event)
        // 表示整体结束
        if (event.data === '[DONE]') {
          console.log('结束')
          finish()
          return
        }
        if (event.data) {
          const jsonData = JSON.parse(event.data)
          // 如果等于stop表示结束
          if (jsonData.choices[0].finish_reason === 'stop') {
            //接收会话信息
            const info = jsonData.choices[0].delta
            chatStore.updateChatSome(
              index,
              {
                dateTime: info?.occurTime,
                aiId: info?.aiId,
                solveIs: info?.solveIs
              }
            )
            return
          }
          // 判断role存在,进行排除
          if (jsonData.choices[0].delta.role !== undefined) {
            respString = jsonData.choices[0].delta.role + ': '
            return
          }
          if (jsonData.choices[0].delta.content !== undefined) {
            respString += jsonData.choices[0].delta.content
            console.log(respString)
            chatStore.updateChat(
              index,
              {
                dateTime: dayjs().format('YYYY-MM-DD HH:mm:ss'),//res?.data?.occurTime,
                text: respString,
                error: false,
                loading: false,
                streaming: true,
                requestOptions: { prompt: message },
              },
            )
            scrollToBottomIfAtBottom()
          }
        }        
      },
      async onerror(error) {
        console.error('Error:', error)
        finish()
      },
      async onclose() {
        loading.value = false
        finish()
        // if the server closes the connection unexpectedly, retry:
        console.log('关闭连接')
      }
    })

    const finish = () => {      
      loading.value = false
      chatStore.updateChatSome(index, { streaming: false,loading: false })
      reqTimeoutId && clearTimeout(reqTimeoutId)
      controller.abort()
    }
  } catch (error) {
    loading.value = false
    const errorMessage = error?.message ?? '好像出错了,请稍后再试。'
    if (error.message === 'canceled') {
      chatStore.updateChatSome(
        index,
        {
          loading: false,
        },
      )
      scrollToBottomIfAtBottom()
      return
    }
    chatStore.updateChat(
      index,
      {
        dateTime: dayjs().format('YYYY-MM-DD HH:mm:ss'),
        text: errorMessage,
        inversion: false,
        error: true,
        loading: false,
        streaming: false,
        requestOptions: { prompt: message },
      },
    )
    scrollToBottomIfAtBottom()
  }
  finally {
    chatStore.updateChatTime(robotId.value,dayjs().format('YYYY-MM-DD HH:mm:ss'))
  }
}

function handleClear() {
  //针对那些回答异常没有会话ID的情况
  if(!dataSources.value?.[dataSources.value.length-1]?.aiId) {
    chatStore.updateChatSome(dataSources.value.length-1, { cleanIs: 1 })
    const element = chatWrapper.value;
    if(element) {
      element.style.height = element.offsetHeight + 300 + 'px'
      scrollToBottom()
    }
    return
  }
  clearChat({
    aiId:dataSources.value?.[dataSources.value.length-1]?.aiId,
    robotId:robotId.value,
    knowledgeId:currentChatRobot?.knowledgeId
  }).then(res => {
    if(res.code == 200) {
      chatStore.updateChatSome(dataSources.value.length-1, { cleanIs: 1 })
      const element = chatWrapper.value;
      if(element) {
        element.style.height = element.offsetHeight + 300 + 'px'
        scrollToBottom()
      }
    }
  })  
}

function handleEnter(event) {
  if (event.key === 'Enter' && !event.shiftKey) {
    event.preventDefault()
    onConversation()
  }
}

function handleStop() {
  if (loading.value) {
    controller.abort()
    loading.value = false
  }
}

const searchRobotDo = () => {
  listRobot({robotName:searchRobot.value,robotStatus:0,pageNum:1,pageSize:100}).then(res => {
    if(res.code == 200) {
      robots.value = res.rows
    }
    if(!searchRobot.value) {
      upTopRobot()
    }
  })
}

//当前对话的机器人置顶
function upTopRobot(rid) {
  if(robotRef.value) {
    rid = rid || robotId.value
    const _up = robots.value.find(item=>item.robotId==rid)
    let _filter = robots.value.filter(item=>item.robotId!=rid)
    _filter.unshift(_up)
    robots.value = _filter
    robotRef.value.scrollTop = 0
  }  
}

function handleFeedResult(index,aiId,type) {
  if(type == 1) {
    feedBackChat({aiId,solveIs:type}).then(res => {
      proxy.$modal.msgSuccess('您的反馈我们已经收到,谢谢您的认可!')
      chatStore.updateChatSome(index,{solveIs:type})
    })
  } else {
    console.log(index)
    curFeedChat.value = dataSources.value[index]
    curFeedChat.value['index'] = index
    feedBackText.value = curFeedChat.value?.feedBack
    feedbackShow.value = true
  }
}

function submitFeedback() {
  if(!feedBackText.value) {
    proxy.$modal.msgWarning('请详细描述需要反馈的问题')
    return
  }
  feedBackChat({aiId:curFeedChat.value?.aiId,robotId:robotId.value,feedBack:feedBackText.value,solveIs:-1}).then(res => {
    if(res.code == 200) {
      proxy.$modal.msgSuccess('感谢您的反馈,我们将不断改进~~')
      chatStore.updateChatSome(curFeedChat.value.index,{feedBack:feedBackText.value,solveIs:-1})
    }
  }).finally(() => {
    feedbackShow.value = false
  })
}

onMounted(() => {  
  //查询所有机器人
  listRobot({robotStatus:0,pageNum:1,pageSize:100}).then(res => {
    if(res.code == 200 && res.rows?.length>0) {
      robots.value = res.rows
      if(!robotId.value || res.rows.findIndex(item=>item.robotId==robotId.value) == -1) {
        chatStore.setActive(res.rows[0]?.robotId)
      } else {
        upTopRobot()
      }
      //查询当前对话机器人的问答记录
      chatStore.getCurChat().then(res => {
        setTimeout(()=>{
          scrollToBottom()
        },0)
      })
    } else {
      proxy.$modal.msgWarning('暂无机器人可以对话~~')
    }
  })
  if (inputRef.value)
    inputRef.value?.focus()
})

onUnmounted(() => {
  if (loading.value)
    controller.abort()
})


const formatChatTime = (time) => {
  if(dayjs(time).isAfter(dayjs().startOf('day'))) {
    return dayjs(time).format('HH:mm')
  } else {
    return dayjs(time).format('MM/DD')
  }
}

const deleteChat = () => {
  proxy.$modal.confirm('是否确认清除当前AI应用的会话记录吗!').then(function () {
    return deleteChatContext({robotId:currentChatRobot.value?.robotId,classifyFrom:1})
  }).then(() => {
    chatStore.getCurChat()
    proxy.$modal.msgSuccess("清除成功");
  }).catch(() => { });
}

const viewAnswerLog = (aiId) => {
  getChatLog(aiId).then(res => {
    if(res.code == 200 && res.data) {
      logDetail.value = res.data
      showLogDetail.value = true
    } else {
      proxy.$modal.msgError("日志查询失败");
    }
  })
}

provide('handleFindRef', (aiId) => {
  showRefAiId.value = aiId
  showRefDialog.value = true
})

const onCloseDialog = () => {
  showRefDialog.value = false;
  showRefAiId.value = ''
}

provide('handleSumbitForm',async (type, flowStep, data) => {
  console.log(type, flowStep, data)
  if(type == -1) {
    proxy.$modal.confirm(`是否提前终止报价流程,已提交的数据将丢失?`).then(()=>{
      bussinessProcess.running = false
      chatStore.updateChatSome(dataSources.value.length - 1,{flowFlag:false})
      bussinessProcess.formData = {}
      bussinessProcess.historyQa = []

      prompt.value = '取消报价'
      onConversation()
    }).catch(() => { });
  } else {
    const qs = replaceTemplate(flowStep?.questionTpl,data)
    
    bussinessProcess.formData = {...bussinessProcess.formData,...data}

    chatStore.updateChatSome(dataSources.value.length - 1,{flowFlag:false})

    chatStore.addChat(
      {
        dateTime: dayjs().format('YYYY-MM-DD HH:mm:ss'),
        text: qs,
        inversion: true,
        error: false,
        requestOptions: { prompt: qs },
      },
    )
    scrollToBottom()
    chatStore.updateChatTime(robotId.value,dayjs().format('YYYY-MM-DD HH:mm:ss'))
    loading.value = true

    chatStore.addChat(
      {
        dateTime: dayjs().format('YYYY-MM-DD HH:mm:ss'),
        text: '',
        loading: true,
        inversion: false,
        error: false,
        requestOptions: { prompt: qs },
      },
    )
    scrollToBottom()

    if(flowStep?.step < bussinessProcess.totalStep) {
      let aw = ''
      try {
        const _tpl = JSON.parse(flowSteps.value[flowStep.step]?.formTpl)
        aw = _tpl?.rule?.[0].value
      } catch (error) {}
      bussinessProcess.historyQa.push([qs,aw])

      setTimeout(() => {
        chatStore.updateChat(
          dataSources.value.length - 1,
          {
            dateTime: dayjs().format('YYYY-MM-DD HH:mm:ss'),
            text: aw,
            inversion: false,
            error: false,
            loading: false,
            requestOptions: { prompt: qs },
            flowFlag:true,
            flowStep:flowSteps.value[flowStep?.step]
          },
        )
        scrollToBottom()
        loading.value = false
      },800)
    } else {

      await currentChatRobot.value.openStream == 'true' ? requestStreamingChat(qs,dataSources.value.length - 1,true) : requestChat(qs,dataSources.value.length - 1,true)
      
      loading.value = false
      bussinessProcess.running = false
      bussinessProcess.formData = {}
      bussinessProcess.historyQa = []
      chatStore.updateChatTime(robotId.value,dayjs().format('YYYY-MM-DD HH:mm:ss'))
    }
  }
})
</script>

实例demo

demo源代码

参考

基于Spring ApplicationEventPublisherAware推送事件实现

Java语言作为后端对接 chatgpt文章来源地址https://www.toymoban.com/news/detail-813587.html

到了这里,关于SSE[Server-Sent Events]实现页面流式数据输出(模拟ChatGPT流式输出)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 前端Server-Sent Events(SSE)请求如何用post

    现在非常流行AI问答,AI回答的时候一般都是流式输出,一个字几个字几个字地慢慢加载完,要实现这个效果,我们一般可以用WebSocket和Server-Sent来实现。 我会选择使用SSE,为什么不用WebSocket呢? 1. WebSocket是双向通信,这个功能只需要服务器一直向我们输出。 2.SSE是一个htt

    2024年02月02日
    浏览(41)
  • html5学习笔记19-SSE服务器发送事件(Server-Sent Events)

    https://www.runoob.com/html/html5-serversentevents.html 允许网页获得来自服务器的更新。类似设置回调函数。 demo_sse.php demo_sse.aspx

    2024年02月09日
    浏览(51)
  • SSE(Server-Sent Events,服务器推送事件)和sockets(套接字)通信区别

    SSE(Server-Sent Events,服务器推送事件)和sockets(套接字)都是用于实现实时通信的技术,但它们具有不同的特点和应用场景。 SSE 的优点: 简单易用:SSE 是基于HTTP协议的一种实时通信技术,使用简单,只需要在客户端通过EventSource对象监听服务器推送的事件即可。 可靠性:

    2024年02月15日
    浏览(52)
  • Server-Sent Events(以下简称 SSE)及event-source-polyfill使用单向长连接(后台主动向前端推送)

    SSE 与 WebSocket 作用相似,都是建立浏览器与服务器之间的通信渠道,然后服务器向浏览器推送信息SSE 是单向通道,只能服务器向浏览器发送,因为流信息本质上就是下载。如果浏览器向服务器发送信息,就变成了另一次 HTTP 请求 使用方法  Server-Sent Events 教程 - 阮一峰的网络

    2024年02月12日
    浏览(47)
  • 结合Server-sent events与 EventSource使用,实现服务端主动向客户端发送数据

    当前解决服务端推送的方案有这几个: 客户端长轮询(不推荐使用) websocket双向连接 iframe永久帧(不推荐使用) EventSource 长轮训虽然可以避免短轮训造成的服务端过载,但在服务端返回数据后仍需要客户端主动发起下一个长轮训请求,等待服务端响应,这样仍需要底层的连

    2024年02月04日
    浏览(76)
  • Java Server-Sent Events通信

    后端可以向前端发送信息,类似于websocket,但是websocket是双向通信,但是sse为单向通信,服务器只能向客户端发送文本信息,效率比websocket高。 单向通信 :SSE只支持服务器到客户端的单向通信。这对于那些只需要服务器推送数据而无需客户端响应的场景非常有效,例如实时

    2024年01月23日
    浏览(44)
  • 介绍Server-Sent Events,以及使用,超级简单!

    严格地说,HTTP 协议无法做到服务器主动推送信息。但是,有一种变通方法,就是服务器向客户端声明,接下来要发送的是流信息(streaming)。 也就是说,发送的不是一次性的数据包,而是一个数据流,会连续不断地发送过来。这时,客户端不会关闭连接,会一直等着服务器

    2024年02月11日
    浏览(42)
  • 浅谈PHP结合JavaScript SSE(Server Sent Events)实现服务器实时推送功能

    如配置后Nginx遇到502/504的,请参考这两篇文章的解决方案 PHP-FPM与Nginx通信报 502 Bad Gateway或504 Gateway Timeout终极解决方案(适用于PHP执行耗时任务情况下的报错) Linux系统下配置Nginx使部分URL使用多套自定义的PHP-FPM配置 SSE 的全称是 Server Sent Events,即服务器推送事件。它是一种

    2024年02月08日
    浏览(46)
  • Go 中的Server-Sent Events:一种高效的实时通信替代方案

    在当今的软件工程领域,实时通信在许多现代应用程序中发挥着至关重要的作用。Server-Sent Events (SSE) 是该领域广受欢迎的一项技术。 在本文中,我们将探讨Server-Sent Events 是什么,将它们的功能与 WebSocket 进行比较,提供 Go 和 JavaScript 代码示例,讨论使用服务器发送事件的优

    2024年02月11日
    浏览(43)
  • Spring Boot 整合 SSE(Server Sent Event)

    服务器发送事件(Server-Sent Events),简称 SSE。这是一种服务器端到客户端的单向消息推送。SSE 基于 HTTP 协议的,SSE 在服务器和客户端之间打开一个单向通道,服务端响应的不再是一次性的数据包而是text/event-stream类型的数据流信息 后端代码: 细节: 创建SseEmitter 对象时需要返

    2024年02月16日
    浏览(51)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包