这里提到的endpointId是一个负载了数据的逻辑点,就像一根水管的出口,有新数据来就会根据后端记录的endpointId推送到用户正在查看的endpointId。用户没有正在查看的endpoint就不会有新数据推送。这里如果如果对endpoint加上权限就相当于实现对实时数据的准确推送。文章来源:https://www.toymoban.com/news/detail-791259.html
main包
mian.go
package main
import (
"mcs/backend/core"
"mcs/backend/global"
)
func main() {
global.DS_LOG.Info("Server Starting.......")
core.RunWindowsServer()
}
core包
server.go
package core
import (
"context"
"fmt"
"mcs/backend/global"
)
func RunWindowsServer() {
//初始化global.DS_ROUTER变量
InitRouter()
address := fmt.Sprintf(":%d", global.DS_CONFIG.System.Addr)
global.DS_LOG.Infof("服务端口:%s", address)
go global.DS_WS_MANAGER.Start()
global.DS_ROUTER.Run(address)
}
router.go
package core
import (
"mcs/backend/controller/register"
"mcs/backend/global"
"mcs/backend/middleware"
"github.com/gin-gonic/gin"
)
func InitRouter() {
global.DS_ROUTER = gin.Default()
rootGroup := global.DS_ROUTER.Group("api")
publicGroup := rootGroup.Group("v1")
{
// 健康监测
register.HealthRouter.InitHealthRouter(publicGroup)
}
privateGroup := rootGroup.Group("v1")
privateGroup.Use(middleware.JWTAuth())
{
register.WebSocketRouter.InitWebsocketRouter(privateGroup)
}
global.DS_LOG.Info("路由注册完成")
}
api包
webSocket.go
package api
import (
"net/http"
"mcs/backend/global"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
)
type webSocketApi struct{}
func (wsa *webSocketApi) PingV2(c *gin.Context) {
// 升级get请求为websocket协议
ws, err := upGrader.Upgrade(c.Writer, c.Request, nil)
if err != nil {
return
}
client := global.NewWSClient(ws)
go client.Write()
go client.Read()
}
global包
ws.go
package global
import (
"encoding/json"
"github.com/gorilla/websocket"
uuid "github.com/satori/go.uuid"
"github.com/sirupsen/logrus"
)
// ws客户端管理
type ClientManager struct {
clients map[string]*Client
broadcast chan []byte
register chan *Client
unregister chan *Client
}
// ws客户端
type Client struct {
Alive bool // 是否还存活
id string // 客户端自身的id,多个客户端管理
// UserId uint // 唯一标识客户端属于哪个请求的用户
EndpointIdMap map[string]interface{} // 放当前该用户正在查看的endpoint,到时就只推送这个几个endpoint的新数据
socket *websocket.Conn
send chan []byte
}
func NewWSClient(socket *websocket.Conn) *Client {
client := &Client{socket: socket, id: uuid.NewV4().String(), send: make(chan []byte)}
DS_LOG.Infof("New user with uuid %s", client.id)
// 客户端注册
DS_WS_MANAGER.register <- client
return client
}
// 发送到前端的消息结构体,前端可以根据endpointId选择把数据推送到指定的位置
type WSMessage struct {
EndpointId *string `json:"endpointId"`
Code uint `json:"code"` // 消息代码
Content []byte `json:"content"` // 消息内容
}
func (manager *ClientManager) Start() {
logrus.Info("Websocket manager start")
for {
select {
case client := <-manager.register:
client.Alive = true
manager.clients[client.id] = client
// go func() {
// time.Sleep(1 * time.Second)
// msg := &Message{Code: UserCount, Content: manager.ClientsTotal()}
// manager.Broadcast(msg)
// }()
case client := <-manager.unregister:
if _, ok := manager.clients[client.id]; ok {
DS_LOG.Infof("管道【%s】关闭", client.id)
close(client.send)
delete(manager.clients, client.id)
// msg := &Message{Code: UserCount, Content: manager.ClientsTotal()}
// manager.send(msg.JSON(), client)
DS_LOG.Infof("管道【%s】已经关闭", client.id)
}
case message := <-manager.broadcast:
msg := jsonUnmarshall(message)
if msg.EndpointId != nil {
for clientId := range manager.clients {
if _, ok := manager.clients[clientId].EndpointIdMap[*msg.EndpointId]; ok {
select {
case manager.clients[clientId].send <- msg.Content:
DS_LOG.Info("数据开始推送")
default:
/*logrus.Error("broadcast closed")
close(conn.send)
delete(manager.clients, conn)*/
}
}
}
}
}
}
}
func (msg *WSMessage) JSONMarshal() []byte {
c, _ := json.Marshal(&msg)
return c
}
func jsonUnmarshall(b []byte) WSMessage {
msg := WSMessage{}
json.Unmarshal(b, &msg)
return msg
}
func (manager *ClientManager) send(message []byte, ignore *Client) {
for userId := range manager.clients {
if manager.clients[userId] != ignore {
manager.clients[userId].send <- message
}
}
}
func (manager *ClientManager) Broadcast(msg *WSMessage) {
select {
// TODO: 需要为每个websocket管道配置单独的channel
case manager.broadcast <- msg.JSONMarshal():
default:
DS_LOG.Error("无法立即写入通道,协程结束")
}
}
func (manager *ClientManager) ClientsTotal() int {
return len(manager.clients)
}
const (
UserOnline = 101 // 用户上线
UserOffline = 102 // 用户下线
UserCount = 103 // 用户总数
NewMsg = 104 // 新消息
)
func (c *Client) Write() {
defer func() {
DS_LOG.Infof("User:%s closed conn", c.id)
c.socket.Close()
}()
for {
select {
case message, ok := <-c.send:
if !ok {
c.socket.WriteMessage(websocket.CloseMessage, []byte{})
return
}
c.socket.WriteMessage(websocket.TextMessage, message)
DS_LOG.Info("数据已经推送")
}
}
}
func (c *Client) Read() {
defer func() {
DS_WS_MANAGER.unregister <- c
c.socket.Close()
}()
for {
t, b, err := c.socket.ReadMessage()
if err != nil {
DS_LOG.Error(err)
DS_WS_MANAGER.unregister <- c
c.socket.Close()
break
}
DS_LOG.Info(t)
DS_LOG.Info(string(b))
endpointIdArr := EndpointIdList{}
err = json.Unmarshal(b, &endpointIdArr)
if err != nil {
DS_LOG.Error("WS--READ ERR:", err)
continue
}
// DS_LOG.Info("endpointArr=", endpointIdArr)
endpointIdMap := make(map[string]interface{})
for i := 0; i < len(endpointIdArr.EndpointIds); i++ {
endpointIdMap[endpointIdArr.EndpointIds[i]] = nil
}
c.EndpointIdMap = endpointIdMap
}
}
type EndpointIdList struct {
EndpointIds []string `json:"endpointIds"`
}
在其他包里使用
byteMsgData:=[]byte("ahdaasdsada")
msg := global.WSMessage{
EndpointId: &endpointId,
Content: byteMsgData,
}
global.DS_WS_MANAGER.Broadcast(&msg)
这里的代码并不能复制之后直接使用,但是websocket部分已经全部在这文章来源地址https://www.toymoban.com/news/detail-791259.html
到了这里,关于golang --gin+websocket实现指定的数据点推送的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!