Golang 搭建 WebSocket 应用(三) - 实现一个消息推送中心

这篇具有很好参考价值的文章主要介绍了Golang 搭建 WebSocket 应用(三) - 实现一个消息推送中心。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

有了前两篇的铺垫,相信大家已经对 Golang 中 WebSocket 的使用有一定的了解了,
今天我们以一个更加真实的例子来学习如何在 Golang 中使用 WebSocket

需求背景

在实际的项目中,往往有一些任务耗时比较长,然后我们会把这些任务做异步的处理,但是又要及时给客户端反馈任务的处理进度。

对于这种场景,我们可以使用 WebSocket 来实现。其他可以使用 WebSocket 进行通知的场景还有像管理后台一些通知(比如新订单通知)等。

在本篇文章中,就是要实现一个这样的消息推送系统,具体来说,它会有以下功能:

  1. 可以给特定的用户推送:建立连接的时候,就建立起 WebSocket 连接与用户 ID 之间的关联
  2. 断开连接的时候,移除 WebSocket 连接与用户的关联,并且关闭这个 WebSocket 连接
  3. 业务系统可以通过 HTTP 接口来给特定的用户推送 WebSocket 消息:只要传递用户 ID 以及需要推送的消息即可

基础框架

下面是一个最简单版本的框架图:

Golang 搭建 WebSocket 应用(三) - 实现一个消息推送中心,go,golang,websocket,python

它包含如下几个角色:

  1. Client 客户端,也就是实际中接收消息通知的浏览器
  2. Server 服务端,在我们的例子中,服务端实际不处理业务逻辑,只处理跟客户端的消息交互:维持 WebSocket 连接,推送消息到特定的 WebSocket 连接
  3. 业务逻辑:这个实际上不属于 demo 的一部分,但是 Server 推送的数据是来自业务逻辑处理的结果

设计成这样的目的是为了将技术跟业务进行分离,业务逻辑上的变化不影响到底层技术,同样的,WebSocket 推送中心的技术上的变动也不会影响到实际的业务。

开始开发

一些结构体变动

  1. Client 结构体的变化
type Client struct {
	hub *Hub
	conn *websocket.Conn
	send chan []byte
    // 新增字段
    uid int
}

因为我们需要建立起 WebSocket 连接与用户之间的关联,因此我们需要一个额外的字段来记录用户 ID,也就是上面的 uid 字段。

这个字段会在客户端建立连接后写入。

  1. Hub 结构体的变化
type Hub struct {
	clients map[*Client]bool
	register chan *Client
	unregister chan *Client

	// 记录 uid 跟 client 的对应关系
	userClients map[int]*Client

    // 读写锁,保护 userClients 以及 clients 的读写
	sync.RWMutex
}
  1. 因为我们不再需要做广播,所以会移除 Hub 中的 broadcast 字段。

取而代之的是,我们会直接在消息推送接口中写入到 uid 对应的 Clientsend 通道。
当然我们也可以在 Hub 中另外加一个字段来记录要推送给不同 uid 的消息,但是我们的 Hubrun 方法是一个协程处理的,当需要推送的数据较多或者其中有
网络延迟的时候,会直接影响到推送给其他用户的消息。当然我们也可以改造一下 run 方法,启动多个协程来处理,不过这样比较复杂,本文会在 writePump 中处理。
(也就是建立 WebSocket 连接时的那个写操作协程)

  1. 同时为了更加快速地通过 uid 来获取对应的 WebSocket 连接,新增了一个 userClients 字段。

这是一个 map 类型的字段,keyuid,值是对应的 Client 指针。

  1. 最后新增了一个 Mutex 互斥锁

因为,在用户实际进行登录的时候需要写入 userClients 字段,而这是一个 map 类型字段,并不支持并发读写。
如果我们在接受并发连接的时候同时修改 userClients 的时候会导致 panic,因此我们使用了一个互斥锁来保证 userClients 的读写安全。

同时,clients 也是一个 map,但上一篇文章中没有使用 sync.Mutex 来保护它的读写,在并发操作的时候也是会有问题的,
所以 Mutex 同时也需要保护 clients 的读写。

func (h *Hub) run() {
	for {
		select {
		case client := <-h.register:
			h.Lock()
			h.clients[client] = true
			h.Unlock()
		case client := <-h.unregister:
			if _, ok := h.clients[client]; ok {
				h.Lock()
				delete(h.userClients, client.uid)
				delete(h.clients, client)
				h.Unlock()
				close(client.send)
			}
		}
	}
}

最后,我们会在 Hubrun 方法中写 userClients 或者 clients 字段的时候,先获取锁,写成功的时候释放锁。

建立连接

在本篇中,将会继续沿用上一篇的代码,只是其中一些细节会有所改动。建立连接这步操作,跟上一篇的一样:

// 将 HTTP 转换为 WebSocket 连接的 Upgrader
var upgrader = websocket.Upgrader{
	ReadBufferSize:  1024,
	WriteBufferSize: 1024,
}

// 处理 WebSocket 连接请求
func serveWs(hub *Hub, w http.ResponseWriter, r *http.Request) {
    // 升级为 WebSocket 连接
	conn, err := upgrader.Upgrade(w, r, nil)
	if err != nil {
		log.Println(err)
		return
	}
    // 新建一个 Client
	client := &Client{hub: hub, conn: conn, send: make(chan []byte, 256)}
    // 注册到 Hub
	client.hub.register <- client

    // 推送消息的协程
	go client.writePump()
    // 结束消息的协程
	go client.readPump()
}

接收消息

由于我们要做的只是一个推送消息的系统,所以我们只处理用户发来的登录请求,其他的消息会全部丢弃:

func (c *Client) readPump() {
	defer func() {
		c.hub.unregister <- c
		_ = c.conn.Close()
	}()
	c.conn.SetReadLimit(maxMessageSize)
	c.conn.SetReadDeadline(time.Time{}) // 永不超时
	for {
		// 从客户端接收消息
		_, message, err := c.conn.ReadMessage()
		if err != nil {
			log.Println("readPump error: ", err)
			break
		}

		// 只处理登录消息
		var data = make(map[string]string)
		err = json.Unmarshal(message, &data)
		if err != nil {
			break
		}

		// 写入 uid 以及 Hub 的 userClients
		if uid, ok := data["uid"]; ok {
			c.uid = uid
			c.hub.Lock()
			c.hub.userClients[uid] = c
			c.hub.Unlock()
		}
	}
}

在本文中,假设客户端的登录消息格式为 {"uid": "123456"} 这种 json 格式。

在这里也操作了 userClients 字段,同样需要使用互斥锁来保证操作的安全性。

发送消息

  1. 在我们的系统中,可以提供一个 HTTP 接口来跟业务系统进行交互:
// 发送消息的接口
// 参数:
// 1. uid:接收消息的用户 ID
// 2. message:需要发送给这个用户的消息
http.HandleFunc("/send", func(w http.ResponseWriter, r *http.Request) {
    send(hub, w, r)
})

// 发送消息的方法
func send(hub *Hub, w http.ResponseWriter, r *http.Request) {
	uid := r.FormValue("uid")
	// 参数错误
	if uid == "" {
		w.WriteHeader(http.StatusBadRequest)
		return
	}

	// 从 hub 中获取 client
	hub.Lock()
	client, ok := hub.userClients[uid]
	hub.Unlock()
	// 尚未建立连接
	if !ok {
		w.WriteHeader(http.StatusBadRequest)
		return
	}

	// 发送消息
	message := r.FormValue("message")
	client.send <- []byte(message)
}
  1. 实际发送消息的操作

writePump 方法中,我们会将从 /send 接收到的数据发送给对应的用户:

// 发送消息的协程
func (c *Client) writePump() {
	defer func() {
		_ = c.conn.Close()
	}()
	for {
		select {
		case message, ok := <-c.send:
			// 设置写超时时间
			c.conn.SetWriteDeadline(time.Now().Add(writeWait))
			// 连接已经被关闭了
			if !ok {
				c.conn.WriteMessage(websocket.CloseMessage, []byte{})
				return
			}

			// 获取一个发送消息的 Writer
			w, err := c.conn.NextWriter(websocket.TextMessage)
			if err != nil {
				return
			}
			// 写入消息到 Writer
			w.Write(message)

            // 关闭 Writer
			if err := w.Close(); err != nil {
				return
			}
		}
	}
}

在这个方法中,我们会从 c.send 这个 chan 中获取需要发送给客户端的消息,然后进行发送操作。

测试

  1. 启动 main 程序
go run main.go
  1. 打开一个浏览器的控制台,执行以下代码
ws = new WebSocket('ws://127.0.0.1:8181/ws')
ws.send('{"uid": "123"}')

这两行代码的作用是与 WebSocket 服务器建立连接,然后发送一个登录信息。

然后我们打开控制台的 Network -> WS -> Message 就可以看到浏览器发给服务端的消息:

Golang 搭建 WebSocket 应用(三) - 实现一个消息推送中心,go,golang,websocket,python

  1. 使用 HTTP 客户端发送消息给 uid 为 123 的用户

假设我们的 WebSocket 服务器绑定的端口为 8181

打开终端,执行以下命令:

curl "http://localhost:8181/send?uid=123&message=Hello%20World"

然后我们可以在 Network -> WS -> Message 看到接收到了消息 Hello World

Golang 搭建 WebSocket 应用(三) - 实现一个消息推送中心,go,golang,websocket,python

结束了

到此为止,我们已经实现了一个初步可工作的 WebSocket 应用,当然还有很多可以优化的地方,
比如:

  1. 错误处理
  2. Hub 状态目前对外部来说是一个黑盒子,我们可以加个接口返回一下 Hub 的当前状态,比如当前连接数
  3. 日志:出错的时候,日志可以帮助我们快速定位问题

这些功能会在后续继续完善,今天就到此为止了。文章来源地址https://www.toymoban.com/news/detail-817768.html

到了这里,关于Golang 搭建 WebSocket 应用(三) - 实现一个消息推送中心的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • WebSocket+Redis实现消息推送机制以及离线消息推送(vue+sping boot)

    vue端涉及业务就不贴了 WebSocket 是一种在单个TCP连接上进行全双工通信的协议。WebSocket通信协议于2011年被IETF定为标准RFC 6455,并由RFC7936补充规范。WebSocket API也被W3C定为标准。 WebSocket使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。在

    2024年02月09日
    浏览(51)
  • SpringBoot整合Netty+Websocket实现消息推送

           Netty是一个高性能、异步事件驱动的网络应用框架,用于快速开发可维护的高性能协议服务器和客户端。以下是Netty的主要优势: 高性能 :Netty基于NIO(非阻塞IO)模型,采用事件驱动的设计,具有高性能的特点。它通过零拷贝技术、内存池化技术等手段,进一步提高

    2024年01月20日
    浏览(45)
  • Spring Boot集成WebSocket实现消息推送

    项目中经常会用到消息推送功能,关于推送技术的实现,我们通常会联想到轮询、comet长连接技术,虽然这些技术能够实现,但是需要反复连接,对于服务资源消耗过大,随着技术的发展,HtML5定义了WebSocket协议,能更好的节省服务器资源和带宽,并且能够更实时地进行通讯。

    2023年04月08日
    浏览(45)
  • 分布式WebSocket消息推送系统设计与实现

    作者:禅与计算机程序设计艺术 现如今,随着物联网、云计算、移动互联网、大数据等新技术的兴起,分布式系统成为越来越多企业面临的挑战。在分布式系统中,服务间通信是一个重要且复杂的课题,基于TCP/IP协议族的传输层协议之上的应用层协议比如HTTP协议、RPC(Remo

    2024年02月05日
    浏览(41)
  • Springboot整合WebSocket实现主动向前端推送消息

            在上篇文章tcp编程中,我们实现了C++客户端与java服务器之间的通信,客户端发送了一个消息给服务器,今天我们要实现基于WebSocket实现服务器主动向前端推送消息,并且以服务器接收到C++客户端的消息主动向前端推送消息的触发条件。 WebSocket 的诞生背景       

    2024年03月16日
    浏览(43)
  • Springboot集成websocket实现消息推送和在线用户统计

    在启动类上添加一个bean 核心代码 实现消息推送只要在业务代码中调用sendMessageSpecial()方法即可。 然后调用刚才的业务接口测试:http://localhost:8080/websocket/t1 调用成功后可以看到三个窗口中都收到了消息

    2023年04月08日
    浏览(55)
  • 记录--你还在使用websocket实现实时消息推送吗?

    在日常的开发中,我们经常能碰见服务端需要主动推送给客户端数据的业务场景,比如数据大屏的实时数据,比如消息中心的未读消息,比如聊天功能等等。 本文主要介绍SSE的使用场景和如何使用SSE。 我们常规实现这些需求的方案有以下三种 轮询 websocket SSE 在很久很久以前

    2024年02月19日
    浏览(40)
  • 一个超级牛逼的消息推送系统Gotify 使用Gotify来搭建你的消息推送系统

    目录 先看效果 简介 1.1创建目录 3.访问服务端 3.1示例 3.2创建应用 4.安装apk 4.1下载apk 4.2安装 4.3配置服务器地址 5.推送消息测试 5.1服务器执行 5.2手机端查看 支持删除 6.源码地址  打开应用 gotify 支持的功能如下 可以通过 restapi 发送消息 可以通过 websocket 接收消息 可以管理用

    2024年01月17日
    浏览(41)
  • 前端实现消息推送、即时通信、SSE、WebSocket、http简介

    服务端主动向客户端推送消息,使客户端能够即时接收到信息。 场景 页面接收到点赞,消息提醒 聊天功能 弹幕功能 实时更新数据功能 短轮询 浏览器(客户端)每隔一段时间向服务器发送http请求,服务器端在收到请求后,不论是否有数据更新,都直接进行响应。 本质:客

    2024年02月16日
    浏览(43)
  • SpringBoot集成WebSocket实现消息实时推送(提供Gitee源码)

    前言:在最近的工作当中,客户反应需要实时接收消息提醒,这个功能虽然不大,但不过也用到了一些新的技术,于是我这边写一个关于我如何实现这个功能、编写、测试到部署服务器,归纳到这篇博客中进行总结。 目录 一、什么是WebSocket 二、后端实现 2.1、引入pom.xml依赖

    2024年02月11日
    浏览(46)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包