Commit d7d0ffd6 by baihe

updated

parent 322601f1
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
from .Utils import build_targets, to_cpu, parse_model_config
def create_modules(module_defs):
"""
Constructs module list of layer blocks from module configuration in module_defs
"""
hyperparams = module_defs.pop(0)
output_filters = [int(hyperparams["channels"])] # [3]
module_list = nn.ModuleList()
for module_i, module_def in enumerate(module_defs):
modules = nn.Sequential()
if module_def["type"] == "convolutional":
bn = int(module_def["batch_normalize"])
filters = int(module_def["filters"])
kernel_size = int(module_def["size"])
pad = (kernel_size - 1) // 2
modules.add_module(
f"conv_{module_i}",
nn.Conv2d(
in_channels=output_filters[-1],
out_channels=filters,
kernel_size=kernel_size,
stride=int(module_def["stride"]),
padding=pad,
bias=not bn,
),
)
if bn:
modules.add_module(f"batch_norm_{module_i}", nn.BatchNorm2d(filters, momentum=0.9, eps=1e-5))
if module_def["activation"] == "leaky":
modules.add_module(f"leaky_{module_i}", nn.LeakyReLU(0.1))
elif module_def["type"] == "maxpool":
kernel_size = int(module_def["size"])
stride = int(module_def["stride"])
if kernel_size == 2 and stride == 1:
modules.add_module(f"_debug_padding_{module_i}", nn.ZeroPad2d((0, 1, 0, 1)))
maxpool = nn.MaxPool2d(kernel_size=kernel_size, stride=stride, padding=int((kernel_size - 1) // 2))
modules.add_module(f"maxpool_{module_i}", maxpool)
elif module_def["type"] == "upsample":
upsample = Upsample(scale_factor=int(module_def["stride"]), mode="nearest")
modules.add_module(f"upsample_{module_i}", upsample)
elif module_def["type"] == "route":
layers = [int(x) for x in module_def["layers"].split(",")]
filters = sum([output_filters[1:][i] for i in layers])
modules.add_module(f"route_{module_i}", EmptyLayer())
elif module_def["type"] == "shortcut":
filters = output_filters[1:][int(module_def["from"])]
modules.add_module(f"shortcut_{module_i}", EmptyLayer())
elif module_def["type"] == "yolo":
anchor_idxs = [int(x) for x in module_def["mask"].split(",")]
# Extract anchors
anchors = [int(x) for x in module_def["anchors"].split(",")]
anchors = [(anchors[i], anchors[i + 1]) for i in range(0, len(anchors), 2)]
anchors = [anchors[i] for i in anchor_idxs]
num_classes = int(module_def["classes"])
img_size = int(hyperparams["height"])
# Define detection layer
yolo_layer = YOLOLayer(anchors, num_classes, img_size)
modules.add_module(f"yolo_{module_i}", yolo_layer)
# Register module list and number of output filters
module_list.append(modules)
output_filters.append(filters)
return hyperparams, module_list
class Upsample(nn.Module):
""" nn.Upsample is deprecated """
def __init__(self, scale_factor, mode="nearest"):
super(Upsample, self).__init__()
self.scale_factor = scale_factor
self.mode = mode
def forward(self, x):
x = F.interpolate(x, scale_factor=self.scale_factor, mode=self.mode)
return x
class EmptyLayer(nn.Module):
"""Placeholder for 'route' and 'shortcut' layers"""
def __init__(self):
super(EmptyLayer, self).__init__()
class YOLOLayer(nn.Module):
"""Detection layer"""
def __init__(self, anchors, num_classes, img_dim=416):
super(YOLOLayer, self).__init__()
self.anchors = anchors
self.num_anchors = len(anchors)
self.num_classes = num_classes
self.ignore_thres = 0.5
self.mse_loss = nn.MSELoss()
self.bce_loss = nn.BCELoss()
self.obj_scale = 1
self.noobj_scale = 100
self.metrics = {}
self.img_dim = img_dim
self.grid_size = 0 # grid size
def compute_grid_offsets(self, grid_size, cuda=True):
self.grid_size = grid_size
g = self.grid_size
FloatTensor = torch.cuda.FloatTensor if cuda else torch.FloatTensor
self.stride = self.img_dim / self.grid_size
# Calculate offsets for each grid
self.grid_x = torch.arange(g).repeat(g, 1).view([1, 1, g, g]).type(FloatTensor)
self.grid_y = torch.arange(g).repeat(g, 1).t().view([1, 1, g, g]).type(FloatTensor)
self.scaled_anchors = FloatTensor([(a_w / self.stride, a_h / self.stride) for a_w, a_h in self.anchors])
self.anchor_w = self.scaled_anchors[:, 0:1].view((1, self.num_anchors, 1, 1))
self.anchor_h = self.scaled_anchors[:, 1:2].view((1, self.num_anchors, 1, 1))
def forward(self, x, targets=None, img_dim=None):
# Tensors for cuda support
FloatTensor = torch.cuda.FloatTensor if x.is_cuda else torch.FloatTensor
LongTensor = torch.cuda.LongTensor if x.is_cuda else torch.LongTensor
ByteTensor = torch.cuda.ByteTensor if x.is_cuda else torch.ByteTensor
self.img_dim = img_dim
num_samples = x.size(0)
grid_size = x.size(2)
prediction = (
x.view(num_samples, self.num_anchors, self.num_classes + 5, grid_size, grid_size)
.permute(0, 1, 3, 4, 2)
.contiguous()
)
# Get outputs
x = torch.sigmoid(prediction[..., 0]) # Center x
y = torch.sigmoid(prediction[..., 1]) # Center y
w = prediction[..., 2] # Width
h = prediction[..., 3] # Height
pred_conf = torch.sigmoid(prediction[..., 4]) # Conf
pred_cls = torch.sigmoid(prediction[..., 5:]) # Cls pred.
# If grid size does not match current we compute new offsets
if grid_size != self.grid_size:
self.compute_grid_offsets(grid_size, cuda=x.is_cuda)
# Add offset and scale with anchors
pred_boxes = FloatTensor(prediction[..., :4].shape)
pred_boxes[..., 0] = x.data + self.grid_x
pred_boxes[..., 1] = y.data + self.grid_y
pred_boxes[..., 2] = torch.exp(w.data) * self.anchor_w
pred_boxes[..., 3] = torch.exp(h.data) * self.anchor_h
output = torch.cat(
(
pred_boxes.view(num_samples, -1, 4) * self.stride,
pred_conf.view(num_samples, -1, 1),
pred_cls.view(num_samples, -1, self.num_classes),
),
-1,
)
if targets is None:
return output, 0
else:
iou_scores, class_mask, obj_mask, noobj_mask, tx, ty, tw, th, tcls, tconf = build_targets(
pred_boxes=pred_boxes,
pred_cls=pred_cls,
target=targets,
anchors=self.scaled_anchors,
ignore_thres=self.ignore_thres,
)
# Loss : Mask outputs to ignore non-existing objects (except with conf. loss)
loss_x = self.mse_loss(x[obj_mask.bool()], tx[obj_mask.bool()])
loss_y = self.mse_loss(y[obj_mask.bool()], ty[obj_mask.bool()])
loss_w = self.mse_loss(w[obj_mask.bool()], tw[obj_mask.bool()])
loss_h = self.mse_loss(h[obj_mask.bool()], th[obj_mask.bool()])
loss_conf_obj = self.bce_loss(pred_conf[obj_mask.bool()], tconf[obj_mask.bool()])
loss_conf_noobj = self.bce_loss(pred_conf[noobj_mask.bool()], tconf[noobj_mask.bool()])
loss_conf = self.obj_scale * loss_conf_obj + self.noobj_scale * loss_conf_noobj
loss_cls = self.bce_loss(pred_cls[obj_mask.bool()], tcls[obj_mask.bool()])
total_loss = loss_x + loss_y + loss_w + loss_h + loss_conf + loss_cls
# Metrics
cls_acc = 100 * class_mask[obj_mask.bool()].mean()
conf_obj = pred_conf[obj_mask.bool()].mean()
conf_noobj = pred_conf[noobj_mask.bool()].mean()
conf50 = (pred_conf > 0.5).float()
iou50 = (iou_scores > 0.5).float()
iou75 = (iou_scores > 0.75).float()
detected_mask = conf50 * class_mask * tconf
precision = torch.sum(iou50 * detected_mask) / (conf50.sum() + 1e-16)
recall50 = torch.sum(iou50 * detected_mask) / (obj_mask.sum() + 1e-16)
recall75 = torch.sum(iou75 * detected_mask) / (obj_mask.sum() + 1e-16)
self.metrics = {
"loss": to_cpu(total_loss).item(),
"x": to_cpu(loss_x).item(),
"y": to_cpu(loss_y).item(),
"w": to_cpu(loss_w).item(),
"h": to_cpu(loss_h).item(),
"conf": to_cpu(loss_conf).item(),
"cls": to_cpu(loss_cls).item(),
"cls_acc": to_cpu(cls_acc).item(),
"recall50": to_cpu(recall50).item(),
"recall75": to_cpu(recall75).item(),
"precision": to_cpu(precision).item(),
"conf_obj": to_cpu(conf_obj).item(),
"conf_noobj": to_cpu(conf_noobj).item(),
"grid_size": grid_size,
}
return output, total_loss
class Darknet(nn.Module):
"""YOLOv3 object detection model"""
def __init__(self, config_path, img_size=416):
super(Darknet, self).__init__()
self.module_defs = parse_model_config(config_path)
self.hyperparams, self.module_list = create_modules(self.module_defs)
self.yolo_layers = [layer[0] for layer in self.module_list if hasattr(layer[0], "metrics")]
self.img_size = img_size
self.seen = 0
self.header_info = np.array([0, 0, 0, self.seen, 0], dtype=np.int32)
def forward(self, x, targets=None):
img_dim = x.shape[2]
loss = 0
layer_outputs, yolo_outputs = [], []
for i, (module_def, module) in enumerate(zip(self.module_defs, self.module_list)):
if module_def["type"] in ["convolutional", "upsample", "maxpool"]:
x = module(x)
elif module_def["type"] == "route":
x = torch.cat([layer_outputs[int(layer_i)] for layer_i in module_def["layers"].split(",")], 1)
elif module_def["type"] == "shortcut":
layer_i = int(module_def["from"])
x = layer_outputs[-1] + layer_outputs[layer_i]
elif module_def["type"] == "yolo":
x, layer_loss = module[0](x, targets, img_dim)
loss += layer_loss
yolo_outputs.append(x)
layer_outputs.append(x)
yolo_outputs = to_cpu(torch.cat(yolo_outputs, 1))
return yolo_outputs if targets is None else (loss, yolo_outputs)
def load_darknet_weights(self, weights_path):
"""Parses and loads the weights stored in 'weights_path'"""
# Open the weights file
with open(weights_path, "rb") as f:
header = np.fromfile(f, dtype=np.int32, count=5) # First five are header values
self.header_info = header # Needed to write header when saving weights
self.seen = header[3] # number of images seen during training
weights = np.fromfile(f, dtype=np.float32) # The rest are weights
# Establish cutoff for loading backbone weights
cutoff = None
if "darknet53.conv.74" in weights_path:
cutoff = 75
ptr = 0
for i, (module_def, module) in enumerate(zip(self.module_defs, self.module_list)):
if i == cutoff:
break
if module_def["type"] == "convolutional":
conv_layer = module[0]
if module_def["batch_normalize"]:
# Load BN bias, weights, running mean and running variance
bn_layer = module[1]
num_b = bn_layer.bias.numel() # Number of biases
# Bias
bn_b = torch.from_numpy(weights[ptr: ptr + num_b]).view_as(bn_layer.bias)
bn_layer.bias.data.copy_(bn_b)
ptr += num_b
# Weight
bn_w = torch.from_numpy(weights[ptr: ptr + num_b]).view_as(bn_layer.weight)
bn_layer.weight.data.copy_(bn_w)
ptr += num_b
# Running Mean
bn_rm = torch.from_numpy(weights[ptr: ptr + num_b]).view_as(bn_layer.running_mean)
bn_layer.running_mean.data.copy_(bn_rm)
ptr += num_b
# Running Var
bn_rv = torch.from_numpy(weights[ptr: ptr + num_b]).view_as(bn_layer.running_var)
bn_layer.running_var.data.copy_(bn_rv)
ptr += num_b
else:
# Load conv. bias
num_b = conv_layer.bias.numel()
conv_b = torch.from_numpy(weights[ptr: ptr + num_b]).view_as(conv_layer.bias)
conv_layer.bias.data.copy_(conv_b)
ptr += num_b
# Load conv. weights
num_w = conv_layer.weight.numel()
conv_w = torch.from_numpy(weights[ptr: ptr + num_w]).view_as(conv_layer.weight)
conv_layer.weight.data.copy_(conv_w)
ptr += num_w
def save_darknet_weights(self, path, cutoff=-1):
"""
@:param path - path of the new weights file
@:param cutoff - save layers between 0 and cutoff (cutoff = -1 -> all are saved)
"""
fp = open(path, "wb")
self.header_info[3] = self.seen
self.header_info.tofile(fp)
# Iterate through layers
for i, (module_def, module) in enumerate(zip(self.module_defs[:cutoff], self.module_list[:cutoff])):
if module_def["type"] == "convolutional":
conv_layer = module[0]
# If batch norm, load bn first
if module_def["batch_normalize"]:
bn_layer = module[1]
bn_layer.bias.data.cpu().numpy().tofile(fp)
bn_layer.weight.data.cpu().numpy().tofile(fp)
bn_layer.running_mean.data.cpu().numpy().tofile(fp)
bn_layer.running_var.data.cpu().numpy().tofile(fp)
# Load conv bias
else:
conv_layer.bias.data.cpu().numpy().tofile(fp)
# Load conv weights
conv_layer.weight.data.cpu().numpy().tofile(fp)
fp.close()
def load_pretrain_to_custom_class(self, weights_pth_path):
state = torch.load(weights_pth_path,map_location=torch.device('cpu'))
own_state = self.state_dict()
for name, param in state.items():
if name not in own_state:
print(f'Model does not have this param: {name}!')
continue
if param.shape != own_state[name].shape:
print(f'Do not load this param: {name} cause it shape not equal! : '
f'{param.shape} into {own_state[name].shape}')
continue
own_state[name].copy_(param)
import cv2
import math
import time
import tqdm
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
from torch.utils.data import DataLoader
def to_cpu(tensor):
return tensor.detach().cpu()
def load_classes(path):
"""
Loads class labels at 'path'
"""
fp = open(path, "r")
names = fp.read().split("\n")[:-1]
return names
def weights_init_normal(m):
classname = m.__class__.__name__
if classname.find("Conv") != -1:
torch.nn.init.normal_(m.weight.data, 0.0, 0.02)
elif classname.find("BatchNorm2d") != -1:
torch.nn.init.normal_(m.weight.data, 1.0, 0.02)
torch.nn.init.constant_(m.bias.data, 0.0)
def rescale_boxes(boxes, current_dim, original_shape):
""" Rescales bounding boxes to the original shape """
orig_h, orig_w = original_shape
# The amount of padding that was added
pad_x = max(orig_h - orig_w, 0) * (current_dim / max(original_shape))
pad_y = max(orig_w - orig_h, 0) * (current_dim / max(original_shape))
# Image height and width after padding is removed
unpad_h = current_dim - pad_y
unpad_w = current_dim - pad_x
# Rescale bounding boxes to dimension of original image
boxes[:, 0] = ((boxes[:, 0] - pad_x // 2) / unpad_w) * orig_w
boxes[:, 1] = ((boxes[:, 1] - pad_y // 2) / unpad_h) * orig_h
boxes[:, 2] = ((boxes[:, 2] - pad_x // 2) / unpad_w) * orig_w
boxes[:, 3] = ((boxes[:, 3] - pad_y // 2) / unpad_h) * orig_h
return boxes
def xywh2xyxy(x):
y = x.new(x.shape)
y[..., 0] = x[..., 0] - x[..., 2] / 2
y[..., 1] = x[..., 1] - x[..., 3] / 2
y[..., 2] = x[..., 0] + x[..., 2] / 2
y[..., 3] = x[..., 1] + x[..., 3] / 2
return y
def ap_per_class(tp, conf, pred_cls, target_cls):
""" Compute the average precision, given the recall and precision curves.
Source: https://github.com/rafaelpadilla/Object-Detection-Metrics.
# Arguments
tp: True positives (list).
conf: Objectness value from 0-1 (list).
pred_cls: Predicted object classes (list).
target_cls: True object classes (list).
# Returns
The average precision as computed in py-faster-rcnn.
"""
# Sort by objectness
i = np.argsort(-conf)
tp, conf, pred_cls = tp[i], conf[i], pred_cls[i]
# Find unique classes
unique_classes = np.unique(target_cls)
# Create Precision-Recall curve and compute AP for each class
ap, p, r = [], [], []
for c in tqdm.tqdm(unique_classes, desc="Computing AP"):
i = pred_cls == c
n_gt = (target_cls == c).sum() # Number of ground truth objects
n_p = i.sum() # Number of predicted objects
if n_p == 0 and n_gt == 0:
continue
elif n_p == 0 or n_gt == 0:
ap.append(0)
r.append(0)
p.append(0)
else:
# Accumulate FPs and TPs
fpc = (1 - tp[i]).cumsum()
tpc = (tp[i]).cumsum()
# Recall
recall_curve = tpc / (n_gt + 1e-16)
r.append(recall_curve[-1])
# Precision
precision_curve = tpc / (tpc + fpc)
p.append(precision_curve[-1])
# AP from recall-precision curve
ap.append(compute_ap(recall_curve, precision_curve))
# Compute F1 score (harmonic mean of precision and recall)
p, r, ap = np.array(p), np.array(r), np.array(ap)
f1 = 2 * p * r / (p + r + 1e-16)
return p, r, ap, f1, unique_classes.astype("int32")
def compute_ap(recall, precision):
""" Compute the average precision, given the recall and precision curves.
Code originally from https://github.com/rbgirshick/py-faster-rcnn.
# Arguments
recall: The recall curve (list).
precision: The precision curve (list).
# Returns
The average precision as computed in py-faster-rcnn.
"""
# correct AP calculation
# first append sentinel values at the end
mrec = np.concatenate(([0.0], recall, [1.0]))
mpre = np.concatenate(([0.0], precision, [0.0]))
# compute the precision envelope
for i in range(mpre.size - 1, 0, -1):
mpre[i - 1] = np.maximum(mpre[i - 1], mpre[i])
# to calculate area under PR curve, look for points
# where X axis (recall) changes value
i = np.where(mrec[1:] != mrec[:-1])[0]
# and sum (\Delta recall) * prec
ap = np.sum((mrec[i + 1] - mrec[i]) * mpre[i + 1])
return ap
def get_batch_statistics(outputs, targets, iou_threshold):
""" Compute true positives, predicted scores and predicted labels per sample """
batch_metrics = []
for sample_i in range(len(outputs)):
if outputs[sample_i] is None:
continue
output = outputs[sample_i]
pred_boxes = output[:, :4]
pred_scores = output[:, 4]
pred_labels = output[:, -1]
true_positives = np.zeros(pred_boxes.shape[0])
annotations = targets[targets[:, 0] == sample_i][:, 1:]
target_labels = annotations[:, 0] if len(annotations) else []
if len(annotations):
detected_boxes = []
target_boxes = annotations[:, 1:]
for pred_i, (pred_box, pred_label) in enumerate(zip(pred_boxes, pred_labels)):
# If targets are found break
if len(detected_boxes) == len(annotations):
break
# Ignore if label is not one of the target labels
if pred_label not in target_labels:
continue
iou, box_index = bbox_iou(pred_box.unsqueeze(0), target_boxes).max(0)
if iou >= iou_threshold and box_index not in detected_boxes:
true_positives[pred_i] = 1
detected_boxes += [box_index]
batch_metrics.append([true_positives, pred_scores, pred_labels])
return batch_metrics
def bbox_wh_iou(wh1, wh2):
wh2 = wh2.t()
w1, h1 = wh1[0], wh1[1]
w2, h2 = wh2[0], wh2[1]
inter_area = torch.min(w1, w2) * torch.min(h1, h2)
union_area = (w1 * h1 + 1e-16) + w2 * h2 - inter_area
return inter_area / union_area
def bbox_iou(box1, box2, x1y1x2y2=True):
"""
Returns the IoU of two bounding boxes
"""
if not x1y1x2y2:
# Transform from center and width to exact coordinates
b1_x1, b1_x2 = box1[:, 0] - box1[:, 2] / 2, box1[:, 0] + box1[:, 2] / 2
b1_y1, b1_y2 = box1[:, 1] - box1[:, 3] / 2, box1[:, 1] + box1[:, 3] / 2
b2_x1, b2_x2 = box2[:, 0] - box2[:, 2] / 2, box2[:, 0] + box2[:, 2] / 2
b2_y1, b2_y2 = box2[:, 1] - box2[:, 3] / 2, box2[:, 1] + box2[:, 3] / 2
else:
# Get the coordinates of bounding boxes
b1_x1, b1_y1, b1_x2, b1_y2 = box1[:, 0], box1[:, 1], box1[:, 2], box1[:, 3]
b2_x1, b2_y1, b2_x2, b2_y2 = box2[:, 0], box2[:, 1], box2[:, 2], box2[:, 3]
# get the corrdinates of the intersection rectangle
inter_rect_x1 = torch.max(b1_x1, b2_x1)
inter_rect_y1 = torch.max(b1_y1, b2_y1)
inter_rect_x2 = torch.min(b1_x2, b2_x2)
inter_rect_y2 = torch.min(b1_y2, b2_y2)
# Intersection area
inter_area = torch.clamp(inter_rect_x2 - inter_rect_x1 + 1, min=0) * torch.clamp(
inter_rect_y2 - inter_rect_y1 + 1, min=0
)
# Union Area
b1_area = (b1_x2 - b1_x1 + 1) * (b1_y2 - b1_y1 + 1)
b2_area = (b2_x2 - b2_x1 + 1) * (b2_y2 - b2_y1 + 1)
iou = inter_area / (b1_area + b2_area - inter_area + 1e-16)
return iou
def non_max_suppression(prediction, conf_thres=0.5, nms_thres=0.4):
"""
Removes detections with lower object confidence score than 'conf_thres' and performs
Non-Maximum Suppression to further filter detections.
Returns detections with shape:
(x1, y1, x2, y2, object_conf, class_score, class_pred)
"""
# From (center x, center y, width, height) to (x1, y1, x2, y2)
prediction[..., :4] = xywh2xyxy(prediction[..., :4])
output = [None for _ in range(len(prediction))]
for image_i, image_pred in enumerate(prediction):
# Filter out confidence scores below threshold
image_pred = image_pred[image_pred[:, 4] >= conf_thres]
# If none are remaining => process next image
if not image_pred.size(0):
continue
# Object confidence times class confidence
score = image_pred[:, 4] * image_pred[:, 5:].max(1)[0]
# Sort by it
image_pred = image_pred[(-score).argsort()]
class_confs, class_preds = image_pred[:, 5:].max(1, keepdim=True)
detections = torch.cat((image_pred[:, :5], class_confs.float(), class_preds.float()), 1)
# Perform non-maximum suppression
keep_boxes = []
while detections.size(0):
large_overlap = bbox_iou(detections[0, :4].unsqueeze(0), detections[:, :4]) > nms_thres
label_match = detections[0, -1] == detections[:, -1]
# Indices of boxes with lower confidence scores, large IOUs and matching labels
invalid = large_overlap & label_match
weights = detections[invalid, 4:5]
# Merge overlapping bboxes by order of confidence
detections[0, :4] = (weights * detections[invalid, :4]).sum(0) / weights.sum()
keep_boxes += [detections[0]]
detections = detections[~invalid]
if keep_boxes:
output[image_i] = torch.stack(keep_boxes)
return output
def build_targets(pred_boxes, pred_cls, target, anchors, ignore_thres):
ByteTensor = torch.cuda.ByteTensor if pred_boxes.is_cuda else torch.ByteTensor
FloatTensor = torch.cuda.FloatTensor if pred_boxes.is_cuda else torch.FloatTensor
nB = pred_boxes.size(0)
nA = pred_boxes.size(1)
nC = pred_cls.size(-1)
nG = pred_boxes.size(2)
# Output tensors
obj_mask = ByteTensor(nB, nA, nG, nG).fill_(0)
noobj_mask = ByteTensor(nB, nA, nG, nG).fill_(1)
class_mask = FloatTensor(nB, nA, nG, nG).fill_(0)
iou_scores = FloatTensor(nB, nA, nG, nG).fill_(0)
tx = FloatTensor(nB, nA, nG, nG).fill_(0)
ty = FloatTensor(nB, nA, nG, nG).fill_(0)
tw = FloatTensor(nB, nA, nG, nG).fill_(0)
th = FloatTensor(nB, nA, nG, nG).fill_(0)
tcls = FloatTensor(nB, nA, nG, nG, nC).fill_(0)
# Convert to position relative to box
target_boxes = target[:, 2:6] * nG
gxy = target_boxes[:, :2]
gwh = target_boxes[:, 2:]
# Get anchors with best iou
ious = torch.stack([bbox_wh_iou(anchor, gwh) for anchor in anchors])
best_ious, best_n = ious.max(0)
# Separate target values
b, target_labels = target[:, :2].long().t()
gx, gy = gxy.t()
gw, gh = gwh.t()
gi, gj = gxy.long().t()
# Set masks
obj_mask[b, best_n, gj, gi] = 1
noobj_mask[b, best_n, gj, gi] = 0
# Set noobj mask to zero where iou exceeds ignore threshold
for i, anchor_ious in enumerate(ious.t()):
noobj_mask[b[i], anchor_ious > ignore_thres, gj[i], gi[i]] = 0
# Coordinates
tx[b, best_n, gj, gi] = gx - gx.floor()
ty[b, best_n, gj, gi] = gy - gy.floor()
# Width and height
tw[b, best_n, gj, gi] = torch.log(gw / anchors[best_n][:, 0] + 1e-16)
th[b, best_n, gj, gi] = torch.log(gh / anchors[best_n][:, 1] + 1e-16)
# One-hot encoding of label
tcls[b, best_n, gj, gi, target_labels] = 1
# Compute label correctness and iou at best anchor
class_mask[b, best_n, gj, gi] = (pred_cls[b, best_n, gj, gi].argmax(-1) == target_labels).float()
iou_scores[b, best_n, gj, gi] = bbox_iou(pred_boxes[b, best_n, gj, gi], target_boxes, x1y1x2y2=False)
tconf = obj_mask.float()
return iou_scores, class_mask, obj_mask, noobj_mask, tx, ty, tw, th, tcls, tconf
def parse_model_config(path):
"""Parses the yolo-v3 layer configuration file and returns module definitions"""
file = open(path, 'r')
lines = file.read().split('\n')
lines = [x for x in lines if x and not x.startswith('#')]
lines = [x.rstrip().lstrip() for x in lines] # get rid of fringe whitespaces
module_defs = []
for line in lines:
if line.startswith('['): # This marks the start of a new block
module_defs.append({})
module_defs[-1]['type'] = line[1:-1].rstrip()
if module_defs[-1]['type'] == 'convolutional':
module_defs[-1]['batch_normalize'] = 0
else:
key, value = line.split("=")
value = value.strip()
module_defs[-1][key.rstrip()] = value.strip()
return module_defs
def parse_data_config(path):
"""Parses the data configuration file"""
options = dict()
options['gpus'] = '0,1,2,3'
options['num_workers'] = '10'
with open(path, 'r') as fp:
lines = fp.readlines()
for line in lines:
line = line.strip()
if line == '' or line.startswith('#'):
continue
key, value = line.split('=')
options[key.strip()] = value.strip()
return options
def ResizePadding(height, width):
desized_size = (height, width)
def resizePadding(image, **kwargs):
old_size = image.shape[:2]
max_size_idx = old_size.index(max(old_size))
ratio = float(desized_size[max_size_idx]) / max(old_size)
new_size = tuple([int(x * ratio) for x in old_size])
if new_size > desized_size:
min_size_idx = old_size.index(min(old_size))
ratio = float(desized_size[min_size_idx]) / min(old_size)
new_size = tuple([int(x * ratio) for x in old_size])
image = cv2.resize(image, (new_size[1], new_size[0]))
delta_w = desized_size[1] - new_size[1]
delta_h = desized_size[0] - new_size[0]
top, bottom = delta_h // 2, delta_h - (delta_h // 2)
left, right = delta_w // 2, delta_w - (delta_w // 2)
image = cv2.copyMakeBorder(image, top, bottom, left, right, cv2.BORDER_CONSTANT)
return image
return resizePadding
class AverageValueMeter(object):
def __init__(self):
self.reset()
self.val = 0
def add(self, value, n=1):
self.val = value
self.sum += value
self.var += value * value
self.n += n
if self.n == 0:
self.mean, self.std = np.nan, np.nan
elif self.n == 1:
self.mean = 0.0 + self.sum # This is to force a copy in torch/numpy
self.std = np.inf
self.mean_old = self.mean
self.m_s = 0.0
else:
self.mean = self.mean_old + (value - n * self.mean_old) / float(self.n)
self.m_s += (value - self.mean_old) * (value - self.mean)
self.mean_old = self.mean
self.std = np.sqrt(self.m_s / (self.n - 1.0))
def value(self):
return self.mean, self.std
def reset(self):
self.n = 0
self.sum = 0.0
self.var = 0.0
self.val = 0.0
self.mean = np.nan
self.mean_old = 0.0
self.m_s = 0.0
self.std = np.nan
import cv2
import sys
import os
import torch
import numpy as np
import torch.utils.data
import myutils.img
class GenerateHeatmap():
def __init__(self, output_res, num_parts):
self.output_res = output_res
self.num_parts = num_parts
sigma = self.output_res/64
self.sigma = sigma
size = 6*sigma + 3
x = np.arange(0, size, 1, float)
y = x[:, np.newaxis]
x0, y0 = 3*sigma + 1, 3*sigma + 1
self.g = np.exp(- ((x - x0) ** 2 + (y - y0) ** 2) / (2 * sigma ** 2))
def __call__(self, keypoints):
hms = np.zeros(shape = (self.num_parts, self.output_res, self.output_res), dtype = np.float32)
sigma = self.sigma
for p in keypoints:
for idx, pt in enumerate(p):
if pt[0] > 0:
x, y = int(pt[0]), int(pt[1])
if x<0 or y<0 or x>=self.output_res or y>=self.output_res:
continue
ul = int(x - 3*sigma - 1), int(y - 3*sigma - 1)
br = int(x + 3*sigma + 2), int(y + 3*sigma + 2)
c,d = max(0, -ul[0]), min(br[0], self.output_res) - ul[0]
a,b = max(0, -ul[1]), min(br[1], self.output_res) - ul[1]
cc,dd = max(0, ul[0]), min(br[0], self.output_res)
aa,bb = max(0, ul[1]), min(br[1], self.output_res)
hms[idx, aa:bb,cc:dd] = np.maximum(hms[idx, aa:bb,cc:dd], self.g[a:b,c:d])
return hms
class Dataset(torch.utils.data.Dataset):
def __init__(self, config, ds, index):
self.input_res = config['train']['input_res']
self.output_res = config['train']['output_res']
self.generateHeatmap = GenerateHeatmap(self.output_res, config['inference']['num_parts'])
self.ds = ds
self.index = index
def __len__(self):
return len(self.index)
def __getitem__(self, idx):
return self.loadImage(self.index[idx % len(self.index)])
def loadImage(self, idx):
ds = self.ds
## load + crop
orig_img = ds.get_img(idx)
path = ds.get_path(idx)
orig_keypoints = ds.get_kps(idx)
kptmp = orig_keypoints.copy()
c = ds.get_center(idx)
s = ds.get_scale(idx)
normalize = ds.get_normalized(idx)
cropped = myutils.img.crop(orig_img, c, s, (self.input_res, self.input_res))
for i in range(np.shape(orig_keypoints)[1]):
if orig_keypoints[0,i,0] > 0:
orig_keypoints[0,i,:2] = myutils.img.transform(orig_keypoints[0,i,:2], c, s, (self.input_res, self.input_res))
keypoints = np.copy(orig_keypoints)
## augmentation -- to be done to cropped image
height, width = cropped.shape[0:2]
center = np.array((width/2, height/2))
scale = max(height, width)/200
aug_rot=0
aug_rot = (np.random.random() * 2 - 1) * 30.
aug_scale = np.random.random() * (1.25 - 0.75) + 0.75
scale *= aug_scale
mat_mask = myutils.img.get_transform(center, scale, (self.output_res, self.output_res), aug_rot)[:2]
mat = myutils.img.get_transform(center, scale, (self.input_res, self.input_res), aug_rot)[:2]
inp = cv2.warpAffine(cropped, mat, (self.input_res, self.input_res)).astype(np.float32)/255
keypoints[:,:,0:2] = myutils.img.kpt_affine(keypoints[:,:,0:2], mat_mask)
if np.random.randint(2) == 0:
inp = self.preprocess(inp)
inp = inp[:, ::-1]
keypoints = keypoints[:, ds.flipped_parts['mpii']]
keypoints[:, :, 0] = self.output_res - keypoints[:, :, 0]
orig_keypoints = orig_keypoints[:, ds.flipped_parts['mpii']]
orig_keypoints[:, :, 0] = self.input_res - orig_keypoints[:, :, 0]
## set keypoints to 0 when were not visible initially (so heatmap all 0s)
for i in range(np.shape(orig_keypoints)[1]):
if kptmp[0,i,0] == 0 and kptmp[0,i,1] == 0:
keypoints[0,i,0] = 0
keypoints[0,i,1] = 0
orig_keypoints[0,i,0] = 0
orig_keypoints[0,i,1] = 0
## generate heatmaps on outres
heatmaps = self.generateHeatmap(keypoints)
return inp.astype(np.float32), heatmaps.astype(np.float32)
def preprocess(self, data):
# random hue and saturation
data = cv2.cvtColor(data, cv2.COLOR_RGB2HSV);
delta = (np.random.random() * 2 - 1) * 0.2
data[:, :, 0] = np.mod(data[:,:,0] + (delta * 360 + 360.), 360.)
delta_sature = np.random.random() + 0.5
data[:, :, 1] *= delta_sature
data[:,:, 1] = np.maximum( np.minimum(data[:,:,1], 1), 0 )
data = cv2.cvtColor(data, cv2.COLOR_HSV2RGB)
# adjust brightness
delta = (np.random.random() * 2 - 1) * 0.3
data += delta
# adjust contrast
mean = data.mean(axis=2, keepdims=True)
data = (data - mean) * (np.random.random() + 0.5) + mean
data = np.minimum(np.maximum(data, 0), 1)
return data
def init(config):
batchsize = config['train']['batchsize']
current_path = os.path.dirname(os.path.abspath(__file__))
sys.path.append(current_path)
import ref as ds
ds.init()
train, valid = ds.setup_val_split()
dataset = { key: Dataset(config, ds, data) for key, data in zip( ['train', 'valid'], [train, valid] ) }
use_data_loader = config['train']['use_data_loader']
loaders = {}
for key in dataset:
loaders[key] = torch.utils.data.DataLoader(dataset[key], batch_size=batchsize, shuffle=True, num_workers=config['train']['num_workers'], pin_memory=False)
def gen(phase):
batchsize = config['train']['batchsize']
batchnum = config['train']['{}_iters'.format(phase)]
loader = loaders[phase].__iter__()
for i in range(batchnum):
try:
imgs, heatmaps = next(loader)
except StopIteration:
# to avoid no data provided by dataloader
loader = loaders[phase].__iter__()
imgs, heatmaps = next(loader)
yield {
'imgs': imgs, #cropped and augmented
'heatmaps': heatmaps, #based on keypoints. 0 if not in img for joint
}
return lambda key: gen(key)
import numpy as np
import h5py
from imageio import imread
import os
import time
def _isArrayLike(obj):
return hasattr(obj, '__iter__') and hasattr(obj, '__len__')
annot_dir = 'data/MPII/annot'
img_dir = 'data/MPII/images'
assert os.path.exists(img_dir)
mpii, num_examples_train, num_examples_val = None, None, None
import cv2
class MPII:
def __init__(self):
print('loading data...')
tic = time.time()
train_f = h5py.File(os.path.join(annot_dir, 'train.h5'), 'r')
val_f = h5py.File(os.path.join(annot_dir, 'valid.h5'), 'r')
self.t_center = train_f['center'][()]
t_scale = train_f['scale'][()]
t_part = train_f['part'][()]
t_visible = train_f['visible'][()]
t_normalize = train_f['normalize'][()]
t_imgname = [None] * len(self.t_center)
for i in range(len(self.t_center)):
t_imgname[i] = train_f['imgname'][i].decode('UTF-8')
self.v_center = val_f['center'][()]
v_scale = val_f['scale'][()]
v_part = val_f['part'][()]
v_visible = val_f['visible'][()]
v_normalize = val_f['normalize'][()]
v_imgname = [None] * len(self.v_center)
for i in range(len(self.v_center)):
v_imgname[i] = val_f['imgname'][i].decode('UTF-8')
self.center = np.append(self.t_center, self.v_center, axis=0)
self.scale = np.append(t_scale, v_scale)
self.part = np.append(t_part, v_part, axis=0)
self.visible = np.append(t_visible, v_visible, axis=0)
self.normalize = np.append(t_normalize, v_normalize)
self.imgname = t_imgname + v_imgname
print('Done (t={:0.2f}s)'.format(time.time()- tic))
def getAnnots(self, idx):
'''
returns h5 file for train or val set
'''
return self.imgname[idx], self.part[idx], self.visible[idx], self.center[idx], self.scale[idx], self.normalize[idx]
def getLength(self):
return len(self.t_center), len(self.v_center)
def init():
global mpii, num_examples_train, num_examples_val
mpii = MPII()
num_examples_train, num_examples_val = mpii.getLength()
# Part reference
parts = {'mpii':['rank', 'rkne', 'rhip',
'lhip', 'lkne', 'lank',
'pelv', 'thrx', 'neck', 'head',
'rwri', 'relb', 'rsho',
'lsho', 'lelb', 'lwri']}
flipped_parts = {'mpii':[5, 4, 3, 2, 1, 0, 6, 7, 8, 9, 15, 14, 13, 12, 11, 10]}
part_pairs = {'mpii':[[0, 5], [1, 4], [2, 3], [6], [7], [8], [9], [10, 15], [11, 14], [12, 13]]}
pair_names = {'mpii':['ankle', 'knee', 'hip', 'pelvis', 'thorax', 'neck', 'head', 'wrist', 'elbow', 'shoulder']}
def setup_val_split():
'''
returns index for train and validation imgs
index for validation images starts after that of train images
so that loadImage can tell them apart
'''
valid = [i+num_examples_train for i in range(num_examples_val)]
train = [i for i in range(num_examples_train)]
return np.array(train), np.array(valid)
def get_img(idx):
imgname, __, __, __, __, __ = mpii.getAnnots(idx)
path = os.path.join(img_dir, imgname)
img = imread(path)
return img
def get_path(idx):
imgname, __, __, __, __, __ = mpii.getAnnots(idx)
path = os.path.join(img_dir, imgname)
return path
def get_kps(idx):
__, part, visible, __, __, __ = mpii.getAnnots(idx)
kp2 = np.insert(part, 2, visible, axis=1)
kps = np.zeros((1, 16, 3))
kps[0] = kp2
return kps
def get_normalized(idx):
__, __, __, __, __, n = mpii.getAnnots(idx)
return n
def get_center(idx):
__, __, __, c, __, __ = mpii.getAnnots(idx)
return c
def get_scale(idx):
__, __, __, __, s, __ = mpii.getAnnots(idx)
return s
\ No newline at end of file
import cv2
import torch
import data.MPII.ref as ds
import myutils.img
from myutils.group import HeatmapParser
from myutils.posture import *
......@@ -11,6 +10,8 @@ CROOKED_HEAD_THRE=8
# 斜肩的斜率阈值
OBLIQUE_SHOULDER_THRE=2
_flipped_parts = {'mpii':[5, 4, 3, 2, 1, 0, 6, 7, 8, 9, 15, 14, 13, 12, 11, 10]}
# 人体骨骼各坐标点的下标对应图
class PersonPosture():
......@@ -145,7 +146,7 @@ def inference(img, func, config, c, s):
for ii in tmp1:
tmp[ii] = np.concatenate((tmp1[ii], tmp2[ii]), axis=0)
det = tmp['det'][0, -1] + tmp['det'][1, -1, :, :, ::-1][ds.flipped_parts['mpii']]
det = tmp['det'][0, -1] + tmp['det'][1, -1, :, :, ::-1][_flipped_parts['mpii']]
if det is None:
return [], []
det = det / 2
......@@ -170,8 +171,21 @@ def main():
for i in range(ans.shape[0]):
pred.append({'keypoints': ans[i,:,:]})
return pred
from urllib.request import urlopen
def url_to_image(url, readFlag=cv2.IMREAD_COLOR):
# download the image, convert it to a NumPy array, and then read
# it into OpenCV format
resp = urlopen(url)
image = np.asarray(bytearray(resp.read()), dtype="uint8")
image = cv2.imdecode(image, readFlag)
# return the image
return image
if __name__ == '__main__':
image_path = "data/custom/O型腿3.jpeg"
# image_path = "data/custom/O型腿3.jpeg"
image_path = 'https://gimg2.baidu.com/image_search/src=http%3A%2F%2Fgss0.baidu.com%2F-4o3dSag_xI4khGko9WTAnF6hhy%2Fzhidao%2Fpic%2Fitem%2Fac345982b2b7d0a2c8b09418ccef76094b369a3e.jpg&refer=http%3A%2F%2Fgss0.baidu.com&app=2002&size=f9999,10000&q=a80&n=0&g=0n&fmt=jpeg?sec=1640224763&t=142191b9f29e133d7c8c0d7af15c7879'
from train import init
func, config = init()
......@@ -188,8 +202,11 @@ if __name__ == '__main__':
return pred
input_res = 256
orig_img = cv2.imread(image_path)
orig_img_reverse = cv2.imread(image_path)[:,:,::-1]
# orig_img = cv2.imread(image_path)
# orig_img_reverse = cv2.imread(image_path)[:,:,::-1]
orig_img = url_to_image(image_path)
orig_img_reverse = url_to_image(image_path)[:,:,::-1]
shape = orig_img_reverse.shape[0:2]
......
# 常量
MIN_VISIBLE=0.3
......@@ -106,14 +106,11 @@ def init():
task = importlib.import_module('task.pose')
exp_path = os.path.join('exp', opt.exp)
current_time = datetime.now().strftime('%b%d_%H-%M-%S')
config = task.__config__
try: os.makedirs(exp_path)
except FileExistsError: pass
config['opt'] = opt
config['data_provider'] = importlib.import_module(config['data_provider'])
# 加载模型,func就是模型预测函数
func = task.make_network(config)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment