Python-Flask:编写自动化连接demo脚本:v1.0.0

这篇具有很好参考价值的文章主要介绍了Python-Flask:编写自动化连接demo脚本:v1.0.0。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

主函数:

# _*_ Coding : UTF-8 _*_
# @Time :  13:14
# @Author : YYZ
# @File : Flask
# @Project : Python_Project_爬虫
import json

from flask import Flask,request,jsonify
import ssh

api = Flask(__name__)

# methods: 指定请求方式


'''
接口解析参数
        host = host_info["host"]
        port = host_info["port"]
        service = host_info["service"]
        user = host_info["user"]
        pwd = host_info["pwd"]
'''
@api.route('/',methods=['POST'])
def install():
    # 请求方式为post时,可以使用 request.get_json()接收到JSON数据
    try:
        #host_info = request.get_json()  # 获取 POST 请求中的 JSON 数据
        host_info = request.get_data()
        # 如果得到的data是字符串格式,则需要用json.loads来变换成python格式,看个人需求
        host_info = json.loads(host_info)
        print(host_info)

    except Exception as e:
        return jsonify({'error': '请求数据失败'}), 400
    # 处理数据
    # 调用do_something_with_data函数来处理接收到的数据。
    try:
        connect = ssh.Sshclass(host_info["host"], host_info["user"], host_info["port"])  # 端口,用户,ssh端口
        connect.conn_by_pwd(host_info["pwd"])  # 输入密码,进行登录
    except Exception as e:
        return jsonify({'error': '连接失败'}), 888

    if  host_info["cmd"] :
        try:
            command_res = str(connect.exec_commond(host_info["cmd"]))    #执行命令
            print(command_res)
        except Exception as e:
            return jsonify({'error': '执行失败'}), 888

    if host_info["file-determine"] == "yes":
        try:
            file_res = connect.upload_file(host_info["local_path"],host_info["remote_path"],host_info["file_name"])
            print(host_info["file_name"]+str(file_res))
        except Exception as e:
            return jsonify({'error': '上传失败'}), 888
    return  "操作完成"


if __name__ == '__main__':
    api.run(host='0.0.0.0', port=8080, debug=True)

Ssh连接部分:

import paramiko
import json
'''
ssh 连接对象
本对象提供密钥连接、命令执行、关闭连接
'''

class Sshclass(object):

    # ip = ''
    # port = 22
    # username = ''
    # timeout = 0
    # ssh = None

    def __init__(self,ip,username,port=22,timeout=30):
        '''
        初始化ssh对象
        :param ip: 主机IP
        :param username: 登录用户名
        :param port: ssh端口号
        :param timeout: 连接超时
        :return:
        '''

        self.ip = ip
        self.username = username
        self.timeout = timeout
        self.port = port

        '''
         SSHClient作用类似于Linux的ssh命令,是对SSH会话的封装,该类封装了传输(Transport),通道(Channel)及SFTPClient建立的方法(open_sftp),通常用于执行远程命令。
         Paramiko中的几个基础名词:
         1、Channel:是一种类Socket,一种安全的SSH传输通道;
         2、Transport:是一种加密的会话,使用时会同步创建了一个加密的Tunnels(通道),这个Tunnels叫做Channel;
         3、Session:是client与Server保持连接的对象,用connect()/start_client()/start_server()开始会话。
        '''
        ssh = paramiko.SSHClient()
        #远程主机没有本地主机密钥或HostKeys对象时的连接方法,需要配置
        ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        self.ssh = ssh

    def conn_by_key(self,key):
        '''
        密钥连接
        :param key:  str rsa密钥路径
        :return:  ssh连接对象
        '''
        rsa_key = paramiko.RSAKey.from_private_key(key)
        self.ssh.connect(hostname=self.ip,port=self.port,username=self.username,pkey=rsa_key,timeout=self.timeout)
        if self.ssh:
            print('密钥连接成功')
        else:
            self.close()
            raise Exception('密钥连接失败')

    def conn_by_pwd(self,pwd):
        '''
        密码连接
        :param pwd: 登录密码
        :return:  ssh连接对象
        '''
        self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        self.ssh.connect(hostname=self.ip,port=self.port,username=self.username,password=pwd)
        if self.ssh:
            print('密码连接成功')

        else:
            self.close()
            raise Exception('密码连接失败')

    def exec_commond(self,command):
        '''
        命令控制
        :param commond: 命令
        :return: 返回结构
        '''

        if command:
            stdin, stdout, stderr = self.ssh.exec_command(command)
            return {
                "stdin":command,
                "stdout":stdout.read(),
                "stderr":stderr.read()
            }

        else:
            self.close()
            raise Exception("命令不能为空")

    def close(self):
        '''
        关闭当前连接
        :return:
        '''
        if self.ssh:
            self.ssh.close()
        else:
            raise Exception("ssh关闭连接失败,当前对象没有ssh连接。")

    def upload_file(self,local_path,remote_path,file_name):
        # sftp_link = paramiko.Transport(self.ip,self.port)
        # sftp_link.connect(username=self.username,password=pwd)
        # sftp = paramiko.SFTPClient.from_transport(sftp_link)
        sftp = self.ssh.open_sftp()
        try:
            sftp.put(local_path+"\\"+file_name, remote_path+"/"+file_name)
            #print("上传成功")
            return ("上传成功"), 200
        except Exception as e:
            return {'error': '上传失败'}, 888
        finally:
            sftp.close()
            self.close()

if __name__ == '__main__':
    ssh = Sshclass('192.168.115.23','root', port=22)
    pwd = "123456"
    local_path = 'D:\PyChrom\Python_Flask\自动化接口--Flask'
    remote_path = "/opt"
    file_name = 'file.py'
    ssh.conn_by_pwd(pwd)
    #res = str(ssh.exec_commond("ls /"))
    #print(res)
    res = ssh.upload_file(local_path,remote_path,file_name)
    print(res)

接口调试

Python-Flask:编写自动化连接demo脚本:v1.0.0,python,flask,自动化,云原生,运维

后续优化思路:

目前只是远程连接+文件上传,后续会继续优化

弄个公共的nfs,平常一些脚本和包会放到这个nfs里,脚本或包自动从nfs里拉,然后执行脚本,即可部署,包括多机部署。文章来源地址https://www.toymoban.com/news/detail-728598.html

到了这里,关于Python-Flask:编写自动化连接demo脚本:v1.0.0的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Python连接打印机:实现自动化打印的利器

    引言: 在现代办公环境中,打印机已经成为不可或缺的设备之一。然而,手动操作打印机往往耗时耗力,而且容易出现错误。为了提高工作效率和准确性,我们可以通过编写Python程序来连接打印机,实现自动化打印。本文将介绍如何使用Python连接打印机,并提供一些实用的技

    2024年01月18日
    浏览(23)
  • 新手教程 | Python自动化测试Selenium+chrome连接HTTP代理(账密+白名单)

    Selenium 有很多功能, 但其核心是 web 浏览器自动化的一个工具集,它允许用户模拟终端用户执行的常见活动;将文本输入到字段中,选择下拉值和复选框,并单击文档中的链接。 它还提供许多其他控件,比如鼠标移动、任意 JavaScript 执行等等。 虽然 Selenium 主要用于网

    2024年02月06日
    浏览(29)
  • python接口自动化测试框架2.0,让你像Postman一样编写测试用例,支持多环境切换、多业务依赖、数据库断言等

    接口自动化测试项目2.0 软件架构 本框架主要是基于 Python + unittest + ddt + HTMLTestRunner + log + excel + mysql + 企业微信通知 + Jenkins 实现的接口自动化框架。 前言 公司突然要求你做自动化,但是没有代码基础不知道怎么做?或者有自动化基础,但是不知道如何系统性的做自动化,

    2024年02月13日
    浏览(29)
  • 【Rust】——编写自动化测试

    🎃个人专栏: 🐬 算法设计与分析:算法设计与分析_IT闫的博客-CSDN博客 🐳Java基础:Java基础_IT闫的博客-CSDN博客 🐋c语言:c语言_IT闫的博客-CSDN博客 🐟MySQL:数据结构_IT闫的博客-CSDN博客 🐠数据结构:​​​​​​数据结构_IT闫的博客-CSDN博客 💎C++:C++_IT闫的博客-CSDN博

    2024年04月11日
    浏览(17)
  • Python-Flask 快学

    flask是Python的一个轻型Web框架. 使用pycharm自动创建项目,也可以手动创建,以下是目录的结构: ├── app.py ├── static └── templates 其中,创建Flask实例时,可以自定配置主要有: 参数 说明 static_path 静态文件访问路径(不推荐使用,使用 static_url_path 代替) static_url_path 指定静

    2023年04月08日
    浏览(15)
  • 自动化测试脚本编写(超详细)

    🍅 视频学习: 文末有免费的配套视频可观看 🍅  关注公众号【互联网杂货铺】,回复 1  ,免费获取软件测试全套资料,资料在手,涨薪更快 什么是自动化测试? 自动化测试是验证和验证软件是否满足所有用户需求,并使用自动化工具按预期运行。它检查在产品开发阶段

    2024年04月26日
    浏览(19)
  • 【Rust】——编写自动化测试(一)

    🎃个人专栏: 🐬 算法设计与分析:算法设计与分析_IT闫的博客-CSDN博客 🐳Java基础:Java基础_IT闫的博客-CSDN博客 🐋c语言:c语言_IT闫的博客-CSDN博客 🐟MySQL:数据结构_IT闫的博客-CSDN博客 🐠数据结构:​​​​​​数据结构_IT闫的博客-CSDN博客 💎C++:C++_IT闫的博客-CSDN博

    2024年04月09日
    浏览(24)
  • Rust之编写自动化测试

    在最简单的情形下,Rust中的测试就是一个标注有test属性的函数。属性 (attribute)是一种用于修饰Rust代码的元数据。只需要将#[test]添加到fn的上一行便可以将函数转变为测试函数。当测试编写完成后,我们可以使用cargo test命令来运行测试。这个命令会构建并执行一个用于测

    2024年02月12日
    浏览(18)
  • ChatGPT辅助编写自动化测试

    大家好,我是洋子,ChatGPT已经越来越火爆,国内百度、阿里等互联网大厂也纷纷投入大模型研究,OpenAI官网中提供了许多ChatGPT应用场景,例如SQL翻译、语言翻译、代码解释等 作为一名QA,我更关注ChatGPT生成的自动化测试脚本质量如何,借助ChatGPT能否提升自动化测试编写效率

    2024年02月10日
    浏览(19)
  • react+unittest+flask 接口自动化测试平台

    目录 1 前言 2 框架 2-1 框架简介 2-2 框架介绍 2-3 框架结构 3 平台 3-1 平台组件图 1 新建用例 2 生成测试任务 3 执行并查看测试报告

    2024年02月17日
    浏览(25)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包