目录
一、消息队列
二、服务端
三、设备功能
四、主线程
五、客户端
六、更新
思路:
1.websockets需要从客户端接收消息,由于websockets创建服务端只能绑定一个端口,所以需要单独占用一个线程。收到的消息,我们需要共享给主线程,然后主线程根据设备(多线程)分发消息给各线程
2.消息中心需要独立出来,websockets服务端放消息,主线程去消息
3.根据思路设计模块:
1.消息库
2.服务端
3.主线程
4.多线程
先运行Main.py,再运行websocket_client.py(客户端),客户端发送的消息可能不一样,所以统一消息里面必须有device_id 或者device_name
修改websocket_client.py中data的信息,发送不同消息
一、消息队列
message_base.py
根据设备,创建储存设备消息,提取设备消息的功能
from queue import Queue
class MessageBase:
def __init__(self):
self.data = dict()
def add(self, device, data):
if device in self.data:
self.data[device].put(data)
else:
self.data[device] = Queue()
self.data[device].put(data)
def get(self, device):
data_queue: Queue = self.data.get(device)
if not data_queue or data_queue.empty():
return None
data = data_queue.get()
return data
if __name__ == '__main__':
mb = MessageBase()
mb.add("a", "asdasd")
mb.add("a", "11111111")
print(mb.data)
data = mb.get("a")
print(data)
二、服务端
websocket_server.py
从客户端接收消息,并存到消息队列
import asyncio
import json
import threading
import websockets
##
from message_base import MessageBase
class WebServer:
def __init__(self, host, port, message_base: MessageBase):
self.host = host
self.port = port
self.clients = []
self.message_base = message_base
async def echo(self, websocket, path):
self.clients.append(websocket)
client_ip, client_port = websocket.remote_address
print(f"连接到:{client_ip}:{client_port}")
while True:
try:
recv_text = await websocket.recv()
data = json.loads(recv_text)
device = data.get("device")
if device:
self.message_base.add(device, data)
else:
continue
except websockets.ConnectionClosed:
print("ConnectionClosed...") # 链接断开
self.clients.remove(websocket)
break
except websockets.InvalidState:
print("InvalidState...") # 无效状态
self.clients.remove(websocket)
break
except Exception as e:
print(e)
def connect(self):
print("连接成功!")
asyncio.set_event_loop(asyncio.new_event_loop())
start_server = websockets.serve(self.echo, self.host, self.port)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
def run(self):
t = threading.Thread(target=self.connect)
t.start()
print("已启动!")
if __name__ == '__main__':
mb = MessageBase()
ws = WebServer("192.168.6.28", 8001, mb)
ws.run()
三、设备功能
device_function.py
每个设备对应的线程功能,可以统一也可以写多个功能
class DeviceFunc:
def __init__(self, device_name, data):
self.device_name = device_name
self.data = data
def show_data(self):
if self.data:
print(self.device_name, "收到消息:", self.data.get("value"))
四、主线程
main.py
初始化所有功能模块,并运行主线程
from message_base import MessageBase
from websocket_server import WebServer
from device_function import DeviceFunc
class MainThread:
def __init__(self, message_base: MessageBase, websocket_server: WebServer, device_list):
self.message_base = message_base
self.websocket_server = websocket_server
self.device_list = device_list
def run_server(self):
self.websocket_server.run()
def run(self):
self.run_server()
while True:
for device in self.device_list:
try:
# 开始根据设备即功能处理消息
data = self.message_base.get(device)
if not data:
continue
df = DeviceFunc(device, data)
df.show_data()
except Exception as err:
pass
if __name__ == '__main__':
mb = MessageBase()
ws = WebServer("192.168.6.28", 8000, mb)
device_list = ["aa", "bb", "cc"]
mt = MainThread(mb, ws, device_list)
mt.run()
五、客户端
webscoket_client.py
给服务端发送消息,测试用
import json
import websocket
class WebClient:
def __init__(self, host, port):
self.host = host
self.port = port
self.conn = None
self.flag = False
def connect(self):
try:
url = f"ws://{self.host}:{self.port}"
self.conn = websocket.create_connection(url)
self.flag = True
print("连接成功")
except Exception as err:
self.flag = False
print("连接失败", err)
def close(self):
self.conn.close()
def recv(self):
data = self.conn.recv(1024)
print(data)
def send(self, data):
self.conn.send(data)
print("发送成功")
if __name__ == '__main__':
host = "192.168.6.28"
port = 8000
ws = WebClient(host, port)
if not ws.flag:
ws.connect()
devices = ["aa", "bb", "cc"]
while True:
device = random.choice(devices)
s = ""
for i in range(random.randint(0, 100)):
s += chr(random.randint(65, 122))
data = {"device": device, "value": s}
data = json.dumps(data)
ws.send(data)
time.sleep(1)
六、更新
队列在获取消息时有阻塞的现象,我们上面避免阻塞用了下面注释的内容,
def get(self, device): data_queue: Queue = self.data.get(device) if not data_queue: # 阻塞 return None # if not data_queue or data_queue.empty(): # return None data = data_queue.get() return data
我们如果不用注释的内容,让获取消息直接进入阻塞,但是阻塞是在设备线程里面,不影响主线程和其他线程。这样,我们需要把设备功能封装到线程里面。这样设备工作不会相互影响
DeviceFunc对象改为DeviceThread文章来源:https://www.toymoban.com/news/detail-857650.html
import threading
class DeviceThread(threading.Thread):
def __init__(self, device_name, message_base):
super().__init__(target=self.process)
self.device_name = device_name
self.message_base = message_base
def get_data(self):
data = self.message_base.get(self.device_name)
return data
def process(self):
while True:
data = self.get_data()
if data:
print(self.device_name, "收到消息:", data.get("value"))
主线程的run方法修改为:文章来源地址https://www.toymoban.com/news/detail-857650.html
def run(self): self.run_server() # while True: # for device in self.device_list: # try: # data = self.message_base.get(device) # if not data: # continue # df = DeviceFunc(device, data) # df.show_data() # except Exception as err: # pass for device in self.device_list: t = DeviceThread(device, self.message_base) t.start()
到了这里,关于Python中websockets服务端从客户端接收消息并发送给多线程的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!