前言
服务端向客户端推送消息,除了用WebSocket可实现,还有一种服务器发送事件(Server-Sent Events)简称 SSE,这是一种服务器端到客户端(浏览器)的单向消息推送。ChatGPT 就是采用的 SSE。对于需要长时间等待响应的对话场景,ChatGPT 采用了一种巧妙的策略:它会将已经计算出的数据“推送”给用户,并利用 SSE 技术在计算过程中持续返回数据。这样做的好处是可以避免用户因等待时间过长而选择关闭页面。
SSE 简介
SSE 基于 HTTP 协议的,我们知道一般意义上的 HTTP 协议是无法做到服务端主动向客户端推送消息的,但 SSE 是个例外,它变换了一种思路。
SSE 在服务器和客户端之间打开一个单向通道,服务端响应的不再是一次性的数据包而是text/event-stream类型的数据流信息,在有数据变更时从服务器流式传输到客户端。整体的实现思路有点类似于在线视频播放,视频流会连续不断的推送到浏览器,你也可以理解成,客户端在完成一次用时很长(网络不畅)的下载。
SSE 与 WebSocket 作用相似,都可以建立服务端与浏览器之间的通信,实现服务端向客户端推送消息,但还是有些许不同:
- SSE 是基于 HTTP 协议的,它们不需要特殊的协议或服务器实现即可工作;WebSocket 需单独服务器来处理协议。
- SSE 单向通信,只能由服务端向客户端单向通信;WebSocket 全双工通信,即通信的双方可以同时发送和接受信息。
- SSE 实现简单开发成本低,无需引入其他组件;WebSocket 传输数据需做二次解析,开发门槛高一些
- SSE 默认支持断线重连;WebSocket 则需要自己实现。
- SSE 只能传送文本消息,二进制数据需要经过编码后传送;WebSocket 默认支持传送二进制数据。
应用场景区分
- WebSocket: 提供更丰富的协议来执行双向、全双工通信。对于游戏、即时通信以及需要双向近乎实时更新的场景,拥有双向通道更具吸引力。
- 而某些情况下,不需要从客户端发送数据,而只需要一些服务器操作的更新。比如:站内信、未读消息数、状态更新、股票行情、监控数量等场景。SSE 具有 WebSocket 在设计上缺乏的多种功能,例如:自动重新连接、事件 ID 和发送任意事件的能力。前端只需进行一次 HTTP 请求,带上唯一 ID,打开事件流,监听服务端推送的事件就可以反向不定期的推送实时数据。SEE 从实现的难易和成本上都更有优势。
浏览器支撑性
SSE 不支持 IE 浏览器,对其他主流浏览器兼容性做的还不错。
实现过程
环境提示:JDK8 、 Spring Boot 2.5.15 、Spring framework 5.3.27
1、配置与初始化工作
导入pom依赖
<dependency>
<groupId>com.unfbx</groupId>
<artifactId>chatgpt-java</artifactId>
<version>1.0.12</version>
</dependency>
启动类自定义OkHttpClient客户端和OpenAi流客户端OpenAiStreamClient使用Bean对象
package com.merak;
import com.unfbx.chatgpt.OpenAiStreamClient;
import com.unfbx.chatgpt.function.KeyRandomStrategy;
import com.unfbx.chatgpt.interceptor.OpenAILogger;
import okhttp3.OkHttpClient;
import okhttp3.logging.HttpLoggingInterceptor;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
/**
* 启动程序
*/
@SpringBootApplication(exclude = { DataSourceAutoConfiguration.class })
@ComponentScan(basePackages = "com.*")
public class MerakAdminApplication {
public static void main(String[] args)
{
SpringApplication.run(MerakAdminApplication.class, args);
}
@Bean
public OpenAiStreamClient openAiStreamClient() {
//本地开发需要配置代理地址
// Proxy proxy = new Proxy(Proxy.Type.HTTP, new InetSocketAddress("127.0.0.1", 8002));
HttpLoggingInterceptor httpLoggingInterceptor = new HttpLoggingInterceptor(new OpenAILogger());
//!!!!!!测试或者发布到服务器千万不要配置Level == BODY!!!!
httpLoggingInterceptor.setLevel(HttpLoggingInterceptor.Level.HEADERS);
//自定义OkHttpClient客户端
OkHttpClient okHttpClient = new OkHttpClient
.Builder()
// .proxy(proxy)
.addInterceptor(httpLoggingInterceptor)
.connectTimeout(30, TimeUnit.SECONDS)
.writeTimeout(600, TimeUnit.SECONDS)
.readTimeout(600, TimeUnit.SECONDS)
.build();
//OpenAi流客户端OpenAiStreamClient,http://127.0.0.1:8080/ 为服务侧查询数据的流式接口,也必须返回text/event-stream类型
return OpenAiStreamClient
.builder()
.apiHost("http://127.0.0.1:8080/")
.apiKey(Arrays.asList("1","2"))
//自定义key使用策略 默认随机策略
.keyStrategy(new KeyRandomStrategy())
.okHttpClient(okHttpClient)
.build();
}
}
定义SSE服务类[SseService]和实现类[SseServiceImpl]
public interface SseService {
/**
* 创建SSE
* @param uid
* @return
*/
SseEmitter createSse(String uid);
/**
* 关闭SSE
* @param uid
*/
void closeSse(String uid);
/**
* 客户端发送消息到服务端
*/
boolean sseChat(String uid, String chatRequestInfo, JSONObject jsonObject, Long userId);
}
实现类代码见源代码
LocalCache配置类
定义缓存对象CACHE存储每个请求UUid和SseEmitter关系,并定时清理
package com.merak.web.controller.stream.config;
import cn.hutool.cache.CacheUtil;
import cn.hutool.cache.impl.TimedCache;
import cn.hutool.core.date.DateUnit;
/**
* 定义缓存对象CACHE存储每个请求UUid和SseEmitter关系,并定时清理
*
*/
public class LocalCache {
/**
* 缓存时长
*/
public static final long TIMEOUT = 5 * DateUnit.MINUTE.getMillis();
/**
* 清理间隔
*/
private static final long CLEAN_TIMEOUT = 5 * DateUnit.MINUTE.getMillis();
/**
* 缓存对象
*/
public static final TimedCache<String, Object> CACHE = CacheUtil.newTimedCache(TIMEOUT);
static {
//启动定时任务
CACHE.schedulePrune(CLEAN_TIMEOUT);
}
}
定义OpenAISSEEventSourceListener类
继承、重载EventSourceListener抽象类建立sse连接-onOpen、监听事件->解析响应数据-onEvent、关闭-onClosed以及异常onFailure 等方法。
package com.merak.web.controller.stream.listener;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.unfbx.chatgpt.entity.chat.MerakChatCompletionResponse;
import com.unfbx.chatgpt.entity.chat.Message;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import okhttp3.Response;
import okhttp3.ResponseBody;
import okhttp3.sse.EventSource;
import okhttp3.sse.EventSourceListener;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.io.IOException;
import java.util.*;
/**
* 描述:定义OpenAISSEEventSourceListener类,继承EventSourceListener抽象类
*/
@Slf4j
public class OpenAISSEEventSourceListener extends EventSourceListener {
private static final Logger log = LoggerFactory.getLogger(OpenAISSEEventSourceListener.class);
private SseEmitter sseEmitter;
private String uid;
private JSONObject jsonObject;
private Long userId;
/**
* 结构化
*/
public OpenAISSEEventSourceListener(SseEmitter sseEmitter, String uid, JSONObject jsonObject, Long userId) {
this.sseEmitter = sseEmitter;
this.uid = uid;
this.jsonObject = jsonObject;
this.userId = userId;
}
/**
* 建立sse连接
*/
@Override
public void onOpen(EventSource eventSource, Response response) {
log.info("response=" + response);
log.info("sseEmitter uid=" + uid);
log.info("OpenAI建立sse连接...");
}
/**
* 监听事件->解析响应数据
*/
@Override
public void onEvent(EventSource eventSource, String id, String type, String data) {
log.info("sseEmitter uid=" + uid + ",OpenAI返回数据:{}", data);
if (data.equals("[DONE]")) {
log.info("OpenAI返回数据结束了");
try {
String answerContext = "[DONE]";
sseEmitter.send(answerContext);// .id("[DONE]" reconnectTime(30000)
// 传输完成后自动关闭sse
sseEmitter.complete();
return;
} catch (IOException e) {
log.error("onEvent error msg=" + e.getMessage());
}
}
ObjectMapper mapper = new ObjectMapper();
//由于业务特性需要自定义MerakChatCompletionResponse -> 重写原响应消息对象CompletionResponse [说明:没要求可以不重写]
MerakChatCompletionResponse completionResponse = null;
String answerContext = "";
try {
completionResponse = mapper.readValue(data, MerakChatCompletionResponse.class); //转换成自定义MerakChatCompletionResponse
String finishReason = completionResponse.getChoices().get(0).getFinishReason();
if ("stop".equals(finishReason)) {
//在原始Json串的"delta"节点增加自定义的业务数据节点bussinessDataNodeJson
String bussinessDataNodeJson = "{\"userId\":\"" + userId + "\"}";
//{"id": "chatcmpl-3BpHEcUKNMUk7jbWkKB2gU", "object": "chat.completion.chunk", "created": 1699844126, "model": "chatglm2-6b-32k", "choices": [{"index": 0, "delta": {}, "finish_reason": "stop"}]}
JSONObject originalObj = JSON.parseObject(data);
originalObj.remove("body");
JSONObject targetObj = JSON.parseObject(bussinessDataNodeJson);
//将自定义的目标Json串添加到原始Json串的"delta"节点后面
originalObj.getJSONArray("choices").getJSONObject(0).put("delta", targetObj);
String lastData = originalObj.toJSONString();
sseEmitter.send(SseEmitter.event()
.id(completionResponse.getId())
.data(lastData)
.reconnectTime(3000));
} else {
//过滤Message delta节点有内容的有效内容
Message delta = completionResponse.getChoices().get(0).getDelta();
if (null != delta) {
answerContext = delta.getContent();
if (null != answerContext && !StringUtils.isBlank(answerContext)) {
Thread.sleep(50);
log.info("休眠50毫秒");
sseEmitter.send(SseEmitter.event()
.id(completionResponse.getId())
.data(data)
.reconnectTime(3000));
}
}
}
} catch (Exception e) {
log.error("sse信息推送失败!");
eventSource.cancel();
log.error("sse信息推送失败,error msg=" + e.getMessage());
}
}
@Override
public void onClosed(EventSource eventSource) {
log.info("OpenAI关闭sse连接...");
}
@SneakyThrows
@Override
public void onFailure(EventSource eventSource, Throwable t, Response response) {
log.info("OpenAI 连接操作失败...");
if (Objects.isNull(response)) {
return;
}
ResponseBody body = response.body();
if (Objects.nonNull(body)) {
try {
log.error("OpenAI sse连接异常data:{},异常:{}", body.string(), t);
} catch (IOException e) {
log.error("OpenAI sse连接异常,error msg=" + e.getMessage());
}
} else {
log.error("OpenAI sse连接异常data:{},异常:{}", response, t);
}
eventSource.cancel();
}
}
2、业务控制类:智能问答,接收Web客户端请求并向调用数据服务
package com.merak.web.controller.knowledge;
import com.alibaba.fastjson2.JSONObject;
import com.merak.common.core.controller.BaseController;
import com.merak.common.utils.uuid.UUID;
import com.merak.web.controller.stream.config.LocalCache;
import com.merak.web.service.AianswerService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitter;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
/**
* 4.7.机器人智能问答管理
*/
@RestController
@RequestMapping("/aianswer")
public class AianswerController extends BaseController {
@Autowired
private AianswerService aianswerService;
/**
* 机器人智能问答:客户端发送消息到服务端-流式响应
* @param jsonObject 智能问答对象 {"robotId":"1","aiQuestion":"请介绍chatgpt"}
* @return emitter SseEmitter
* produces:request请求头中的(Accept)类型包含text/event-stream时,指定返回内容类型text/event-stream;
*/
@PostMapping(path = "/streamAsk", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public ResponseBodyEmitter streamAskBak(@RequestBody JSONObject jsonObject) {
SseEmitter emitter = null;
try {
String uid = UUID.randomUUID().toString().replace("-", "");
Long userId = 1L;
boolean blnAnswer = aianswerService.aianswerStreamAsk(jsonObject, userId, uid);
logger.info("sse 完成连接和发送请求,blnAnswer=" + blnAnswer);
if (blnAnswer) {
emitter = (SseEmitter) LocalCache.CACHE.get(uid);
}
} catch (Exception e) {
logger.info("机器人智能问答新增error msg=" + e.getMessage());
}
return emitter;
}
}
智能问答实现方法-aianswerStreamAsk:
package com.merak.web.service;
import com.alibaba.fastjson2.JSONObject;
import com.merak.web.controller.stream.service.SseService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.*;
/**
* 智能问答-服务层
*/
@Service
public class AianswerService {
private static final Logger logger = LoggerFactory.getLogger(AianswerService.class);
@Autowired
private SseService sseService;
public boolean aianswerStreamAsk(JSONObject jsonObject, Long userId, String uid) {
boolean bln = false;
String prompt = "";//提示词
try {
//1.拼装问答提问入参chatCompletion
String robotId = (String) jsonObject.get("robotId");//提问机器人
String originalQuestion = (String) jsonObject.get("aiQuestion");//提问问题
Map<String, Object> chatInfo = new HashMap();
chatInfo.put("source_question", originalQuestion);
chatInfo.put("prompt", prompt);
chatInfo.put("temperature", 1);
chatInfo.put("top_p", 1);
chatInfo.put("history", null);
String chatCompletion = JSONObject.toJSONString(chatInfo);
//2.创建SSE
sseService.createSse(uid);
logger.info("chatCompletion=" + chatCompletion);
//3.客户端发送消息到服务端
bln = sseService.sseChat(uid, chatCompletion, jsonObject, userId);
} catch (Exception e) {
logger.error("机器人智能问答失败,error msg=" + e.getMessage());
}
return bln;
}
}
客户端发送消息到服务端-sseService.sseChat
@Override
public boolean sseChat(String uid, String chatCompletion, JSONObject jsonObject, Long userId) {
boolean bln = true;
try {
//从缓存对象TimedCache内获取SseEmitter
SseEmitter sseEmitter = (SseEmitter) LocalCache.CACHE.get(uid);
if (sseEmitter == null) {
log.info("消息推送失败uid:[{}],没有创建连接,请重试。", uid);
throw new BaseException("消息推送失败uid:[{}],没有创建连接,请重试。~");
}
OpenAISSEEventSourceListener openAIEventSourceListener = new OpenAISSEEventSourceListener(sseEmitter, uid, jsonObject, userId);
openAiStreamClient.streamChatCompletion(chatCompletion, openAIEventSourceListener);
}catch (Exception e){
bln = false;
log.info("[{}]消息推送失败失败!", uid);
}
return bln;
}
服务端调用ChatGPT API方法streamChatCompletion 实现同最底层数据提供服务方通信
即调用openAiStreamClient.streamChatCompletion(chatCompletion, openAIEventSourceListener);方法
发送的API接口地址为:String streamUrl = this.apiHost + "v1/completions";
响应的实时数据
{"id": "chatcmpl-42rNbjJ9WFAnMGF9cNZTwn", "model": "chatglm2-6b-32k", "choices": [{"index": 0, "delta": {"role": "assistant"}, "finish_reason": null}]}
{"id": "chatcmpl-42rNbjJ9WFAnMGF9cNZTwn", "model": "chatglm2-6b-32k", "choices": [{"index": 0, "delta": {"content": "尊敬"}, "finish_reason": null}]}
{"id": "chatcmpl-42rNbjJ9WFAnMGF9cNZTwn", "model": "chatglm2-6b-32k", "choices": [{"index": 0, "delta": {"content": "的"}, "finish_reason": null}]}
{"id": "chatcmpl-42rNbjJ9WFAnMGF9cNZTwn", "model": "chatglm2-6b-32k", "choices": [{"index": 0, "delta": {"content": "客户"}, "finish_reason": null}]}
{"id": "chatcmpl-42rNbjJ9WFAnMGF9cNZTwn", "model": "chatglm2-6b-32k", "choices": [{"index": 0, "delta": {"content": ","}, "finish_reason": null}]}
{"id": "chatcmpl-42rNbjJ9WFAnMGF9cNZTwn", "model": "chatglm2-6b-32k", "choices": [{"index": 0, "delta": {"content": "您好"}, "finish_reason": null}]}
。。。。。。
{"id": "chatcmpl-42rNbjJ9WFAnMGF9cNZTwn", "model": "chatglm2-6b-32k", "choices": [{"index": 0, "delta": {}, "finish_reason": "stop"}]}
[DONE]
3、监听响应的实时数据
EventSourceListener onEvent()监听事件->解析响应数据:
/**
* 监听事件->解析响应数据
*/
@Override
public void onEvent(EventSource eventSource, String id, String type, String data) {
log.info("sseEmitter uid=" + uid + ",OpenAI返回数据:{}", data);
if (data.equals("[DONE]")) {
log.info("OpenAI返回数据结束了");
try {
String answerContext = "[DONE]";
sseEmitter.send(answerContext);// .id("[DONE]" reconnectTime(30000)
// 传输完成后自动关闭sse
sseEmitter.complete();
return;
} catch (IOException e) {
log.error("onEvent error msg=" + e.getMessage());
}
}
ObjectMapper mapper = new ObjectMapper();
//由于业务特性需要自定义MerakChatCompletionResponse -> 重写原响应消息对象CompletionResponse [说明:没要求可以不重写]
MerakChatCompletionResponse completionResponse = null;
String answerContext = "";
try {
completionResponse = mapper.readValue(data, MerakChatCompletionResponse.class); //转换成自定义MerakChatCompletionResponse
String finishReason = completionResponse.getChoices().get(0).getFinishReason();
if ("stop".equals(finishReason)) {
//在原始Json串的"delta"节点增加自定义的业务数据节点bussinessDataNodeJson
String bussinessDataNodeJson = "{\"userId\":\"" + userId + "\"}";
//{"id": "chatcmpl-3BpHEcUKNMUk7jbWkKB2gU", "object": "chat.completion.chunk", "created": 1699844126, "model": "chatglm2-6b-32k", "choices": [{"index": 0, "delta": {}, "finish_reason": "stop"}]}
JSONObject originalObj = JSON.parseObject(data);
originalObj.remove("body");
JSONObject targetObj = JSON.parseObject(bussinessDataNodeJson);
//将自定义的目标Json串添加到原始Json串的"delta"节点后面
originalObj.getJSONArray("choices").getJSONObject(0).put("delta", targetObj);
String lastData = originalObj.toJSONString();
sseEmitter.send(SseEmitter.event()
.id(completionResponse.getId())
.data(lastData)
.reconnectTime(3000));
} else {
//过滤Message delta节点有内容的有效内容
Message delta = completionResponse.getChoices().get(0).getDelta();
if (null != delta) {
answerContext = delta.getContent();
if (null != answerContext && !StringUtils.isBlank(answerContext)) {
Thread.sleep(50);
log.info("休眠50毫秒");
sseEmitter.send(SseEmitter.event()
.id(completionResponse.getId())
.data(data)
.reconnectTime(3000));
}
}
}
} catch (Exception e) {
log.error("sse信息推送失败!");
eventSource.cancel();
log.error("sse信息推送失败,error msg=" + e.getMessage());
}
}
4、sseEmitter向Web客户端实时推送数据,将数据实时写入outputMessage Body体内
sseEmitter.send(SseEmitter.event().id(completionResponse.getId())
.data(lastData).reconnectTime(3000));
流程说明:
sseEmitter.send() ->
ResponseBodyEmitter.sendInternal(Object object, @Nullable MediaType mediaType) ->
this.handler.send(object, mediaType) ->
ResponseBodyEmitterReturnValueHandler.sendInternal(T data, @Nullable MediaType mediaType) ->
{ converter.write(data, mediaType, this.outputMessage);
this.outputMessage.flush();
} ->
public final void write(final T t, @Nullable MediaType contentType, HttpOutputMessage outputMessage) throws IOException, HttpMessageNotWritableException {
HttpHeaders headers = outputMessage.getHeaders();
this.addDefaultHeaders(headers, t, contentType);
if (outputMessage instanceof StreamingHttpOutputMessage) {
StreamingHttpOutputMessage streamingOutputMessage = (StreamingHttpOutputMessage)outputMessage;
streamingOutputMessage.setBody((outputStream) -> {
this.writeInternal(t, new HttpOutputMessage() {
public OutputStream getBody() {
return outputStream;
}
public HttpHeaders getHeaders() {
return headers;
}
});
});
} else {
this.writeInternal(t, outputMessage);
outputMessage.getBody().flush();
}
}
Web VUE核心解析数据代码
<script setup name="Chat">
// 核心解析服务端方法requestStreamingChat
async function requestStreamingChat(message,index,subBussiness) {
const url = `${window.location.origin}${import.meta.env.VITE_APP_BASE_API}/aianswer/streamAsk`
controller = new AbortController()
const reqTimeoutId = setTimeout(() => controller.abort(), 30000)
try {
let respString = ''
fetchEventSource(url,{
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${getToken()}`,
},
signal: controller.signal,
body: JSON.stringify({
robotId: currentChatRobot.value?.robotId,
knowledgeId:currentChatRobot.value?.knowledgeId,
aiQuestion: message,
aiQuestionHistory: subBussiness?bussinessProcess.historyQa:history.value,
classifyFrom:'1',
bussinessType:subBussiness?bussinessProcess.bussinessType:'',
bussinessContent:subBussiness?bussinessProcess.formData:''
}),
async onopen(response) {
console.log(response)
if (response.ok && response.headers.get('content-type')?.includes('text/event-stream')) {
// everything's good
console.log('everything\'s good')
} else if (response.status >= 400 && response.status < 500 && response.status !== 429) {
chatStore.updateChat(
index,
{
dateTime: dayjs().format('YYYY-MM-DD HH:mm:ss'),
text: 'Stream Error',
inversion: false,
error: true,
loading: false,
streaming: false,
requestOptions: { prompt: message },
},
)
scrollToBottomIfAtBottom()
} else {
console.log('其他错误')
finish()
}
},
async onmessage(event) {
console.log(event)
// 表示整体结束
if (event.data === '[DONE]') {
console.log('结束')
finish()
return
}
if (event.data) {
const jsonData = JSON.parse(event.data)
// 如果等于stop表示结束
if (jsonData.choices[0].finish_reason === 'stop') {
//接收会话信息
const info = jsonData.choices[0].delta
chatStore.updateChatSome(
index,
{
dateTime: info?.occurTime,
aiId: info?.aiId,
solveIs: info?.solveIs
}
)
return
}
// 判断role存在,进行排除
if (jsonData.choices[0].delta.role !== undefined) {
respString = jsonData.choices[0].delta.role + ': '
return
}
if (jsonData.choices[0].delta.content !== undefined) {
respString += jsonData.choices[0].delta.content
console.log(respString)
chatStore.updateChat(
index,
{
dateTime: dayjs().format('YYYY-MM-DD HH:mm:ss'),//res?.data?.occurTime,
text: respString,
error: false,
loading: false,
streaming: true,
requestOptions: { prompt: message },
},
)
scrollToBottomIfAtBottom()
}
}
},
async onerror(error) {
console.error('Error:', error)
finish()
},
async onclose() {
loading.value = false
finish()
// if the server closes the connection unexpectedly, retry:
console.log('关闭连接')
}
})
const finish = () => {
loading.value = false
chatStore.updateChatSome(index, { streaming: false,loading: false })
reqTimeoutId && clearTimeout(reqTimeoutId)
controller.abort()
}
} catch (error) {
loading.value = false
const errorMessage = error?.message ?? '好像出错了,请稍后再试。'
if (error.message === 'canceled') {
chatStore.updateChatSome(
index,
{
loading: false,
},
)
scrollToBottomIfAtBottom()
return
}
chatStore.updateChat(
index,
{
dateTime: dayjs().format('YYYY-MM-DD HH:mm:ss'),
text: errorMessage,
inversion: false,
error: true,
loading: false,
streaming: false,
requestOptions: { prompt: message },
},
)
scrollToBottomIfAtBottom()
}
finally {
chatStore.updateChatTime(robotId.value,dayjs().format('YYYY-MM-DD HH:mm:ss'))
}
}
function handleClear() {
//针对那些回答异常没有会话ID的情况
if(!dataSources.value?.[dataSources.value.length-1]?.aiId) {
chatStore.updateChatSome(dataSources.value.length-1, { cleanIs: 1 })
const element = chatWrapper.value;
if(element) {
element.style.height = element.offsetHeight + 300 + 'px'
scrollToBottom()
}
return
}
clearChat({
aiId:dataSources.value?.[dataSources.value.length-1]?.aiId,
robotId:robotId.value,
knowledgeId:currentChatRobot?.knowledgeId
}).then(res => {
if(res.code == 200) {
chatStore.updateChatSome(dataSources.value.length-1, { cleanIs: 1 })
const element = chatWrapper.value;
if(element) {
element.style.height = element.offsetHeight + 300 + 'px'
scrollToBottom()
}
}
})
}
function handleEnter(event) {
if (event.key === 'Enter' && !event.shiftKey) {
event.preventDefault()
onConversation()
}
}
function handleStop() {
if (loading.value) {
controller.abort()
loading.value = false
}
}
const searchRobotDo = () => {
listRobot({robotName:searchRobot.value,robotStatus:0,pageNum:1,pageSize:100}).then(res => {
if(res.code == 200) {
robots.value = res.rows
}
if(!searchRobot.value) {
upTopRobot()
}
})
}
//当前对话的机器人置顶
function upTopRobot(rid) {
if(robotRef.value) {
rid = rid || robotId.value
const _up = robots.value.find(item=>item.robotId==rid)
let _filter = robots.value.filter(item=>item.robotId!=rid)
_filter.unshift(_up)
robots.value = _filter
robotRef.value.scrollTop = 0
}
}
function handleFeedResult(index,aiId,type) {
if(type == 1) {
feedBackChat({aiId,solveIs:type}).then(res => {
proxy.$modal.msgSuccess('您的反馈我们已经收到,谢谢您的认可!')
chatStore.updateChatSome(index,{solveIs:type})
})
} else {
console.log(index)
curFeedChat.value = dataSources.value[index]
curFeedChat.value['index'] = index
feedBackText.value = curFeedChat.value?.feedBack
feedbackShow.value = true
}
}
function submitFeedback() {
if(!feedBackText.value) {
proxy.$modal.msgWarning('请详细描述需要反馈的问题')
return
}
feedBackChat({aiId:curFeedChat.value?.aiId,robotId:robotId.value,feedBack:feedBackText.value,solveIs:-1}).then(res => {
if(res.code == 200) {
proxy.$modal.msgSuccess('感谢您的反馈,我们将不断改进~~')
chatStore.updateChatSome(curFeedChat.value.index,{feedBack:feedBackText.value,solveIs:-1})
}
}).finally(() => {
feedbackShow.value = false
})
}
onMounted(() => {
//查询所有机器人
listRobot({robotStatus:0,pageNum:1,pageSize:100}).then(res => {
if(res.code == 200 && res.rows?.length>0) {
robots.value = res.rows
if(!robotId.value || res.rows.findIndex(item=>item.robotId==robotId.value) == -1) {
chatStore.setActive(res.rows[0]?.robotId)
} else {
upTopRobot()
}
//查询当前对话机器人的问答记录
chatStore.getCurChat().then(res => {
setTimeout(()=>{
scrollToBottom()
},0)
})
} else {
proxy.$modal.msgWarning('暂无机器人可以对话~~')
}
})
if (inputRef.value)
inputRef.value?.focus()
})
onUnmounted(() => {
if (loading.value)
controller.abort()
})
const formatChatTime = (time) => {
if(dayjs(time).isAfter(dayjs().startOf('day'))) {
return dayjs(time).format('HH:mm')
} else {
return dayjs(time).format('MM/DD')
}
}
const deleteChat = () => {
proxy.$modal.confirm('是否确认清除当前AI应用的会话记录吗!').then(function () {
return deleteChatContext({robotId:currentChatRobot.value?.robotId,classifyFrom:1})
}).then(() => {
chatStore.getCurChat()
proxy.$modal.msgSuccess("清除成功");
}).catch(() => { });
}
const viewAnswerLog = (aiId) => {
getChatLog(aiId).then(res => {
if(res.code == 200 && res.data) {
logDetail.value = res.data
showLogDetail.value = true
} else {
proxy.$modal.msgError("日志查询失败");
}
})
}
provide('handleFindRef', (aiId) => {
showRefAiId.value = aiId
showRefDialog.value = true
})
const onCloseDialog = () => {
showRefDialog.value = false;
showRefAiId.value = ''
}
provide('handleSumbitForm',async (type, flowStep, data) => {
console.log(type, flowStep, data)
if(type == -1) {
proxy.$modal.confirm(`是否提前终止报价流程,已提交的数据将丢失?`).then(()=>{
bussinessProcess.running = false
chatStore.updateChatSome(dataSources.value.length - 1,{flowFlag:false})
bussinessProcess.formData = {}
bussinessProcess.historyQa = []
prompt.value = '取消报价'
onConversation()
}).catch(() => { });
} else {
const qs = replaceTemplate(flowStep?.questionTpl,data)
bussinessProcess.formData = {...bussinessProcess.formData,...data}
chatStore.updateChatSome(dataSources.value.length - 1,{flowFlag:false})
chatStore.addChat(
{
dateTime: dayjs().format('YYYY-MM-DD HH:mm:ss'),
text: qs,
inversion: true,
error: false,
requestOptions: { prompt: qs },
},
)
scrollToBottom()
chatStore.updateChatTime(robotId.value,dayjs().format('YYYY-MM-DD HH:mm:ss'))
loading.value = true
chatStore.addChat(
{
dateTime: dayjs().format('YYYY-MM-DD HH:mm:ss'),
text: '',
loading: true,
inversion: false,
error: false,
requestOptions: { prompt: qs },
},
)
scrollToBottom()
if(flowStep?.step < bussinessProcess.totalStep) {
let aw = ''
try {
const _tpl = JSON.parse(flowSteps.value[flowStep.step]?.formTpl)
aw = _tpl?.rule?.[0].value
} catch (error) {}
bussinessProcess.historyQa.push([qs,aw])
setTimeout(() => {
chatStore.updateChat(
dataSources.value.length - 1,
{
dateTime: dayjs().format('YYYY-MM-DD HH:mm:ss'),
text: aw,
inversion: false,
error: false,
loading: false,
requestOptions: { prompt: qs },
flowFlag:true,
flowStep:flowSteps.value[flowStep?.step]
},
)
scrollToBottom()
loading.value = false
},800)
} else {
await currentChatRobot.value.openStream == 'true' ? requestStreamingChat(qs,dataSources.value.length - 1,true) : requestChat(qs,dataSources.value.length - 1,true)
loading.value = false
bussinessProcess.running = false
bussinessProcess.formData = {}
bussinessProcess.historyQa = []
chatStore.updateChatTime(robotId.value,dayjs().format('YYYY-MM-DD HH:mm:ss'))
}
}
})
</script>
实例demo
demo源代码
参考
基于Spring ApplicationEventPublisherAware推送事件实现文章来源:https://www.toymoban.com/news/detail-813587.html
Java语言作为后端对接 chatgpt文章来源地址https://www.toymoban.com/news/detail-813587.html
到了这里,关于SSE[Server-Sent Events]实现页面流式数据输出(模拟ChatGPT流式输出)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!