解码并处理视频流的多线程应用
随着视频处理技术的不断发展,越来越多的应用需要对视频流进行解码和处理。在本文中,我们将介绍一个基于Python的多线程应用程序,该应用程序可以解码并处理多个RTSP视频流,同时利用GPU加速,以提高处理速度。
这个应用程序使用了一些关键的Python库和工具,包括PyNvCodec、OpenCV、和PyCUDA等。它充分利用了现代GPU的计算能力,实现了高效的视频解码和处理。
多线程解码
在这个应用程序中,我们使用了Python的concurrent.futures
库来实现多线程解码。每个视频流都在独立的线程中解码,这样可以同时处理多个视频流,充分利用了多核CPU的性能。
from concurrent.futures import ThreadPoolExecutor
# ...
# 创建线程池
pool = ThreadPoolExecutor(max_workers=len(urls))
futures = []
# 遍历每个视频流并提交解码任务
for url in urls:
future = pool.submit(decode_rtsp_stream, index, url, gpuID)
futures.append(future)
index += 1
# 等待所有任务完成
pool.shutdown()
# 获取每个任务的结果
for future in futures:
future.result()
视频解码和处理
视频解码是这个应用程序的核心功能。我们使用PyNvCodec库来进行视频解码,同时利用了GPU来加速处理。
def decode_rtsp_stream(thread_index: int, url: str, gpu_id: int):
# 获取视频流参数
params = get_stream_params(url)
# ...
# 创建NvDecoder实例
nvdec = nvc.PyNvDecoder(w, h, f, c, g)
# ...
while True:
# 读取视频流数据
bits = proc.stdout.read(read_size)
# ...
# 解码视频帧
surf = nvdec.DecodeSurfaceFromPacket(enc_packet, pkt_data)
# ...
# 执行颜色空间转换和表面下载
cvtSurface = nv_cvt.Execute(surf, cc_ctx)
success = nv_down.DownloadSingleSurface(cvtSurface, data)
# ...
# 显示解码后的帧
cv2.imshow(str(thread_index), new_data)
cv2.waitKey(1)
# ...
完整代码
这个应用程序可以广泛用于视频监控、实时视频分析、视频编码和解码等领域。通过多线程解码和GPU加速,它可以处理多个高分辨率视频流,并在实时性要求较高的情况下提供流畅的显示和处理效果。
import os
import sys
import subprocess
import json
import PyNvCodec as nvc
import numpy as np
from io import BytesIO
from multiprocessing import Process
import uuid
import time
from concurrent.futures import ThreadPoolExecutor
import cv2
import pycuda.gpuarray as gpuarray
# import PytorchNvCodec as pnvc
import torch
import torchvision.transforms as T
def add_cuda_dll_directories():
if os.name == "nt":
cuda_path = os.environ.get("CUDA_PATH")
if cuda_path:
os.add_dll_directory(cuda_path)
else:
print("CUDA_PATH environment variable is not set.", file=sys.stderr)
exit(1)
sys_path = os.environ.get("PATH")
if sys_path:
paths = sys_path.split(";")
for path in paths:
if os.path.isdir(path) and path != '.':
os.add_dll_directory(path)
else:
print("PATH environment variable is not set.", file=sys.stderr)
exit(1)
def surface_to_tensor(surface: nvc.Surface) -> torch.Tensor:
"""
Converts planar rgb surface to cuda float tensor.
"""
if surface.Format() != nvc.PixelFormat.RGB_PLANAR:
raise RuntimeError("Surface shall be of RGB_PLANAR pixel format")
surf_plane = surface.PlanePtr()
img_tensor = pnvc.DptrToTensor(
surf_plane.GpuMem(),
surf_plane.Width(),
surf_plane.Height(),
surf_plane.Pitch(),
surf_plane.ElemSize(),
)
if img_tensor is None:
raise RuntimeError("Can not export to tensor.")
img_tensor.resize_(3, int(surf_plane.Height() / 3), surf_plane.Width())
img_tensor = img_tensor.type(dtype=torch.cuda.FloatTensor)
img_tensor = torch.divide(img_tensor, 255.0)
img_tensor = torch.clamp(img_tensor, 0.0, 1.0)
return img_tensor
def get_stream_params(url: str):
cmd = [
"ffprobe",
"-v",
"quiet",
"-print_format",
"json",
"-show_format",
"-show_streams",
url,
]
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE)
stdout = proc.communicate()[0]
bio = BytesIO(stdout)
json_out = json.load(bio)
params = {}
if not "streams" in json_out:
return {}
for stream in json_out["streams"]:
if stream["codec_type"] == "video":
params["width"] = stream["width"]
params["height"] = stream["height"]
params["framerate"] = float(eval(stream["avg_frame_rate"]))
codec_name = stream["codec_name"]
is_h264 = True if codec_name == "h264" else False
is_hevc = True if codec_name == "hevc" else False
if not is_h264 and not is_hevc:
raise ValueError(
"Unsupported codec: "
+ codec_name
+ ". Only H.264 and HEVC are supported in this sample."
)
else:
params["codec"] = (
nvc.CudaVideoCodec.H264 if is_h264 else nvc.CudaVideoCodec.HEVC
)
pix_fmt = stream["pix_fmt"]
is_yuv420 = pix_fmt == "yuv420p"
is_yuv444 = pix_fmt == "yuv444p"
# YUVJ420P and YUVJ444P are deprecated but still wide spread, so handle
# them as well. They also indicate JPEG color range.
is_yuvj420 = pix_fmt == "yuvj420p"
is_yuvj444 = pix_fmt == "yuvj444p"
if is_yuvj420:
is_yuv420 = True
params["color_range"] = nvc.ColorRange.JPEG
if is_yuvj444:
is_yuv444 = True
params["color_range"] = nvc.ColorRange.JPEG
if not is_yuv420 and not is_yuv444:
raise ValueError(
"Unsupported pixel format: "
+ pix_fmt
+ ". Only YUV420 and YUV444 are supported in this sample."
)
else:
params["format"] = (
nvc.PixelFormat.NV12 if is_yuv420 else nvc.PixelFormat.YUV444
)
# Color range default option. We may have set when parsing
# pixel format, so check first.
if "color_range" not in params:
params["color_range"] = nvc.ColorRange.MPEG
# Check actual value.
if "color_range" in stream:
color_range = stream["color_range"]
if color_range == "pc" or color_range == "jpeg":
params["color_range"] = nvc.ColorRange.JPEG
# Color space default option:
params["color_space"] = nvc.ColorSpace.BT_601
# Check actual value.
if "color_space" in stream:
color_space = stream["color_space"]
if color_space == "bt709":
params["color_space"] = nvc.ColorSpace.BT_709
return params
return {}
def decode_rtsp_stream(thread_index: int, url: str, gpu_id: int):
params = get_stream_params(url)
if not len(params):
raise ValueError("Can not get " + url + " streams params")
w = params["width"]
h = params["height"]
f = params["format"]
c = params["codec"]
framerate = params["framerate"]
g = gpu_id
if nvc.CudaVideoCodec.H264 == c:
codec_name = "h264"
elif nvc.CudaVideoCodec.HEVC == c:
codec_name = "hevc"
bsf_name = codec_name + "_mp4toannexb,dump_extra=all"
cmd = [
"ffmpeg",
"-hide_banner",
"-i",
url,
"-c:v",
"copy",
"-bsf:v",
bsf_name,
"-f",
codec_name,
"pipe:1",
]
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE)
nvdec = nvc.PyNvDecoder(w, h, f, c, g)
read_size = 4096
rt = 0
fd = 0
t0 = time.time()
print("running stream")
# nv_cvt = nvc.PySurfaceConverter(
# w, h, self.nvYuv.Format(), nvc.PixelFormat.RGB, 0
# )
nv_cvt = nvc.PySurfaceConverter(w, h, nvc.PixelFormat.NV12, nvc.PixelFormat.BGR, g)
cc_ctx = nvc.ColorspaceConversionContext(
params["color_space"], params["color_range"]
)
nv_down = nvc.PySurfaceDownloader(
w, h, nv_cvt.Format(), g
)
data = np.zeros((w * h, 3), np.uint8)
empty_count = 0
while True:
t1=time.time()
if not read_size:
read_size = int(rt / fd)
rt = read_size
fd = 1
bits = proc.stdout.read(read_size)
if not len(bits):
print("Can't read data from pipe")
break
else:
rt += len(bits)
enc_packet = np.frombuffer(buffer=bits, dtype=np.uint8)
pkt_data = nvc.PacketData()
try:
surf = nvdec.DecodeSurfaceFromPacket(enc_packet, pkt_data)
if not surf.Empty():
fd += 1
if pkt_data.bsl < read_size:
read_size = pkt_data.bsl
cvtSurface = nv_cvt.Execute(surf, cc_ctx)
success = nv_down.DownloadSingleSurface(cvtSurface, data)
if success:
new_data = data.reshape((h, w, 3))
cv2.imshow(str(thread_index), new_data)
cv2.waitKey(1)
else:
empty_count += 1
if empty_count > framerate * 30:
print("surf is Empty too many times > "+str(framerate * 30))
nvdec = nvc.PyNvDecoder(w, h, f, c, g)
empty_count = 0
except nvc.HwResetException:
nvdec = nvc.PyNvDecoder(w, h, f, c, g)
empty_count = 0
continue
t2 = time.time()
# print((t2-t1)*1000)
if __name__ == "__main__":
add_cuda_dll_directories()
print("This sample decodes multiple videos in parallel on given GPU.")
print("It doesn't do anything beside decoding, output isn't saved.")
print("Usage: SampleDecodeRTSP.py $gpu_id $url1 ... $urlN .")
if len(sys.argv) < 2:
print("Provide gpu ID and input URL(s).")
exit(1)
gpuID = int(sys.argv[1])
urls = sys.argv[2:]
pool = ThreadPoolExecutor(max_workers=len(urls))
futures = []
index = 0
for url in urls:
future = pool.submit(decode_rtsp_stream, index, url, gpuID)
futures.append(future)
index += 1
pool.shutdown()
for future in futures:
future.result()
运行脚本文章来源:https://www.toymoban.com/news/detail-635641.html
python rtsp_decoder.py 0 rtsp://admin:a1234567@10.10.16.26:554/Streaming/Channels/101?transportmode=multicast
VPF库安装
windows11编译VideoProcessingFramework库_random_2011的博客-CSDN博客文章来源地址https://www.toymoban.com/news/detail-635641.html
到了这里,关于在python中使用nvidia的VPF库对RTSP流进行硬解码并使用opencv进行显示的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!