websocket协议简介
WebSocket 是一种在单个 TCP 连接上进行全双工通信的协议。WebSocket 通信协议于 2011 年被 IETF 定为标准 RFC6455,并由 RFC7936 补充规范。(RFC 是一系列以编号排定的文件,它由一系列草案和标准组成。几乎所有互联网通信协议均记录在 RFC 中,例如 HTTP 协议标准、 WebSocket 协议标准、Base64 编码规范等。除此之外,RFC 还加入了许多论题。)
WebSocket 协议的来源
在 WebSocket 协议出现以前,网站通常使用轮询来实现类似“数据实时更新”这样的效果。要注意的是,这里的“数据实时更新”是带有引号的,这表示并不是真正意义上的数据实时更新。轮询指的是在特定的时间间隔内,由客户端主动向服务端发起 HTTP 请求,以确认是否有新数据的行为。下图描述了轮询的过程:
这种一问一答的方式有着明显的缺点,即浏览器需要不断的向服务器发出请求。由于 HTTP 请求包含较长的头部信息(例如 User-Agent、Referer 和 Host 等),其中真正有效的数据可能只是很小的一部分,所以这样会浪费很多的带宽资源。
短轮询
本质
浏览器发送HTTP request从服务器获取最新的数据.
实现
特定的的时间间隔(如每1秒),由浏览器对服务器发出HTTP request,服务器不管有没有最新数据都直接响应并关闭连接。
应用场景
传统的web通信模式。后台处理数据,需要一定时间,前端想要知道后端的处理结果,就要不定时的向后端发出请求以获得最新情况。
优缺点
优点:
与普通HTTP请求无异,前后端程序编写较为容易
缺点:
-
请求中有大半是无用,难于维护,浪费带宽和服务器资源;
-
响应的结果没有顺序(因为是异步请求,当发送的请求没有返回结果的时候,后面的请求又被发送。而此时如果后面的请求比前面的请 求要先返回结果,那么当前面的请求返回结果数据时已经是过时无效的数据了).
长轮询
本质
浏览器发送HTTP request从服务器获取最新的数据.
实现
客户端向服务器发送Ajax请求,服务器接到请求后hold住连接,直到有新消息才返回响应信息并关闭连接,客户端处理完响应信息后再向服务器发送新的请求。
应用场景
聊天网页,WebQQ,FacebookIM等.
优缺点
优点:
在无新数据的情况下不会频繁的请求,耗费资源小。
缺点:
服务器阻塞连接会消耗资源,返回数据顺序无保证,难于管理维护。
WebSocket协议
HTML5规范在传统的web交互基础上为我们带来了众多的新特性,随着web技术被广泛 用于web APP的开发,这些新特性得以推广和使用,而websocket作为一种新的web通信,技术具有巨大意义。
websocket定义及与HTPP的关系
客户端与服务端连接成功之前,使用的通信协议是 HTTP。连接成功后,使用的才是 WebSocket 协议。
- WebSocket与Http协议一样都是基于TCP的应用层协议。
- 首先,客户端向服务端发出一个 HTTP 请求,请求中携带了服务端规定的信息,并在信息中表明希望将协议升级为 WebSocket。这个请求被称为升级请求,双端升级协议的整个过程叫做握手。然后服务端验证客户端发送的信息,如果符合规范则将协议替换成 WebSocket,并将升级成功的信息响应给客户端。最后,双方就可以基于 WebSocket 协议互相推送信息了。
相对于http,websocket的优点
相对于 HTTP 协议来说,WebSocket 具有开销少、实时性高、支持二进制消息传输、支持扩展和更好的压缩等优点。这些优点如下所述:
-
较少的开销:WebSocket 只需要一次握手,在每次传输数据时只传输数据帧即可。而 HTTP 协议下,每次请求都需要携带完整的请求头信息,例如 User-Agent、Referer 和 Host 等。所以 WebSocket 的开销相对于 HTTP 来说会少很多。
-
更强的实时性:由于协议是全双工的,所以服务器可以随时主动给客户端下发数据。相对于一问一答的 HTTP 来说,WebSocket 协议下的数据传输的延迟明显更少。
-
支持二进制消息传输:WebSocket 定义了二进制帧,可以更轻松地处理二进制内容。
-
支持扩展:开发者可以扩展协议,或者实现部分自定义的子协议。
-
更好的压缩:Websocket 在适当的扩展支持下,可以沿用之前内容的上下文。这样在传递类似结构的数据时,可以显著地提高压缩率。
http 101 状态码(告诉服务器我要升级请求协议)
101状态代码被发送作为对包括一个请求的响应"Upgrade"报头信号,该请求的接收方愿意升级到所期望的协议之一。如果"101 Switching Protocols"返回了状态码,则标头还必须包含Connection和Upgrade标头,以描述所选协议。
说明:
-
WebSocket未加密连接为ws://端口80,加密后的协议为wss://端口 443.
-
WebSocket是类似Socket的TCP长连接,一旦WebSocket连接建立后,服务器和浏览器之间可以自由的进行数据传递.
-
WebSocket协议中浏览器与服务器是完全平等的,可以相互发送请求亦可以主动断开连接.
实现原理
请求头中重要的字段:
响应头中重要的字段:
参数说明:
过程:
客户端带上一段随机生成的base64码(Sec-WebSocket-Key),发给服务器。
如果服务器正好支持升级成websocket协议。就会走websocket握手流程,同时根据客户端生成的base64码,用某个公开的算法变成另一段字符串,放在HTTP响应的 Sec-WebSocket-Accept 头里,同时带上101状态码,发回给浏览器。
之后,浏览器也用同样的公开算法将base64码转成另一段字符串,如果这段字符串跟服务器传回来的字符串一致,那验证通过。
就这样经历了一来一回两次HTTP握手,websocket就建立完成了,后续双方就可以使用webscoket的数据格式进行通信了。
应用场景
· 即时聊天通信
· 多玩家游戏
· 在线协同编辑/编辑
· 实时数据流的拉取与推送
· 体育/游戏实况(自动寻路)
· 实时地图位置
websocket的消息格式
数据包在websocket中被叫做帧。
-
opcode字段:这个是用来标志这是个什么类型的数据帧。比如:
- 等于1时是指text类型(string)的数据包
- 等于2是二进制数据类型([]byte)的数据包
- 等于8是关闭连接的信号
-
payload字段:存放的是真正想要传输的数据的长度,单位是字节。比如发送的数据是字符串"111",那它的长度就是3。
- 如果最开始的7bit的值是 0~125,那么它就表示了 payload 全部长度,只读最开始的7个bit就完事了。
- 如果是126(0x7E)。那它表示payload的长度范围在 126~65535 之间,接下来还需要再读16bit。这16bit会包含payload的真实长度。
- 如果是127(0x7F)。那它表示payload的长度范围>=65536,接下来还需要再读64bit。这64bit会包含payload的长度。这能放2的64次方byte的数据,换算一下好多个TB,肯定够用了。
- 如果最开始的7bit的值是 0~125,那么它就表示了 payload 全部长度,只读最开始的7个bit就完事了。
-
payload data字段:这里存放的就是真正要传输的数据,在知道了上面的payload长度后,就可以根据这个值去截取对应的数据。
关闭状态码
这些状态码是WebSocket协议中定义的关闭代码,用于指示连接关闭的原因。根据RFC 6455(WebSocket协议规范)第11.7节的定义,以下是一些常见的关闭代码及其含义:
-
CloseNormalClosure
(1000): 正常关闭连接。 -
CloseGoingAway
(1001): 终端离开,表示终端正在关闭连接,例如浏览器标签被关闭或导航离开当前页面。 -
CloseProtocolError
(1002): 协议错误,表示由于协议错误导致连接关闭。 -
CloseUnsupportedData
(1003): 不支持的数据,表示接收到了不支持的数据类型或格式。 -
CloseNoStatusReceived
(1005): 未接收到状态码,表示连接关闭时未接收到预期的状态码。 -
CloseAbnormalClosure
(1006): 异常关闭,表示连接意外关闭,无法确定具体原因。 -
CloseInvalidFramePayloadData
(1007): 无效的帧载荷数据,表示接收到的帧载荷数据不符合协议规范。 -
ClosePolicyViolation
(1008): 协议违规,表示违反了协议的约束或策略。 -
CloseMessageTooBig
(1009): 消息过大,表示接收到的消息超过了允许的最大大小。 -
CloseMandatoryExtension
(1010): 必需扩展,表示服务器要求使用一个或多个扩展,但客户端未提供。 -
CloseInternalServerErr
(1011): 内部服务器错误,表示服务器在处理连接时遇到了内部错误。 -
CloseServiceRestart
(1012): 服务重启,表示服务器正在重新启动。 -
CloseTryAgainLater
(1013): 请稍后重试,表示服务器暂时无法处理连接,请求客户端稍后再试。 -
CloseTLSHandshake
(1015): TLS握手错误,表示TLS握手过程中发生错误。
这些关闭代码可以用于指示连接关闭的原因,以便在处理WebSocket连接时做出适当的处理。
关闭帧
WebSocket的关闭状态码是通过特定的控制帧(Control Frames)在WebSocket报文中传输的。在关闭连接时,发送方会发送一个关闭帧(Close Frame),其中包含状态码。
关闭帧的结构如下:
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-------+-+-------------+-------------------------------+
|F|R|R|R| opcode|M| Payload len | Extended payload length |
|I|S|S|S| (4) |A| (7) | (16/64) |
|N|V|V|V| |S| | (if payload len==126/127) |
| |1|2|3| |K| | |
+-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
| Extended payload length continued, if payload len == 127 |
+ - - - - - - - - - - - - - - - +-------------------------------+
| |Masking-key, if MASK set to 1 |
+-------------------------------+-------------------------------+
| Masking-key (continued) | Payload Data |
+-------------------------------- - - - - - - - - - - - - - - - +
: Payload Data continued ... :
+---------------------------------------------------------------+
关闭帧的Payload Data字段中包含了两个字节的状态码。状态码位于Payload Data字段的开始位置,紧随着Masking-key字段(如果设置了掩码)。
要读取或处理状态码,您需要解析WebSocket报文并提取关闭帧中的Payload Data字段,并从中读取两个字节的状态码。请注意,Payload Data字段中的数据是以网络字节顺序(Big Endian)表示的。
需要注意的是,关闭帧的状态码是由发送方发送给接收方的,并且接收方可以根据状态码执行适当的操作或响应。
为什么不直接用tcp
TCP协议本身就是全双工,但直接使用纯裸TCP去传输数据,会有粘包的"问题"。为了解决这个问题,上层协议一般会用消息头+消息体的格式去重新包装要发的数据。
而消息头里一般含有消息体的长度,通过这个长度可以去截取真正的消息体。
HTTP协议和大部分RPC协议,以及websocket协议,都是这样设计的。
websocket完美继承了TCP协议的全双工能力,并且还贴心的提供了解决粘包的方案。它适用于需要服务器和客户端(浏览器)频繁交互的大部分场景。比如网页/小程序游戏,网页聊天室,以及一些类似飞书这样的网页协同办公软件。
回到文章开头的问题,在使用websocket协议的网页游戏里,怪物移动以及攻击玩家的行为是服务器逻辑产生的,对玩家产生的伤害等数据,都需要由服务器主动发送给客户端,客户端获得数据后展示对应的效果。
go使用websocket
包的选择
官方包:golang.org/x/net/websocket
三方包:github.com/gorilla/websocket
其中gorilla/websocket更常用些,go的另外一个web框架iris中,就使用的是gorilla/websocket库。
下载:go get github.com/gorilla/websocket
在线调试网站
http://www.jsons.cn/websocket/
应用
简单实现websocket发送消息
package main
import (
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
"log"
"net/http"
"time"
)
var connectPool = make([]*websocket.Conn, 0)
func main() {
engine := gin.Default()
upGrader := websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
},
}
engine.GET("/ws", func(context *gin.Context) {
conn, err := upGrader.Upgrade(context.Writer, context.Request, nil)
if err != nil {
log.Println(err)
http.NotFound(context.Writer, context.Request)
return
}
log.Println("连接建立成功", conn.RemoteAddr())
for {
_ = conn.WriteMessage(websocket.TextMessage, []byte("hello world"))
time.Sleep(time.Second)
}
})
log.Fatalln(engine.Run("127.0.0.1:8080"))
}
简单聊天室
- 当有人
上线
时,加入UID-Socket映射 - 通过解析websocket的data获取用户行为,如:
- 当to=="0"时,代表着广播消息,遍历映射给所有人发消息
- 当to=="xx"时,代表着私发消息,找到对应的socket给其发消息。
演示:
- Generalzy3广播
大家好
其他两人均收到了,
- Generalzy2对Generalzy3私发
雪豹闭嘴
,只有Generalzy3收到了
package main
import (
"encoding/json"
"fmt"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
"log"
"net/http"
)
var ConnUIDMapping = make(map[string]*websocket.Conn)
func ParseInfo(msg []byte) (map[string]string, error) {
result := make(map[string]string)
err := json.Unmarshal(msg, &result)
if err != nil {
return nil, err
}
return result, nil
}
func main() {
engine := gin.Default()
upGrader := websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
},
}
engine.GET("/Online", func(context *gin.Context) {
username := context.Query("username")
conn, err := upGrader.Upgrade(context.Writer, context.Request, nil)
if err != nil {
log.Println(err)
http.NotFound(context.Writer, context.Request)
return
}
ConnUIDMapping[username] = conn
for {
fmt.Println(ConnUIDMapping)
msgType, content, err := conn.ReadMessage()
if err != nil {
log.Println(err)
break
} else {
if msgType == websocket.CloseMessage {
log.Println("client closed!")
break
} else {
res, err := ParseInfo(content)
if err != nil {
log.Println(err)
break
} else {
to, msg := res["to"], res["msg"]
switch to {
case "0":
for _, c := range ConnUIDMapping {
_ = c.WriteMessage(websocket.TextMessage, []byte(msg))
}
default:
if c, ok := ConnUIDMapping[to]; ok {
_ = c.WriteMessage(websocket.TextMessage, []byte(msg))
} else {
_ = conn.WriteMessage(websocket.TextMessage, []byte(to+"不存在!"))
}
}
}
}
}
}
_ = conn.WriteMessage(websocket.CloseMessage, []byte("closed"))
delete(ConnUIDMapping, username)
_ = conn.Close()
})
log.Fatalln(engine.Run("127.0.0.1:8080"))
}
Q&A?
在Golang中存储WebSocket连接的最佳方法是什么?
使用map存储
针对在线用户,使用map存储,onlineUser = map[int] *websocket.Conn
。
var userConnMap = make(map[string]*websocket.Conn)
// 存储用户 Conn
func storeUserConn(userID string, conn *websocket.Conn) {
userConnMap[userID] = conn
}
// 获取用户 Conn
func getUserConn(userID string) *websocket.Conn {
return userConnMap[userID]
}
// 删除用户 Conn
func deleteUserConn(userID string) {
delete(userConnMap, userID)
}
在使用 Map 存储 Conn 时,需要考虑并发安全问题。可以使用锁或者通道等机制来保证并发安全。另外,如果在线用户较多,存储在内存中的 Map 可能会占用过多的内存,这时可以考虑使用 Redis 等外部存储来存储在线用户的 Conn。
使用redis存储(待补充)
- chatgpt给出的代码全部都是错误的根本不可能使用,且,socket这样的对象序列化了再反序列化回来,也不是原来的那个对象了
不存储
以redis的发布订阅为例,当用户登录且想与某个人发消息时,可以以对方ID为key建立一个topic,然后本用户publish userId msg
发送消息,这样对方只要在线且监听自己的topic就可以接收到消息,但注意一点,由于redis不支持消费离线数据,所以可以用专用的mq
去代替或者使用redis的Sorted Set存储消息,然后等用户上线了从zset中读取消息,发送给自己。
var upGrader = websocket.Upgrader{CheckOrigin: func(r *http.Request) bool {
return true
}}
func ChatWithPerson(ctx *gin.Context) {
uInterface, _ := ctx.Get("user")
u := uInterface.(*models.User)
conn, err := upGrader.Upgrade(ctx.Writer, ctx.Request, nil)
if err != nil {
ctx.Error(err)
return
}
defer func() {
err = conn.Close()
if err != nil {
ctx.Error(err)
}
}()
wg := &sync.WaitGroup{}
wg.Add(2)
// write
go func(wg *sync.WaitGroup, conn *websocket.Conn, ctx *gin.Context) {
defer wg.Done()
for {
msg := new(models.Message)
if err := conn.ReadJSON(msg); err != nil {
global.GlobalLogger.Error(err.Error())
break
}
msg.UserID = u.ID
msg.MsgType = models.PrivateMsg
// 入库
if err = global.GlobalMysqlClient.Create(msg).Error; err != nil {
global.GlobalLogger.Error(err.Error())
break
}
// {"dst":"02195049","from":"02195048","msgType":1,"msg":xxx"}
// 序列化结构体
msgBinary, err := json.Marshal(*msg)
if err != nil {
global.GlobalLogger.Error(err.Error())
break
}
// 给对方频道发送
dstChan := strconv.FormatUint(uint64(msg.Target), 10)
if err = global.GlobalRedisClient.Publish(context.TODO(), dstChan, string(msgBinary)).Err(); err != nil {
global.GlobalLogger.Error(err.Error())
break
}
// 写入前端
if err = conn.WriteMessage(websocket.BinaryMessage, msgBinary); err != nil {
global.GlobalLogger.Error(err.Error())
break
}
}
global.GlobalLogger.Info("write exit(0)")
}(wg, conn, ctx)
// read
go func(wg *sync.WaitGroup, conn *websocket.Conn, ctx *gin.Context) {
defer wg.Done()
// 监听自己
ownChan := strconv.FormatUint(uint64(u.ID), 10)
sub := global.GlobalRedisClient.Subscribe(context.TODO(), ownChan)
for {
if msg, err := sub.ReceiveMessage(context.TODO()); err != nil {
global.GlobalLogger.Error(err.Error())
break
} else {
msgStruct := new(models.Message)
err := json.Unmarshal([]byte(msg.Payload), msgStruct)
if err != nil {
global.GlobalLogger.Error(err.Error())
break
}
err = conn.WriteMessage(websocket.TextMessage, []byte(msg.Payload))
if err != nil {
global.GlobalLogger.Error(err.Error())
break
}
}
}
global.GlobalLogger.Info("read exit(0)")
}(wg, conn, ctx)
wg.Wait()
}
处理离线信息
对于离线用户,您可以考虑使用离线消息推送(Offline Message Push)的方式。离线消息推送是指在用户不在线时,服务器将消息缓存起来,并在用户再次上线时将消息推送给用户。
Redis的Sorted Set存储
实现离线消息推送的一种方式是使用Redis的Sorted Set数据结构,其中以用户ID为Key,消息发送时间为Score,消息内容为Value存储离线消息。具体实现步骤如下:
在用户离线时,将消息缓存到Redis的Sorted Set中:
score := time.Now().Unix()
err := rdb.ZAdd(ctx, userId, &redis.Z{
Score: float64(score),
Member: message,
}).Err()
if err != nil {
// 处理错误
}
在用户上线时,从Redis的Sorted Set中取出离线消息推送给用户:
start := "-inf"
end := strconv.FormatInt(time.Now().Unix(), 10)
res, err := rdb.ZRangeByScore(ctx, userId, &redis.ZRangeBy{
Min: start,
Max: end,
}).Result()
if err != nil {
// 处理错误
}
for _, msg := range res {
// 发送消息给客户端
err = conn.WriteMessage(websocket.TextMessage, []byte(msg))
if err != nil {
// 处理错误
}
}
// 删除已推送的消息
err = rdb.ZRemRangeByScore(ctx, userId, start, end).Err()
if err != nil {
// 处理错误
}
在第1步中,我们将消息存储到以用户ID为Key的Sorted Set中,以消息发送时间为Score,消息内容为Value。在第2步中,我们从Sorted Set中取出离线消息并推送给用户,然后将已经推送的消息从Sorted Set中删除。
需要注意的是,由于离线消息是存储在Redis中的,因此需要保证Redis服务的可用性和稳定性,避免数据丢失或者消息推送失败。此外,离线消息的大小和数量也需要进行控制,避免对系统性能造成过大的影响。
使用消息队列(Message Queue)来处理离线消息
除了使用Redis的Sorted Set数据结构来实现离线消息推送外,还可以考虑使用消息队列(Message Queue)来处理离线消息。相比Redis的Sorted Set,消息队列的可靠性更高,支持更复杂的消息处理逻辑,同时也更容易进行水平扩展。
以下是使用消息队列实现离线消息推送的一种方案:
用户发送消息时,将消息发送到消息队列中:
message := fmt.Sprintf("%s:%s", senderId, content)
err := mq.SendMessage(message)
if err != nil {
// 处理错误
}
在用户上线时,从消息队列中取出离线消息推送给用户:
for {
message, err := mq.ReceiveMessage()
if err != nil {
// 处理错误
}
if message == nil {
break
}
parts := strings.SplitN(message.Body, ":", 2)
senderId := parts[0]
content := parts[1]
if senderId == userId {
continue
}
// 发送消息给客户端
err = conn.WriteMessage(websocket.TextMessage, []byte(content))
if err != nil {
// 处理错误
}
}
在第1步中,我们将消息发送到消息队列中。在第2步中,我们从消息队列中取出离线消息并推送给用户。需要注意的是,为了避免将消息推送给发送消息的用户,我们在推送消息之前先判断消息的发送者ID和当前用户ID是否相同。
使用消息队列实现离线消息推送需要考虑消息的持久化、消息的去重、消息的过期等问题,因此需要选择适合业务需求的消息队列系统,并进行相应的配置和优化。
能否用redis的发布订阅实现?
Redis的发布订阅机制(Pub/Sub)可以实现简单的消息通信,但并不适合作为离线消息推送的实现方式。原因如下:
Redis的发布订阅机制只支持最新消息的消费,不支持消费历史消息。如果使用Redis的发布订阅机制实现离线消息推送,就需要在订阅者断线时将消息持久化存储,然后在订阅者重新连接时读取历史消息。这样会增加实现复杂度,并且需要考虑历史消息的存储和清理问题。
Redis的发布订阅机制在分布式环境下需要考虑订阅者的负载均衡问题。如果有多个订阅者,需要保证消息能够均匀地分配给各个订阅者,避免某个订阅者的负载过高。文章来源:https://www.toymoban.com/news/detail-735349.html
相比之下,专门设计的消息队列系统通常支持持久化存储、消息去重、消息过期等功能,并且能够支持分布式集群,能够更好地满足离线消息推送的需求。因此,如果需要实现离线消息推送,建议选择专门的消息队列系统,而不是使用Redis的发布订阅机制。文章来源地址https://www.toymoban.com/news/detail-735349.html
到了这里,关于websocket协议以及在gin中的应用的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!