基于python-socket构建任务服务器(基于socket发送指令创建、停止任务)

这篇具有很好参考价值的文章主要介绍了基于python-socket构建任务服务器(基于socket发送指令创建、停止任务)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

在实现ia业务服务器时需要构建一个python-socket客户端,1、要求能与服务器保持心跳连接,每10秒钟发送一次心跳信号;2、要求能根据socket服务器发送的指令创建或终止一个定时任务。
为此以3个类实现该功能,分别为socket通信类(用于实现通信连接与任务创建)、任务池类(用于管理任务)、任务类(用于实现具体任务)。

1、socket通信客户端

这里定义的MySocket类主体结构如下图所示,共包含4个函数,2个线程(其本身继承Thread类实现主任务流程——run函数、接收服务器信息并创建任务添加到任务池;同时又在__init__函数中将self.thread_msg类封装为一个线程,每隔10秒钟向socket服务器发送一次心跳包)。check_connection函数用于检测socket是否与服务器断开连接,在send_msg函数中调用,当发现客户端掉线后则立刻进行重连。send_msg函数用于发送信息给服务器,因为run函数与thread_msg函数2个线程都需要调用连接与服务器发送数据,为避免冲突故而定义为函数在内部进行加锁。
基于python-socket构建任务服务器(基于socket发送指令创建、停止任务),项目实战记录,python,服务器,网络

#socket客户端
class MySocket(Thread):
    def __init__(self,config):
        super().__init__()
        # 1.创建套接字
        self.tcp_socket = socket(AF_INET,SOCK_STREAM)
        self.tcp_socket.setsockopt(SOL_SOCKET, SO_KEEPALIVE, 1) #在客户端开启心跳维护
        # 2.准备连接服务器,建立连接
        self.serve_ip = config["serve_ip"]#当前"118.24.111,149"
        self.serve_port = config["serve_port"]  #端口当前7900
        self.sleep_time = config["sleep_time"]
        print("connect to : ",self.serve_ip,self.serve_port)
        self.tcp_socket.connect((self.serve_ip,self.serve_port))  # 连接服务器,建立连接,参数是元组形式
        self.lock = threading.RLock()
        
        self.taskpool=TaskPool()

        task_msg=threading.Thread(target=self.thread_msg)
        task_msg.daemon = True
        task_msg.start()
            #定时发送信息
    def run(self):
        while True:
            a=self.tcp_socket.recv(1024)#接受服务端的信息,最大数据为1k
            a=a.decode('utf-8')
            print("------主线程-----",a)
            jdata=json.loads(a)
            #jdata={"streamAddr":"rtmp://adasdasdxcvsdfj.sdfdsfsd","state":1,"count":5,"taskname":"aaa","jsonname":"a.json"}
            task=OCRTask(jdata)
            self.taskpool.append(task)
            
            json_data={  
                "type":"OCR_STATE_ACK",
                "timestamp": int(time.time()*10),#时间戳放大一位和格式要求的长度保持一致
                "streamAddr": jdata["streamAddr"]
            }
            #print( json_data)
            message = json.dumps(json_data)
            data='{:08X}'.format(len(message))+message.encode('utf-8').hex().upper()
            data=hex_to_bytes(data)
            self.send_msg(data)

    def check_connection(self):
        try:
            self.tcp_socket.getpeername()
            return True
        except socket.error:
            return False
    
    #定时发送心跳信息
    def thread_msg(self):
        while True:
            #message=input('You can say:')
            #json标注的模板
            json_data={  
                "timestamp": int(time.time()*10),#时间戳放大一位和格式要求的长度保持一致
                "type":"HEARBEAT"
            }
            #print( json_data)
            message = json.dumps(json_data)
            data='{:08X}'.format(len(message))+message.encode('utf-8').hex().upper()
            data=hex_to_bytes(data)

            #进行定时发送
            self.send_msg(data)
            # self.lock.acquire()
            # self.tcp_socket.send(data)#将发送的数据进行编码
            # self.lock.release()
            try:
                #进行定时发送
                self.lock.acquire()
                a=self.tcp_socket.recv(1024)#接受服务端的信息,最大数据为1k
                self.lock.release()
                time.sleep(self.sleep_time)
                print("ack: ",a.decode('utf-8'))
            except ConnectionRefusedError:
                print('服务器拒绝本次连接!!!!!')
                self.tcp_socket.connect((self.serve_ip,self.serve_port))  # 连接服务器,建立连接,参数是元组形式
            except TimeoutError:
                print('连接超时!!!!!')
                self.tcp_socket.connect((self.serve_ip,self.serve_port))  # 连接服务器,建立连接,参数是元组形式
            except OSError:
                self.tcp_socket.connect((self.serve_ip,self.serve_port))  # 连接服务器,建立连接,参数是元组形式
                print('智能终端无网络连接!!!!!')

    def send_msg(self,msg):
        if self.check_connection() is False:
            print('服务器掉线!!!!!')
            self.tcp_socket.connect((self.serve_ip,self.serve_port))  # 连接服务器,建立连接,参数是元组形式
        try:
            #进行定时发送
            self.lock.acquire()
            self.tcp_socket.send(msg)
            self.lock.release()
        except ConnectionRefusedError:
            print('服务器拒绝本次连接!!!!!')
            self.tcp_socket.connect((self.serve_ip,self.serve_port))  # 连接服务器,建立连接,参数是元组形式
        except TimeoutError:
            print('连接超时!!!!!')
            self.tcp_socket.connect((self.serve_ip,self.serve_port))  # 连接服务器,建立连接,参数是元组形式
        except OSError:
            self.tcp_socket.connect((self.serve_ip,self.serve_port))  # 连接服务器,建立连接,参数是元组形式
            print('智能终端无网络连接!!!!!')

2、任务池实现

任务池的实现代码如下所示,主要包含3个函数(其中将remove_task封装为一个子线程,用于实时移除已经完成计算任务的线程),append函数用于将新创建的任务添加大任务池pool中,stop函数用于停止并移除正在运行中的任务。
基于python-socket构建任务服务器(基于socket发送指令创建、停止任务),项目实战记录,python,服务器,网络
其具体实现代码如下所示,其作为MySocket类中的一个成员属性,每当MySocket接收到服务器信息创建任务ocrtask后都调用TaskPool.append(ocrtask)将任务添加到任务池中。由任务池管理任务的声明周期,具体可见其append函数可以启动task或终止task。remove_task线程会自动将已经完成的任务移除。

#ocr任务线程池
class TaskPool:
    def __init__(self,sleep_time=0.5):
        self.pool=[]
        self.sleep_time=sleep_time
        task_msg=threading.Thread(target=self.remove_task)
        task_msg.daemon = True
        task_msg.start()

    #删除已经结束的任务
    def remove_task(self):
        while True:
            names=[]
            for task in self.pool:
                if task.get_count()==0: #生存时间为0,认为该任务已经完成需要被删除
                    task.stop()
                    self.pool.remove(task)
                else:
                    names.append(task.taskname)
            if len(names)>0:
                print(names)
            time.sleep(self.sleep_time)
            
    def append(self,ocrtask):
        if ocrtask.state==0:
            #终止任务
            self.stop(ocrtask)
        else:
            #启动任务
            ocrtask.start()
            self.pool.append(ocrtask)

    #终止任务
    def stop(self,ocrtask):
        for task in self.pool:
            if task.taskname==ocrtask.taskname:
                task.stop()
                self.pool.remove(task)

3、具体任务线程

任务的实现代码如下所示,其支持3中任务模式,使用state区分任务,state为0-停止识别,1-连续识别count张,2-持续识别(故而在state为2时将count设置的特别大)。这里以count控制任务的运行,任务每运行一次count减少1。当count小于等于0,则表示任务运行完成。在TaskPool的remove_task中检测到count为0时则会自动删除任务。

#ocr任务
class OCRTask(Thread):
    def __init__(self,json):
        super().__init__()
        self.streamAddr=json["streamAddr"]
        self.state=json["state"] # 0-停止识别,1-连续识别count张,2-持续识别
        if json["state"]==2:
            self.count=9999999999999999999999999
        else:
            self.count=json["count"]
        if "taskname" in json.keys():
            self.taskname=json["taskname"]
        else:
            self.taskname=json["streamAddr"]

        self.jsonname=json["jsonname"]
        self.lock = threading.RLock()

    def run(self):
        while self.get_count()>0:
            print('run %s'%self.taskname,end='*')
            time.sleep(2)
            self.lock.acquire()
            self.count-=1
            self.lock.release()
        print('%s finish!! '%self.taskname)

    #获取任务的生存时间
    def get_count(self):
        self.lock.acquire()
        now_count=self.count
        self.lock.release()
        #削减count
        return now_count

    #停止任务
    def stop(self):
        self.lock.acquire()
        self.count=-1
        self.lock.release()
        #停止任务
        pass

4、完整代码与使用效果

完整代码如下所示

from socket import *
import time,json
import yaml
import threading,struct
from threading import Thread
 
def hex_to_bytes(hex_str):
    """
    :param hex_str: 16进制字符串
    :return: byte_data 字节流数据
    """
    bytes_data = bytes()
    while hex_str :
        """16进制字符串转换为字节流"""
        temp = hex_str[0:2]
        s = int(temp, 16)
        bytes_data += struct.pack('B', s)
        hex_str = hex_str[2:]
    return bytes_data

# 读取Yaml文件方法
def read_yaml(yaml_path):
    with open(yaml_path, encoding="utf-8", mode="r") as f:
        result = yaml.load(stream=f,Loader=yaml.FullLoader)
        return result

#ocr任务
class OCRTask(Thread):
    def __init__(self,json):
        super().__init__()
        self.streamAddr=json["streamAddr"]
        self.state=json["state"] # 0-停止识别,1-连续识别count张,2-持续识别
        if json["state"]==2:
            self.count=9999999999999999999999999
        else:
            self.count=json["count"]
        if "taskname" in json.keys():
            self.taskname=json["taskname"]
        else:
            self.taskname=json["streamAddr"]

        self.jsonname=json["jsonname"]
        self.lock = threading.RLock()

    def run(self):
        while self.get_count()>0:
            print('run %s'%self.taskname,end='*')
            time.sleep(2)
            self.lock.acquire()
            self.count-=1
            self.lock.release()
        print('%s finish!! '%self.taskname)

    #获取任务的生存时间
    def get_count(self):
        self.lock.acquire()
        now_count=self.count
        self.lock.release()
        #削减count
        return now_count

    #停止任务
    def stop(self):
        self.lock.acquire()
        self.count=-1
        self.lock.release()
        #停止任务
        pass

#ocr任务线程池
class TaskPool:
    def __init__(self,sleep_time=0.5):
        self.pool=[]
        self.sleep_time=sleep_time
        task_msg=threading.Thread(target=self.remove_task)
        task_msg.daemon = True
        task_msg.start()

    #删除已经结束的任务
    def remove_task(self):
        while True:
            names=[]
            for task in self.pool:
                if task.get_count()==0:
                    task.stop()
                    self.pool.remove(task)
                else:
                    names.append(task.taskname)
            if len(names)>0:
                print(names)
            time.sleep(self.sleep_time)
            
    def append(self,ocrtask):
        if ocrtask.state==0:
            #终止任务
            self.stop(ocrtask)
        else:
            #启动任务
            ocrtask.start()
            self.pool.append(ocrtask)

    #终止任务
    def stop(self,ocrtask):
        for task in self.pool:
            if task.taskname==ocrtask.taskname:
                task.stop()
                self.pool.remove(task)

#socket客户端
class MySocket(Thread):
    def __init__(self,config):
        super().__init__()
        # 1.创建套接字
        self.tcp_socket = socket(AF_INET,SOCK_STREAM)
        self.tcp_socket.setsockopt(SOL_SOCKET, SO_KEEPALIVE, 1) #在客户端开启心跳维护
        # 2.准备连接服务器,建立连接
        self.serve_ip = config["serve_ip"]#当前"118.24.111,149"
        self.serve_port = config["serve_port"]  #端口当前7900
        self.sleep_time = config["sleep_time"]
        print("connect to : ",self.serve_ip,self.serve_port)
        self.tcp_socket.connect((self.serve_ip,self.serve_port))  # 连接服务器,建立连接,参数是元组形式
        self.lock = threading.RLock()
        
        self.taskpool=TaskPool()

        task_msg=threading.Thread(target=self.thread_msg)
        task_msg.daemon = True
        task_msg.start()
            #定时发送信息
    
    #通信线程-用于接收服务器的指令
    def run(self):
        while True:
            a=self.tcp_socket.recv(1024)#接受服务端的信息,最大数据为1k
            a=a.decode('utf-8')
            print("------主线程-----",a)
            jdata=json.loads(a)
            #jdata={"streamAddr":"rtmp://adasdasdxcvsdfj.sdfdsfsd","state":1,"count":5,"taskname":"aaa","jsonname":"a.json"}
            task=OCRTask(jdata)
            self.taskpool.append(task)
            
            json_data={  
                "type":"OCR_STATE_ACK",
                "timestamp": int(time.time()*10),#时间戳放大一位和格式要求的长度保持一致
                "streamAddr": jdata["streamAddr"]
            }
            #print( json_data)
            message = json.dumps(json_data)
            data='{:08X}'.format(len(message))+message.encode('utf-8').hex().upper()
            data=hex_to_bytes(data)
            self.send_msg(data)

    #检测socket连接是否断开
    def check_connection(self):
        try:
            self.tcp_socket.getpeername()
            return True
        except socket.error:
            return False
    
    #定时发送心跳信息--子线程
    def thread_msg(self):
        while True:
            #message=input('You can say:')
            #json标注的模板
            json_data={  
                "timestamp": int(time.time()*10),#时间戳放大一位和格式要求的长度保持一致
                "type":"HEARBEAT"
            }
            #print( json_data)
            message = json.dumps(json_data)
            data='{:08X}'.format(len(message))+message.encode('utf-8').hex().upper()
            data=hex_to_bytes(data)

            #进行定时发送
            self.send_msg(data)
            # self.lock.acquire()
            # self.tcp_socket.send(data)#将发送的数据进行编码
            # self.lock.release()
            try:
                #进行定时发送
                self.lock.acquire()
                a=self.tcp_socket.recv(1024)#接受服务端的信息,最大数据为1k
                self.lock.release()
                time.sleep(self.sleep_time)
                print("ack: ",a.decode('utf-8'))
            except ConnectionRefusedError:
                print('服务器拒绝本次连接!!!!!')
                self.tcp_socket.connect((self.serve_ip,self.serve_port))  # 连接服务器,建立连接,参数是元组形式
            except TimeoutError:
                print('连接超时!!!!!')
                self.tcp_socket.connect((self.serve_ip,self.serve_port))  # 连接服务器,建立连接,参数是元组形式
            except OSError:
                self.tcp_socket.connect((self.serve_ip,self.serve_port))  # 连接服务器,建立连接,参数是元组形式
                print('智能终端无网络连接!!!!!')

    #发送信息
    def send_msg(self,msg):
        if self.check_connection() is False:
            print('服务器掉线!!!!!')
            self.tcp_socket.connect((self.serve_ip,self.serve_port))  # 连接服务器,建立连接,参数是元组形式
        try:
            #进行定时发送
            self.lock.acquire()
            self.tcp_socket.send(msg)
            self.lock.release()
        except ConnectionRefusedError:
            print('服务器拒绝本次连接!!!!!')
            self.tcp_socket.connect((self.serve_ip,self.serve_port))  # 连接服务器,建立连接,参数是元组形式
        except TimeoutError:
            print('连接超时!!!!!')
            self.tcp_socket.connect((self.serve_ip,self.serve_port))  # 连接服务器,建立连接,参数是元组形式
        except OSError:
            self.tcp_socket.connect((self.serve_ip,self.serve_port))  # 连接服务器,建立连接,参数是元组形式
            print('智能终端无网络连接!!!!!')

if "__main__"==__name__:
    #进行定时通信测试
    config=read_yaml("config.yaml")
    socket_client=MySocket(config)
    socket_client.start()

使用效果如下所示,这里基于socket调试工具作为客户端

基于python-socket构建任务服务器(基于socket发送指令创建、停止任务),项目实战记录,python,服务器,网络
基于python-socket构建任务服务器(基于socket发送指令创建、停止任务),项目实战记录,python,服务器,网络
基于python-socket构建任务服务器(基于socket发送指令创建、停止任务),项目实战记录,python,服务器,网络文章来源地址https://www.toymoban.com/news/detail-835803.html

到了这里,关于基于python-socket构建任务服务器(基于socket发送指令创建、停止任务)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【python】TCP socket服务器 Demo

    目录 一、单线程服务器 二、多线程服务器 三、多线程服务器(发送和接收分离) 说明:只能连接一个客户端 客户端测试结果: 服务端测试结果:  说明:可以支持连接多个客户端 客户端测试结果:  服务端测试结果: 说明: 可以支持连接多个客户端,并且能够做到和多

    2024年02月09日
    浏览(38)
  • python socket监测服务器和客户端连接状态

    服务器端和客户端的连接状态,应该是没有单独的函数返回或是接口监测的,看网上很多资料说需要监测心跳,这确实是一个普遍解决监测状态的办法,但是对我的项目却显得有些被动,其实对一般的项目包括我的,用try…except就可以满足,里面设置标志位,if判断一下就可

    2024年02月13日
    浏览(43)
  • 【Java网络编程】基于UDP-Socket 实现客户端、服务器通信

    ​ 哈喽,大家好~我是你们的老朋友: 保护小周ღ   本期为大家带来的是网络编程的 UDP Socket 套接字,基于 UDP协议的 Socket 实现客户端服务器通信 ,Socket 套接字可以理解为是,传输层给应用层提供的一组 API,如此程序,确定不来看看嘛~~ 本期收录于博主的专栏 : JavaEE_保

    2024年02月02日
    浏览(70)
  • C#上位机基础学习_基于SOCKET实现与PLC服务器的TCP通信(一)

    测试软件: TIA PORTAL V15.1 S7-PLCSIM ADVANCED V3.0 Visual Studio 2019 如下图所示,打开S7-PLCSIM ADVANCED V3.0仿真软件,新键一个实例,设置仿真PLC的IP地址等参数,然后点击Start激活PLC, 如下图所示,激活PLC后,可以看到已经存在一个实例, 如下图所示,打开TIA PORTAL V15.1,新建一个项目,

    2023年04月15日
    浏览(45)
  • 基于PBS向超算服务器队列提交任务的脚本模板与常用命令

      本文介绍在 Linux 服务器中,通过 PBS (Portable Batch System)作业管理系统脚本的方式,提交任务到 服务器 队列,并执行任务的方法。   最近,需要在学校公用的超算中执行代码任务;而和多数超算设备一样,其也是需要通过作业队列的方式,来提交、管理、排序不同用

    2024年04月12日
    浏览(39)
  • 基于k8s的web服务器构建

    项目描述/项目功能: 模拟企业里的k8s生产环境,部署web,nfs,harbor,Prometheus,granfa等应用,构建一个高可用高性能的web系统,同时能监控整个k8s集群的使用。 CentOS 7.9,ansible 2.9.27,Docker 2.6.0.0,Docker Compose 2.18.1,Kubernetes 1.20.6,Harbor 2.1.0,nfs v4,metrics-server 0.6.0,ingress-ngi

    2024年04月11日
    浏览(46)
  • 基于k8s的综合的web服务器构建

    目录 项目架构图: 项目环境: 项目描述: 项目步骤: ip规划: 一.在三台k8s机器上安装部署好k8s,一台作为master,两台node 安装部署k8s node节点加入集群: master节点初始化:  安装Calico网络插件: 二,部署nfs服务,让所有的web业务pod都取访问,通过pv,pvc和卷挂载实现 1.搭建

    2024年04月13日
    浏览(50)
  • Python网络编程实战:构建TCP服务器与客户端

    Python网络编程实战:构建TCP服务器与客户端 在信息化时代,网络编程是软件开发中不可或缺的一部分。Python作为一种功能强大的编程语言,提供了丰富的网络编程库和工具,使得开发者能够轻松构建各种网络应用。本文将详细介绍如何在Python中进行网络编程,特别是如何使用

    2024年04月15日
    浏览(44)
  • 智能车上位机系统,pyqt下的socket通信,python实现服务器+客户端,文本+视频不定长字节传输,超详细,小白都能看懂

    目录 前言: 准备工作: 初级服务器端编写: 中级服务器端编写+客户端收数据函数实现: 数据包格式v1.0 客户端收数据函数V1.0 客户端分析1.0     本地测试:成功!      两台主机测试1.0:失败,视频解析失败,直接花屏了! 问题分析: 问题解决: 数据包格式V2.0 客户端接

    2024年04月17日
    浏览(46)
  • 【spring authorization server系列教程】(一)入门系列,spring authorization server简介。快速构建一个授权服务器(基于最新版本0.3.0)

    【spring authorization server系列教程】(一)入门系列,快速构建一个授权服务器 spring authorization server是spring团队最新的认证授权服务器,之前的oauth2后面会逐步弃用。不过到现在发文的时候,我看到官网已经把之前oauth2仓库废弃了。 现在spring authorization server已经到生产就绪阶段了

    2024年02月05日
    浏览(55)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包