Python接口并发压力测试(单接口,多接口参数化)+异步aiohttp

这篇具有很好参考价值的文章主要介绍了Python接口并发压力测试(单接口,多接口参数化)+异步aiohttp。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一 单接口

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import json
import random
import datetime

import requests
import threading
import time


class Presstest(object):
    headers = {
        'Content-Type': 'application/json; charset=UTF-8'
    }

    def __init__(self, login_url, press_url, phone, password):
        self.login_url = login_url
        self.press_url = press_url
        self.phone = phone
        self.password = password
        self.session = requests.Session()
        self.session.headers = self.headers

    def login(self):
        '''登陆获取session'''
        data = {"mobile": self.phone, "pwd": self.password, "type": "1"}
        res = self.session.post(self.login_url, data=json.dumps(data))
        XToken = res.json().get('data').get('token')
        self.session.headers['token'] = XToken

    def testinterface(self):
        '''压测接口'''
        # self.session.headers['X-UnionId'] = 'of6uw1CUVhP533sQok'
        data = {"id": 418}
        global ERROR_NUM
        try:
            print('开始调接口:', datetime.datetime.now().strftime('%Y-%m-%d- %H:%M:%S:%f'))
            response = self.session.post(self.press_url, data=json.dumps(data))
            if response.json().get('code') != 0:
                print(response.json())
                ERROR_NUM += 1
        except Exception as e:
            print(e)
            ERROR_NUM += 1

    def testonework(self):
        '''一次并发处理单个任务'''
        i = 0
        while i < ONE_WORKER_NUM:
            i += 1
            self.testinterface()
        time.sleep(LOOP_SLEEP)

    def run(self):
        '''使用多线程进程并发测试'''
        t1 = time.time()
        Threads = []

        for i in range(THREAD_NUM):
            t = threading.Thread(target=self.testonework, name="T" + str(i))
            t.setDaemon(True)
            Threads.append(t)

        for t in Threads:
            t.start()
        for t in Threads:
            t.join()
        t2 = time.time()

        print("===============压测结果===================")
        print("URL:", self.press_url)
        print("任务数量:", THREAD_NUM, "*", ONE_WORKER_NUM, "=", THREAD_NUM * ONE_WORKER_NUM)
        print("总耗时(秒):", t2 - t1)
        print("每次请求耗时(秒):", (t2 - t1) / (THREAD_NUM * ONE_WORKER_NUM))
        print("每秒承载请求数:", 1 / ((t2 - t1) / (THREAD_NUM * ONE_WORKER_NUM)))
        print("错误数量:", ERROR_NUM)


if __name__ == '__main__':
    login_url = 'http://test/sale/login/login'
    press_url = 'http://test/sale/hsOrder/afterExamineAdopt'
    phone = "150000000"
    password = "123456"

    THREAD_NUM = 1  # 并发线程总数
    ONE_WORKER_NUM = 5  # 每个线程的循环次数
    LOOP_SLEEP = 0.1  # 每次请求时间间隔(秒)
    ERROR_NUM = 0  # 出错数

    obj = Presstest(login_url=login_url, press_url=press_url, phone=phone, password=password)
    obj.login()
    obj.run()

二 多接口参数化(实现多接口参数化并发,data和url必须一一对应,且THREAD_NUM并发线程数不能大于url_list长度)

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import json
import random
import datetime

import requests
import threading
import time


class Presstest(object):
    headers = {
        'Content-Type': 'application/json; charset=UTF-8'
    }

    def __init__(self, login_url, press_url, phone, password):
        self.login_url = login_url
        self.press_url = press_url
        self.phone = phone
        self.password = password
        self.session = requests.Session()
        self.session.headers = self.headers

    def login(self):
        '''登陆获取session'''
        data = {"mobile": self.phone, "pwd": self.password, "type": "1"}
        res = self.session.post(self.login_url, data=json.dumps(data))
        XToken = res.json().get('data').get('token')
        self.session.headers['token'] = XToken

    def testinterface(self):
        '''压测接口'''
        # self.session.headers['X-UnionId'] = 'of6uw1CUVhP533sQok'
        data = {"id": 418}
        global ERROR_NUM
        try:
            print('开始调接口111111:', datetime.datetime.now().strftime('%Y-%m-%d- %H:%M:%S:%f'))
            response = self.session.post(self.press_url, data=json.dumps(data))
            if response.json().get('code') != 0:
                print(response.json())
                ERROR_NUM += 1
        except Exception as e:
            print(e)
            ERROR_NUM += 1

    def testinterface2(self, url, data):
        '''压测接口'''
        # self.session.headers['X-UnionId'] = 'of6uw1CUVhP533sQok'
        # data = {"orderId": 1179, "cause": "让人"}
        global ERROR_NUM
        try:
            print('开始调接口2222222:', datetime.datetime.now().strftime('%Y-%m-%d- %H:%M:%S:%f'))
            print("接口请求入参url,data==", url, data)
            response = self.session.post(url,
                                         data=json.dumps(data))
            if response.json().get('code') != 0:
                print(response.json())
                ERROR_NUM += 1
        except Exception as e:
            print(e)
            ERROR_NUM += 1

    def testonework(self, url, data):
        '''一次并发处理单个任务'''
        i = 0
        while i < ONE_WORKER_NUM:
            i += 1
            self.testinterface2(url, data)
            self.testinterface()
        time.sleep(LOOP_SLEEP)

    def run(self):
        '''使用多线程进程并发测试'''
        t1 = time.time()
        Threads = []
        # 实现多接口参数化并发,data和url必须一一对应,且THREAD_NUM并发线程数不能大于url_list长度
        data_list = [{"orderId": 1194, "cause": "让人"},
                     {"orderId": 1193, "cause": "让人"},
                     {"orderId": 1192, "cause": "让人"}]
        url_list = ["http://test/api1",
                    "http://test/api2",
                    "http://test/api3"]
        list_arr = list(range(0, len(data_list)))
        print("index========", list_arr)
        for i in range(THREAD_NUM):
            index = random.choice(list_arr)
            list_arr.remove(index)
            t = threading.Thread(target=self.testonework(url_list[index], data_list[index]), name="T" + str(i))
            t.setDaemon(True)
            Threads.append(t)

        for t in Threads:
            t.start()
        for t in Threads:
            t.join()
        t2 = time.time()

        print("===============压测结果===================")
        print("URL:", self.press_url)
        print("任务数量:", THREAD_NUM, "*", ONE_WORKER_NUM, "=", THREAD_NUM * ONE_WORKER_NUM)
        print("总耗时(秒):", t2 - t1)
        print("每次请求耗时(秒):", (t2 - t1) / (THREAD_NUM * ONE_WORKER_NUM))
        print("每秒承载请求数:", 1 / ((t2 - t1) / (THREAD_NUM * ONE_WORKER_NUM)))
        print("错误数量:", ERROR_NUM)


if __name__ == '__main__':
    login_url = 'http://test/login'
    press_url = 'http://test/afterExamineAdopt'
    phone = "1500000000"
    password = "123456"

    THREAD_NUM = 3  # 并发线程总数
    ONE_WORKER_NUM = 1  # 每个线程的循环次数
    LOOP_SLEEP = 0  # 每次请求时间间隔(秒)
    ERROR_NUM = 0  # 出错数

    obj = Presstest(login_url=login_url, press_url=press_url, phone=phone, password=password)
    obj.login()
    obj.run()

三 多接口并发调用方法二

import datetime
import json

import requests
import threading
import time


def post_request(url, data):
    start_time = time.time()
    print('开始调接口111111:', datetime.datetime.now().strftime('%Y-%m-%d- %H:%M:%S:%f'))
    response = requests.post(url, data=json.dumps(data), headers={"Content-Type": "application/json;charset=UTF-8"})
    print('调用结束:', datetime.datetime.now().strftime('%Y-%m-%d- %H:%M:%S:%f'))
    end_time = time.time()
    duration = end_time - start_time
    print("Response from", url, ":", data, response.text)
    print("Request duration:", duration * 1000, "ms")


data = [{"orderId": 1194, "cause": "让人"},
        {"orderId": 1193, "cause": "让人"},
        {"orderId": 1192, "cause": "让人"}]
urls = ["http://test/api1",
        "http://test/api2",
        "http://test/api3"]

threads = []
for i in range(len(urls)):
    t = threading.Thread(target=post_request, args=(urls[i], data[i]))
    threads.append(t)
    t.start()

for t in threads:
    t.join()

四 多接口同时并发(相当于集合点)(异步实现集合点)

import asyncio
import datetime
import aiohttp
async def make_request(session, url, data):
    print('开始调接口:', datetime.datetime.now().strftime('%Y-%m-%d- %H:%M:%S:%f'))
    async with session.post(url, json=data) as response:
        result = await response.json(content_type='text/html', encoding='utf-8')
        return result


async def run_concurrent_requests(urls, datas, max_concurrent_requests):
    headers = {
        "Content-Type": "application/json;charset=UTF-8"
    }
    tasks = []
    async with aiohttp.ClientSession() as session:
        session.headers = headers
        session.headers["token"] = "BEE010F2A6D0696DD90A82FF28B21AF2"
        sem = asyncio.Semaphore(max_concurrent_requests)
        list_arr = list(range(0, len(datas)))
        for index in list_arr:
            await sem.acquire()
            task = asyncio.ensure_future(make_request(session, urls[index], datas[index]))
            task.add_done_callback(lambda t: sem.release())
            tasks.append(task)
        responses = await asyncio.gather(*tasks)
        return responses


if __name__ == '__main__':
    urls = ['http://test/SubmitSettlement'] * 5
    datas = [{"idArr": [2086]}, {"idArr": [2090]}, {"idArr": [2089]}, {"idArr": [2086]}, {"idArr": [2090]}]
    max_concurrent_requests = 5
    loop = asyncio.get_event_loop()
    responses = loop.run_until_complete(run_concurrent_requests(urls, datas, max_concurrent_requests))
    print(responses)

异步接口怎么压测,python,python,压力测试文章来源地址https://www.toymoban.com/news/detail-738112.html

到了这里,关于Python接口并发压力测试(单接口,多接口参数化)+异步aiohttp的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • python实现接口压力测试

    直接上代码: 输出20次压测结果如下:

    2024年02月17日
    浏览(38)
  • 接口自动化测试框架(pytest+allure+aiohttp+ 用例自动生成)

    近期准备优先做接口测试的覆盖,为此需要开发一个测试框架,经过思考,这次依然想做点儿不一样的东西。 接口测试是比较讲究效率的,测试人员会希望很快能得到结果反馈,然而接口的数量一般都很多,而且会越来越多,所以提高执行效率很有必要 接口测试的用例其实

    2024年02月07日
    浏览(62)
  • 接口自动化测试框架开发(pytest+allure+aiohttp+ 用例自动生成)

    近期准备优先做接口测试的覆盖,为此需要开发一个测试框架,经过思考,这次依然想做点儿不一样的东西。 接口测试是比较讲究效率的,测试人员会希望很快能得到结果反馈,然而接口的数量一般都很多,而且会越来越多,所以提高执行效率很有必要 接口测试的用例其实

    2024年01月23日
    浏览(56)
  • 接口自动化测试框架开发 (pytest+allure+aiohttp+ 用例自动生成)

    目录 前言: 第一部分(整个过程都要求是异步非阻塞的) 读取 yaml 测试用例 http 请求测试接口 收集测试数据 第二部分 动态生成 pytest 认可的测试用例 后续(yml 测试文件自动生成) 前言: 开发一个接口自动化测试框架是一个很好的方式,可以提高测试效率和准确性。在这

    2024年02月16日
    浏览(62)
  • 性能测试----负载测试、压力测试、并发测试

    性能测试:检测一个软件的性能。 性能测试的指标: 响应时间:用户从请求到服务器响应的时间 吞吐量:单位时间内成功地传送数据的数量 并发数:在线并且在操作的用户数 负载测试:加负载,找到让系统崩溃的边界点。 压力测试:高负载的情况下跑,看系统稳定性。

    2024年02月15日
    浏览(43)
  • 【性能测试】稳定性/并发压力测试的TPS计算+5W并发场景设计...

    1、稳定性测试TPS计算 ①普通计算公式:TPS=总请求数 / 总时间按照需求得到基础数据。 在去年第20周,某平台有5万的浏览量 那么总请求数我们可以估算为5万(1次浏览都至少对应1个请求) 总请求数 = 50000请求数 总时间:由于不知道每个请求的具体时间,我们按照普通方法,

    2024年02月04日
    浏览(56)
  • Python向带有SSL/TSL认证服务器发送网络请求小实践(附并发http请求实现asyncio+aiohttp)

    最近工作中遇到这样的一个场景:给客户发送文件的时候,为保证整个过程中,文件不会被篡改,需要在发送文件之间, 对发送的文件进行签名, 而整个签名系统是另外一个团队做的, 提供了一个接口服务完成签名,但访问这个接口需要提供他们团队提供的证书链先进行认

    2024年04月16日
    浏览(45)
  • JMeter之压力测试——混合场景并发

    在实际的压力测试场景中,有时会遇到多个场景混合并发的情况,这时就需要设置不同的并发比例对不同场景请求数量的控制,下面提供两种方案。 一、多线程组方案 1.业务场景设计如下:场景A、场景B、场景C,三个场景按照并发比例要求进行100个用户并发的压力测试 业务

    2024年02月02日
    浏览(39)
  • aiohttp 异步爬虫实战

    之前介绍的 asyncio 模块内部 实现了对 TCP、UDP、SSL 协议的异步操作,但是对于 HTTP 请求的异步操作来说,我们就需要用到 aiohttp 来实现了 。aiohttp 是 一个基于 asyncio 的异步 HTTP 网络模块,它既提供了服务端,又提供了客户端 。其中我们用服务端可以搭建一个支持异步处理的

    2023年04月22日
    浏览(37)
  • tomcat maxThread 压力测试 700 并发以内最好

    以前一直简单的认为多线程=高效率。其实多线程本身并不能提高 cpu 效率,线程过多反而会降低 cpu 效率。当 cpu 核心数线程数时,cpu 就需要在多个线程直接来回切换,以保证每个线程都会获得 cpu 时间,即通常我们说的并发执行。所以  max-threads  的配置绝对不是越大越好。

    2024年02月12日
    浏览(27)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包