一、FunASR
在我的另一个博客有介绍FunASR,并且进行了语者分离,不过最近FunASR自带了语者分离,挺好挺好,但是一直看社区就是大家都用python写,会出现线程不安全问题,群里有大佬说使用多台服务器,然后用nginx做代理,这不是妥妥土豪行为吗,感觉很浪费
vad出现的问题
方案解决:
图上部分是大佬给的解决方案
图下部分是我给的解决方案方案
二、我的方案:上代码(队列解决线程并发问题)
import os
import uuid
import copy
import json
import logging
import queue
import threading
import numpy as np
import torch
from flask import Flask
from flask import request, jsonify
from modelscope.pipelines import pipeline
from modelscope.utils.constant import Tasks
app = Flask(__name__)
# 创建一个队列
pipeline_queue = queue.Queue()
# 实例对象的计数器来存储实例的数量
created_instances = 0
# logging.basicConfig(filename='app.log', level=logging.INFO)
# logger = logging.getLogger('info')
# # 再创建一个handler,用于输出到控制台
# ch = logging.StreamHandler()
# ch.setLevel(logging.INFO)
# # 定义handler的输出格式
# formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
# ch.setFormatter(formatter)
# # 给logger添加handler
# logger.addHandler(ch)
def flask_content_type(req) -> dict:
"""根据不同的content_type来解析数据"""
if req.method == 'POST' or req.method == "PUT":
if 'application/x-www-form-urlencoded' == req.content_type or 'form-data' in req.content_type:
data = req.form
else: # 无法被解析出来的数据
if req.data:
data = json.loads(req.data)
else:
raise Exception('无法解析')
elif req.method == 'GET':
data = req.args
else:
raise Exception('不支持的请求方式')
return copy.copy(data)
def create_pipelines(num_pipelines):
"""
@Description:num_pipelines:创建pipeline实例的数量,根据你的显存大小创建,一个实例需要占用2GB
"""
for _ in range(num_pipelines):
global created_instances
inference_pipeline = pipeline(
task=Tasks.auto_speech_recognition,
model='/root/autodl-tmp/models_from_modelscope/damo/speech_paraformer-large-vad-punc-spk_asr_nat-zh-cn',
model_revision='v0.0.2',
vad_model='/root/autodl-tmp/models_from_modelscope/damo/speech_fsmn_vad_zh-cn-16k-common-pytorch',
punc_model='/root/autodl-tmp/models_from_modelscope/damo/punc_ct-transformer_cn-en-common-vocab471067-large')
pipeline_queue.put(inference_pipeline)
print("=========成功创建实例=========")
# 更新已创建实例的数量
created_instances += 1
print(f"=====队列现有空闲实例数量为{pipeline_queue.qsize()}========")
print(f"=====实例总数量数量为{created_instances}========")
print("============================")
def create_pipelines_thread(num_pipelines):
# 创建线程
thread = threading.Thread(target=create_pipelines, args=(num_pipelines,))
# 启动线程
thread.start()
return thread
def default_dump(obj):
"""Convert numpy classes to JSON serializable objects."""
if isinstance(obj, (np.integer, np.floating, np.bool_)):
return obj.item()
elif isinstance(obj, np.ndarray):
return obj.tolist()
else:
return obj
@app.route('/queue_size', methods=['GET'])
def get_queue_size():
# 获取队列数量
queue_size = pipeline_queue.qsize()
# 构建响应
response = {
'queue_size': queue_size
}
# 返回响应
return jsonify(response), 200
@app.route('/created_instances', methods=['GET'])
def get_created_instances():
# 获取已创建的实例数量
response = {
'created_instances': created_instances
}
# 返回响应
return jsonify(response), 200
@app.route('/add_pipeline/<int:num>', methods=['GET'])
def add_pipeline_queue(num):
global created_instances
if (created_instances >= 10):
return jsonify({'error': f"现有实例数量为{created_instances},无法再添加"})
print("=========开始创建实例=========")
print(f"=====队列现有空闲实例数量为{pipeline_queue.qsize()}========")
thread = create_pipelines_thread(num)
# 等待线程结束
thread.join()
print("=========实例创建结束=========")
print(pipeline_queue.qsize())
return jsonify({'success': f"队列现有空闲实例数量为{pipeline_queue.qsize()}!现有实例数量为{created_instances}"})
@app.route('/', methods=['POST'])
def result_test():
dates = flask_content_type(request).copy()
print(dates)
return jsonify({'success': dates})
@app.route('/transcribe', methods=['POST'])
def transcribe():
print("======队列剩余======")
print(pipeline_queue.qsize())
# 第一步,获取请求体
if 'audio_file' in request.files:
audio_file = request.files['audio_file']
file_ext = os.path.splitext(audio_file.filename)[1]
if file_ext.lower() not in ['.wav', '.mp3']:
return jsonify({'error': str('Error: Audio file must be either .wav or .mp3')}), 500
else:
try:
# 将音频文件保存到临时文件夹中
temp_dir_path = 'temp'
if not os.path.exists(temp_dir_path):
os.makedirs(temp_dir_path)
# 保存上传的临时文件
file_extension = os.path.splitext(audio_file.filename)[1]
unique_filename = str(uuid.uuid4()) + file_extension
temp_file_path = os.path.join(temp_dir_path, unique_filename)
audio_file.save(temp_file_path)
return start_asr(temp_file_path)
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
# 删除临时文件
os.remove(temp_file_path)
else:
dates = flask_content_type(request).copy()
return start_asr(dates['file_url'])
def start_asr(temp_file_path):
import time
# 记录开始时间
start_time = time.time()
inference_pipeline = pipeline_queue.get()
# 使用 inference pipeline 进行语音转写
asr_result = inference_pipeline(audio_in=temp_file_path, batch_size_token=5000,
batch_size_token_threshold_s=40,
max_single_segment_time=6000)
try:
transform = time.time() - start_time
asr_result["transform_time"] = transform
# 将语音识别结果转换为json格式
result = json.dumps(asr_result, ensure_ascii=False, default=default_dump)
return result
except Exception as e:
print(str(e))
# 返回错误信息
return jsonify({'error': str(e)}), 500
finally:
pipeline_queue.put(inference_pipeline)
def start_flask_app(port):
"""
启动 Flask 应用程序并运行在指定端口上
在调用 Flask 的 app.run 方法后,应用会进入监听状态,等待客户端发起请求。这意味着应用会一直停留在 app.run() 这一行,不会继续执行后续的 Python 代码。
启动 Flask 应用程序前,使用多线程的方式创建:我这里是32G显存所以默认创建10个实例
"""
print("============================")
print("======开始异步创建实例=========")
print("============================")
try:
# 清除显卡
torch.cuda.empty_cache()
except Exception as e:
# 返回错误信息
print(e)
create_pipelines_thread(5)
app.run(port=port)
if __name__ == '__main__':
start_flask_app(9501)
三、测试
二已经给你完整的示例了,所以我就不测了,我都上生产了,你们自己用postman或者代码试一下把!有问题再联系我18956043585(微信同号)
兄弟们,这是我的解决方案,欢迎交流,如果有帮助,或者有问题,欢迎回来留言,分享一下你的兴奋文章来源:https://www.toymoban.com/news/detail-794914.html
文章来源地址https://www.toymoban.com/news/detail-794914.html
到了这里,关于FunASR语音识别(解决-高并发线程问题)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!