import random
import logging
def get_logger(log_file):
# 日志记录器
logger = logging.getLogger()
# 设置日志级别,只有大于等于这个级别的日志才能输出
logger.setLevel(logging.INFO)
# 设置日志格式
formatter = logging.Formatter(
"%(asctime)s - %(module)s - %(funcName)s - line:%(lineno)d - %(levelname)s - %(message)s"
)
# 输出到控制台
to_console = logging.StreamHandler()
to_console.setFormatter(formatter)
logger.addHandler(to_console)
# 输出到文件中
to_file = logging.FileHandler(filename=log_file)
to_file.setFormatter(formatter)
logger.addHandler(to_file)
return logger
CONFIG = {"a": random.randint(1, 100), "b": random.randint(1, 100)}
core/worker.py
fast_api.py文章来源:https://www.toymoban.com/news/detail-729083.html
import os
import time
import json
import random
import datetime
from multiprocessing import Process
from fastapi import FastAPI
import uuid
ROOT_DIR = os.path.dirname(os.path.abspath(__file__))
def do_task(task_id):
try:
import sys
sys.path.append(f'{ROOT_DIR}/core')
from worker import CONFIG, get_logger
logger = get_logger(f'{ROOT_DIR}/logs/{task_id}.log')
start_time = datetime.datetime.now()
n = random.randint(1, 5)
while n > 0:
logger.info(f"task_{task_id} is doing ...")
time.sleep(1)
n -= 1
end_time = datetime.datetime.now()
CONFIG[str(time.time())] = str(end_time)
logger.info(f"task_{task_id} finished. config: {CONFIG}")
res = {"start": str(start_time), "end": str(end_time), 'config': CONFIG}
json.dump(res, open(f'{ROOT_DIR}/output/{task_id}.json', 'w'))
except Exception as e:
print('-'*50)
print(e)
print('-'*50)
app = FastAPI()
@app.get("/")
def read_root():
task_id = ''.join(str(uuid.uuid1()).split('-')[::-1])
print('task_id', task_id)
task = Process(target=do_task, args=(task_id,))
task.start()
print("task.is_alive()", task.is_alive())
while task.is_alive():
print(f"waiting task {task_id} ...")
time.sleep(1)
task.join()
return json.load(open(f'{ROOT_DIR}/output/{task_id}.json'))
@app.get("/items/{item_id}")
def read_item(item_id: int, q: str = None):
return {"item_id": item_id, "q": q}
requsts test文章来源地址https://www.toymoban.com/news/detail-729083.html
import requests
import random
import time
from multiprocessing import Pool
def f(x):
return x*x
def func(x):
return f"worker_{x}: hello world"
def download(x):
url = 'http://127.0.0.1:8000'
t = random.randint(1, 5)
time.sleep(t)
response = requests.get(url)
return f"worker_{x}({t}): {response.json()}"
if __name__ == '__main__':
with Pool(5) as p:
for res in p.map(download, [1,2,3,4,5]):
print(res)
# download(1)
到了这里,关于FastAPI 使用记录的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!