一 单接口
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import json
import random
import datetime
import requests
import threading
import time
class Presstest(object):
headers = {
'Content-Type': 'application/json; charset=UTF-8'
}
def __init__(self, login_url, press_url, phone, password):
self.login_url = login_url
self.press_url = press_url
self.phone = phone
self.password = password
self.session = requests.Session()
self.session.headers = self.headers
def login(self):
'''登陆获取session'''
data = {"mobile": self.phone, "pwd": self.password, "type": "1"}
res = self.session.post(self.login_url, data=json.dumps(data))
XToken = res.json().get('data').get('token')
self.session.headers['token'] = XToken
def testinterface(self):
'''压测接口'''
# self.session.headers['X-UnionId'] = 'of6uw1CUVhP533sQok'
data = {"id": 418}
global ERROR_NUM
try:
print('开始调接口:', datetime.datetime.now().strftime('%Y-%m-%d- %H:%M:%S:%f'))
response = self.session.post(self.press_url, data=json.dumps(data))
if response.json().get('code') != 0:
print(response.json())
ERROR_NUM += 1
except Exception as e:
print(e)
ERROR_NUM += 1
def testonework(self):
'''一次并发处理单个任务'''
i = 0
while i < ONE_WORKER_NUM:
i += 1
self.testinterface()
time.sleep(LOOP_SLEEP)
def run(self):
'''使用多线程进程并发测试'''
t1 = time.time()
Threads = []
for i in range(THREAD_NUM):
t = threading.Thread(target=self.testonework, name="T" + str(i))
t.setDaemon(True)
Threads.append(t)
for t in Threads:
t.start()
for t in Threads:
t.join()
t2 = time.time()
print("===============压测结果===================")
print("URL:", self.press_url)
print("任务数量:", THREAD_NUM, "*", ONE_WORKER_NUM, "=", THREAD_NUM * ONE_WORKER_NUM)
print("总耗时(秒):", t2 - t1)
print("每次请求耗时(秒):", (t2 - t1) / (THREAD_NUM * ONE_WORKER_NUM))
print("每秒承载请求数:", 1 / ((t2 - t1) / (THREAD_NUM * ONE_WORKER_NUM)))
print("错误数量:", ERROR_NUM)
if __name__ == '__main__':
login_url = 'http://test/sale/login/login'
press_url = 'http://test/sale/hsOrder/afterExamineAdopt'
phone = "150000000"
password = "123456"
THREAD_NUM = 1 # 并发线程总数
ONE_WORKER_NUM = 5 # 每个线程的循环次数
LOOP_SLEEP = 0.1 # 每次请求时间间隔(秒)
ERROR_NUM = 0 # 出错数
obj = Presstest(login_url=login_url, press_url=press_url, phone=phone, password=password)
obj.login()
obj.run()
二 多接口参数化(实现多接口参数化并发,data和url必须一一对应,且THREAD_NUM并发线程数不能大于url_list长度)
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import json
import random
import datetime
import requests
import threading
import time
class Presstest(object):
headers = {
'Content-Type': 'application/json; charset=UTF-8'
}
def __init__(self, login_url, press_url, phone, password):
self.login_url = login_url
self.press_url = press_url
self.phone = phone
self.password = password
self.session = requests.Session()
self.session.headers = self.headers
def login(self):
'''登陆获取session'''
data = {"mobile": self.phone, "pwd": self.password, "type": "1"}
res = self.session.post(self.login_url, data=json.dumps(data))
XToken = res.json().get('data').get('token')
self.session.headers['token'] = XToken
def testinterface(self):
'''压测接口'''
# self.session.headers['X-UnionId'] = 'of6uw1CUVhP533sQok'
data = {"id": 418}
global ERROR_NUM
try:
print('开始调接口111111:', datetime.datetime.now().strftime('%Y-%m-%d- %H:%M:%S:%f'))
response = self.session.post(self.press_url, data=json.dumps(data))
if response.json().get('code') != 0:
print(response.json())
ERROR_NUM += 1
except Exception as e:
print(e)
ERROR_NUM += 1
def testinterface2(self, url, data):
'''压测接口'''
# self.session.headers['X-UnionId'] = 'of6uw1CUVhP533sQok'
# data = {"orderId": 1179, "cause": "让人"}
global ERROR_NUM
try:
print('开始调接口2222222:', datetime.datetime.now().strftime('%Y-%m-%d- %H:%M:%S:%f'))
print("接口请求入参url,data==", url, data)
response = self.session.post(url,
data=json.dumps(data))
if response.json().get('code') != 0:
print(response.json())
ERROR_NUM += 1
except Exception as e:
print(e)
ERROR_NUM += 1
def testonework(self, url, data):
'''一次并发处理单个任务'''
i = 0
while i < ONE_WORKER_NUM:
i += 1
self.testinterface2(url, data)
self.testinterface()
time.sleep(LOOP_SLEEP)
def run(self):
'''使用多线程进程并发测试'''
t1 = time.time()
Threads = []
# 实现多接口参数化并发,data和url必须一一对应,且THREAD_NUM并发线程数不能大于url_list长度
data_list = [{"orderId": 1194, "cause": "让人"},
{"orderId": 1193, "cause": "让人"},
{"orderId": 1192, "cause": "让人"}]
url_list = ["http://test/api1",
"http://test/api2",
"http://test/api3"]
list_arr = list(range(0, len(data_list)))
print("index========", list_arr)
for i in range(THREAD_NUM):
index = random.choice(list_arr)
list_arr.remove(index)
t = threading.Thread(target=self.testonework(url_list[index], data_list[index]), name="T" + str(i))
t.setDaemon(True)
Threads.append(t)
for t in Threads:
t.start()
for t in Threads:
t.join()
t2 = time.time()
print("===============压测结果===================")
print("URL:", self.press_url)
print("任务数量:", THREAD_NUM, "*", ONE_WORKER_NUM, "=", THREAD_NUM * ONE_WORKER_NUM)
print("总耗时(秒):", t2 - t1)
print("每次请求耗时(秒):", (t2 - t1) / (THREAD_NUM * ONE_WORKER_NUM))
print("每秒承载请求数:", 1 / ((t2 - t1) / (THREAD_NUM * ONE_WORKER_NUM)))
print("错误数量:", ERROR_NUM)
if __name__ == '__main__':
login_url = 'http://test/login'
press_url = 'http://test/afterExamineAdopt'
phone = "1500000000"
password = "123456"
THREAD_NUM = 3 # 并发线程总数
ONE_WORKER_NUM = 1 # 每个线程的循环次数
LOOP_SLEEP = 0 # 每次请求时间间隔(秒)
ERROR_NUM = 0 # 出错数
obj = Presstest(login_url=login_url, press_url=press_url, phone=phone, password=password)
obj.login()
obj.run()
三 多接口并发调用方法二
import datetime
import json
import requests
import threading
import time
def post_request(url, data):
start_time = time.time()
print('开始调接口111111:', datetime.datetime.now().strftime('%Y-%m-%d- %H:%M:%S:%f'))
response = requests.post(url, data=json.dumps(data), headers={"Content-Type": "application/json;charset=UTF-8"})
print('调用结束:', datetime.datetime.now().strftime('%Y-%m-%d- %H:%M:%S:%f'))
end_time = time.time()
duration = end_time - start_time
print("Response from", url, ":", data, response.text)
print("Request duration:", duration * 1000, "ms")
data = [{"orderId": 1194, "cause": "让人"},
{"orderId": 1193, "cause": "让人"},
{"orderId": 1192, "cause": "让人"}]
urls = ["http://test/api1",
"http://test/api2",
"http://test/api3"]
threads = []
for i in range(len(urls)):
t = threading.Thread(target=post_request, args=(urls[i], data[i]))
threads.append(t)
t.start()
for t in threads:
t.join()
四 多接口同时并发(相当于集合点)(异步实现集合点)文章来源:https://www.toymoban.com/news/detail-738112.html
import asyncio
import datetime
import aiohttp
async def make_request(session, url, data):
print('开始调接口:', datetime.datetime.now().strftime('%Y-%m-%d- %H:%M:%S:%f'))
async with session.post(url, json=data) as response:
result = await response.json(content_type='text/html', encoding='utf-8')
return result
async def run_concurrent_requests(urls, datas, max_concurrent_requests):
headers = {
"Content-Type": "application/json;charset=UTF-8"
}
tasks = []
async with aiohttp.ClientSession() as session:
session.headers = headers
session.headers["token"] = "BEE010F2A6D0696DD90A82FF28B21AF2"
sem = asyncio.Semaphore(max_concurrent_requests)
list_arr = list(range(0, len(datas)))
for index in list_arr:
await sem.acquire()
task = asyncio.ensure_future(make_request(session, urls[index], datas[index]))
task.add_done_callback(lambda t: sem.release())
tasks.append(task)
responses = await asyncio.gather(*tasks)
return responses
if __name__ == '__main__':
urls = ['http://test/SubmitSettlement'] * 5
datas = [{"idArr": [2086]}, {"idArr": [2090]}, {"idArr": [2089]}, {"idArr": [2086]}, {"idArr": [2090]}]
max_concurrent_requests = 5
loop = asyncio.get_event_loop()
responses = loop.run_until_complete(run_concurrent_requests(urls, datas, max_concurrent_requests))
print(responses)
文章来源地址https://www.toymoban.com/news/detail-738112.html
到了这里,关于Python接口并发压力测试(单接口,多接口参数化)+异步aiohttp的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!