前言
WebSocket是一种在Web应用程序中实现双向通信的技术,它允许服务器和客户端之间实时交换数据。近期在项目中使用websocket的一个过程,以及遇到的一些问题的解决思路。
1.websocket基本逻辑
连接的建立,处理消息的发送,接收
const SOCKET_URL = 'wss://*****************************'
class mySocket {
this.socketUrl = SOCKET_URL
//地址
this.socketUrl = SOCKET_URL
//开启socket报错和关闭监听
this.monitorSocketError()
this.monitorSocketClose()
init(callback) {
const _this = this
console.log('WebSocket初始化')
//建立连接
this.task = uni.connectSocket({
url: this.socketUrl,
method: 'GET',
success: function(res) {
console.log('WebSocket success', res)
console.log(SOCKET_URL)
},
fail: function(err) {
console.log('WebSocket fail', err)
},
})
//连接打开
uni.onSocketOpen(function(res) {
console.log('WebSocket连接已打开!')
//监听消息接收
_this.socketReceive()
//打开心跳
_this.getHeartbeat()
//执行回调
callback && callback()
})
}
socketReceive() {
const _this = this
uni.onSocketMessage(function(res) {
console.log('收到消息:', res)
}
}
//监听Socket关闭
monitorSocketClose() {
const _this = this
uni.onSocketClose(function(res) {
console.log('WebSocket 已关闭!')
})
}
//监听Socket错误
monitorSocketError() {
const _this = this
uni.onSocketError(function(res) {
console.log('WebSocket连接打开失败,请检查!')
})
}
//心跳
getHeartbeat() {
const _this = this
this.send(
{
message: ‘这是心跳’
},
(val) => {
timer = setTimeout(() => {
if (val) {
_this.getHeartbeat()
} else {
_this.init()
}
}, 10000)
}
)
}
}
sendFunc(data, callback) {
uni.sendSocketMessage({
data: data,
success: (res) => {
console.log('发送成功!')
},
fail: (res) => {
callback && callback(false)
},
complete: (res) => {
console.log(res)
},
})
}
var mySocket = new mySocketF()
export default mySocket
- 打开报错关闭监听,报错监听。
- 建立连接,连接已建立时监听WebSocket接受到服务器的消息事件,并打开心跳。
心跳的作用:心跳机制的目的是定期发送小的探测消息,以确保连接仍然有效,如果连接断开或出现问题,可以及时发现并采取措施。 - 消息发送的格式,和接收的格式和后端开发人员定义好。
2.使用字节流进行传输
由于后端要求,所以我这边使用arraybuffer进行通信
//字符串转arraybuffer
function stringToUint8Array(str){
var arr = [];
for (var i = 0, j = str.length; i < j; ++i) {
arr.push(str.charCodeAt(i));
}
var tmpUint8Array = new Uint8Array(arr);
return tmpUint8Array
}
//arraybuffer转字符串
function arrayBufferToString(arr) {
if (typeof arr === 'string') {
return arr
}
var dataview = new DataView(arr.data)
var ints = new Uint8Array(arr.data.byteLength)
for (var i = 0; i < ints.length; i++) {
ints[i] = dataview.getUint8(i)
}
arr = ints
var str = '',
_arr = arr
for (var i = 0; i < _arr.length; i++) {
var one = _arr[i].toString(2),
v = one.match(/^1+?(?=0)/)
if (v && one.length == 8) {
var bytesLength = v[0].length
var store = _arr[i].toString(2).slice(7 - bytesLength)
for (var st = 1; st < bytesLength; st++) {
store += _arr[st + i].toString(2).slice(2)
}
str += String.fromCharCode(parseInt(store, 2))
i += bytesLength - 1
} else {
str += String.fromCharCode(_arr[i])
}
}
return str
}
websocket是支持发送string/arraybuffer类型的数据的,可以往消息加入两位或三位去标记消息的类型,长度等信息,可以校验消息的可靠性。
const bufHeader = new ArrayBuffer(2)
const hearder = new Uint8Array(bufHeader)
hearder[0] = 0xff & 5
hearder[1] = (data.length >> 8) & 0xff
const bufByte = new ArrayBuffer(2 + data.length)
const byte = new Uint8Array(bufByte)
byte[0] = hearder[0]
byte[1] = hearder[1]
for (let i = 0; i < data.length; i++) {
byte[2 + i] = data[i]
}
this.send(byte.buffer)
3.使用protobuf处理消息
这次后端要求使用protobuf对数据进行处理,下面介绍一下protobuf
Protocol Buffers(protobuf)是一种由Google开发的二进制序列化格式,用于在不同系统之间进行有效的数据通信。使用protobuf进行通信有一些优势:
- 高效的序列化和反序列化: Protobuf使用二进制格式进行数据序列化,相对于基于文本的格式(如JSON和XML),它的序列化和反序列化速度更快,产生的数据包也更小。这使得它在网络通信中效率更高。
- 协议版本兼容性: Protobuf支持在消息结构发生变化时进行向前和向后兼容的演化。通过向消息添加或删除字段,而不影响现有字段的顺序,可以更容易地进行协议的升级。
- 跨语言支持: Protobuf定义了一个跨语言的接口描述语言,可以在多种编程语言中生成相应的代码。这意味着你可以使用不同的编程语言开发客户端和服务器端,并且它们可以轻松地进行通信。
- 更小的数据大小: 由于使用二进制编码,Protobuf产生的数据包通常比使用文本格式编码的数据包更小。这在网络传输和存储方面都有利。
- 可读性: 虽然Protobuf生成的二进制数据不像JSON那样易读,但Protobuf的接口描述文件是可读的,可以用来理解和维护数据结构。
- 性能: 由于数据包更小,序列化和反序列化速度更快,因此Protobuf通常在性能方面表现更好,特别是在大规模数据通信的场景中。
- 自动代码生成: Protobuf提供了一个编译器(protoc),它可以根据接口描述文件生成相应语言的代码。这简化了开发过程,减少了手动编写序列化和反序列化代码的工作量。
总体而言,使用Protobuf进行通信的主要优势在于其高效性、兼容性、跨语言支持以及在网络通信和存储方面的性能优势。
使用protobufjs对后端给过来的proto文件进行处理
proto文件大概长这样,类似接口文档,对发送的消息字段和类型进行限制
syntax = "proto3";
option java_outer_classname="SentBodyProto";
message Send {
string key = 1;
int64 timestamp = 2;
map<string,string> data_p = 3;
}
我们是不能直接使用这个文件的,需要使用到protobufjs把这些文件转换为我们能使用的js文件
npx pbjs -t static-module -w commonjs -o ./protobuf/your_proto.js ./protobuf/your_proto.proto
在项目中引入
import { Send } from '../protobuf/your_proto.js'
//数据
const data = JSON.stringify({
key: 110,
timestamp:new Date().getTime(),
dataP: {
message: '你好'
}
})
const message = Send.create(data)
const binaryData = Send.encode(message).finish()
const bufHeader = new ArrayBuffer(2)
const hearder = new Uint8Array(bufHeader)
hearder[0] = 0xff & 5
hearder[1] = (binaryData.length >> 8) & 0xff
const bufByte = new ArrayBuffer(2 + binaryData.length)
const byte = new Uint8Array(bufByte)
byte[0] = hearder[0]
byte[1] = hearder[1]
for (let i = 0; i < binaryData.length; i++) {
byte[2 + i] = binaryData[i]
}
this.send(byte.buffer)
能把string直接转换为uint8Array格式的数据,因此不需要再进行字符串转arraybuffer的操作了
4.分片传输
使用分片传输的原因:
在发送消息的时候,出现了后端接收到的消息不完整的问题,经过多次测试,发现出现的频率随着消息长度的增加逐渐增加,经过讨论,决定采用分片传输的方式去发送消息。文章来源:https://www.toymoban.com/news/detail-790491.html
分片传输的实现:
// 随机生成4个无符号8位整数,生成数组
function generateRandomIntegers() {
const integers = []
for (let i = 0; i < 4; i++) {
// 生成范围在 0 到 255 之间的随机整数
const randomInt = Math.floor(Math.random() * 256)
integers.push(randomInt)
}
return integers
}
const message = Send.create(data)
const binaryData = Send.encode(message).finish()
// 分片的顺序
let index = 0
// 4个无符号8位整数组成的数组,用来当作消息的id
const result = generateRandomIntegers()
while (binaryData.length > 255 * index) {
index++
// 对消息进行分隔,内容的长度为255,为什么使用255,因为无符号8位整数最大为255
const chunk = binaryData.slice((index - 1) * 255, 255 * index)
// 生成一个arraybuffer容器,长度前8位为头部,放置消息的信息,后255位为消息本体
const bufByte = new ArrayBuffer(8 + 255)
const byte = new Uint8Array(bufByte)
const bufHeader = new ArrayBuffer(2)
const hearder = new Uint8Array(bufHeader)
// 消息的长度
hearder[0] = chunk.length & 0xff
// 固定为0
hearder[1] = (chunk.length >> 8) & 0xff
// 消息类型
byte[0] = 0xff & 5
byte[1] = hearder[0]
byte[2] = hearder[1]
// 第4位定义分片的顺序,从1开始,当分片为最后一片时设置为0
if (255 * index > binaryData.length) {
byte[3] = 0
} else {
byte[3] = index
}
// 第5-8位设置分片的id,同一消息的分片id都是一样的
for (let i = 0; i < 4; i++) {
byte[i + 4] = result[i]
}
for (let i = 0; i < chunk.length; i++) {
byte[8 + i] = chunk[i]
}
// 将分片放入数组中
messageStack.push(byte)
}
while (messageStack.length) {
const byte = messageStack.shift()
this.send(byte, callback)
}
前8位是和后端协商过,去进行统一设置的。文章来源地址https://www.toymoban.com/news/detail-790491.html
5.解决粘包和丢包问题
使用分包之后,出现了新的问题,这些分包发送到服务端的时候部分粘到一起了,这导致了服务端不能正确读取数据。解决办法:
- 前端设置一个map,以消息的id+分片的顺序为key,发送的内容和时间戳为value。
- 服务端接收到后,当消息可读,则对前端进行回复,带上消息的id和分片的顺序,类似 ACK,messageId,Index的字符串,前端收到该消息,则把该消息从map中delete掉;
- 如果是丢包,且后端能正确读取消息的id+分片的顺序,则回复前端 ERROR,messageId,Index,前端收到该消息,则从map中取出value中的内容重新进行发送;
- 在服务端收到消息,但是内容不可读的时候,不会回复前端,则前端开启定时器,当map的size不为0时,重新将map中的所有内容取出发送,由于前面保存了时间戳,当时间超过2s时自动清除该消息(猜测为消息有问题),避免定时器长期存活,当map为空时关闭定时器。
const bufferMap = new Map()
// 轮询方法
function polling(_this) {
console.log('我开启了轮询')
if (pollingTimer) {
clearInterval(pollingTimer)
}
pollingTimer = setInterval(() => {
if (bufferMap.size > 0) {
for (const byte of bufferMap) {
let time = new Date().getTime()
if (time - byte[1].timestamp > 2000) {
bufferMap.delete(byte[0])
} else {
console.log(byte[0])
_this.sendFunc(byte[1].byte)
}
}
} else {
clearInterval(pollingTimer)
}
}, 100)
}
// 消息处理
{
const message = Send.create(data)
const binaryData = Send.encode(message).finish()
let index = 0
const result = generateRandomIntegers()
let messageId = result.join('')
while (binaryData.length > 255 * index) {
index++
const chunk = binaryData.slice((index - 1) * 255, 255 * index)
const bufByte = new ArrayBuffer(8 + 255)
const byte = new Uint8Array(bufByte)
const bufHeader = new ArrayBuffer(2)
const hearder = new Uint8Array(bufHeader)
hearder[0] = chunk.length & 0xff
hearder[1] = (chunk.length >> 8) & 0xff
byte[0] = 0xff & 5
byte[1] = hearder[0]
byte[2] = hearder[1]
if (255 * index > binaryData.length) {
byte[3] = 0
} else {
byte[3] = index
}
for (let i = 0; i < 4; i++) {
byte[i + 4] = result[i]
}
for (let i = 0; i < chunk.length; i++) {
byte[8 + i] = chunk[i]
}
messageStack.push(byte)
}
while (messageStack.length) {
const byte = messageStack.shift()
const bufferKey = '' + messageId + byte[3]
if (!bufferMap.has(bufferKey)) {
bufferMap.set(bufferKey, {
byte: byte,
timestamp: new Date().getTime(),
})
}
this.send(byte.buffer)
}
}
send(byte, callback) {
const _this = this
if (pollingTimer) {
clearInterval(pollingTimer)
}
polling(_this)
uni.sendSocketMessage({
data: byter,
success: (res) => {
},
fail: (res) => {
callback && callback(false)
},
complete: (res) => {
console.log(res)
},
})
}
//Socket接收服务器发送过来的消息
socketReceive() {
const _this = this
uni.onSocketMessage(function(res) {
const ackString = arrayBufferToString({
data: res.data,
})
const ackArr = ackString.split(',')
const bufferKey = ackArr[1] + ackArr[3]
if (bufferMap.get(bufferKey) && ackArr[0] === 'ACK') {
bufferMap.delete(bufferKey)
} else if (bufferMap.get(bufferKey) && ackArr[0] === 'ERROR') {
let bufferItem = bufferMap.get(bufferKey)
_this.send(bufferItem.byte.buffer)
}
})
}
6.完整代码
import { SOCKET_URL } from './conster.js'
// proto转换
import protobuf from 'protobufjs'
import { Send, Reply } from '../protobuf/your_proto.js'
let timer
let pollingTimer
// 分片栈
let messageStack = []
// 缓冲区
const bufferMap = new Map()
function generateRandomIntegers() {
const integers = []
for (let i = 0; i < 4; i++) {
// 生成范围在 0 到 255 之间的随机整数
const randomInt = Math.floor(Math.random() * 256)
integers.push(randomInt)
}
return integers
}
function arrayBufferToString(arr) {
if (typeof arr === 'string') {
return arr
}
var dataview = new DataView(arr.data)
var ints = new Uint8Array(arr.data.byteLength)
for (var i = 0; i < ints.length; i++) {
ints[i] = dataview.getUint8(i)
}
arr = ints
var str = '',
_arr = arr
for (var i = 0; i < _arr.length; i++) {
var one = _arr[i].toString(2),
v = one.match(/^1+?(?=0)/)
if (v && one.length == 8) {
var bytesLength = v[0].length
var store = _arr[i].toString(2).slice(7 - bytesLength)
for (var st = 1; st < bytesLength; st++) {
store += _arr[st + i].toString(2).slice(2)
}
str += String.fromCharCode(parseInt(store, 2))
i += bytesLength - 1
} else {
str += String.fromCharCode(_arr[i])
}
}
return str
}
function polling(_this) {
console.log('我开启了轮询')
if (pollingTimer) {
clearInterval(pollingTimer)
}
pollingTimer = setInterval(() => {
if (bufferMap.size > 0) {
for (const byte of bufferMap) {
let time = new Date().getTime()
if (time - byte[1].timestamp > 2000) {
bufferMap.delete(byte[0])
} else {
console.log(byte[0])
_this.sendFunc(byte[1].byte)
}
}
} else {
clearInterval(pollingTimer)
}
}, 100)
}
class mySocketF {
constructor(options) {
//地址
this.socketUrl = SOCKET_URL
this.task = null
this.monitorSocketError()
this.monitorSocketClose()
}
init(callback) {
const _this = this
console.log('WebSocket初始化')
this.task = uni.connectSocket({
url: SOCKET_URL,
method: 'GET',
success: function(res) {
console.log('WebSocket success', res)
},
fail: function(err) {
console.log('WebSocket fail', err)
},
})
uni.onSocketOpen(function(res) {
console.log('WebSocket连接已打开!')
if (pollingTimer) {
clearInterval(pollingTimer)
}
polling(_this)
_this.socketReceive()
setTimeout(() => {
_this.getHeartbeat()
}, 10000)
callback && callback()
})
}
//Socket给服务器发送消息
send(data, callback) {
const _this = this
console.log('发送消息:', JSON.stringify(data))
const message = Send.create(data)
const binaryData = Send.encode(message).finish()
let index = 0
const result = generateRandomIntegers()
let messageId = result.join('')
while (binaryData.length > 255 * index) {
index++
const chunk = binaryData.slice((index - 1) * 255, 255 * index)
const bufByte = new ArrayBuffer(8 + 255)
const byte = new Uint8Array(bufByte)
const bufHeader = new ArrayBuffer(2)
const hearder = new Uint8Array(bufHeader)
hearder[0] = chunk.length & 0xff
hearder[1] = (chunk.length >> 8) & 0xff
byte[0] = 0xff & 5
byte[1] = hearder[0]
byte[2] = hearder[1]
if (255 * index > binaryData.length) {
byte[3] = 0
} else {
byte[3] = index
}
for (let i = 0; i < 4; i++) {
byte[i + 4] = result[i]
}
for (let i = 0; i < chunk.length; i++) {
byte[8 + i] = chunk[i]
}
messageStack.push(byte)
}
while (messageStack.length) {
const byte = messageStack.shift()
const bufferKey = '' + messageId + byte[3]
if (!bufferMap.has(bufferKey)) {
bufferMap.set(bufferKey, {
byte: byte,
timestamp: new Date().getTime(),
})
}
_this.sendFunc(byte, callback)
}
}
sendFunc(byte, callback) {
const _this = this
if (pollingTimer) {
clearInterval(pollingTimer)
}
polling(_this)
uni.sendSocketMessage({
data: byte.buffer,
success: (res) => {
},
fail: (res) => {
callback && callback(false)
if (pollingTimer) {
clearInterval(pollingTimer)
}
},
complete: (res) => {
console.log(res)
},
})
}
//Socket接收服务器发送过来的消息
socketReceive() {
const _this = this
uni.onSocketMessage(function(res) {
const ackString = arrayBufferToString({
data: res.data,
})
const ackArr = ackString.split(',')
const bufferKey = ackArr[1] + ackArr[3]
if (bufferMap.get(bufferKey) && ackArr[0] === 'ACK') {
bufferMap.delete(bufferKey)
} else if (bufferMap.get(bufferKey) && ackArr[0] === 'ERROR') {
let bufferItem = bufferMap.get(bufferKey)
_this.sendFunc(bufferItem.byte)
}
})
}
//关闭Socket
closeSocket() {
const _this = this
let code = 4000
uni.closeSocket({
code: code,
success(res) {
console.log('WebSocket关闭成功!', res)
clearTimeout(timer)
if (pollingTimer) {
clearInterval(pollingTimer)
}
},
fail: function(res) {
console.log('WebSocket关闭失败!', res)
clearTimeout(timer)
if (pollingTimer) {
clearInterval(pollingTimer)
}
},
})
}
//监听Socket关闭
monitorSocketClose() {
const _this = this
uni.onSocketClose(function(res) {
clearTimeout(timer)
if (pollingTimer) {
clearInterval(pollingTimer)
}
console.log('WebSocket 已关闭!')
})
}
//监听Socket错误
monitorSocketError() {
const _this = this
uni.onSocketError(function(res) {
console.log('WebSocket连接打开失败,请检查!')
})
}
//心跳
getHeartbeat() {
const _this = this
this.send(
{
},
(val) => {
timer = setTimeout(() => {
if (val) {
_this.getHeartbeat()
} else {
_this.closeSocket()
_this.init()
}
}, 10000)
}
)
}
}
var mySocket = new mySocketF()
export default mySocket
到了这里,关于记一次在uniapp中使用websocket的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!