【通用消息通知服务】0x3 - 发送我们第一条消息(Websocket)

这篇具有很好参考价值的文章主要介绍了【通用消息通知服务】0x3 - 发送我们第一条消息(Websocket)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

【通用消息通知服务】0x3 - 发送我们第一条消息

项目地址: A generic message notification system[Github]

实现接收/发送Websocket消息

Websocket Connection Pool

import asyncio
from asyncio.queues import Queue
from asyncio.queues import QueueEmpty
from contextlib import suppress
from typing import Any

import async_timeout
import orjson
from sanic.log import logger
from ulid import ULID

from common.depend import Dependency

PING = "#ping"
PONG = "#pong"


class WebsocketConnectionPoolDependency(
    Dependency, dependency_name="WebsocketPool", dependency_alias="ws_pool"
):
    def __init__(self, app) -> None:
        super().__init__(app)
        self.lock = asyncio.Lock()
        self.connections = {}	# 存储websocket connections
        self.send_queues = {}   # 各websocket发送队列
        self.recv_queues = {}   # 各websocket接收消息队列
        self.close_callbacks = {} # websocket销毁回调
        self.listeners = {} # 连接监听函数

    def _gen_id(self) -> str:
        return str(ULID())

    async def add_connection(self, connection) -> str:
        async with self.lock:
            id = self._gen_id()
            self.connections[id] = connection
            self.send_queues[id] = Queue()
            self.app.add_task(
                self.send_task(self.send_queues[id], connection),
                name=f"websocket_{id}_send_task",
            )
            self.recv_queues[id] = Queue()
            self.app.add_task(
                self.recv_task(self.recv_queues[id], connection),
                name=f"websocket_{id}_recv_task",
            )
            self.app.add_task(self.notify_task(id), name=f"websocket_{id}_notify_task")
            self.app.add_task(
                self.is_alive_task(id), name=f"websocket_{id}_is_alive_task"
            )
            setattr(connection, "_id", id)
            return connection._id

    def get_connection(self, connection_id: str):
        return self.connections.get(connection_id)

    async def add_listener(self, connection_id, handler) -> str:
        async with self.lock:
            id = self._gen_id()
            self.listeners.setdefault(connection_id, {}).update({id: handler})
            return id

    async def remove_listener(self, connection_id, listener_id):
        async with self.lock:
            self.listeners.get(connection_id, {}).pop(listener_id, None)

    async def add_close_callback(self, connection_id, callback):
        async with self.lock:
            self.close_callbacks.setdefault(connection_id, []).append(callback)

    def is_alive(self, connection_id: str):
        if hasattr(connection_id, "_id"):
            connection_id = connection_id._id
        return connection_id in self.connections

    async def remove_connection(self, connection: Any):
        if hasattr(connection, "_id"):
            connection_id = connection._id
        else:
            connection_id = connection

            if connection_id not in self.connections:
                # removed already
                return

        async with self.lock:
            logger.info(f"remove connection: {connection_id}")

            with suppress(Exception):
                await self.app.cancel_task(f"websocket_{connection_id}_send_task")
            with suppress(Exception):
                await self.app.cancel_task(f"websocket_{connection_id}_recv_task")
            with suppress(Exception):
                await self.app.cancel_task(f"websocket_{connection_id}_notify_task")
            with suppress(Exception):
                await self.app.cancel_task(f"websocket_{connection_id}_is_alive_task")

            if connection_id in self.send_queues:
                del self.send_queues[connection_id]

            if connection_id in self.recv_queues:
                del self.recv_queues[connection_id]

            if connection_id in self.listeners:
                del self.listeners[connection_id]

            if connection_id in self.close_callbacks:
                await self.do_close_callbacks(connection_id)
                del self.close_callbacks[connection_id]

            if connection_id in self.connections:
                del self.connections[connection_id]

    async def do_close_callbacks(self, connection_id):
        for cb in self.close_callbacks.get(connection_id, []):
            self.app.add_task(cb(connection_id))

    async def prepare(self):
        self.is_prepared = True
        logger.info("dependency:WebsocketPool is prepared")
        return self.is_prepared

    async def check(self):
        return True

    async def send_task(self, queue, connection):
        while self.is_alive(connection):
            try:
                data = queue.get_nowait()
            except QueueEmpty:
                await asyncio.sleep(0)
                continue
            try:
                if isinstance(data, (bytes, str, int)):
                    await connection.send(data)
                else:
                    await connection.send(orjson.dumps(data).decode())
                queue.task_done()
            except Exception as err:
                break

    async def recv_task(self, queue, connection):
        while self.is_alive(connection):
            try:
                data = await connection.recv()
                await queue.put(data)
                logger.info(f"recv message: {data} from connection: {connection._id}")
            except Exception as err:
                break

    async def notify_task(self, connection_id):
        while self.is_alive(connection_id):
            try:
                logger.info(f"notify connection: {connection_id}'s listeners")
                data = await self.recv_queues[connection_id].get()
                for listener in self.listeners.get(connection_id, {}).values():
                    await listener(connection_id, data)
            except Exception as err:
                pass

    async def is_alive_task(self, connection_id: str):
        if hasattr(connection_id, "_id"):
            connection_id = connection_id._id

        get_pong = asyncio.Event()

        async def wait_pong(connection_id, data):
            if data != PONG:
                return
            get_pong.set()

        while True:
            get_pong.clear()
            await self.send(connection_id, PING)
            listener_id = await self.add_listener(connection_id, wait_pong)

            with suppress(asyncio.TimeoutError):
                async with async_timeout.timeout(
                    self.app.config.WEBSOCKET_PING_TIMEOUT
                ):
                    await get_pong.wait()

            await self.remove_listener(connection_id, listener_id)
            if get_pong.is_set():
                # this connection is closed
                await asyncio.sleep(self.app.config.WEBSOCKET_PING_INTERVAL)
            else:
                await self.remove_connection(connection_id)

    async def wait_closed(self, connection_id: str):
        """
        if negative=True, only release when client close this connection.
        """
        while self.is_alive(connection_id):
            await asyncio.sleep(0)
        return False

    async def send(self, connection_id: str, data: Any) -> bool:
        if not self.is_alive(connection_id):
            return False
        if connection_id not in self.send_queues:
            return False
        await self.send_queues[connection_id].put(data)

        return True

Websocket Provider

from typing import Dict
from typing import List
from typing import Union

from pydantic import BaseModel
from pydantic import field_serializer
from sanic.log import logger

from apps.message.common.constants import MessageProviderType
from apps.message.common.constants import MessageStatus
from apps.message.common.interfaces import SendResult
from apps.message.providers.base import MessageProviderModel
from apps.message.validators.types import EndpointExID
from apps.message.validators.types import EndpointTag
from apps.message.validators.types import ETag
from apps.message.validators.types import ExID
from utils import get_app


class WebsocketMessageProviderModel(MessageProviderModel):
    class Info:
        name = "websocket"
        description = "Bio-Channel Communication"
        type = MessageProviderType.WEBSOCKET

    class Capability:
        is_enabled = True
        can_send = True

    class Message(BaseModel):
        connections: List[Union[EndpointTag, EndpointExID, str]]
        action: str
        payload: Union[List, Dict, str, bytes]

        @field_serializer("connections")
        def serialize_connections(self, connections):
            return list(set(map(str, connections)))

    async def send(self, provider_id, message: Message) -> SendResult:
        app = get_app()
        websocket_pool = app.ctx.ws_pool

        sent_list = set()

        connections = []
        for connection in message.connections:
            if isinstance(connection, ETag):
                connections.extend(
                    [
                        w
                        for c in await connection.decode()
                        for w in c.get("websockets", [])
                    ]
                )
            elif isinstance(connection, ExID):
                endpoint = await connection.decode()
                if endpoint:
                    connections.extend(endpoint.get("websockets", []))
            else:
                connections.append(connection)

        connections = list(
            set(filter(lambda x: app.ctx.ws_pool.is_alive(connection), connections))
        )

        # logger.info(f"sending websocket message to {connections}")
        for connection in connections:
            if await websocket_pool.send(
                connection, data=message.model_dump_json(exclude=["connections"])
            ):
                sent_list.add(connection)

        if sent_list:
            return SendResult(
                provider_id=provider_id, message=message, status=MessageStatus.SUCCEEDED
            )
        else:
            return SendResult(
                provider_id=provider_id, message=message, status=MessageStatus.FAILED
            )

websocket接口


@app.websocket("/websocket")
async def handle_websocket(request, ws):
    from apps.endpoint.listeners import register_websocket_endpoint
    from apps.endpoint.listeners import unregister_websocket_endpoint

    con_id = None
    try:
        ctx = request.app.ctx
        con_id = await ctx.ws_pool.add_connection(ws)
        logger.info(f"new connection connected -> {con_id}")
        await ctx.ws_pool.add_listener(con_id, register_websocket_endpoint)
        await ctx.ws_pool.add_close_callback(con_id, unregister_websocket_endpoint)
        await ctx.ws_pool.send(
            con_id, data={"action": "on.connect", "payload": {"connection_id": con_id}}
        )
        await ctx.ws_pool.wait_closed(con_id) # 等待连接断开
    finally:
    	# 如果连接被客户端断开, handle_websocket将会被直接销毁, 所以销毁处理需要放在finally。
        request.app.add_task(request.app.ctx.ws_pool.remove_connection(con_id))

结果截图

【通用消息通知服务】0x3 - 发送我们第一条消息(Websocket),通用消息通知服务,websocket,python,网络协议文章来源地址https://www.toymoban.com/news/detail-686478.html

到了这里,关于【通用消息通知服务】0x3 - 发送我们第一条消息(Websocket)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • java发送公众号/服务通知模板消息到指定用户(完整流程|亲测可用)

    准备: 获取当前微信小程序appId(小程序appId)获取当前小程序的秘钥secret 新建模板消息 选用后勾选需要的字段并提交 一次订阅: 指用户订阅一次,服务号可不限时间地下发一条对应的订阅通知; 长期订阅: 指用户订阅一次,服务号可长期多次下发通知,长期订阅通知仅

    2024年02月10日
    浏览(43)
  • JAVA 使用WebSocket发送通知消息

    注: 1、jdk必须要1.8及以上 2、项目若有接口拦截过滤,WebSocket接口必须要配置拦截,使其可以验证通过 WebSocket 业务类 发送消息的方法 前端代码

    2024年02月11日
    浏览(52)
  • Python:使用钉钉dingtalk发送通知消息

    通过钉钉的开放API接口,可以很容易的将消息发送到钉钉dingtalk,比起邮件发送更稳定,及时 文档 官网:https://www.dingtalk.com/ API Explorer调试 https://open-dev.dingtalk.com/apiExplorer 文档:https://open.dingtalk.com/document/robots/custom-robot-access 使用场景:发送消息到聊天群 前期准备:需要新建

    2024年02月11日
    浏览(47)
  • 十二、Jenkins构建完成发送飞书消息通知

    得到一个飞书webhook地址: https://open.feishu.cn/open-apis/bot/v2/hook/2d0b6357-333a-4077-9fcd-61e361a3e51e send_notice.py最后面 代码send_notice.py中拿到Jenkins 引用变量参数,放到https://www.sojson.com/ 进行格式压缩

    2024年02月16日
    浏览(46)
  • uniapp - 微信小程序平台模板消息订阅功能,唤起订阅模板消息弹框、微信公众号向用户发送 “服务通知“ 实现全过程示例代码,支持一次性订阅与永久性订阅(注释详细,一键复制开箱即用)

    本博客实现了uniapp微信小程序端,详细实现公众号订阅通知模板消息完整示例源码,一次性订阅与永久订阅均可,注释详细新手一看就懂! 效果如图所示,uniapp编译的微信小程序内点击按钮后,唤起模板消息订阅申请弹框,后续微信内会收到通知。

    2024年02月13日
    浏览(67)
  • 关于0x3f和0x3f3f3f3f

    在做题时经常将0x3f3f3f3设为INF(正无穷) 相比0x7fffffff,0x3f3f3f3f在做图论题时,(如Dijkstra算法) 相加时不会使正无穷溢出导致变成负数, 使用0x3f的场景一般是 因为memset()是对char操作,即一个字节一个字节的操作,而如果p恰好是int型(四字节),就可以将int的四个字节中的

    2024年02月14日
    浏览(36)
  • Outlook无需API开发连接钉钉群机器人,实现新增会议日程自动发送群消息通知

    Outlook用户使用场景: 在企业中,会议和活动的顺利举行对于业务运转和团队协作至关重要。然而,计划的变动总是无法避免,这可能会导致其他人的计划受到影响,打乱原有的安排。为了解决这个问题,许多企业开始使用各种工具和技术来确保信息的及时传递和更新。其中

    2024年02月09日
    浏览(41)
  • 微信公众号 - Java推送小程序订阅消息给用户_java 通过微信公众号发送订阅通知

    不啰嗦,我们直接开始! 本文使用体验版小程序进行调试。 1、登录微信公众平台 点开下面链接,使用微信扫码 微信公众平台 然后选择一个小程序并登录 2、在小程序后台找到Appid、AppSecret、Token、EncodingAESKey等参数 AppSecret忘记了可以自行重置 往下翻,在消息推送这儿能看到

    2024年04月26日
    浏览(49)
  • 5.图论(0x3f:从周赛中学算法 2022下)

    来自0x3f【从周赛中学算法 - 2022 年周赛题目总结(下篇)】:https://leetcode.cn/circle/discuss/WR1MJP/ 周赛中的图论题目比较少,除了下面选的 DFS、BFS、拓扑排序、基环树、二分图判定 等,还有 最短路、DFS 时间戳 等(这些可以在上半年的周赛题目中学到)。 注:偶见于周赛第三题

    2024年02月14日
    浏览(47)
  • 7.思维题(0x3f:从周赛中学算法 2022下)

    来自0x3f【从周赛中学算法 - 2022 年周赛题目总结(下篇)】:https://leetcode.cn/circle/discuss/WR1MJP/ 【【灵茶山艾府】2022 年周赛题目总结(上篇)】https://leetcode.cn/circle/discuss/G0n5iY/ 包含贪心、脑筋急转弯等,挑选一些比较有趣的题目。 注:常见于周赛第二题(约占 21%)、第三题

    2023年04月19日
    浏览(75)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包