Python Tornado 实现SSE服务端主动推送方案

这篇具有很好参考价值的文章主要介绍了Python Tornado 实现SSE服务端主动推送方案。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、SSE 服务端消息推送

SSEServer-Sent Events 的简称, 是一种服务器端到客户端(浏览器)的单项消息推送。对应的浏览器端实现 Event Source 接口被制定为HTML5 的一部分。相比于 WebSocket,服务器端和客户端工作量都要小很多、简单很多,而 Tornado 又是Python中的一款优秀的高性能web框架,本文带领大家一起实践下 Tornado SSE 的实现。

本文主要探索两个方面的实践:一个是客户端发送请求,服务端的返回是分多次进行传输的,直到传输完成,这种情况下请求结束后,就可以考虑关闭 SSE了,所以这种连接可以认为是暂时的。另一种是由服务端在特定的时机下主动推送消息给到客户端,推送的时机具有不确定性,随时性,所以这种情况下需要客户端和服务端保持长久连接。

本次使用的 Tornado 版本:

tornado==6.3.2

二、短暂性场景下的 SSE 实现

短暂性场景下就是对应上面的第一点,客户端主动发送请求后,服务端分多次传输,直到完成,数据获取完成后连接就可以断开了,适用于一些接口复杂,操作步骤多的场景,可以提前告诉客户端现在进行到了哪一步了,并且这种方式也有利于服务端的横向扩展。

Tornado 中实现,需要注意的是要关闭 _auto_finish ,这样的话就不会被框架自己主动停止连接了,下面是一个实现的案例:

import time
from tornado.concurrent import run_on_executor
from tornado.web import RequestHandler
import tornado.gen
from concurrent.futures.thread import ThreadPoolExecutor

class SSE(RequestHandler):

    def initialize(self):
        # 关闭自动结束
        self._auto_finish = False
        print("initialize")

    def set_default_headers(self):
        # 设置为事件驱动模式
        self.set_header('Content-Type', "text/event-stream")
        # 不使用缓存
        self.set_header('Content-Control', "no-cache")
        # 保持长连接
        self.set_header('Connection', "keep-alive")
        # 允许跨域
        self.set_header('Access-Control-Allow-Origin', "*")

    def prepare(self):
        # 准备线程池
        self.executor = self.application.pool

    @tornado.gen.coroutine
    def get(self):
        result = yield self.doHandle()
        self.write(result)
        # 结束
        self.finish()

    @run_on_executor
    def doHandle(self):
        tornado.ioloop.IOLoop.current()
        # 分十次推送信息
        for i in range(10):
            time.sleep(1)
            self.flush()
            self.callback(f"current: {i}")
        return f"data: end\n\n"

    def callback(self, message):
        # 事件推送
        message = f"data: {message}\n\n"
        self.write(message)
        self.flush()


class Application(tornado.web.Application):
    def __init__(self):
        handlers = [
            ("/sse", SSE),
            ("/(.*)$", tornado.web.StaticFileHandler, {
                "path": "resources/static",
                "default_filename": "index.html"
            })
        ]
        super(Application, self).__init__(handlers)
        self.pool = ThreadPoolExecutor(200)


def startServer(port):
    app = Application()
    httpserver = tornado.httpserver.HTTPServer(app)
    httpserver.listen(port)

    print(f"Start server success", f"The prot = {port}")

    tornado.ioloop.IOLoop.current().start()


if __name__ == '__main__':
    startServer(8020)

运行后可以到浏览器访问:http://localhost:8020/sse,此时就可以看到服务端在不断地推送数据过来了:

tornado服务端,微服务,python,python,tornado,开发语言

那如何在前端用 JS 获取数据呢,前面提到在 JS 层面,有封装好的 Event Source 组件可以直接拿来使用,例如:

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>测试服务器推送技术</title>
</head>
<body>
    <div id="messages"></div>
</body>
<script>
    const eventSource = new EventSource('http://localhost:8020/sse');

    // 事件回调
    eventSource.onmessage = (event) => {
      console.log(event.data)
      const messagesDiv = document.getElementById('messages');
      messagesDiv.innerHTML += '<p>' + event.data + '</p>';
    };

    // 异常
    eventSource.onerror = (error) => {
      console.error('EventSource failed:', error);
      eventSource.close();
    };

    eventSource.onopen = ()=>{
        console.log("开启")
    }
  </script>
</html>

运行后可以看到服务端分阶段推送过来的数据:

tornado服务端,微服务,python,python,tornado,开发语言

三、长连接场景下的 SSE 实现

上面实现了客户端请求后,分批次返回,但是有些情况下是客户端连接后没有东西返回,而是在某个特定的时机下返回给某几个客户端,所以这种情况,我们需要和客户端保持长久的连接,同时进行客户端连接的缓存,因为同时有可能有 100 个用户,但是推送时可能只需要给 10 个用户推送,这种方式相当于将一个客户端和一个服务端进行了绑定,一定程度上不利于服务端的横向扩展,但也可以通过一些消息订阅的方式解决类似问题。

下面是一个实现案例:

import time
from tornado.concurrent import run_on_executor
from tornado.web import RequestHandler
import tornado.gen
from concurrent.futures.thread import ThreadPoolExecutor


# 单例
def singleton(cls):
    instances = {}
    def wrapper(*args, **kwargs):
        if cls not in instances:
            instances[cls] = cls(*args, **kwargs)
        return instances[cls]
    return wrapper


# 订阅推送工具类
@singleton
class Pusher():

    def __init__(self):
        self.clients = {}

    def add_client(self, client_id, callback):
        if client_id not in self.clients:
            self.clients[client_id] = callback
            print(f"{client_id} 连接")

    def send_all(self, message):
        for client_id in self.clients:
            callback = self.clients[client_id]
            print("发送消息给:", client_id)
            callback(message)

    def send(self, client_id, message):
        callback = self.clients[client_id]
        print("发送消息给:", client_id)
        callback(message)


class SSE(RequestHandler):
    # 定义推送者
    pusher = Pusher()

    def initialize(self):
        # 关闭自动结束
        self._auto_finish = False
        print("initialize")

    def set_default_headers(self):
        # 设置为事件驱动模式
        self.set_header('Content-Type', "text/event-stream")
        # 不使用缓存
        self.set_header('Content-Control', "no-cache")
        # 保持长连接
        self.set_header('Connection', "keep-alive")
        # 允许跨域
        self.set_header('Access-Control-Allow-Origin', "*")

    @tornado.gen.coroutine
    def get(self):
        # 客户端唯一标识
        client_id = self.get_argument("client_id")
        self.pusher.add_client(client_id, self.callback)

    def callback(self, message):
        # 事件推送
        message = f"data: {message}\n\n"
        self.write(message)
        self.flush()


# 定义推送接口,模拟推送
class Push(RequestHandler):
    # 定义推送者
    pusher = Pusher()

    def prepare(self):
        # 准备线程池
        self.executor = self.application.pool

    @tornado.gen.coroutine
    def get(self):
        # 客户端标识
        client_id = self.get_argument("client_id")
        # 推送的消息
        message = self.get_argument("message")
        result = yield self.doHandle(client_id, message)
        self.write(result)

    @run_on_executor
    def doHandle(self, client_id, message):
        tornado.ioloop.IOLoop.current()
        self.pusher.send(client_id, message)
        return "success"


class Application(tornado.web.Application):
    def __init__(self):
        handlers = [
            ("/sse", SSE),
            ("/push", Push),
            ("/(.*)$", tornado.web.StaticFileHandler, {
                "path": "resources/static",
                "default_filename": "index.html"
            })
        ]
        super(Application, self).__init__(handlers)
        self.pool = ThreadPoolExecutor(200)

def startServer(port):
    app = Application()
    httpserver = tornado.httpserver.HTTPServer(app)
    httpserver.listen(port)

    print(f"Start server success", f"The prot = {port}")

    tornado.ioloop.IOLoop.current().start()


if __name__ == '__main__':
    startServer(8020)

这里我定义了一个 Pusher 订阅推送工具类,用来存储客户端的连接,以及给指定客户端或全部客户端发送消息,然后我又定义 Push 接口,模拟不定时的指定客户端发送信息的场景。

同样前端也要修改,需要给自己定义 client_id ,例如:

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>测试服务器推送技术</title>
</head>
<body>
    <div id="client"></div>
    <div id="messages"></div>
</body>
<script>
    function generateUUID() {
      let uuid = 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, function(c) {
        const r = Math.random() * 16 | 0;
        const v = c === 'x' ? r : (r & 0x3 | 0x8);
        return v.toString(16);
      });

      return uuid;
    }
    // 利用uuid 模拟生成唯一的客户端ID
    let client_id = generateUUID();
    document.getElementById('client').innerHTML = "当前 client_id = "+client_id;
    const eventSource = new EventSource('http://localhost:8020/sse?client_id='+client_id);

    // 事件回调
    eventSource.onmessage = (event) => {
      console.log(event.data)
      const messagesDiv = document.getElementById('messages');
      messagesDiv.innerHTML += '<p>' + event.data + '</p>';
    };

    // 异常
    eventSource.onerror = (error) => {
      console.error('EventSource failed:', error);
      eventSource.close();
    };

    eventSource.onopen = ()=>{
        console.log("开启")
    }
  </script>
</html>

这里我用 uuid 模拟客户端的唯一ID,在真实使用时可不要这么做。

下面使用浏览器打开三个页面,可以看到三个不同的 client_id :

tornado服务端,微服务,python,python,tornado,开发语言

tornado服务端,微服务,python,python,tornado,开发语言

tornado服务端,微服务,python,python,tornado,开发语言
在服务端的日志中也能看到这三个客户端的连接:

tornado服务端,微服务,python,python,tornado,开发语言

下面调用 push 接口来给任意一个客户端发送消息,例如这里发给client_id = 2493045e-84dd-4118-8d96-0735c4ac186b 的用户 :

tornado服务端,微服务,python,python,tornado,开发语言

下面看到 client_id2493045e-84dd-4118-8d96-0735c4ac186b的页面:

tornado服务端,微服务,python,python,tornado,开发语言
已经成功收到推送的消息,反之看另外两个:

tornado服务端,微服务,python,python,tornado,开发语言
tornado服务端,微服务,python,python,tornado,开发语言
都没有消息,到这里就实现了长连接下不定时的服务端消息推送方案。文章来源地址https://www.toymoban.com/news/detail-832932.html

到了这里,关于Python Tornado 实现SSE服务端主动推送方案的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 在Linux服务器上部署Tornado项目

    1、准备服务器: 确保你的服务器上安装了Python。Tornado通常与Python 3兼容,因此建议安装Python 3.x。 安装和配置一个Web服务器,如Nginx或Apache,以用作反向代理,将请求传递给Tornado应用。这有助于提高性能并增加安全性。 配置域名和DNS,以便将域名解析到服务器的IP地址(如果

    2024年02月09日
    浏览(40)
  • 试玩python的web框架 flask、fastapi、tornado、django

    先解决一下IDEA使用远程解释器,本地代码编辑无法代码提示问题 常用的4个Python Web框架对比 注意 1.这里使用linux 192.168.72.126上远程解释器,需要 /usr/bin/pip3 install flask ,host参数不要使用localhost/127.0.0.1,即只监听本地的访问,会导致windows无法访问到flask app 2.运行方式增加main方法

    2024年02月17日
    浏览(61)
  • Python学习笔记_进阶篇(一)_浅析tornado web框架

    1、tornado概述 Tornado就是我们在 FriendFeed 的 Web 服务器及其常用工具的开源版本。Tornado 和现在的主流 Web 服务器框架(包括大多数 Python 的框架)有着明显的区别:它是非阻塞式服务器,而且速度相当快。得利于其 非阻塞的方式和对epoll的运用,Tornado 每秒可以处理数以千计的

    2024年02月12日
    浏览(42)
  • 【websocket - Tornado】简易聊天应用

    项目测试的过程中需要自己搭建一个webscoket站点,确保此类服务接入后台系统后访问不受影响。python的服务框架常用的有Flask、Django、Tornado,每个框架的侧重点不同,导致使用的场景就会有所差异。 Flask轻量级,采用常规的同步编程方式,需要安装其他模块辅助,主要用于传

    2024年02月14日
    浏览(36)
  • Server-Sent Events(以下简称 SSE)及event-source-polyfill使用单向长连接(后台主动向前端推送)

    SSE 与 WebSocket 作用相似,都是建立浏览器与服务器之间的通信渠道,然后服务器向浏览器推送信息SSE 是单向通道,只能服务器向浏览器发送,因为流信息本质上就是下载。如果浏览器向服务器发送信息,就变成了另一次 HTTP 请求 使用方法  Server-Sent Events 教程 - 阮一峰的网络

    2024年02月12日
    浏览(47)
  • 使用HTTP/2实现服务端主动推送消息给客户端

    77. 使用HTTP/2实现服务端主动推送消息给客户端 HTTP/2 协议的服务器主动推送机制是通过服务器在接收到客户端请求后,主动向客户端推送相关资源的方式来实现的。下面将详细解释如何在服务器端和客户端实现 HTTP/2 的服务器主动推送,并给出相应的代码示例。 客户端实现:

    2024年02月11日
    浏览(63)
  • Flask、Django和Tornado怎么选

    Flask、Django和Tornado是三个常用的PythonWeb框架,它们在设计理念、功能和适用场景上有所不同。下面是它们的对比: 1. 设计理念: - Flask是一个轻量级的框架,它提供了最基本的功能,但是具有高度的灵活性和可扩展性。它鼓励开发者使用自己喜欢的库和工具来完成特定任务

    2024年04月12日
    浏览(38)
  • 9-tornado-Template优化方法、个人信息案例、tornado中ORM的使用(peewee的使用、peewee_async)、WTForms的使用

    common/base.html shop2.html Tornado中支持累死Vue中的组件功能,就是也公共的内容提取出来当成组件。 具体的使用方式是用 tornado.web.UIModule 环境搭建 在网站中,少了数据CRUD的操作,但在tornado中,我们知道若想操作的话尽量使用异步的操作,这样效率才会高。 那应该如何编写呢,

    2024年02月04日
    浏览(32)
  • 实时通信的服务器推送机制 EventSource(SSE) 简介,附 go 实现示例

    不知道大家有没有见过 Content-Type:text/event-stream 的请求头,这是 HTML5 中的 EventSource 是一项强大的 API ,通过服务器推送实现实时通信。 与 WebSocket 相比, EventSource 提供了一种简单而可靠的单向通信机制(服务器-客户端),实现简单,适用于许多实时应用场景。 本文将介绍

    2024年02月10日
    浏览(85)
  • 6-tornado配置文件的使用(命令行解析、文件设置)

    使用 tornado.options.define 前定义,通常在模块的顶层。 然后,可以将这些选项作为以下属性的属性进行访问 tornado.options.options 但要解析命令行参数时,需要使用 tornado.options.parse_command_line 来解析参数 具体代码如下: 创建文件 server.conf ,将必要的参数直接写入即可 设置好文件

    2024年02月04日
    浏览(33)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包