本帖子源于AidLux面向众多开发者的AIGC训练营,目的在于实现使用stablediffusion生成图片传输到AidLux端实现目标检测。分享部分传输检测代码及其实现视频如下:
AIGC与AidLux互联应用—Aidlux端AIGC测评
import socket
import cv2
import numpy as np
import time
import sys
### 本代码主要是客户端代码,aidlux上的Socket_fuwuduan.py是匹配的服务端代码,当服务端代码启动时,由本代码读取一张图片,推送过去
def recvall(sock, count):
buf = b'' # buf是一个byte类型
while count:
newbuf = sock.recv(count)
if not newbuf: return None
buf += newbuf
count -= len(newbuf)
return buf
def SendAIGC():
# 建立sock连接
# address要连接的aidlux服务器IP地址和端口号
address = ('192.168.137.116', 9023)
try:
# 建立socket对象
# socket.AF_INET:服务器之间网络通信
# socket.SOCK_STREAM:流式socket , for TCP
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 开启连接
sock.connect(address)
except socket.error as msg:
print(msg)
sys.exit(1)
###########传送AIGC图片#################
## 如果本地没有GPU
if 1:
frame = cv2.imread("car.png")
# # 压缩参数,后面cv2.imencode将会用到,对于jpeg来说,15代表图像质量,越高代表图像质量越好为 0-100,默认95
encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), 95]
# cv2.imencode将图片格式转换(编码)成流数据,赋值到内存缓存中;主要用于图像数据格式的压缩,方便网络传输
# '.jpg'表示将图片按照jpg格式编码。
result, imgencode = cv2.imencode('.jpg', frame, encode_param)
# 建立矩阵
data = np.array(imgencode)
# 将numpy矩阵转换成字符形式,以便在网络中传输
stringData = data.tostring()
# 先发送要发送的数据的长度
# ljust() 方法返回一个原字符串左对齐,并使用空格填充至指定长度的新字符串
sock.send(str.encode(str(len(stringData)).ljust(16)))
# 发送数据
sock.send(stringData)
### 如果本地有GPU
# if 0:
# ### 本地生成AIGC图片 ###
# ## 添加AIGC代码 ##
# #####################
# frame = cv2.imread("car.png")
# # # 压缩参数,后面cv2.imencode将会用到,对于jpeg来说,15代表图像质量,越高代表图像质量越好为 0-100,默认95
# encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), 95]
# # cv2.imencode将图片格式转换(编码)成流数据,赋值到内存缓存中;主要用于图像数据格式的压缩,方便网络传输
# # '.jpg'表示将图片按照jpg格式编码。
# result, imgencode = cv2.imencode('.jpg', frame, encode_param)
# # 建立矩阵
# data = np.array(imgencode)
# # 将numpy矩阵转换成字符形式,以便在网络中传输
# stringData = data.tostring()
# # 先发送要发送的数据的长度
# # ljust() 方法返回一个原字符串左对齐,并使用空格填充至指定长度的新字符串
# sock.send(str.encode(str(len(stringData)).ljust(16)))
# # 发送数据
# sock.send(stringData)
# 读取服务器返回值
receive = sock.recv(1000)
receive1 = sock.recv(1000)
receive2 = sock.recv(1000)
i = 0
if len(receive):
print("图片发送成功")
print("检测类别及其数目:")
print(str(receive, encoding='utf-8'),str(receive1, encoding='utf-8'))
# print(str(receive2, encoding='utf-8'))
### 之前接受的帧率数据,现在换成image流数据
sock.close()
if __name__ == '__main__':
SendAIGC()
该部分代码用于传输生成的图片到Aidlux端,并且接受返回的回馈信息。文章来源:https://www.toymoban.com/news/detail-551399.html
import cv2
import numpy as np
import socket
import time
import numpy
from cvs import *
import aidlite_gpu
import copy
coco_class = ['person', 'bicycle', 'car', 'motorcycle', 'airplane', 'bus', 'train', 'truck', 'boat', 'traffic light',
'fire hydrant', 'stop sign', 'parking meter', 'bench', 'bird', 'cat', 'dog', 'horse', 'sheep', 'cow',
'elephant', 'bear', 'zebra', 'giraffe', 'backpack', 'umbrella', 'handbag', 'tie', 'suitcase', 'frisbee',
'skis', 'snowboard', 'sports ball', 'kite', 'baseball bat', 'baseball glove', 'skateboard', 'surfboard',
'tennis racket', 'bottle', 'wine glass', 'cup', 'fork', 'knife', 'spoon', 'bowl', 'banana', 'apple',
'sandwich', 'orange', 'broccoli', 'carrot', 'hot dog', 'pizza', 'donut', 'cake', 'chair', 'couch',
'potted plant', 'bed', 'dining table', 'toilet', 'tv', 'laptop', 'mouse', 'remote', 'keyboard', 'cell phone',
'microwave', 'oven', 'toaster', 'sink', 'refrigerator', 'book', 'clock', 'vase', 'scissors', 'teddy bear',
'hair drier', 'toothbrush']
blank_dict = {cls: 0 for cls in coco_class}
def xywh2xyxy(x):
'''
Box (center x, center y, width, height) to (x1, y1, x2, y2)
'''
y = np.copy(x)
y[:, 0] = x[:, 0] - x[:, 2] / 2 # top left x
y[:, 1] = x[:, 1] - x[:, 3] / 2 # top left y
y[:, 2] = x[:, 0] + x[:, 2] / 2 # bottom right x
y[:, 3] = x[:, 1] + x[:, 3] / 2 # bottom right y
return y
def xyxy2xywh(box):
'''
Box (left_top x, left_top y, right_bottom x, right_bottom y) to (left_top x, left_top y, width, height)
'''
box[:, 2:] = box[:, 2:] - box[:, :2]
return box
def NMS(dets, thresh):
'''
单类NMS算法
dets.shape = (N, 5), (left_top x, left_top y, right_bottom x, right_bottom y, Scores)
'''
x1 = dets[:,0]
y1 = dets[:,1]
x2 = dets[:,2]
y2 = dets[:,3]
areas = (y2-y1+1) * (x2-x1+1)
scores = dets[:,4]
keep = []
index = scores.argsort()[::-1]
while index.size >0:
i = index[0] # every time the first is the biggst, and add it directly
keep.append(i)
x11 = np.maximum(x1[i], x1[index[1:]]) # calculate the points of overlap
y11 = np.maximum(y1[i], y1[index[1:]])
x22 = np.minimum(x2[i], x2[index[1:]])
y22 = np.minimum(y2[i], y2[index[1:]])
w = np.maximum(0, x22-x11+1) # the weights of overlap
h = np.maximum(0, y22-y11+1) # the height of overlap
overlaps = w*h
ious = overlaps / (areas[i]+areas[index[1:]] - overlaps)
idx = np.where(ious<=thresh)[0]
index = index[idx+1] # because index start from 1
return dets[keep]
def letterbox(img, new_shape=(640, 640), color=(114, 114, 114), auto=True, scaleFill=False, scaleup=True, stride=32):
# Resize and pad image while meeting stride-multiple constraints
shape = img.shape[:2] # current shape [height, width]
if isinstance(new_shape, int):
new_shape = (new_shape, new_shape)
# Scale ratio (new / old)
r = min(new_shape[0] / shape[0], new_shape[1] / shape[1])
if not scaleup: # only scale down, do not scale up (for better test mAP)
r = min(r, 1.0)
# Compute padding
ratio = r, r # width, height ratios
new_unpad = int(round(shape[1] * r)), int(round(shape[0] * r))
dw, dh = new_shape[1] - new_unpad[0], new_shape[0] - new_unpad[1] # wh padding
if auto: # minimum rectangle
dw, dh = np.mod(dw, stride), np.mod(dh, stride) # wh padding
elif scaleFill: # stretch
dw, dh = 0.0, 0.0
new_unpad = (new_shape[1], new_shape[0])
ratio = new_shape[1] / shape[1], new_shape[0] / shape[0] # width, height ratios
dw /= 2 # divide padding into 2 sides
dh /= 2
if shape[::-1] != new_unpad: # resize
img = cv2.resize(img, new_unpad, interpolation=cv2.INTER_LINEAR)
top, bottom = int(round(dh - 0.1)), int(round(dh + 0.1))
left, right = int(round(dw - 0.1)), int(round(dw + 0.1))
img = cv2.copyMakeBorder(img, top, bottom, left, right, cv2.BORDER_CONSTANT, value=color) # add border
return img, ratio, (dw, dh)
def preprocess_img(img, target_shape:tuple=None, div_num=255, means:list=[0.485, 0.456, 0.406], stds:list=[0.229, 0.224, 0.225]):
'''
图像预处理:
target_shape: 目标shape
div_num: 归一化除数
means: len(means)==图像通道数,通道均值, None不进行zscore
stds: len(stds)==图像通道数,通道方差, None不进行zscore
'''
img_processed = np.copy(img)
# resize
if target_shape:
# img_processed = cv2.resize(img_processed, target_shape)
img_processed = letterbox(img_processed, target_shape, stride=None, auto=False)[0]
img_processed = img_processed.astype(np.float32)
img_processed = img_processed/div_num
# z-score
if means is not None and stds is not None:
means = np.array(means).reshape(1, 1, -1)
stds = np.array(stds).reshape(1, 1, -1)
img_processed = (img_processed-means)/stds
# unsqueeze
img_processed = img_processed[None, :]
return img_processed.astype(np.float32)
def convert_shape(shapes:tuple or list, int8=False):
'''
转化为aidlite需要的格式
'''
if isinstance(shapes, tuple):
shapes = [shapes]
out = []
for shape in shapes:
nums = 1 if int8 else 4
for n in shape:
nums *= n
out.append(nums)
return out
def scale_coords(img1_shape, coords, img0_shape, ratio_pad=None):
# Rescale coords (xyxy) from img1_shape to img0_shape
if ratio_pad is None: # calculate from img0_shape
gain = min(img1_shape[0] / img0_shape[0], img1_shape[1] / img0_shape[1]) # gain = old / new
pad = (img1_shape[1] - img0_shape[1] * gain) / 2, (img1_shape[0] - img0_shape[0] * gain) / 2 # wh padding
else:
gain = ratio_pad[0][0]
pad = ratio_pad[1]
coords[:, [0, 2]] -= pad[0] # x padding
coords[:, [1, 3]] -= pad[1] # y padding
coords[:, :4] /= gain
clip_coords(coords, img0_shape)
return coords
def clip_coords(boxes, img_shape):
# Clip bounding xyxy bounding boxes to image shape (height, width)
boxes[:, 0].clip(0, img_shape[1], out=boxes[:, 0]) # x1
boxes[:, 1].clip(0, img_shape[0], out=boxes[:, 1]) # y1
boxes[:, 2].clip(0, img_shape[1], out=boxes[:, 2]) # x2
boxes[:, 3].clip(0, img_shape[0], out=boxes[:, 3]) # y2
def detect_postprocess(prediction, img0shape, img1shape, conf_thres=0.25, iou_thres=0.45):
'''
检测输出后处理
prediction: aidlite模型预测输出
img0shape: 原始图片shape
img1shape: 输入图片shape
conf_thres: 置信度阈值
iou_thres: IOU阈值
return: list[np.ndarray(N, 5)], 对应类别的坐标框信息, xywh、conf
'''
h, w, _ = img1shape
cls_num = prediction.shape[-1] - 5
valid_condidates = prediction[prediction[..., 4] > conf_thres]
valid_condidates[:, 0] *= w
valid_condidates[:, 1] *= h
valid_condidates[:, 2] *= w
valid_condidates[:, 3] *= h
valid_condidates[:, :4] = xywh2xyxy(valid_condidates[:, :4])
valid_condidates = valid_condidates[(valid_condidates[:, 0] > 0) & (valid_condidates[:, 1] > 0) & (valid_condidates[:, 2] > 0) & (valid_condidates[:, 3] > 0)]
box_cls = valid_condidates[:, 5:].argmax(1)
cls_box = []
for i in range(cls_num):
temp_boxes = valid_condidates[box_cls == i]
if(len(temp_boxes) == 0):
cls_box.append([])
continue
temp_boxes = NMS(temp_boxes, iou_thres)
temp_boxes[:, :4] = scale_coords([h, w], temp_boxes[:, :4] , img0shape).round()
temp_boxes[:, :4] = xyxy2xywh(temp_boxes[:, :4])
cls_box.append(temp_boxes[:, :5])
return cls_box
cnt_dict = blank_dict.copy()
def draw_detect_res(img, all_boxes):
'''
检测结果绘制
'''
global box_count
img = img.astype(np.uint8)
color_step = int(255/len(all_boxes))
box_count = 0
for bi in range(len(all_boxes)):
if len(all_boxes[bi]) == 0:
continue
for box in all_boxes[bi]:
cnt_dict[coco_class[bi]] += 1
x, y, w, h = [int(t) for t in box[:4]]
cv2.putText(img, f'{coco_class[bi]}', (x, y), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2)
cv2.rectangle(img, (x,y), (x+w, y+h),(0, bi*color_step, 255-bi*color_step),thickness = 2)
box_count += 1
a = box_count
print(f"检测目标数: {a}")
return img
def ReceiveVideo():
# IP地址'0.0.0.0'为等待客户端连接
address = ('0.0.0.0', 9023)
# 建立socket对象,参数意义见https://blog.csdn.net/rebelqsp/article/details/22109925
# socket.AF_INET:服务器之间网络通信
# socket.SOCK_STREAM:流式socket , for TCP
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 将套接字绑定到地址, 在AF_INET下,以元组(host,port)的形式表示地址.
s.bind(address)
# 开始监听TCP传入连接。参数指定在拒绝连接之前,操作系统可以挂起的最大连接数量。该值至少为1,大部分应用程序设为5就可以了。
s.listen(5)
def recvall(sock, count):
buf = b'' # buf是一个byte类型
while count:
# 接受TCP套接字的数据。数据以字符串形式返回,count指定要接收的最大数据量.
newbuf = sock.recv(count)
if not newbuf: return None
buf += newbuf
count -= len(newbuf)
return buf
# 接受TCP连接并返回(conn,address),其中conn是 新的套接字对象,可以用来接收和发送数据。addr是连接客户端的地址。
# 没有连接则等待有连接
conn, addr = s.accept()
print('connect from PC:' + str(addr))
if 1:
start = time.time() # 用于计算帧率信息
length = recvall(conn, 16) # 获得图片文件的长度,16代表获取长度
stringData = recvall(conn, int(length)) # 根据获得的文件长度,获取图片文件
data = numpy.frombuffer(stringData, numpy.uint8) # 将获取到的字符流数据转换成1维数组
decimg = cv2.imdecode(data, cv2.IMREAD_COLOR) # 将数组解码成图像
cv2.imwrite("car.jpg",decimg)
print("save image ")
# cv2.imshow('SERVER', decimg) # 显示图像
# cv2.waitKey(2000)
#
# # 进行下一步处理
# # 。
# # 。
# # 。
#
# AidLite初始化:调用AidLite进行AI模型的加载与推理,需导入aidlite
aidlite = aidlite_gpu.aidlite()
# Aidlite模型路径
model_path = '/home/AIGC_lesson3/aidlux_estimate/models/yolov5n-fp16.tflite'
# 定义输入输出shape
in_shape = [1 * 640 * 640 * 3 * 4]
out_shape = [1 * 25200 * 85 * 4]
# 加载Aidlite检测模型:支持tflite, tnn, mnn, ms, nb格式的模型加载
aidlite.ANNModel(model_path, in_shape, out_shape, 4, 0)
# 读取图片进行推理
# 设置测试集路径
source = "/home/AIGC_lesson3/aidlux_estimate/images/test1"
images_list = os.listdir(source)
print(images_list)
frame_id = 0
# 读取数据集
for image_name in images_list:
frame_id += 1
print("frame_id:", frame_id)
image_path = os.path.join(source, image_name)
frame = cvs.imread(image_path)
# 预处理
img = preprocess_img(frame, target_shape=(640, 640), div_num=255, means=None, stds=None)
# 数据转换:因为setTensor_Fp32()需要的是float32类型的数据,所以送入的input的数据需为float32,大多数的开发者都会忘记将图像的数据类型转换为float32
aidlite.setInput_Float32(img, 640, 640)
# 模型推理API
aidlite.invoke()
# 读取返回的结果
pred = aidlite.getOutput_Float32(0)
# 数据维度转换
pred = pred.reshape(1, 25200, 85)[0]
# 模型推理后处理
pred = detect_postprocess(pred, frame.shape, [640, 640, 3], conf_thres=0.25, iou_thres=0.45)
# 绘制推理结果
res_img = draw_detect_res(frame, pred)
for cls, cnt in cnt_dict.items():
# print(f'{cls}:{cnt}')
if cnt != 0:
fps = f'{cls}:{cnt}'
conn.send(bytes(str(fps), encoding='utf-8'))
print(fps)
cvs.imshow(res_img)
print("AIGC test!!!")
# 测试结果展示停顿
time.sleep(5)
# # 将帧率信息回传,主要目的是测试可以双向通信
end = time.time()
seconds = end - start
count = box_count
##返回已处理图像到客户端
# conn.send(bytes(str(res), encoding='utf-8'))
# conn.send(bytes(str(fps), encoding='utf-8'))
# image = copy.deepcopy(decimg)
# encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), 100]
# result, imgencode = cv2.imencode('.jpg', image, encode_param)
# # 建立矩阵
# data = numpy.array(imgencode)
# # 将numpy矩阵转换成字符形式,以便在网络中传输
# img_Data = data.tostring()
# # 先发送要发送的数据的长度
# # ljust() 方法返回一个原字符串左对齐,并使用空格填充至指定长度的新字符串
# conn.send(str.encode(str(len(img_Data)).ljust(16)))
# # # print(img_Data)
# # # 发送数据
# conn.send(img_Data)
# if cv2.waitKey(10) & 0xff == 27:
# break
s.close()
# cv2.destroyAllWindows()
if __name__ == '__main__':
ReceiveVideo()
该代码用于接收传输的图片,并且对传输的图片做检测,最后将检测结果发送回pc具体实现效果可见于视频。文章来源地址https://www.toymoban.com/news/detail-551399.html
到了这里,关于AIGC与AidLux互联应用—Aidlux端AIGC测评的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!