463 lines
17 KiB
Python
463 lines
17 KiB
Python
import cv2
|
||
import numpy as np
|
||
import torch
|
||
import torchvision
|
||
import torch.nn.functional as F
|
||
from utils.client import WebDevice, ModelConfig
|
||
from utils.quantize import Dtype
|
||
from utils.common import load_image_cv, load_image_pil, init_service
|
||
|
||
class Colors:
|
||
"""用于可视化和绘图的调色板
|
||
"""
|
||
def __init__(self):
|
||
"""Initialize colors as hex = matplotlib.colors.TABLEAU_COLORS.values()."""
|
||
hexs = (
|
||
"042AFF",
|
||
"0BDBEB",
|
||
"F3F3F3",
|
||
"00DFB7",
|
||
"111F68",
|
||
"FF6FDD",
|
||
"FF444F",
|
||
"CCED00",
|
||
"00F344",
|
||
"BD00FF",
|
||
"00B4FF",
|
||
"DD00BA",
|
||
"00FFFF",
|
||
"26C000",
|
||
"01FFB3",
|
||
"7D24FF",
|
||
"7B0068",
|
||
"FF1B6C",
|
||
"FC6D2F",
|
||
"A2FF0B",
|
||
)
|
||
self.palette = [self.hex2rgb(f"#{c}") for c in hexs]
|
||
self.n = len(self.palette)
|
||
self.pose_palette = np.array(
|
||
[
|
||
[255, 128, 0],
|
||
[255, 153, 51],
|
||
[255, 178, 102],
|
||
[230, 230, 0],
|
||
[255, 153, 255],
|
||
[153, 204, 255],
|
||
[255, 102, 255],
|
||
[255, 51, 255],
|
||
[102, 178, 255],
|
||
[51, 153, 255],
|
||
[255, 153, 153],
|
||
[255, 102, 102],
|
||
[255, 51, 51],
|
||
[153, 255, 153],
|
||
[102, 255, 102],
|
||
[51, 255, 51],
|
||
[0, 255, 0],
|
||
[0, 0, 255],
|
||
[255, 0, 0],
|
||
[255, 255, 255],
|
||
],
|
||
dtype=np.uint8,
|
||
)
|
||
|
||
def __call__(self, i: int, bgr: bool = False) -> tuple:
|
||
c = self.palette[int(i) % self.n]
|
||
return (c[2], c[1], c[0]) if bgr else c
|
||
|
||
@staticmethod
|
||
def hex2rgb(h: str) -> tuple:
|
||
"""Convert hex color codes to RGB values (i.e. default PIL order)."""
|
||
return tuple(int(h[1 + i : 1 + i + 2], 16) for i in (0, 2, 4))
|
||
|
||
|
||
names = {0: 'person', 1: 'bicycle', 2: 'car', 3: 'motorcycle', 4: 'airplane', 5: 'bus', 6: 'train', 7: 'truck',
|
||
8: 'boat', 9: 'traffic light', 10: 'fire hydrant', 11: 'stop sign', 12: 'parking meter', 13: 'bench',
|
||
14: 'bird', 15: 'cat', 16: 'dog', 17: 'horse', 18: 'sheep', 19: 'cow', 20: 'elephant', 21: 'bear', 22: 'zebra',
|
||
23: 'giraffe', 24: 'backpack', 25: 'umbrella', 26: 'handbag', 27: 'tie', 28: 'suitcase', 29: 'frisbee',
|
||
30: 'skis', 31: 'snowboard', 32: 'sports ball', 33: 'kite', 34: 'baseball bat', 35: 'baseball glove',
|
||
36: 'skateboard', 37: 'surfboard', 38: 'tennis racket', 39: 'bottle', 40: 'wine glass', 41: 'cup', 42: 'fork',
|
||
43: 'knife', 44: 'spoon', 45: 'bowl', 46: 'banana', 47: 'apple', 48: 'sandwich', 49: 'orange', 50: 'broccoli',
|
||
51: 'carrot', 52: 'hot dog', 53: 'pizza', 54: 'donut', 55: 'cake', 56: 'chair', 57: 'couch',
|
||
58: 'potted plant', 59: 'bed', 60: 'dining table', 61: 'toilet', 62: 'tv', 63: 'laptop', 64: 'mouse',
|
||
65: 'remote', 66: 'keyboard', 67: 'cell phone', 68: 'microwave', 69: 'oven', 70: 'toaster', 71: 'sink',
|
||
72: 'refrigerator', 73: 'book', 74: 'clock', 75: 'vase', 76: 'scissors', 77: 'teddy bear', 78: 'hair drier',
|
||
79: 'toothbrush'}
|
||
|
||
|
||
|
||
def infer(img_data, web_service):
|
||
"""下位机推理
|
||
|
||
Args:
|
||
img_data (np.array): 推理数据
|
||
web_service: 实例化service
|
||
|
||
Returns:
|
||
推理结果信息
|
||
"""
|
||
rsp = web_service.infer(img_data)
|
||
infer_result0 = np.frombuffer(rsp['output_0.dat'], dtype=np.float32).flatten()
|
||
infer_result1 = np.frombuffer(rsp['output_1.dat'], dtype=np.float32).flatten()
|
||
np_reshaped = infer_result0.reshape(1, 25200, 117)
|
||
pred = torch.from_numpy(np_reshaped)
|
||
np_reshaped1 = infer_result1.reshape(1, 32, 160, 160)
|
||
proto = torch.from_numpy(np_reshaped1)
|
||
return pred, proto
|
||
|
||
|
||
def xywh2xyxy(x):
|
||
"""坐标转化 [x, y, w, h] to [x1, y1, x2, y2]
|
||
|
||
Args:
|
||
x : [x, y, w, h],中心点坐标和长、宽
|
||
|
||
Returns:
|
||
矩形坐标
|
||
"""
|
||
|
||
y = x.clone() if isinstance(x, torch.Tensor) else np.copy(x)
|
||
y[..., 0] = x[..., 0] - x[..., 2] / 2 # top left x
|
||
y[..., 1] = x[..., 1] - x[..., 3] / 2 # top left y
|
||
y[..., 2] = x[..., 0] + x[..., 2] / 2 # bottom right x
|
||
y[..., 3] = x[..., 1] + x[..., 3] / 2 # bottom right y
|
||
return y
|
||
|
||
|
||
def box_iou(box1, box2, eps=1e-7):
|
||
"""计算两个集合中所有框(bounding boxes)两两之间的交并比
|
||
|
||
Args:
|
||
box1 : box1
|
||
box2 : box2
|
||
eps : 超参数
|
||
|
||
Returns:
|
||
交并比
|
||
"""
|
||
(a1, a2), (b1, b2) = box1.unsqueeze(1).chunk(2, 2), box2.unsqueeze(0).chunk(2, 2)
|
||
inter = (torch.min(a2, b2) - torch.max(a1, b1)).clamp(0).prod(2)
|
||
return inter / ((a2 - a1).prod(2) + (b2 - b1).prod(2) - inter + eps)
|
||
|
||
|
||
def non_max_suppression(
|
||
prediction,conf_thres=0.25,iou_thres=0.45,classes=None,agnostic=False,multi_label=False,labels=(),max_det=300,nm=32,
|
||
):
|
||
"""对推理结果进行非最大抑制(NMS)
|
||
|
||
Args:
|
||
prediction:模型的原始输出预测
|
||
conf_thres: 置信度阈值. Defaults to 0.25.
|
||
iou_thres : IOU阈值. Defaults to 0.45.
|
||
classes : 类别过滤. Defaults to None.
|
||
agnostic: 类别无关的NMS. Defaults to False.
|
||
multi_label : 是否允许每个检测框对应多个类别(多标签). Defaults to False.
|
||
labels : 附加的标签. Defaults to ().
|
||
max_det : 最大检测数. Defaults to 300.
|
||
nm : 非极大值抑制的最大候选数. Defaults to 32.
|
||
|
||
Returns:
|
||
框列表
|
||
"""
|
||
assert 0 <= conf_thres <= 1, f"Invalid Confidence threshold {conf_thres}, valid values are between 0.0 and 1.0"
|
||
assert 0 <= iou_thres <= 1, f"Invalid IoU {iou_thres}, valid values are between 0.0 and 1.0"
|
||
if isinstance(prediction, (list, tuple)): # YOLOv5 model in validation model, output = (inference_out, loss_out)
|
||
prediction = prediction[0] # select only inference output
|
||
|
||
device = prediction.device
|
||
mps = "mps" in device.type # Apple MPS
|
||
if mps: # MPS not fully supported yet, convert tensors to CPU before NMS
|
||
prediction = prediction.cpu()
|
||
bs = prediction.shape[0] # batch size
|
||
nc = prediction.shape[2] - nm - 5 # number of classes
|
||
xc = prediction[..., 4] > conf_thres # candidates
|
||
|
||
max_wh = 7680 # (pixels) maximum box width and height
|
||
max_nms = 30000 # maximum number of boxes into torchvision.ops.nms()
|
||
redundant = True # require redundant detections
|
||
multi_label &= nc > 1 # multiple labels per box (adds 0.5ms/img)
|
||
merge = False # use merge-NMS
|
||
|
||
mi = 5 + nc # mask start index
|
||
output = [torch.zeros((0, 6 + nm), device=prediction.device)] * bs
|
||
for xi, x in enumerate(prediction): # image index, image inference
|
||
|
||
x = x[xc[xi]] # confidence
|
||
if labels and len(labels[xi]):
|
||
lb = labels[xi]
|
||
v = torch.zeros((len(lb), nc + nm + 5), device=x.device)
|
||
v[:, :4] = lb[:, 1:5] # box
|
||
v[:, 4] = 1.0 # conf
|
||
v[range(len(lb)), lb[:, 0].long() + 5] = 1.0 # cls
|
||
x = torch.cat((x, v), 0)
|
||
|
||
# If none remain process next image
|
||
if not x.shape[0]:
|
||
continue
|
||
|
||
# Compute conf
|
||
x[:, 5:] *= x[:, 4:5] # conf = obj_conf * cls_conf
|
||
|
||
# Box/Mask
|
||
box = xywh2xyxy(x[:, :4]) # center_x, center_y, width, height) to (x1, y1, x2, y2)
|
||
mask = x[:, mi:] # zero columns if no masks
|
||
|
||
# Detections matrix nx6 (xyxy, conf, cls)
|
||
if multi_label:
|
||
i, j = (x[:, 5:mi] > conf_thres).nonzero(as_tuple=False).T
|
||
x = torch.cat((box[i], x[i, 5 + j, None], j[:, None].float(), mask[i]), 1)
|
||
else: # best class only
|
||
conf, j = x[:, 5:mi].max(1, keepdim=True)
|
||
x = torch.cat((box, conf, j.float(), mask), 1)[conf.view(-1) > conf_thres]
|
||
|
||
# Filter by class
|
||
if classes is not None:
|
||
x = x[(x[:, 5:6] == torch.tensor(classes, device=x.device)).any(1)]
|
||
n = x.shape[0] # number of boxes
|
||
if not n: # no boxes
|
||
continue
|
||
x = x[x[:, 4].argsort(descending=True)[:max_nms]] # sort by confidence and remove excess boxes
|
||
|
||
# Batched NMS
|
||
c = x[:, 5:6] * (0 if agnostic else max_wh) # classes
|
||
boxes, scores = x[:, :4] + c, x[:, 4] # boxes (offset by class), scores
|
||
i = torchvision.ops.nms(boxes, scores, iou_thres) # NMS
|
||
i = i[:max_det] # limit detections
|
||
if merge and (1 < n < 3e3): # Merge NMS (boxes merged using weighted mean)
|
||
# update boxes as boxes(i,4) = weights(i,n) * boxes(n,4)
|
||
iou = box_iou(boxes[i], boxes) > iou_thres # iou matrix
|
||
weights = iou * scores[None] # box weights
|
||
x[i, :4] = torch.mm(weights, x[:, :4]).float() / weights.sum(1, keepdim=True) # merged boxes
|
||
if redundant:
|
||
i = i[iou.sum(1) > 1] # require redundancy
|
||
|
||
output[xi] = x[i]
|
||
if mps:
|
||
output[xi] = output[xi].to(device)
|
||
|
||
return output
|
||
|
||
|
||
def crop_mask(masks, boxes):
|
||
"""对每个掩码,根据对应的边界框,将掩码裁剪到边界框的范围内
|
||
|
||
Args:
|
||
masks : 掩码
|
||
boxes :边界框
|
||
|
||
Returns:
|
||
裁剪后的掩码
|
||
"""
|
||
n, h, w = masks.shape
|
||
x1, y1, x2, y2 = torch.chunk(boxes[:, :, None], 4, 1) # x1 shape(1,1,n)
|
||
r = torch.arange(w, device=masks.device, dtype=x1.dtype)[None, None, :] # rows shape(1,w,1)
|
||
c = torch.arange(h, device=masks.device, dtype=x1.dtype)[None, :, None] # cols shape(h,1,1)
|
||
|
||
return masks * ((r >= x1) * (r < x2) * (c >= y1) * (c < y2))
|
||
|
||
|
||
|
||
def process_mask(protos, masks_in, bboxes, shape, upsample=False):
|
||
"""根据输入的 protos 和掩码输入 masks_in 生成对应的掩码
|
||
|
||
Args:
|
||
protos : 特征图
|
||
masks_in : 掩码系数
|
||
bboxes : 目标的边界框
|
||
shape : 输入图像的原始尺寸
|
||
upsample :是否将掩码放大到原始尺寸. Defaults to False.
|
||
|
||
Returns:
|
||
二值化处理后的掩码
|
||
"""
|
||
c, mh, mw = protos.shape # CHW
|
||
ih, iw = shape
|
||
masks = (masks_in @ protos.float().view(c, -1)).sigmoid().view(-1, mh, mw) # CHW
|
||
|
||
downsampled_bboxes = bboxes.clone()
|
||
downsampled_bboxes[:, 0] *= mw / iw
|
||
downsampled_bboxes[:, 2] *= mw / iw
|
||
downsampled_bboxes[:, 3] *= mh / ih
|
||
downsampled_bboxes[:, 1] *= mh / ih
|
||
|
||
masks = crop_mask(masks, downsampled_bboxes) # CHW
|
||
if upsample:
|
||
masks = F.interpolate(masks[None], shape, mode="bilinear", align_corners=False)[0] # CHW
|
||
return masks.gt_(0.5)
|
||
|
||
|
||
def scale_boxes(img1_shape, boxes, img0_shape, ratio_pad=None):
|
||
"""根据输入的 protos 和掩码输入 masks_in 生成对应的掩码
|
||
|
||
Args:
|
||
protos : 特征图
|
||
masks_in : 掩码系数
|
||
bboxes : 目标的边界框
|
||
shape : 输入图像的原始尺寸
|
||
upsample :是否将掩码放大到原始尺寸. Defaults to False.
|
||
|
||
Returns:
|
||
二值化处理后的掩码
|
||
"""
|
||
if ratio_pad is None: # calculate from img0_shape
|
||
gain = min(img1_shape[0] / img0_shape[0], img1_shape[1] / img0_shape[1]) # gain = old / new
|
||
pad = (img1_shape[1] - img0_shape[1] * gain) / 2, (img1_shape[0] - img0_shape[0] * gain) / 2 # wh padding
|
||
else:
|
||
gain = ratio_pad[0][0]
|
||
pad = ratio_pad[1]
|
||
|
||
boxes[..., [0, 2]] -= pad[0] # x padding
|
||
boxes[..., [1, 3]] -= pad[1] # y padding
|
||
boxes[..., :4] /= gain
|
||
return boxes
|
||
|
||
def scale_image(masks, im0_shape, ratio_pad=None):
|
||
"""对图像(或者掩码)进行缩放和裁剪,使其匹配原始图像的大小
|
||
|
||
Args:
|
||
masks :掩码
|
||
im0_shape:原始目标图像的shape
|
||
ratio_pad : 缩放比例和填充信息
|
||
|
||
Returns:
|
||
调整后与 im0_shape 大小一致的掩码或图像。
|
||
"""
|
||
im1_shape = masks.shape
|
||
if im1_shape[:2] == im0_shape[:2]:
|
||
return masks
|
||
if ratio_pad is None: # calculate from im0_shape
|
||
gain = min(im1_shape[0] / im0_shape[0], im1_shape[1] / im0_shape[1]) # gain = old / new
|
||
pad = (im1_shape[1] - im0_shape[1] * gain) / 2, (im1_shape[0] - im0_shape[0] * gain) / 2 # wh padding
|
||
else:
|
||
pad = ratio_pad[1]
|
||
|
||
top, left = (int(round(pad[1] - 0.1)), int(round(pad[0] - 0.1)))
|
||
bottom, right = (
|
||
im1_shape[0] - int(round(pad[1] + 0.1)),
|
||
im1_shape[1] - int(round(pad[0] + 0.1)),
|
||
)
|
||
|
||
if len(masks.shape) < 2:
|
||
raise ValueError(f'"len of masks shape" should be 2 or 3, but got {len(masks.shape)}')
|
||
masks = masks[top:bottom, left:right]
|
||
masks = cv2.resize(masks, (im0_shape[1], im0_shape[0]))
|
||
if len(masks.shape) == 2:
|
||
masks = masks[:, :, None]
|
||
|
||
return masks
|
||
|
||
|
||
def masks_cal(im, masks, colors, im_gpu, alpha: float = 0.5, retina_masks: bool = False):
|
||
"""输入图像上绘制(叠加)预测的掩码(mask)
|
||
|
||
Args:
|
||
im : 输入图片
|
||
masks : 掩码
|
||
colors : 掩码的颜色
|
||
im_gpu : 目标图片
|
||
alpha : 透明度系数
|
||
retina_masks : 是否使用高分辨率掩码
|
||
|
||
Returns:
|
||
实例分割的结果
|
||
"""
|
||
if len(masks) == 0:
|
||
im[:] = im_gpu.permute(1, 2, 0).contiguous().cpu().numpy() * 255
|
||
im_gpu = torch.from_numpy(im_gpu).to('cpu')
|
||
colors = torch.tensor(colors, device=masks.device, dtype=torch.float32) / 255.0 # shape(n,3)
|
||
colors = colors[:, None, None] # shape(n,1,1,3)
|
||
masks = masks.unsqueeze(3) # shape(n,h,w,1)
|
||
masks_color = masks * (colors * alpha) # shape(n,h,w,3)
|
||
|
||
inv_alpha_masks = (1 - masks * alpha).cumprod(0) # shape(n,h,w,1)
|
||
mcs = masks_color.max(dim=0).values # shape(n,h,w,3)
|
||
|
||
im_gpu = im_gpu.flip(dims=[0]) # flip channel
|
||
im_gpu = im_gpu.permute(1, 2, 0).contiguous() # shape(h,w,3)
|
||
im_gpu = im_gpu * inv_alpha_masks[-1] + mcs
|
||
im_mask = im_gpu * 255
|
||
im_mask_np = im_mask.byte().cpu().numpy()
|
||
im[:] = im_mask_np if retina_masks else scale_image(im_mask_np, im.shape)
|
||
return im
|
||
|
||
|
||
def draw_box_with_text(image, xyxy, text, box_color=(255, 0, 0), text_color=(255, 255, 255)):
|
||
"""图像上标注检测结果
|
||
|
||
Args:
|
||
image : 输入图片
|
||
xyxy : 矩形框坐标
|
||
text : 置信度和分类信息
|
||
box_color :矩形框的颜色
|
||
text_color : 文本颜色
|
||
|
||
Returns:
|
||
结果图
|
||
"""
|
||
x1, y1, x2, y2 = map(int, xyxy)
|
||
cv2.rectangle(image, (x1, y1), (x2, y2), box_color, 2)
|
||
(text_width, text_height), _ = cv2.getTextSize(text, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1)
|
||
cv2.rectangle(image, (x1, y1 - text_height - 5), (x1 + text_width, y1), box_color, -1)
|
||
cv2.putText(image, text, (x1, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, text_color, 1, cv2.LINE_AA)
|
||
|
||
return image
|
||
|
||
def post_process(rsp, im0 , im_i):
|
||
"""后处理
|
||
|
||
Args:
|
||
rsp : 预测框列表
|
||
proto : 掩码系数矩阵
|
||
im0 : 原图
|
||
im_i : 预处理后的图
|
||
|
||
Returns:
|
||
实例分割结果图
|
||
"""
|
||
infer_result0 = np.frombuffer(rsp['output_0.dat'], dtype=np.float32).flatten()
|
||
infer_result1 = np.frombuffer(rsp['output_1.dat'], dtype=np.float32).flatten()
|
||
np_reshaped = infer_result0.reshape(1, 25200, 117)
|
||
preds = torch.from_numpy(np_reshaped)
|
||
np_reshaped1 = infer_result1.reshape(1, 32, 160, 160)
|
||
proto = torch.from_numpy(np_reshaped1)
|
||
pred = non_max_suppression(preds)
|
||
for i, det in enumerate(pred):
|
||
if len(det):
|
||
masks = process_mask(proto[i], det[:, 6:], det[:, :4], [640, 640], upsample=True) # HWC
|
||
det[:, :4] = scale_boxes([640, 640], det[:, :4], im0.shape).round() # rescale boxes to im0 size
|
||
colors = Colors()
|
||
retina_masks = False
|
||
|
||
result = masks_cal(im0,
|
||
masks,
|
||
colors=[colors(x, True) for x in det[:, 5]],
|
||
im_gpu=torch.as_tensor(im0, dtype=torch.float16).permute(2, 0, 1).flip(0).contiguous() / 255
|
||
if retina_masks else im_i,
|
||
)
|
||
for j, (*xyxy, conf, cls) in enumerate(reversed(det[:, :6])):
|
||
c = int(cls) # integer class
|
||
label = None if False else (names[c] if False else f"{names[c]} {conf:.2f}")
|
||
result_image = draw_box_with_text(result, xyxy, label)
|
||
|
||
return result_image
|
||
|
||
|
||
if __name__ == "__main__":
|
||
image_path = './models/yolov5s-seg/0.jpg'
|
||
im0s = load_image_cv(image_path)
|
||
im0s = cv2.cvtColor(im0s, cv2.COLOR_BGR2RGB)
|
||
im0 = im0s.copy()
|
||
im_i = im0s.astype(np.float32) / 255.0
|
||
im_i = np.transpose(im_i, (2, 0, 1))
|
||
in_data = im0s.reshape(1, 1, 640, 640 * 3)
|
||
web_service = init_service('./models/yolov5s-seg', Dtype.I16)
|
||
rsp = web_service.infer(in_data)
|
||
result_image = post_process(rsp, im0, im_i)
|
||
cv2.imwrite('./models/yolov5s-seg/result.jpg', result_image)
|
||
|
||
|
||
|
||
|
||
|