很幸运能参加Aidlux平台的5月AI实战训练营,先聊聊感受吧 在第一次听课的时候说真的有点懵 感觉AI的专业属于很多 都很陌生,但是Rocky老师给我们这些AI小白举了很多实在的例子,然后带我们实操运用diffusers生成了图片以后感觉就完全不同了,从新看了第二次课程回放 才深刻的感受到老师讲的都是干货满满。第二堂课以后成功跑通了代码特别有成就感虽然只是修改了一下老师的源码哈哈。
Aidlux对于AIGC内容生成的测评非常简单容易上手,可以使用Aidlux平台的 aidlite_gpu 轻松的完成模型推理。对这次测评的体验就是整个流程都很轻松 容易上手 特别适合我这样的AI小白。希望可以和大家共同学习 共同进步,也希望Aidlux越来越好。
Aidlux端的的服务器源码
import socket
import struct
import time
import cv2
from datetime import datetime
import numpy
from cvs import *
import aidlite_gpu
from utils import detect_postprocess, preprocess_img, draw_detect_res
import copy
def ReceiveVideo():
# IP地址'0.0.0.0'为等待客户端连接
# address = (socket.INADDR_ANY, 9099)
address = (socket.inet_ntoa(struct.pack('!L', socket.INADDR_ANY)), 9998)
# 建立socket对象,参数意义见https://blog.csdn.net/rebelqsp/article/details/22109925
# socket.AF_INET:服务器之间网络通信
# socket.SOCK_STREAM:流式socket , for TCP
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# 将套接字绑定到地址, 在AF_INET下,以元组(host,port)的形式表示地址.
s.bind(address)
# 开始监听TCP传入连接。参数指定在拒绝连接之前,操作系统可以挂起的最大连接数量。该值至少为1,大部分应用程序设为5就可以了。
s.listen(5)
def recvall(sock, count):
buf = b'' # buf是一个byte类型
while count:
# 接受TCP套接字的数据。数据以字符串形式返回,count指定要接收的最大数据量.
newbuf = sock.recv(count)
if not newbuf: return None
buf += newbuf
count -= len(newbuf)
return buf
# 接受TCP连接并返回(conn,address),其中conn是 新的套接字对象,可以用来接收和发送数据。addr是连接客户端的地址。
# 没有连接则等待有连接
while 1:
conn, addr = s.accept()
print('connect from PC:' + str(addr))
if 1:
start = time.time() # 用于计算帧率信息
length = recvall(conn, 16) # 获得图片文件的长度,16代表获取长度
print("length"+str(length)+"\n")
received_data = recvall(conn, int(length)) # 根据获得的文件长度,获取图片文件
# 解析JSON数据包
json_data = json.loads(received_data.decode('utf-8'))
# 从Python对象中提取图像数据
img_dict = json_data['images']
# 存储图像的路径
image_dir = r'/第三节课代码/aidlux_estimate/images/AIGC'
# 对每个图像数据进行逐一处理,将它们保存到本地文件系统中
for img_name, img_base64 in img_dict.items():
# 将base64格式的图像数据解码为二进制数据
img_data = base64.b64decode(img_base64)
# 拼接文件路径
img_path = os.path.join(image_dir, img_name)
# 将二进制数据写入本地文件系统中保存为图像
with open(img_path, 'wb') as f:
f.write(img_data)
#data = numpy.frombuffer(stringData, numpy.uint8) # 将获取到的字符流数据转换成1维数组
#decimg = cv2.imdecode(data, cv2.IMREAD_COLOR) # 将数组解码成图像
#path='/第三节课代码/aidlux_estimate/images/AIGC/'
#filename=datetime.now().strftime("%Y-%m-%d_%H-%M-%S.png")
#save_path=path+filename
#cv2.imwrite(save_path,decimg)
print("save image ")
# cv2.imshow('SERVER', decimg) # 显示图像
# cv2.waitKey(2000)
#
# # 进行下一步处理
# # 。
# # 。
# # 。
#
# AidLite初始化:调用AidLite进行AI模型的加载与推理,需导入aidlite
aidlite = aidlite_gpu.aidlite()
# Aidlite模型路径
model_path = '/第三节课代码/aidlux_estimate/models/yolov5n-fp16.tflite'
# 定义输入输出shape
in_shape = [1 * 640 * 640 * 3 * 4]
out_shape = [1 * 25200 * 85 * 4]
# 加载Aidlite检测模型:支持tflite, tnn, mnn, ms, nb格式的模型加载
aidlite.ANNModel(model_path, in_shape, out_shape, 4, 0)
# 读取图片进行推理
# 设置测试集路径
source = "/第三节课代码/aidlux_estimate/images/AIGC"
images_list = os.listdir(source)
print(images_list)
frame_id = 0
# 读取数据集
for image_name in images_list:
frame_id += 1
print("frame_id:", frame_id)
image_path = os.path.join(source, image_name)
frame = cvs.imread(image_path)
# 预处理
img = preprocess_img(frame, target_shape=(640, 640), div_num=255, means=None, stds=None)
# 数据转换:因为setTensor_Fp32()需要的是float32类型的数据,所以送入的input的数据需为float32,大多数的开发者都会忘记将图像的数据类型转换为float32
aidlite.setInput_Float32(img, 640, 640)
# 模型推理API
aidlite.invoke()
# 读取返回的结果
pred = aidlite.getOutput_Float32(0)
# 数据维度转换
pred = pred.reshape(1, 25200, 85)[0]
# 模型推理后处理
pred, num_boxes= detect_postprocess(pred, frame.shape, [640, 640, 3], conf_thres=0.25, iou_thres=0.45)
# 绘制推理结果
res_img = draw_detect_res(frame,pred)
cvs.imshow(res_img)
print("AIGC test!!!"+"测试完毕"+image_name+'中有'+str(num_boxes)+'个对象')
end = time.time()
seconds = end - start
fps = 1 / seconds
conn.send(bytes(str(int(fps)), encoding='utf-8'))
conn.send(str.encode("测试完毕"+image_name+'中有'+str(num_boxes)+'个对象',"utf-8"))
# 测试结果展示停顿
time.sleep(5)
# # 将帧率信息回传,主要目的是测试可以双向通信
# print("fps"+str(fps))
conn.send(str.encode("全部测试完毕","utf-8"))
##返回已处理图像到客户端
# image = copy.deepcopy(decimg)
# encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), 100]
# result, imgencode = cv2.imencode('.jpg', image, encode_param)
# # 建立矩阵
# data = numpy.array(imgencode)
# # 将numpy矩阵转换成字符形式,以便在网络中传输
# img_Data = data.tostring()
# # 先发送要发送的数据的长度
# # ljust() 方法返回一个原字符串左对齐,并使用空格填充至指定长度的新字符串
# conn.send(str.encode(str(len(img_Data)).ljust(16)))
# # # print(img_Data)
# # # 发送数据
# conn.send(img_Data)
# if cv2.waitKey(10) & 0xff == 27:
# break
s.close()
# cv2.destroyAllWindows()
if __name__ == '__main__':
ReceiveVideo()
time.sleep(5)
Aidlux端依赖
import cv2
import numpy as np
coco_class = ['person', 'bicycle', 'car', 'motorcycle', 'airplane', 'bus', 'train', 'truck', 'boat', 'traffic light',
'fire hydrant', 'stop sign', 'parking meter', 'bench', 'bird', 'cat', 'dog', 'horse', 'sheep', 'cow',
'elephant', 'bear', 'zebra', 'giraffe', 'backpack', 'umbrella', 'handbag', 'tie', 'suitcase', 'frisbee',
'skis', 'snowboard', 'sports ball', 'kite', 'baseball bat', 'baseball glove', 'skateboard', 'surfboard',
'tennis racket', 'bottle', 'wine glass', 'cup', 'fork', 'knife', 'spoon', 'bowl', 'banana', 'apple',
'sandwich', 'orange', 'broccoli', 'carrot', 'hot dog', 'pizza', 'donut', 'cake', 'chair', 'couch',
'potted plant', 'bed', 'dining table', 'toilet', 'tv', 'laptop', 'mouse', 'remote', 'keyboard', 'cell phone',
'microwave', 'oven', 'toaster', 'sink', 'refrigerator', 'book', 'clock', 'vase', 'scissors', 'teddy bear',
'hair drier', 'toothbrush']
def xywh2xyxy(x):
'''
Box (center x, center y, width, height) to (x1, y1, x2, y2)
'''
y = np.copy(x)
y[:, 0] = x[:, 0] - x[:, 2] / 2 # top left x
y[:, 1] = x[:, 1] - x[:, 3] / 2 # top left y
y[:, 2] = x[:, 0] + x[:, 2] / 2 # bottom right x
y[:, 3] = x[:, 1] + x[:, 3] / 2 # bottom right y
return y
def xyxy2xywh(box):
'''
Box (left_top x, left_top y, right_bottom x, right_bottom y) to (left_top x, left_top y, width, height)
'''
box[:, 2:] = box[:, 2:] - box[:, :2]
return box
def NMS(dets, thresh):
'''
单类NMS算法
dets.shape = (N, 5), (left_top x, left_top y, right_bottom x, right_bottom y, Scores)
'''
x1 = dets[:,0]
y1 = dets[:,1]
x2 = dets[:,2]
y2 = dets[:,3]
areas = (y2-y1+1) * (x2-x1+1)
scores = dets[:,4]
keep = []
index = scores.argsort()[::-1]
while index.size >0:
i = index[0] # every time the first is the biggst, and add it directly
keep.append(i)
x11 = np.maximum(x1[i], x1[index[1:]]) # calculate the points of overlap
y11 = np.maximum(y1[i], y1[index[1:]])
x22 = np.minimum(x2[i], x2[index[1:]])
y22 = np.minimum(y2[i], y2[index[1:]])
w = np.maximum(0, x22-x11+1) # the weights of overlap
h = np.maximum(0, y22-y11+1) # the height of overlap
overlaps = w*h
ious = overlaps / (areas[i]+areas[index[1:]] - overlaps)
idx = np.where(ious<=thresh)[0]
index = index[idx+1] # because index start from 1
return dets[keep]
def letterbox(img, new_shape=(640, 640), color=(114, 114, 114), auto=True, scaleFill=False, scaleup=True, stride=32):
# Resize and pad image while meeting stride-multiple constraints
shape = img.shape[:2] # current shape [height, width]
if isinstance(new_shape, int):
new_shape = (new_shape, new_shape)
# Scale ratio (new / old)
r = min(new_shape[0] / shape[0], new_shape[1] / shape[1])
if not scaleup: # only scale down, do not scale up (for better test mAP)
r = min(r, 1.0)
# Compute padding
ratio = r, r # width, height ratios
new_unpad = int(round(shape[1] * r)), int(round(shape[0] * r))
dw, dh = new_shape[1] - new_unpad[0], new_shape[0] - new_unpad[1] # wh padding
if auto: # minimum rectangle
dw, dh = np.mod(dw, stride), np.mod(dh, stride) # wh padding
elif scaleFill: # stretch
dw, dh = 0.0, 0.0
new_unpad = (new_shape[1], new_shape[0])
ratio = new_shape[1] / shape[1], new_shape[0] / shape[0] # width, height ratios
dw /= 2 # divide padding into 2 sides
dh /= 2
if shape[::-1] != new_unpad: # resize
img = cv2.resize(img, new_unpad, interpolation=cv2.INTER_LINEAR)
top, bottom = int(round(dh - 0.1)), int(round(dh + 0.1))
left, right = int(round(dw - 0.1)), int(round(dw + 0.1))
img = cv2.copyMakeBorder(img, top, bottom, left, right, cv2.BORDER_CONSTANT, value=color) # add border
return img, ratio, (dw, dh)
def preprocess_img(img, target_shape:tuple=None, div_num=255, means:list=[0.485, 0.456, 0.406], stds:list=[0.229, 0.224, 0.225]):
'''
图像预处理:
target_shape: 目标shape
div_num: 归一化除数
means: len(means)==图像通道数,通道均值, None不进行zscore
stds: len(stds)==图像通道数,通道方差, None不进行zscore
'''
img_processed = np.copy(img)
# resize
if target_shape:
# img_processed = cv2.resize(img_processed, target_shape)
img_processed = letterbox(img_processed, target_shape, stride=None, auto=False)[0]
img_processed = img_processed.astype(np.float32)
img_processed = img_processed/div_num
# z-score
if means is not None and stds is not None:
means = np.array(means).reshape(1, 1, -1)
stds = np.array(stds).reshape(1, 1, -1)
img_processed = (img_processed-means)/stds
# unsqueeze
img_processed = img_processed[None, :]
return img_processed.astype(np.float32)
def convert_shape(shapes:tuple or list, int8=False):
'''
转化为aidlite需要的格式
'''
if isinstance(shapes, tuple):
shapes = [shapes]
out = []
for shape in shapes:
nums = 1 if int8 else 4
for n in shape:
nums *= n
out.append(nums)
return out
def scale_coords(img1_shape, coords, img0_shape, ratio_pad=None):
# Rescale coords (xyxy) from img1_shape to img0_shape
if ratio_pad is None: # calculate from img0_shape
gain = min(img1_shape[0] / img0_shape[0], img1_shape[1] / img0_shape[1]) # gain = old / new
pad = (img1_shape[1] - img0_shape[1] * gain) / 2, (img1_shape[0] - img0_shape[0] * gain) / 2 # wh padding
else:
gain = ratio_pad[0][0]
pad = ratio_pad[1]
coords[:, [0, 2]] -= pad[0] # x padding
coords[:, [1, 3]] -= pad[1] # y padding
coords[:, :4] /= gain
clip_coords(coords, img0_shape)
return coords
def clip_coords(boxes, img_shape):
# Clip bounding xyxy bounding boxes to image shape (height, width)
boxes[:, 0].clip(0, img_shape[1], out=boxes[:, 0]) # x1
boxes[:, 1].clip(0, img_shape[0], out=boxes[:, 1]) # y1
boxes[:, 2].clip(0, img_shape[1], out=boxes[:, 2]) # x2
boxes[:, 3].clip(0, img_shape[0], out=boxes[:, 3]) # y2
def detect_postprocess(prediction, img0shape, img1shape, conf_thres=0.25, iou_thres=0.45):
'''
检测输出后处理
prediction: aidlite模型预测输出
img0shape: 原始图片shape
img1shape: 输入图片shape
conf_thres: 置信度阈值
iou_thres: IOU阈值
return: list[np.ndarray(N, 5)], 对应类别的坐标框信息, xywh、conf
'''
h, w, _ = img1shape
cls_num = prediction.shape[-1] - 5
valid_condidates = prediction[prediction[..., 4] > conf_thres]
valid_condidates[:, 0] *= w
valid_condidates[:, 1] *= h
valid_condidates[:, 2] *= w
valid_condidates[:, 3] *= h
valid_condidates[:, :4] = xywh2xyxy(valid_condidates[:, :4])
valid_condidates = valid_condidates[(valid_condidates[:, 0] > 0) & (valid_condidates[:, 1] > 0) & (valid_condidates[:, 2] > 0) & (valid_condidates[:, 3] > 0)]
box_cls = valid_condidates[:, 5:].argmax(1)
cls_box = []
num_detections = 0 # 初始化边框数量为0
for i in range(cls_num):
temp_boxes = valid_condidates[box_cls == i]
if(len(temp_boxes) == 0):
cls_box.append([])
continue
temp_boxes = NMS(temp_boxes, iou_thres)
temp_boxes[:, :4] = scale_coords([h, w], temp_boxes[:, :4] , img0shape).round()
temp_boxes[:, :4] = xyxy2xywh(temp_boxes[:, :4])
cls_box.append(temp_boxes[:, :5])
num_detections += len(temp_boxes) # 每处理完一个类别后累加边框数量
return cls_box, num_detections
def draw_detect_res(img, all_boxes):
'''
检测结果绘制
'''
img = img.astype(np.uint8)
color_step = int(255/len(all_boxes))
for bi in range(len(all_boxes)):
if len(all_boxes[bi]) == 0:
continue
for box in all_boxes[bi]:
x, y, w, h = [int(t) for t in box[:4]]
cv2.putText(img, f'{coco_class[bi]}', (x, y), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2)
cv2.rectangle(img, (x,y), (x+w, y+h),(0, bi*color_step, 255-bi*color_step),thickness = 2)
return img
PC端的客户端源码
import socket
import cv2
import numpy as np
import time
import os
import json
import base64
import sys
### 本代码主要是客户端代码,aidlux上的Socket_fuwuduan.py是匹配的服务端代码,当服务端代码启动时,由本代码读取一张图片,推送过去
def recvall(sock, count):
buf = b'' # buf是一个byte类型
while count:
newbuf = sock.recv(count)
if not newbuf: return None
buf += newbuf
count -= len(newbuf)
return buf
def SendAIGC():
# 建立sock连接
# address要连接的aidlux服务器IP地址和端口号
address = ('192.168.0.102', 9998)
try:
# 建立socket对象
# socket.AF_INET:服务器之间网络通信
# socket.SOCK_STREAM:流式socket , for TCP
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 开启连接
sock.connect(address)
except socket.error as msg:
print(msg)
sys.exit(1)
###########传送AIGC图片#################
## 如果本地没有GPU
if 1:
#frame = cv2.imread("ca.png")
#cv2.imshow("show",frame)
# # 压缩参数,后面cv2.imencode将会用到,对于jpeg来说,15代表图像质量,越高代表图像质量越好为 0-100,默认95
#encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), 95]
# cv2.imencode将图片格式转换(编码)成流数据,赋值到内存缓存中;主要用于图像数据格式的压缩,方便网络传输
# '.jpg'表示将图片按照jpg格式编码。
#result, imgencode = cv2.imencode('.jpg', frame, encode_param)
# 建立矩阵
#data = np.array(imgencode)
# 将numpy矩阵转换成字符形式,以便在网络中传输
#stringData = data.tostring()
# 先发送要发送的数据的长度
# ljust() 方法返回一个原字符串左对齐,并使用空格填充至指定长度的新字符串
#初始化空图片字典
img_dict = {}
#获取所有图片的路径
#image_dir='D:\Aidlux_File\STABLE DIFFUSION\第三节课代码\aidlux_estimate\images\AIGC'
image_dir=r'D:\Aidlux_File\STABLE DIFFUSION\第三节课代码\aidlux_estimate\images\AIGC'
image_dir_list=os.listdir(image_dir)
#遍历图片获取图片数据
for img_name in image_dir_list:
#将文件名拼接为完整的路径
img_path=os.path.join(image_dir,img_name)
#在with中以 二进制只读方式 打开图片文件(防止忘记关闭文件)
with open(img_path,'rb') as f:
#将读取到的信息存入img_data中
img_data = f.read()
#将信息转码为base64格式的文本并使用decode()方法转换为二进制文件方便传送
img_base64=base64.b64encode(img_data).decode()
#将转换后的文件存入图片字典中
img_dict[img_name]=img_base64
#将图片字典存入json对象中
json_data={'images': img_dict}
#将json对象序列化为二进制数据
json_data_body=json.dumps(json_data).encode()
#将head和json_data_body拼接后转化为json对象后在转换为utf-8格式的编码发送给给服务器
sock.send(str.encode(str(len(json_data_body)).ljust(16),'utf-8'))
sock.send(json.dumps(json_data).encode('utf-8'))
# 发送数据
#sock.send(stringData)
### 如果本地有GPU
if 0:
### 本地生成AIGC图片 ###
## 添加AIGC代码 ##
#####################
frame = cv2.imread("car.png")
# # 压缩参数,后面cv2.imencode将会用到,对于jpeg来说,15代表图像质量,越高代表图像质量越好为 0-100,默认95
encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), 95]
# cv2.imencode将图片格式转换(编码)成流数据,赋值到内存缓存中;主要用于图像数据格式的压缩,方便网络传输
# '.jpg'表示将图片按照jpg格式编码。
result, imgencode = cv2.imencode('.jpg', frame, encode_param)
# 建立矩阵
data = np.array(imgencode)
# 将numpy矩阵转换成字符形式,以便在网络中传输
stringData = data.tostring()
# 先发送要发送的数据的长度
# ljust() 方法返回一个原字符串左对齐,并使用空格填充至指定长度的新字符串
sock.send(str.encode(str(len(stringData)).ljust(16)))
# 发送数据
sock.send(stringData)
while 1:
# 读取服务器返回值
receive = sock.recv(1024)
if len(receive):
print("图片发送成功")
print(str(receive, encoding='utf-8')) ### 之前接受的帧率数据,现在换成image流数据
if str(receive, encoding='utf-8') is "全部测试完毕":
break;
sock.close()
print("数据发送完毕")
exit()文章来源地址https://www.toymoban.com/news/detail-602435.html
if __name__ == '__main__':
SendAIGC()文章来源:https://www.toymoban.com/news/detail-602435.html
exit()
到了这里,关于《首次体验AIGC与AidLux互联应用AidLux端AIGC测评》的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!