Commit b10cddf0 by wudiao

Initial commit

parents
File added
exp
*.pyc
*__pycache__*
*core.*
*.jpg
*.png
*.pkl
*.txt
*.json
_ext
tmp
*.o*
*~
\ No newline at end of file
# Default ignored files
/shelf/
/workspace.xml
<component name="InspectionProjectProfileManager">
<profile version="1.0">
<option name="myName" value="Project Default" />
<inspection_tool class="PyCompatibilityInspection" enabled="true" level="WARNING" enabled_by_default="true">
<option name="ourVersions">
<value>
<list size="2">
<item index="0" class="java.lang.String" itemvalue="2.7" />
<item index="1" class="java.lang.String" itemvalue="3.10" />
</list>
</value>
</option>
</inspection_tool>
<inspection_tool class="PyPackageRequirementsInspection" enabled="true" level="WARNING" enabled_by_default="true">
<option name="ignoredPackages">
<value>
<list size="44">
<item index="0" class="java.lang.String" itemvalue="six" />
<item index="1" class="java.lang.String" itemvalue="Flask-Cors" />
<item index="2" class="java.lang.String" itemvalue="Werkzeug" />
<item index="3" class="java.lang.String" itemvalue="aniso8601" />
<item index="4" class="java.lang.String" itemvalue="PyYAML" />
<item index="5" class="java.lang.String" itemvalue="python-dateutil" />
<item index="6" class="java.lang.String" itemvalue="SQLAlchemy" />
<item index="7" class="java.lang.String" itemvalue="MarkupSafe" />
<item index="8" class="java.lang.String" itemvalue="requests" />
<item index="9" class="java.lang.String" itemvalue="click" />
<item index="10" class="java.lang.String" itemvalue="Jinja2" />
<item index="11" class="java.lang.String" itemvalue="XlsxWriter" />
<item index="12" class="java.lang.String" itemvalue="PyGithub" />
<item index="13" class="java.lang.String" itemvalue="pytz" />
<item index="14" class="java.lang.String" itemvalue="urllib3" />
<item index="15" class="java.lang.String" itemvalue="itsdangerous" />
<item index="16" class="java.lang.String" itemvalue="Flask-RESTful" />
<item index="17" class="java.lang.String" itemvalue="notebook" />
<item index="18" class="java.lang.String" itemvalue="Flask" />
<item index="19" class="java.lang.String" itemvalue="tqdm" />
<item index="20" class="java.lang.String" itemvalue="opencv-python" />
<item index="21" class="java.lang.String" itemvalue="sklearn2" />
<item index="22" class="java.lang.String" itemvalue="tensorflow-gpu" />
<item index="23" class="java.lang.String" itemvalue="bcolz" />
<item index="24" class="java.lang.String" itemvalue="scipy" />
<item index="25" class="java.lang.String" itemvalue="scikit_learn" />
<item index="26" class="java.lang.String" itemvalue="torch" />
<item index="27" class="java.lang.String" itemvalue="mxnet" />
<item index="28" class="java.lang.String" itemvalue="numpy" />
<item index="29" class="java.lang.String" itemvalue="torchvision" />
<item index="30" class="java.lang.String" itemvalue="easydict" />
<item index="31" class="java.lang.String" itemvalue="matplotlib" />
<item index="32" class="java.lang.String" itemvalue="tensorboardX" />
<item index="33" class="java.lang.String" itemvalue="mxnet_cu90" />
<item index="34" class="java.lang.String" itemvalue="opencv_python" />
<item index="35" class="java.lang.String" itemvalue="Pillow" />
<item index="36" class="java.lang.String" itemvalue="opencv-contrib-python" />
<item index="37" class="java.lang.String" itemvalue="onnxruntime" />
<item index="38" class="java.lang.String" itemvalue="onnx" />
<item index="39" class="java.lang.String" itemvalue="tensorflow" />
<item index="40" class="java.lang.String" itemvalue="seaborn" />
<item index="41" class="java.lang.String" itemvalue="keras" />
<item index="42" class="java.lang.String" itemvalue="mtcnn" />
<item index="43" class="java.lang.String" itemvalue="opencv-python-headless" />
</list>
</value>
</option>
</inspection_tool>
</profile>
</component>
\ No newline at end of file
<component name="InspectionProjectProfileManager">
<settings>
<option name="USE_PROJECT_PROFILE" value="false" />
<version value="1.0" />
</settings>
</component>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.9 (pytorch)" project-jdk-type="Python SDK" />
<component name="PyCharmProfessionalAdvertiser">
<option name="shown" value="true" />
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/pytorch_stacked_hourglass-master.iml" filepath="$PROJECT_DIR$/.idea/pytorch_stacked_hourglass-master.iml" />
</modules>
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$">
<sourceFolder url="file://$MODULE_DIR$" isTestSource="false" />
</content>
<orderEntry type="jdk" jdkName="Python 3.9 (pytorch)" jdkType="Python SDK" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
<component name="PyDocumentationSettings">
<option name="format" value="PLAIN" />
<option name="myDocStringFormat" value="Plain" />
</component>
<component name="PyNamespacePackagesService">
<option name="namespacePackageFolders">
<list>
<option value="$MODULE_DIR$/models" />
</list>
</option>
</component>
</module>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$" vcs="Git" />
</component>
</project>
\ No newline at end of file
import time
import torch
import numpy as np
import torchvision.transforms as transforms
from queue import Queue
from threading import Thread
from Detection.Models import Darknet
from Detection.Utils import non_max_suppression, ResizePadding
class TinyYOLOv3_onecls(object):
"""Load trained Tiny-YOLOv3 one class (person) detection model.
Args:
input_size: (int) Size of input image must be divisible by 32. Default: 416,
config_file: (str) Path to Yolo model structure config file.,
weight_file: (str) Path to trained weights file.,
nms: (float) Non-Maximum Suppression overlap threshold.,
conf_thres: (float) Minimum Confidence threshold of predicted bboxs to cut off.,
device: (str) Device to load the model on 'cpu' or 'cuda'.
"""
def __init__(self,
input_size=416,
config_file='models/yolo-tiny-onecls/yolov3-tiny-onecls.cfg',
weight_file='models/yolo-tiny-onecls/best-model.pth',
nms=0.2,
conf_thres=0.45,
device='cuda'):
self.input_size = input_size
self.model = Darknet(config_file).to(device)
self.model.load_state_dict(torch.load(weight_file,map_location=torch.device('cpu')))
self.model.eval()
self.device = device
self.nms = nms
self.conf_thres = conf_thres
self.resize_fn = ResizePadding(input_size, input_size)
self.transf_fn = transforms.ToTensor()
def detect(self, image, need_resize=True, expand_bb=5):
"""Feed forward to the model.
Args:
image: (numpy array) Single RGB image to detect.,
need_resize: (bool) Resize to input_size before feed and will return bboxs
with scale to image original size.,
expand_bb: (int) Expand boundary of the boxs.
Returns:
(torch.float32) Of each detected object contain a
[top, left, bottom, right, bbox_score, class_score, class]
return `None` if no detected.
"""
image_size = (self.input_size, self.input_size)
if need_resize:
image_size = image.shape[:2]
image = self.resize_fn(image)
image = self.transf_fn(image)[None, ...]
scf = torch.min(self.input_size / torch.FloatTensor([image_size]), 1)[0]
detected = self.model(image.to(self.device))
detected = non_max_suppression(detected, self.conf_thres, self.nms)[0]
if detected is not None:
detected[:, [0, 2]] -= (self.input_size - scf * image_size[1]) / 2
detected[:, [1, 3]] -= (self.input_size - scf * image_size[0]) / 2
detected[:, 0:4] /= scf
detected[:, 0:2] = np.maximum(0, detected[:, 0:2] - expand_bb)
detected[:, 2:4] = np.minimum(image_size[::-1], detected[:, 2:4] + expand_bb)
return detected
class ThreadDetection(object):
def __init__(self,
dataloader,
model,
queue_size=256):
self.model = model
self.dataloader = dataloader
self.stopped = False
self.Q = Queue(maxsize=queue_size)
def start(self):
t = Thread(target=self.update, args=(), daemon=True).start()
return self
def update(self):
while True:
if self.stopped:
return
images = self.dataloader.getitem()
outputs = self.model.detect(images)
if self.Q.full():
time.sleep(2)
self.Q.put((images, outputs))
def getitem(self):
return self.Q.get()
def stop(self):
self.stopped = True
def __len__(self):
return self.Q.qsize()
BSD 3-Clause License
Copyright (c) 2019, princeton-vl
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
* Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
* Neither the name of the copyright holder nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
File added
import cv2
import sys
import os
import torch
import numpy as np
import torch.utils.data
import utils.img
class GenerateHeatmap():
def __init__(self, output_res, num_parts):
self.output_res = output_res
self.num_parts = num_parts
sigma = self.output_res/64
self.sigma = sigma
size = 6*sigma + 3
x = np.arange(0, size, 1, float)
y = x[:, np.newaxis]
x0, y0 = 3*sigma + 1, 3*sigma + 1
self.g = np.exp(- ((x - x0) ** 2 + (y - y0) ** 2) / (2 * sigma ** 2))
def __call__(self, keypoints):
hms = np.zeros(shape = (self.num_parts, self.output_res, self.output_res), dtype = np.float32)
sigma = self.sigma
for p in keypoints:
for idx, pt in enumerate(p):
if pt[0] > 0:
x, y = int(pt[0]), int(pt[1])
if x<0 or y<0 or x>=self.output_res or y>=self.output_res:
continue
ul = int(x - 3*sigma - 1), int(y - 3*sigma - 1)
br = int(x + 3*sigma + 2), int(y + 3*sigma + 2)
c,d = max(0, -ul[0]), min(br[0], self.output_res) - ul[0]
a,b = max(0, -ul[1]), min(br[1], self.output_res) - ul[1]
cc,dd = max(0, ul[0]), min(br[0], self.output_res)
aa,bb = max(0, ul[1]), min(br[1], self.output_res)
hms[idx, aa:bb,cc:dd] = np.maximum(hms[idx, aa:bb,cc:dd], self.g[a:b,c:d])
return hms
class Dataset(torch.utils.data.Dataset):
def __init__(self, config, ds, index):
self.input_res = config['train']['input_res']
self.output_res = config['train']['output_res']
self.generateHeatmap = GenerateHeatmap(self.output_res, config['inference']['num_parts'])
self.ds = ds
self.index = index
def __len__(self):
return len(self.index)
def __getitem__(self, idx):
return self.loadImage(self.index[idx % len(self.index)])
def loadImage(self, idx):
ds = self.ds
## load + crop
orig_img = ds.get_img(idx)
path = ds.get_path(idx)
orig_keypoints = ds.get_kps(idx)
kptmp = orig_keypoints.copy()
c = ds.get_center(idx)
s = ds.get_scale(idx)
normalize = ds.get_normalized(idx)
cropped = utils.img.crop(orig_img, c, s, (self.input_res, self.input_res))
for i in range(np.shape(orig_keypoints)[1]):
if orig_keypoints[0,i,0] > 0:
orig_keypoints[0,i,:2] = utils.img.transform(orig_keypoints[0,i,:2], c, s, (self.input_res, self.input_res))
keypoints = np.copy(orig_keypoints)
## augmentation -- to be done to cropped image
height, width = cropped.shape[0:2]
center = np.array((width/2, height/2))
scale = max(height, width)/200
aug_rot=0
aug_rot = (np.random.random() * 2 - 1) * 30.
aug_scale = np.random.random() * (1.25 - 0.75) + 0.75
scale *= aug_scale
mat_mask = utils.img.get_transform(center, scale, (self.output_res, self.output_res), aug_rot)[:2]
mat = utils.img.get_transform(center, scale, (self.input_res, self.input_res), aug_rot)[:2]
inp = cv2.warpAffine(cropped, mat, (self.input_res, self.input_res)).astype(np.float32)/255
keypoints[:,:,0:2] = utils.img.kpt_affine(keypoints[:,:,0:2], mat_mask)
if np.random.randint(2) == 0:
inp = self.preprocess(inp)
inp = inp[:, ::-1]
keypoints = keypoints[:, ds.flipped_parts['mpii']]
keypoints[:, :, 0] = self.output_res - keypoints[:, :, 0]
orig_keypoints = orig_keypoints[:, ds.flipped_parts['mpii']]
orig_keypoints[:, :, 0] = self.input_res - orig_keypoints[:, :, 0]
## set keypoints to 0 when were not visible initially (so heatmap all 0s)
for i in range(np.shape(orig_keypoints)[1]):
if kptmp[0,i,0] == 0 and kptmp[0,i,1] == 0:
keypoints[0,i,0] = 0
keypoints[0,i,1] = 0
orig_keypoints[0,i,0] = 0
orig_keypoints[0,i,1] = 0
## generate heatmaps on outres
heatmaps = self.generateHeatmap(keypoints)
return inp.astype(np.float32), heatmaps.astype(np.float32)
def preprocess(self, data):
# random hue and saturation
data = cv2.cvtColor(data, cv2.COLOR_RGB2HSV);
delta = (np.random.random() * 2 - 1) * 0.2
data[:, :, 0] = np.mod(data[:,:,0] + (delta * 360 + 360.), 360.)
delta_sature = np.random.random() + 0.5
data[:, :, 1] *= delta_sature
data[:,:, 1] = np.maximum( np.minimum(data[:,:,1], 1), 0 )
data = cv2.cvtColor(data, cv2.COLOR_HSV2RGB)
# adjust brightness
delta = (np.random.random() * 2 - 1) * 0.3
data += delta
# adjust contrast
mean = data.mean(axis=2, keepdims=True)
data = (data - mean) * (np.random.random() + 0.5) + mean
data = np.minimum(np.maximum(data, 0), 1)
return data
def init(config):
batchsize = config['train']['batchsize']
current_path = os.path.dirname(os.path.abspath(__file__))
sys.path.append(current_path)
import ref as ds
ds.init()
train, valid = ds.setup_val_split()
dataset = { key: Dataset(config, ds, data) for key, data in zip( ['train', 'valid'], [train, valid] ) }
use_data_loader = config['train']['use_data_loader']
loaders = {}
for key in dataset:
loaders[key] = torch.utils.data.DataLoader(dataset[key], batch_size=batchsize, shuffle=True, num_workers=config['train']['num_workers'], pin_memory=False)
def gen(phase):
batchsize = config['train']['batchsize']
batchnum = config['train']['{}_iters'.format(phase)]
loader = loaders[phase].__iter__()
for i in range(batchnum):
try:
imgs, heatmaps = next(loader)
except StopIteration:
# to avoid no data provided by dataloader
loader = loaders[phase].__iter__()
imgs, heatmaps = next(loader)
yield {
'imgs': imgs, #cropped and augmented
'heatmaps': heatmaps, #based on keypoints. 0 if not in img for joint
}
return lambda key: gen(key)
import numpy as np
import h5py
from imageio import imread
import os
import time
def _isArrayLike(obj):
return hasattr(obj, '__iter__') and hasattr(obj, '__len__')
annot_dir = 'data/MPII/annot'
img_dir = 'data/MPII/images'
assert os.path.exists(img_dir)
mpii, num_examples_train, num_examples_val = None, None, None
import cv2
class MPII:
def __init__(self):
print('loading data...')
tic = time.time()
train_f = h5py.File(os.path.join(annot_dir, 'train.h5'), 'r')
val_f = h5py.File(os.path.join(annot_dir, 'valid.h5'), 'r')
self.t_center = train_f['center'][()]
t_scale = train_f['scale'][()]
t_part = train_f['part'][()]
t_visible = train_f['visible'][()]
t_normalize = train_f['normalize'][()]
t_imgname = [None] * len(self.t_center)
for i in range(len(self.t_center)):
t_imgname[i] = train_f['imgname'][i].decode('UTF-8')
self.v_center = val_f['center'][()]
v_scale = val_f['scale'][()]
v_part = val_f['part'][()]
v_visible = val_f['visible'][()]
v_normalize = val_f['normalize'][()]
v_imgname = [None] * len(self.v_center)
for i in range(len(self.v_center)):
v_imgname[i] = val_f['imgname'][i].decode('UTF-8')
self.center = np.append(self.t_center, self.v_center, axis=0)
self.scale = np.append(t_scale, v_scale)
self.part = np.append(t_part, v_part, axis=0)
self.visible = np.append(t_visible, v_visible, axis=0)
self.normalize = np.append(t_normalize, v_normalize)
self.imgname = t_imgname + v_imgname
print('Done (t={:0.2f}s)'.format(time.time()- tic))
def getAnnots(self, idx):
'''
returns h5 file for train or val set
'''
return self.imgname[idx], self.part[idx], self.visible[idx], self.center[idx], self.scale[idx], self.normalize[idx]
def getLength(self):
return len(self.t_center), len(self.v_center)
def init():
global mpii, num_examples_train, num_examples_val
mpii = MPII()
num_examples_train, num_examples_val = mpii.getLength()
# Part reference
parts = {'mpii':['rank', 'rkne', 'rhip',
'lhip', 'lkne', 'lank',
'pelv', 'thrx', 'neck', 'head',
'rwri', 'relb', 'rsho',
'lsho', 'lelb', 'lwri']}
flipped_parts = {'mpii':[5, 4, 3, 2, 1, 0, 6, 7, 8, 9, 15, 14, 13, 12, 11, 10]}
part_pairs = {'mpii':[[0, 5], [1, 4], [2, 3], [6], [7], [8], [9], [10, 15], [11, 14], [12, 13]]}
pair_names = {'mpii':['ankle', 'knee', 'hip', 'pelvis', 'thorax', 'neck', 'head', 'wrist', 'elbow', 'shoulder']}
def setup_val_split():
'''
returns index for train and validation imgs
index for validation images starts after that of train images
so that loadImage can tell them apart
'''
valid = [i+num_examples_train for i in range(num_examples_val)]
train = [i for i in range(num_examples_train)]
return np.array(train), np.array(valid)
def get_img(idx):
imgname, __, __, __, __, __ = mpii.getAnnots(idx)
path = os.path.join(img_dir, imgname)
img = imread(path)
return img
def get_path(idx):
imgname, __, __, __, __, __ = mpii.getAnnots(idx)
path = os.path.join(img_dir, imgname)
return path
def get_kps(idx):
__, part, visible, __, __, __ = mpii.getAnnots(idx)
kp2 = np.insert(part, 2, visible, axis=1)
kps = np.zeros((1, 16, 3))
kps[0] = kp2
return kps
def get_normalized(idx):
__, __, __, __, __, n = mpii.getAnnots(idx)
return n
def get_center(idx):
__, __, __, c, __, __ = mpii.getAnnots(idx)
return c
def get_scale(idx):
__, __, __, __, s, __ = mpii.getAnnots(idx)
return s
\ No newline at end of file
import cv2
import utils.img
import torch
import numpy as np
from utils.group import HeatmapParser
import data.MPII.ref as ds
from utils.posture import *
# 歪头的斜率阈值
CROOKED_HEAD_THRE=8
# 斜肩的斜率阈值
OBLIQUE_SHOULDER_THRE=2
# 人体骨骼各坐标点的下标对应图
class PersonPosture():
def __init__(self, pred,orig_img):
self.pred = pred[0]['keypoints']
self.orin_img = orig_img
def analysis(self, visual_angle=True):
# 肩斜率
Shoulder_slope = segment_slope(self.pred[12], self.pred[13])
head_slope = segment_slope(self.pred[8], self.pred[9])
# 左腿的大腿根和膝盖斜率
left_thigh_knee = segment_slope(self.pred[1], self.pred[2])
# 左腿脚跟和膝盖斜率
left_calf_knee = segment_slope(self.pred[1], self.pred[0])
# 左腿大腿根和脚跟的斜率
left_lag_slope = segment_slope(self.pred[2], self.pred[0])
# 右腿的大腿根和膝盖斜率
right_lag_slope = segment_slope(self.pred[3], self.pred[5])
# 右腿脚跟和膝盖斜率
right_thigh_knee = segment_slope(self.pred[3], self.pred[4])
# 右腿大腿根和脚跟的斜率
right_calf_knee = segment_slope(self.pred[4], self.pred[5])
# 正面照
if visual_angle:
# 两个膝盖的间距
knee_distance = absolute_distance(self.pred[4],self.pred[1])
# 两个脚的间距
feet_distance = absolute_distance(self.pred[0],self.pred[5])
# 检查斜肩
if Shoulder_slope!=None and abs(Shoulder_slope) > OBLIQUE_SHOULDER_THRE:
cv2.line(self.orin_img,tuple(map(int,self.pred[12][:2])),tuple(map(int,self.pred[13][:2])),(0,0,255),thickness=3)
print("斜肩")
# 检查O型腿
if feet_distance != None and knee_distance != None:
if feet_distance / knee_distance < 0.8:
cv2.line(self.orin_img, tuple(map(int,self.pred[2][:2])), tuple(map(int,self.pred[1][:2])), (0, 0, 255), thickness=3)
cv2.line(self.orin_img, tuple(map(int,self.pred[1][:2])), tuple(map(int,self.pred[0][:2])), (0, 0, 255), thickness=3)
cv2.line(self.orin_img, tuple(map(int,self.pred[3][:2])), tuple(map(int,self.pred[4][:2])), (0, 0, 255), thickness=3)
cv2.line(self.orin_img, tuple(map(int,self.pred[4][:2])), tuple(map(int,self.pred[5][:2])), (0, 0, 255), thickness=3)
print("膝盖间距较大,O形腿")
# 检查 X型腿
# 如果两个膝盖和两个脚四个点都检测到了
if feet_distance!=None and knee_distance!=None:
# 如果脚之间的距离比膝盖的距离大于1.3,(假设用户是自然笔直站立,而不是双脚叉开站立),则判定为X型腿
if feet_distance/knee_distance>1.3:
cv2.line(self.orin_img, tuple(map(int,self.pred[2][:2])), tuple(map(int,self.pred[1][:2])), (0, 0, 255), thickness=3)
cv2.line(self.orin_img, tuple(map(int,self.pred[1][:2])), tuple(map(int,self.pred[0][:2])), (0, 0, 255), thickness=3)
cv2.line(self.orin_img, tuple(map(int,self.pred[3][:2])), tuple(map(int,self.pred[4][:2])), (0, 0, 255), thickness=3)
cv2.line(self.orin_img, tuple(map(int,self.pred[4][:2])), tuple(map(int,self.pred[5][:2])), (0, 0, 255), thickness=3)
print("双脚间距较大,X形腿")
# 检查歪头
if head_slope!=None and abs(head_slope)<CROOKED_HEAD_THRE:
cv2.line(self.orin_img, self.pred[8][:2], self.pred[9][:2], (0, 0, 255), thickness=3)
print("歪头")
else:
print("头部正常,无明显歪头现象")
# 侧面照
else:
# 检查头部前倾
if head_slope != np.Inf and 0 < head_slope < CROOKED_HEAD_THRE:
cv2.line(self.orin_img, tuple(map(int,self.pred[8][:2])), tuple(map(int,self.pred[9][:2])), (0, 0, 255), thickness=3)
print("头部前倾")
else:
print("头部正常,无明显倾斜")
# 检查膝盖超伸
if ((left_lag_slope!=np.Inf or left_calf_knee!=np.Inf or left_thigh_knee!=np.Inf)
or (right_lag_slope!=np.Inf or right_calf_knee!=np.Inf or right_thigh_knee!=np.Inf)):
cv2.line(self.orin_img, tuple(map(int,self.pred[2][:2])), tuple(map(int,self.pred[1][:2])), (0, 0, 255), thickness=3)
cv2.line(self.orin_img, tuple(map(int,self.pred[1][:2])), tuple(map(int,self.pred[0][:2])), (0, 0, 255), thickness=3)
cv2.line(self.orin_img, tuple(map(int,self.pred[3][:2])), tuple(map(int,self.pred[4][:2])), (0, 0, 255), thickness=3)
cv2.line(self.orin_img, tuple(map(int,self.pred[4][:2])), tuple(map(int,self.pred[5][:2])), (0, 0, 255), thickness=3)
print("膝盖超伸")
else:
print("膝盖正常,无明显超伸")
return self.orin_img
parser = HeatmapParser()
def post_process(det, mat_, trainval, c=None, s=None, resolution=None):
mat = np.linalg.pinv(np.array(mat_).tolist() + [[0, 0, 1]])[:2]
res = det.shape[1:3]
cropped_preds = parser.parse(np.float32([det]))[0]
if len(cropped_preds) > 0:
cropped_preds[:, :, :2] = utils.img.kpt_affine(cropped_preds[:, :, :2] * 4, mat) # size 1x16x3
preds = np.copy(cropped_preds)
##for inverting predictions from input res on cropped to original image
if trainval != 'cropped':
for j in range(preds.shape[1]):
preds[0, j, :2] = utils.img.transform(preds[0, j, :2], c, s, resolution, invert=1)
return preds
def inference(img, func, config, c, s):
"""
forward pass at test time
calls post_process to post process results
"""
# 256,256
# 128.0,128.0
# 1.28
# (256,256)
height, width = img.shape[0:2]
center = (width / 2, height / 2)
scale = max(height, width) / 200
res = (config['train']['input_res'], config['train']['input_res'])
# [[1,0,0],[0,1,0]]
mat_ = utils.img.get_transform(center, scale, res)[:2]
inp = img / 255
def array2dict(tmp):
return {
'det': tmp[0][:, :, :16],
}
tmp1 = array2dict(func([inp]))
tmp2 = array2dict(func([inp[:, ::-1]]))
tmp = {}
for ii in tmp1:
tmp[ii] = np.concatenate((tmp1[ii], tmp2[ii]), axis=0)
det = tmp['det'][0, -1] + tmp['det'][1, -1, :, :, ::-1][ds.flipped_parts['mpii']]
if det is None:
return [], []
det = det / 2
det = np.minimum(det, 1)
return post_process(det, mat_, 'valid', c, s, res)
def main():
from train import init
func, config = init()
def runner(imgs):
return func(0, config, 'inference', imgs=torch.Tensor(np.float32(imgs)))['preds']
def do(img, c, s):
import time
ans = inference(img, runner, config, c, s)
if len(ans) > 0:
ans = ans[:,:,:3]
## ans has shape N,16,3 (num preds, joints, x/y/visible)
pred = []
for i in range(ans.shape[0]):
pred.append({'keypoints': ans[i,:,:]})
return pred
if __name__ == '__main__':
image_path = "data/custom/膝盖超伸2.png"
from train import init
func, config = init()
def runner(imgs):
return func(0, config, 'inference', imgs=torch.Tensor(np.float32(imgs)))['preds']
def do(img, c, s):
ans = inference(img, runner, config, c, s)
if len(ans) > 0:
ans = ans[:,:,:3]
## ans has shape N,16,3 (num preds, joints, x/y/visible)
pred = []
for i in range(ans.shape[0]):
pred.append({'keypoints': ans[i,:,:]})
return pred
input_res = 256
orig_img = cv2.imread(image_path)
orig_img_reverse = cv2.imread(image_path)[:,:,::-1]
shape = orig_img_reverse.shape[0:2]
# 这里需要使用yolo检测图片中的人体矩形区域,返回矩形的中心和矩形较长边的边长/200
# 这里要起一个yolo5的服务。因为yolo5的模型的相对路径已经定死了,无法直接引入到项目中。除非通过命令行的方式
import requests
DETECTION_URL = "http://localhost:5000/v1/object-detection/yolov5s"
image_data = open(image_path, "rb").read()
response = requests.post(DETECTION_URL, files={"image": image_data}).json()
for box in response:
if box['class']!=0:
continue
else:
c = [(box['xmax']+box['xmin'])/2,(box['ymax']+box['ymin'])/2]
s = max((box['xmax']-box['xmin'])/200,(box['ymax']-box['ymin'])/200)+1
# # 矩形中心坐标点
# # 缩放比例 实验结果:720*1280的图片,人物占据大部分的情况下,7比较好。
# s = 7
im = utils.img.crop(orig_img_reverse,c,s,(input_res,input_res))
pred = do(im,c,s)
points = pred[0]['keypoints']
for index,point in enumerate(points):
print(point[2])
if point[2]>0.3:
cv2.circle(orig_img, (int(point[0]), int(point[1])), 5, color=(0, 0, 255), thickness=-1)
cv2.putText(orig_img,str(index),(int(point[0]), int(point[1])),cv2.FONT_ITALIC, 1, (0,255,0), 1)
person = PersonPosture(pred,orig_img)
orig_img = person.analysis(False)
cv2.imshow("test", orig_img)
cv2.waitKey()
from torch import nn
Pool = nn.MaxPool2d
def batchnorm(x):
return nn.BatchNorm2d(x.size()[1])(x)
class Conv(nn.Module):
def __init__(self, inp_dim, out_dim, kernel_size=3, stride = 1, bn = False, relu = True):
super(Conv, self).__init__()
self.inp_dim = inp_dim
self.conv = nn.Conv2d(inp_dim, out_dim, kernel_size, stride, padding=(kernel_size-1)//2, bias=True)
self.relu = None
self.bn = None
if relu:
self.relu = nn.ReLU()
if bn:
self.bn = nn.BatchNorm2d(out_dim)
def forward(self, x):
assert x.size()[1] == self.inp_dim, "{} {}".format(x.size()[1], self.inp_dim)
x = self.conv(x)
if self.bn is not None:
x = self.bn(x)
if self.relu is not None:
x = self.relu(x)
return x
class Residual(nn.Module):
def __init__(self, inp_dim, out_dim):
super(Residual, self).__init__()
self.relu = nn.ReLU()
self.bn1 = nn.BatchNorm2d(inp_dim)
self.conv1 = Conv(inp_dim, int(out_dim/2), 1, relu=False)
self.bn2 = nn.BatchNorm2d(int(out_dim/2))
self.conv2 = Conv(int(out_dim/2), int(out_dim/2), 3, relu=False)
self.bn3 = nn.BatchNorm2d(int(out_dim/2))
self.conv3 = Conv(int(out_dim/2), out_dim, 1, relu=False)
self.skip_layer = Conv(inp_dim, out_dim, 1, relu=False)
if inp_dim == out_dim:
self.need_skip = False
else:
self.need_skip = True
def forward(self, x):
if self.need_skip:
residual = self.skip_layer(x)
else:
residual = x
out = self.bn1(x)
out = self.relu(out)
out = self.conv1(out)
out = self.bn2(out)
out = self.relu(out)
out = self.conv2(out)
out = self.bn3(out)
out = self.relu(out)
out = self.conv3(out)
out += residual
return out
class Hourglass(nn.Module):
def __init__(self, n, f, bn=None, increase=0):
super(Hourglass, self).__init__()
nf = f + increase
self.up1 = Residual(f, f)
# Lower branch
self.pool1 = Pool(2, 2)
self.low1 = Residual(f, nf)
self.n = n
# Recursive hourglass
if self.n > 1:
self.low2 = Hourglass(n-1, nf, bn=bn)
else:
self.low2 = Residual(nf, nf)
self.low3 = Residual(nf, f)
self.up2 = nn.Upsample(scale_factor=2, mode='nearest')
def forward(self, x):
up1 = self.up1(x)
pool1 = self.pool1(x)
low1 = self.low1(pool1)
low2 = self.low2(low1)
low3 = self.low3(low2)
up2 = self.up2(low3)
return up1 + up2
import torch
from torch import nn
from models.layers import Conv, Hourglass, Pool, Residual
from task.loss import HeatmapLoss
class UnFlatten(nn.Module):
def forward(self, input):
return input.view(-1, 256, 4, 4)
class Merge(nn.Module):
def __init__(self, x_dim, y_dim):
super(Merge, self).__init__()
self.conv = Conv(x_dim, y_dim, 1, relu=False, bn=False)
def forward(self, x):
return self.conv(x)
class PoseNet(nn.Module):
def __init__(self, nstack, inp_dim, oup_dim, bn=False, increase=0, **kwargs):
super(PoseNet, self).__init__()
self.nstack = nstack
self.pre = nn.Sequential(
Conv(3, 64, 7, 2, bn=True, relu=True),
Residual(64, 128),
Pool(2, 2),
Residual(128, 128),
Residual(128, inp_dim)
)
self.hgs = nn.ModuleList( [
nn.Sequential(
Hourglass(4, inp_dim, bn, increase),
) for i in range(nstack)] )
self.features = nn.ModuleList( [
nn.Sequential(
Residual(inp_dim, inp_dim),
Conv(inp_dim, inp_dim, 1, bn=True, relu=True)
) for i in range(nstack)] )
self.outs = nn.ModuleList( [Conv(inp_dim, oup_dim, 1, relu=False, bn=False) for i in range(nstack)] )
self.merge_features = nn.ModuleList( [Merge(inp_dim, inp_dim) for i in range(nstack-1)] )
self.merge_preds = nn.ModuleList( [Merge(oup_dim, inp_dim) for i in range(nstack-1)] )
self.nstack = nstack
self.heatmapLoss = HeatmapLoss()
def forward(self, imgs):
## our posenet
x = imgs.permute(0, 3, 1, 2) #x of size 1,3,inpdim,inpdim
x = self.pre(x)
combined_hm_preds = []
for i in range(self.nstack):
hg = self.hgs[i](x)
feature = self.features[i](hg)
preds = self.outs[i](feature)
combined_hm_preds.append(preds)
if i < self.nstack - 1:
x = x + self.merge_preds[i](preds) + self.merge_features[i](feature)
return torch.stack(combined_hm_preds, 1)
def calc_loss(self, combined_hm_preds, heatmaps):
combined_loss = []
for i in range(self.nstack):
combined_loss.append(self.heatmapLoss(combined_hm_preds[0][:,i], heatmaps))
combined_loss = torch.stack(combined_loss, dim=1)
return combined_loss
[net]
# Testing
batch=1
subdivisions=1
# Training
# batch=64
# subdivisions=2
width=416
height=416
channels=3
momentum=0.9
decay=0.0005
angle=0
saturation = 1.5
exposure = 1.5
hue=.1
learning_rate=0.001
burn_in=1000
max_batches = 500200
policy=steps
steps=400000,450000
scales=.1,.1
# 0
[convolutional]
batch_normalize=1
filters=16
size=3
stride=1
pad=1
activation=leaky
# 1
[maxpool]
size=2
stride=2
# 2
[convolutional]
batch_normalize=1
filters=32
size=3
stride=1
pad=1
activation=leaky
# 3
[maxpool]
size=2
stride=2
# 4
[convolutional]
batch_normalize=1
filters=64
size=3
stride=1
pad=1
activation=leaky
# 5
[maxpool]
size=2
stride=2
# 6
[convolutional]
batch_normalize=1
filters=128
size=3
stride=1
pad=1
activation=leaky
# 7
[maxpool]
size=2
stride=2
# 8
[convolutional]
batch_normalize=1
filters=256
size=3
stride=1
pad=1
activation=leaky
# 9
[maxpool]
size=2
stride=2
# 10
[convolutional]
batch_normalize=1
filters=512
size=3
stride=1
pad=1
activation=leaky
# 11
[maxpool]
size=2
stride=1
# 12
[convolutional]
batch_normalize=1
filters=1024
size=3
stride=1
pad=1
activation=leaky
###########
# 13
[convolutional]
batch_normalize=1
filters=256
size=1
stride=1
pad=1
activation=leaky
# 14
[convolutional]
batch_normalize=1
filters=512
size=3
stride=1
pad=1
activation=leaky
# 15
[convolutional]
size=1
stride=1
pad=1
filters=18
activation=linear
# 16
[yolo]
mask = 3,4,5
anchors = 10,14, 23,27, 37,58, 81,82, 135,169, 344,319
classes=1
num=6
jitter=.3
ignore_thresh = .7
truth_thresh = 1
random=1
# 17
[route]
layers = -4
# 18
[convolutional]
batch_normalize=1
filters=128
size=1
stride=1
pad=1
activation=leaky
# 19
[upsample]
stride=2
# 20
[route]
layers = -1, 8
# 21
[convolutional]
batch_normalize=1
filters=256
size=3
stride=1
pad=1
activation=leaky
# 22
[convolutional]
size=1
stride=1
pad=1
filters=18
activation=linear
# 23
[yolo]
mask = 1,2,3
anchors = 10,14, 23,27, 37,58, 81,82, 135,169, 344,319
classes=1
num=6
jitter=.3
ignore_thresh = .7
truth_thresh = 1
random=1
# 主流程
python handler.py -c pose
1. 使用yolo5 模型,检测图片中的人的区域框,根据框的坐标点,计算人的中心坐标和缩放系数 c和s
2. 根据c和s对原图进行裁剪为256*256大小
3. 加载statked_hourglasses模型(checkpoint.pt模型),将256*256的图像送入模型进行预测。
4. 模型的输出shape为 N*16*3,N为框的数量, 16 为人体的16个关键点, 3为 x,y,visible 分别表示该点的坐标和可见度
5. 根据16个坐标点和自定义的检测规则,判断体态
# 备注
yolo5模型目前是调用yolo5项目的restapi获取的,自带的models里面的yolo没有用
import cv2
x = [(620,393),
(620,269),
(579,181),
(644,193),
(661,222),
(673,281),
(614,187),
(650,163),
(632,181),
(709,116),
(614,210),
(555,157),
(608,145),
(697,175),
(691,240),
(685,305)]
img = cv2.imread("data/MPII/images/015601864.jpg")
for point in x:
cv2.circle(img,point,10,color = (0,0,255),thickness=0)
cv2.imshow("ts",img)
cv2.waitKey()
import torch
class HeatmapLoss(torch.nn.Module):
"""
loss for detection heatmap
"""
def __init__(self):
super(HeatmapLoss, self).__init__()
def forward(self, pred, gt):
l = ((pred - gt)**2)
l = l.mean(dim=3).mean(dim=2).mean(dim=1)
return l ## l of dim bsize
\ No newline at end of file
"""
__config__ contains the options for training and testing
Basically all of the variables related to training are put in __config__['train']
"""
import torch
import numpy as np
from torch import nn
import os
from torch.nn import DataParallel
from utils.misc import make_input, make_output, importNet
__config__ = {
'data_provider': 'data.MPII.dp',
'network': 'models.posenet.PoseNet',
'inference': {
'nstack': 8,
'inp_dim': 256,
'oup_dim': 16,
'num_parts': 16,
'increase': 0,
'keys': ['imgs'],
'num_eval': 2958, ## number of val examples used. entire set is 2958
'train_num_eval': 300, ## number of train examples tested at test time
},
'train': {
'batchsize': 16,
'input_res': 256,
'output_res': 64,
'train_iters': 1000,
'valid_iters': 10,
'learning_rate': 1e-3,
'max_num_people' : 1,
'loss': [
['combined_hm_loss', 1],
],
'decay_iters': 100000,
'decay_lr': 2e-4,
'num_workers': 2,
'use_data_loader': True,
},
}
class Trainer(nn.Module):
"""
The wrapper module that will behave differetly for training or testing
inference_keys specify the inputs for inference
"""
def __init__(self, model, inference_keys, calc_loss=None):
super(Trainer, self).__init__()
self.model = model
self.keys = inference_keys
self.calc_loss = calc_loss
def forward(self, imgs, **inputs):
inps = {}
labels = {}
for i in inputs:
if i in self.keys:
inps[i] = inputs[i]
else:
labels[i] = inputs[i]
if not self.training:
return self.model(imgs, **inps)
else:
combined_hm_preds = self.model(imgs, **inps)
if type(combined_hm_preds)!=list and type(combined_hm_preds)!=tuple:
combined_hm_preds = [combined_hm_preds]
loss = self.calc_loss(**labels, combined_hm_preds=combined_hm_preds)
return list(combined_hm_preds) + list([loss])
def make_network(configs):
train_cfg = configs['train']
config = configs['inference']
def calc_loss(*args, **kwargs):
return poseNet.calc_loss(*args, **kwargs)
## creating new posenet
print(configs['network'])
configs['network'] = "models.posenet.PoseNet"
PoseNet = importNet(configs['network'])
poseNet = PoseNet(**config)
# forward_net = DataParallel(poseNet.cuda())
forward_net = DataParallel(poseNet)
config['net'] = Trainer(forward_net, configs['inference']['keys'], calc_loss)
## optimizer, experiment setup
train_cfg['optimizer'] = torch.optim.Adam(filter(lambda p: p.requires_grad,config['net'].parameters()), train_cfg['learning_rate'])
exp_path = os.path.join('exp', configs['opt'].exp)
if configs['opt'].exp=='pose' and configs['opt'].continue_exp is not None:
exp_path = os.path.join('exp', configs['opt'].continue_exp)
if not os.path.exists(exp_path):
os.mkdir(exp_path)
logger = open(os.path.join(exp_path, 'log'), 'a+')
def make_train(batch_id, config, phase, **inputs):
for i in inputs:
try:
inputs[i] = make_input(inputs[i])
except:
pass #for last input, which is a string (id_)
net = config['inference']['net']
config['batch_id'] = batch_id
net = net.train()
if phase != 'inference':
result = net(inputs['imgs'], **{i:inputs[i] for i in inputs if i!='imgs'})
num_loss = len(config['train']['loss'])
losses = {i[0]: result[-num_loss + idx]*i[1] for idx, i in enumerate(config['train']['loss'])}
loss = 0
toprint = '\n{}: '.format(batch_id)
for i in losses:
loss = loss + torch.mean(losses[i])
my_loss = make_output( losses[i] )
my_loss = my_loss.mean()
if my_loss.size == 1:
toprint += ' {}: {}'.format(i, format(my_loss.mean(), '.8f'))
else:
toprint += '\n{}'.format(i)
for j in my_loss:
toprint += ' {}'.format(format(j.mean(), '.8f'))
logger.write(toprint)
logger.flush()
if phase == 'train':
optimizer = train_cfg['optimizer']
optimizer.zero_grad()
loss.backward()
optimizer.step()
if batch_id == config['train']['decay_iters']:
## decrease the learning rate after decay # iterations
for param_group in optimizer.param_groups:
param_group['lr'] = config['train']['decay_lr']
return None
else:
out = {}
net = net.eval()
result = net(**inputs)
if type(result)!=list and type(result)!=tuple:
result = [result]
out['preds'] = [make_output(i) for i in result]
return out
return make_train
import cv2
import torch
import tqdm
import os
import numpy as np
import h5py
import copy
from utils.group import HeatmapParser
import utils.img
import data.MPII.ref as ds
parser = HeatmapParser()
def post_process(det, mat_, trainval, c=None, s=None, resolution=None):
mat = np.linalg.pinv(np.array(mat_).tolist() + [[0,0,1]])[:2]
res = det.shape[1:3]
cropped_preds = parser.parse(np.float32([det]))[0]
if len(cropped_preds) > 0:
cropped_preds[:,:,:2] = utils.img.kpt_affine(cropped_preds[:,:,:2] * 4, mat) #size 1x16x3
preds = np.copy(cropped_preds)
##for inverting predictions from input res on cropped to original image
if trainval != 'cropped':
for j in range(preds.shape[1]):
preds[0,j,:2] = utils.img.transform(preds[0,j,:2], c, s, resolution, invert=1)
return preds
def inference(img, func, config, c, s):
"""
forward pass at test time
calls post_process to post process results
"""
height, width = img.shape[0:2]
center = (width/2, height/2)
scale = max(height, width)/200
res = (config['train']['input_res'], config['train']['input_res'])
mat_ = utils.img.get_transform(center, scale, res)[:2]
inp = img/255
def array2dict(tmp):
return {
'det': tmp[0][:,:,:16],
}
tmp1 = array2dict(func([inp]))
tmp2 = array2dict(func([inp[:,::-1]]))
tmp = {}
for ii in tmp1:
tmp[ii] = np.concatenate((tmp1[ii], tmp2[ii]),axis=0)
det = tmp['det'][0, -1] + tmp['det'][1, -1, :, :, ::-1][ds.flipped_parts['mpii']]
if det is None:
return [], []
det = det/2
det = np.minimum(det, 1)
return post_process(det, mat_, 'valid', c, s, res)
def mpii_eval(pred, gt, normalizing, num_train, bound=0.5):
"""
Use PCK with threshold of .5 of normalized distance (presumably head size)
"""
correct = {'all': {'total': 0, 'ankle': 0, 'knee': 0, 'hip': 0, 'pelvis': 0,
'thorax': 0, 'neck': 0, 'head': 0, 'wrist': 0, 'elbow': 0,
'shoulder': 0},
'visible': {'total': 0, 'ankle': 0, 'knee': 0, 'hip': 0, 'pelvis': 0,
'thorax': 0, 'neck': 0, 'head': 0, 'wrist': 0, 'elbow': 0,
'shoulder': 0},
'not visible': {'total': 0, 'ankle': 0, 'knee': 0, 'hip': 0, 'pelvis': 0,
'thorax': 0, 'neck': 0, 'head': 0, 'wrist': 0, 'elbow': 0,
'shoulder': 0}}
count = copy.deepcopy(correct)
correct_train = copy.deepcopy(correct)
count_train = copy.deepcopy(correct)
idx = 0
for p, g, normalize in zip(pred, gt, normalizing):
for j in range(g.shape[1]):
vis = 'visible'
if g[0,j,0] == 0: ## not in picture!
continue
if g[0,j,2] == 0:
vis = 'not visible'
joint = 'ankle'
if j==1 or j==4:
joint = 'knee'
elif j==2 or j==3:
joint = 'hip'
elif j==6:
joint = 'pelvis'
elif j==7:
joint = 'thorax'
elif j==8:
joint = 'neck'
elif j==9:
joint = 'head'
elif j==10 or j==15:
joint = 'wrist'
elif j==11 or j==14:
joint = 'elbow'
elif j==12 or j==13:
joint = 'shoulder'
if idx >= num_train:
count['all']['total'] += 1
count['all'][joint] += 1
count[vis]['total'] += 1
count[vis][joint] += 1
else:
count_train['all']['total'] += 1
count_train['all'][joint] += 1
count_train[vis]['total'] += 1
count_train[vis][joint] += 1
error = np.linalg.norm(p[0]['keypoints'][j,:2]-g[0,j,:2]) / normalize
if idx >= num_train:
if bound > error:
correct['all']['total'] += 1
correct['all'][joint] += 1
correct[vis]['total'] += 1
correct[vis][joint] += 1
else:
if bound > error:
correct_train['all']['total'] += 1
correct_train['all'][joint] += 1
correct_train[vis]['total'] += 1
correct_train[vis][joint] += 1
idx += 1
## breakdown by validation set / training set
for k in correct:
print(k, ':')
for key in correct[k]:
print('Val PCK @,', bound, ',', key, ':', round(correct[k][key] / max(count[k][key],1), 3), ', count:', count[k][key])
print('Tra PCK @,', bound, ',', key, ':', round(correct_train[k][key] / max(count_train[k][key],1), 3), ', count:', count_train[k][key])
print('\n')
def get_img(config, num_eval=2958, num_train=300):
'''
Load validation and training images
'''
input_res = config['train']['input_res']
output_res = config['train']['output_res']
val_f = h5py.File(os.path.join(ds.annot_dir, 'valid.h5'), 'r')
tr = tqdm.tqdm( range(0, num_train), total = num_train )
## training
train_f = h5py.File(os.path.join(ds.annot_dir, 'train.h5') ,'r')
for i in tr:
path_t = '%s/%s' % (ds.img_dir, train_f['imgname'][i].decode('UTF-8'))
## img
orig_img = cv2.imread(path_t)[:,:,::-1]
c = train_f['center'][i]
s = train_f['scale'][i]
im = utils.img.crop(orig_img, c, s, (input_res, input_res))
## kp
kp = train_f['part'][i]
vis = train_f['visible'][i]
kp2 = np.insert(kp, 2, vis, axis=1)
kps = np.zeros((1, 16, 3))
kps[0] = kp2
## normalize (to make errors more fair on high pixel imgs)
n = train_f['normalize'][i]
yield kps, im, c, s, n
tr2 = tqdm.tqdm( range(0, num_eval), total = num_eval )
## validation
for i in tr2:
path_t = '%s/%s' % (ds.img_dir, val_f['imgname'][i].decode('UTF-8'))
## img
orig_img = cv2.imread(path_t)[:,:,::-1]
c = val_f['center'][i]
s = val_f['scale'][i]
im = utils.img.crop(orig_img, c, s, (input_res, input_res))
## kp
kp = val_f['part'][i]
vis = val_f['visible'][i]
kp2 = np.insert(kp, 2, vis, axis=1)
kps = np.zeros((1, 16, 3))
kps[0] = kp2
## normalize (to make errors more fair on high pixel imgs)
n = val_f['normalize'][i]
yield kps, im, c, s, n
def main():
from train import init
func, config = init()
def runner(imgs):
return func(0, config, 'inference', imgs=torch.Tensor(np.float32(imgs)))['preds']
def do(img, c, s):
ans = inference(img, runner, config, c, s)
if len(ans) > 0:
ans = ans[:,:,:3]
## ans has shape N,16,3 (num preds, joints, x/y/visible)
pred = []
for i in range(ans.shape[0]):
pred.append({'keypoints': ans[i,:,:]})
return pred
gts = []
preds = []
normalizing = []
num_eval = config['inference']['num_eval']
num_train = config['inference']['train_num_eval']
for anns, img, c, s, n in get_img(config, num_eval, num_train):
gts.append(anns)
pred = do(img, c, s)
preds.append(pred)
normalizing.append(n)
mpii_eval(preds, gts, normalizing, num_train)
if __name__ == '__main__':
main()
\ No newline at end of file
import os
import tqdm
from os.path import dirname
import torch.backends.cudnn as cudnn
cudnn.benchmark = True
cudnn.enabled = True
import torch
import importlib
import argparse
from datetime import datetime
from pytz import timezone
import shutil
def parse_command_line():
parser = argparse.ArgumentParser()
parser.add_argument('-c', '--continue_exp', type=str, help='continue exp')
parser.add_argument('-e', '--exp', type=str, default='pose', help='experiments name')
parser.add_argument('-m', '--max_iters', type=int, default=250, help='max number of iterations (thousands)')
args = parser.parse_args()
return args
def reload(config):
"""
load or initialize model's parameters by config from config['opt'].continue_exp
config['train']['epoch'] records the epoch num
config['inference']['net'] is the model
"""
opt = config['opt']
if opt.continue_exp:
resume = os.path.join('exp', opt.continue_exp)
resume_file = os.path.join(resume, 'checkpoint.pt')
if os.path.isfile(resume_file):
print("=> loading checkpoint '{}'".format(resume))
checkpoint = torch.load(resume_file,map_location='cpu')
config['inference']['net'].load_state_dict(checkpoint['state_dict'])
config['train']['optimizer'].load_state_dict(checkpoint['optimizer'])
config['train']['epoch'] = checkpoint['epoch']
print("=> loaded checkpoint '{}' (epoch {})"
.format(resume, checkpoint['epoch']))
else:
print("=> no checkpoint found at '{}'".format(resume))
exit(0)
if 'epoch' not in config['train']:
config['train']['epoch'] = 0
def save_checkpoint(state, is_best, filename='checkpoint.pt'):
"""
from pytorch/examples
"""
basename = dirname(filename)
if not os.path.exists(basename):
os.makedirs(basename)
torch.save(state, filename)
if is_best:
shutil.copyfile(filename, 'model_best.pt')
def save(config):
resume = os.path.join('exp', config['opt'].exp)
if config['opt'].exp=='pose' and config['opt'].continue_exp is not None:
resume = os.path.join('exp', config['opt'].continue_exp)
resume_file = os.path.join(resume, 'checkpoint.pt')
save_checkpoint({
'state_dict': config['inference']['net'].state_dict(),
'optimizer' : config['train']['optimizer'].state_dict(),
'epoch': config['train']['epoch'],
}, False, filename=resume_file)
print('=> save checkpoint')
def train(train_func, data_func, config, post_epoch=None):
while True:
fails = 0
print('epoch: ', config['train']['epoch'])
if 'epoch_num' in config['train']:
if config['train']['epoch'] > config['train']['epoch_num']:
break
for phase in ['train', 'valid']:
num_step = config['train']['{}_iters'.format(phase)]
generator = data_func(phase)
print('start', phase, config['opt'].exp)
show_range = range(num_step)
show_range = tqdm.tqdm(show_range, total = num_step, ascii=True)
batch_id = num_step * config['train']['epoch']
if batch_id > config['opt'].max_iters * 1000:
return
for i in show_range:
datas = next(generator)
outs = train_func(batch_id + i, config, phase, **datas)
config['train']['epoch'] += 1
save(config)
def init():
"""
task.__config__ contains the variables that control the training and testing
make_network builds a function which can do forward and backward propagation
"""
opt = parse_command_line()
task = importlib.import_module('task.pose')
exp_path = os.path.join('exp', opt.exp)
current_time = datetime.now().strftime('%b%d_%H-%M-%S')
config = task.__config__
try: os.makedirs(exp_path)
except FileExistsError: pass
config['opt'] = opt
config['data_provider'] = importlib.import_module(config['data_provider'])
# 加载模型,func就是模型预测函数
func = task.make_network(config)
reload(config)
return func, config
def main():
func, config = init()
data_func = config['data_provider'].init(config)
train(func, data_func, config)
print(datetime.now(timezone('EST')))
if __name__ == '__main__':
main()
# 常量
MIN_VISIBLE=0.3
import numpy as np
import torch
def match_format(dic):
loc = dic['loc_k'][0,:,0,:]
val = dic['val_k'][0,:,:]
ans = np.hstack((loc, val))
ans = np.expand_dims(ans, axis = 0)
ret = []
ret.append(ans)
return ret
class HeatmapParser:
def __init__(self):
from torch import nn
self.pool = nn.MaxPool2d(3, 1, 1)
def nms(self, det):
maxm = self.pool(det)
maxm = torch.eq(maxm, det).float()
det = det * maxm
return det
def calc(self, det):
with torch.no_grad():
det = torch.autograd.Variable(torch.Tensor(det))
# This is a better format for future version pytorch
det = self.nms(det)
h = det.size()[2]
w = det.size()[3]
det = det.view(det.size()[0], det.size()[1], -1)
val_k, ind = det.topk(1, dim=2)
x = ind % w
y = (ind / w).long()
ind_k = torch.stack((x, y), dim=3)
ans = {'loc_k': ind_k, 'val_k': val_k}
return {key:ans[key].cpu().data.numpy() for key in ans}
def adjust(self, ans, det):
for batch_id, people in enumerate(ans):
for people_id, i in enumerate(people):
for joint_id, joint in enumerate(i):
if joint[2]>0:
y, x = joint[0:2]
xx, yy = int(x), int(y)
tmp = det[0][joint_id]
if tmp[xx, min(yy+1, tmp.shape[1]-1)]>tmp[xx, max(yy-1, 0)]:
y+=0.25
else:
y-=0.25
if tmp[min(xx+1, tmp.shape[0]-1), yy]>tmp[max(0, xx-1), yy]:
x+=0.25
else:
x-=0.25
ans[0][0, joint_id, 0:2] = (y+0.5, x+0.5)
return ans
def parse(self, det, adjust=True):
ans = match_format(self.calc(det))
if adjust:
ans = self.adjust(ans, det)
return ans
import numpy as np
import scipy.misc
import cv2
# =============================================================================
# General image processing functions
# =============================================================================
def get_transform(center, scale, res, rot=0):
# Generate transformation matrix
h = 200 * scale
t = np.zeros((3, 3))
t[0, 0] = float(res[1]) / h
t[1, 1] = float(res[0]) / h
t[0, 2] = res[1] * (-float(center[0]) / h + .5)
t[1, 2] = res[0] * (-float(center[1]) / h + .5)
t[2, 2] = 1
if not rot == 0:
rot = -rot # To match direction of rotation from cropping
rot_mat = np.zeros((3,3))
rot_rad = rot * np.pi / 180
sn,cs = np.sin(rot_rad), np.cos(rot_rad)
rot_mat[0,:2] = [cs, -sn]
rot_mat[1,:2] = [sn, cs]
rot_mat[2,2] = 1
# Need to rotate around center
t_mat = np.eye(3)
t_mat[0,2] = -res[1]/2
t_mat[1,2] = -res[0]/2
t_inv = t_mat.copy()
t_inv[:2,2] *= -1
t = np.dot(t_inv,np.dot(rot_mat,np.dot(t_mat,t)))
return t
def transform(pt, center, scale, res, invert=0, rot=0):
# Transform pixel location to different reference
t = get_transform(center, scale, res, rot=rot)
if invert:
t = np.linalg.inv(t)
new_pt = np.array([pt[0], pt[1], 1.]).T
new_pt = np.dot(t, new_pt)
return new_pt[:2].astype(int)
def crop(img, center, scale, res, rot=0):
# Upper left point
ul = np.array(transform([0, 0], center, scale, res, invert=1))
# Bottom right point
br = np.array(transform(res, center, scale, res, invert=1))
new_shape = [br[1] - ul[1], br[0] - ul[0]]
if len(img.shape) > 2:
new_shape += [img.shape[2]]
new_img = np.zeros(new_shape)
# Range to fill new array
new_x = max(0, -ul[0]), min(br[0], len(img[0])) - ul[0]
new_y = max(0, -ul[1]), min(br[1], len(img)) - ul[1]
# Range to sample from original image
old_x = max(0, ul[0]), min(len(img[0]), br[0])
old_y = max(0, ul[1]), min(len(img), br[1])
new_img[new_y[0]:new_y[1], new_x[0]:new_x[1]] = img[old_y[0]:old_y[1], old_x[0]:old_x[1]]
return cv2.resize(new_img, res)
def inv_mat(mat):
ans = np.linalg.pinv(np.array(mat).tolist() + [[0,0,1]])
return ans[:2]
def kpt_affine(kpt, mat):
kpt = np.array(kpt)
shape = kpt.shape
kpt = kpt.reshape(-1, 2)
return np.dot( np.concatenate((kpt, kpt[:, 0:1]*0+1), axis = 1), mat.T ).reshape(shape)
def resize(im, res):
return np.array([cv2.resize(im[i],res) for i in range(im.shape[0])])
\ No newline at end of file
import torch
import numpy as np
import importlib
# Helpers when setting up training
def importNet(net):
t = net.split('.')
path, name = '.'.join(t[:-1]), t[-1]
module = importlib.import_module(path)
return eval('module.{}'.format(name))
def make_input(t, requires_grad=False, need_cuda = True):
inp = torch.autograd.Variable(t, requires_grad=requires_grad)
if need_cuda:
inp = inp.cuda()
return inp
def make_output(x):
if not (type(x) is list):
return x.cpu().data.numpy()
else:
return [make_output(i) for i in x]
import numpy as np
import math
MIN_VISIBLE=0.3
def segment_slope(pointa,pointb):
# 如果某个点的visible概率小于0.3,不计算该线段斜率
if pointb[2]<MIN_VISIBLE or pointa[2]<MIN_VISIBLE:
return None
elif pointb[0]==pointa[0]:
# 返回无穷大
return np.Inf
return (pointb[1]-pointa[1])/(pointb[0]-pointa[0])
def absolute_distance(pointa,pointb):
# 如果某个点的visible概率小于0.3,不计算亮点之间距离
if pointb[2]<MIN_VISIBLE or pointa[2]<MIN_VISIBLE:
return None
return math.sqrt(((pointa[1]-pointb[1])**2+((pointa[0]-pointb[0])**2)))
def midpoint(pointa,pointb):
if pointb[2]<MIN_VISIBLE or pointa[2]<MIN_VISIBLE:
return None
return ((pointa[0]+pointb[0])/2,(pointa[1]+pointb[1])/2)
# coding: utf-8
# coding: utf-8
import cv2
import numpy as np
img = cv2.imread("data/custom/wenjin侧脸.jpg")
def on_EVENT_LBUTTONDOWN(event, x, y, flags, param):
if event == cv2.EVENT_LBUTTONDOWN:
xy = "%d,%d" % (x, y)
print(xy)
cv2.circle(img, (x, y), 1, (255, 0, 0), thickness=-1)
cv2.putText(img, xy, (x, y), cv2.FONT_HERSHEY_PLAIN,
1.0, (0, 0, 0), thickness=1)
cv2.imshow("image", img)
cv2.namedWindow("image")
cv2.setMouseCallback("image", on_EVENT_LBUTTONDOWN)
cv2.imshow("image", img)
while (True):
try:
cv2.waitKey(100)
except Exception:
cv2.destroyWindow("image")
break
cv2.waitKey(0)
cv2.destroyAllWindow()
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment