Commit 3571f887 by wudiao

Init

parents
File added
FROM registry.cn-beijing.aliyuncs.com/aliyunfc/runtime-python3.6:build-1.9.13 as python3
RUN fun-install pip install torch
RUN fun-install pip install torchvision
RUN fun-install pip install easydict
RUN fun-install pip install pillow
RUN fun-install pip install requests
\ No newline at end of file
.env
template.yml
.funignore
.fun
.idea
\ No newline at end of file
nasMappings:
faces:
- localNasDir: .fun
remoteNasDir: /mnt/nas
- localNasDir: work_space/save
remoteNasDir: /mnt/nas
\ No newline at end of file
RUNTIME python3
RUN fun-install pip install torch
RUN fun-install pip install torchvision
RUN fun-install pip install easydict
RUN fun-install pip install pillow
RUN fun-install pip install requests
from model import Backbone,MobileFaceNet, l2_norm
import torch
from tqdm import tqdm
from matplotlib import pyplot as plt
plt.switch_backend('agg')
from utils import get_time,search_similer_face
from torchvision import transforms as trans
import math
from elasticsearch7 import Elasticsearch
from utils import insert_feature_to_es,update_feature_to_es
class face_learner(object):
def __init__(self, conf, inference=False):
if conf.use_mobilfacenet:
self.model = MobileFaceNet(conf.embedding_size).to(conf.device)
print('MobileFaceNet model generated')
else:
self.model = Backbone(conf.net_depth, conf.drop_ratio, conf.net_mode).to(conf.device)
print('{}_{} model generated'.format(conf.net_mode, conf.net_depth))
host = "es-cn-7mz28jd5z001oqrc9.elasticsearch.aliyuncs.com"
port = 9200
user = "elastic"
password = "4rfv%TGB"
authen = None
uripart = host + ':' + str(port)
if user is not None:
authen = user
if authen is not None and password is not None:
authen += ':' + password
if authen is not None:
uripart = authen + '@' + uripart
protocol = 'http'
if protocol is not None:
protocol = protocol
uri = protocol + '://' + uripart
self.es = Elasticsearch(uri)
def save_state(self, conf, accuracy, to_save_folder=False, extra=None, model_only=False):
if to_save_folder:
save_path = conf.save_path
else:
save_path = conf.model_path
torch.save(
self.model.state_dict(), save_path /
('model_{}_accuracy:{}_step:{}_{}.pth'.format(get_time(), accuracy, self.step, extra)))
if not model_only:
torch.save(
self.head.state_dict(), save_path /
('head_{}_accuracy:{}_step:{}_{}.pth'.format(get_time(), accuracy, self.step, extra)))
torch.save(
self.optimizer.state_dict(), save_path /
('optimizer_{}_accuracy:{}_step:{}_{}.pth'.format(get_time(), accuracy, self.step, extra)))
def load_state(self, conf, fixed_str, from_save_folder=False, model_only=False):
if from_save_folder:
save_path = conf.save_path
else:
save_path = conf.model_path
self.model.load_state_dict(torch.load('/mnt/nas/model_{}'.format(fixed_str),map_location=torch.device('cpu')))
# self.model.load_state_dict(torch.load('/Users/wda/PycharmProjects/faces/work_space/save/model_{}'.format(fixed_str), map_location=torch.device('cpu')))
if not model_only:
self.head.load_state_dict(torch.load(save_path/'head_{}'.format(fixed_str)))
self.optimizer.load_state_dict(torch.load(save_path/'optimizer_{}'.format(fixed_str)))
def board_val(self, db_name, accuracy, best_threshold, roc_curve_tensor):
self.writer.add_scalar('{}_accuracy'.format(db_name), accuracy, self.step)
self.writer.add_scalar('{}_best_threshold'.format(db_name), best_threshold, self.step)
self.writer.add_image('{}_roc_curve'.format(db_name), roc_curve_tensor, self.step)
# self.writer.add_scalar('{}_val:true accept ratio'.format(db_name), val, self.step)
# self.writer.add_scalar('{}_val_std'.format(db_name), val_std, self.step)
# self.writer.add_scalar('{}_far:False Acceptance Ratio'.format(db_name), far, self.step)
def find_lr(self,
conf,
init_value=1e-8,
final_value=10.,
beta=0.98,
bloding_scale=3.,
num=None):
if not num:
num = len(self.loader)
mult = (final_value / init_value)**(1 / num)
lr = init_value
for params in self.optimizer.param_groups:
params['lr'] = lr
self.model.train()
avg_loss = 0.
best_loss = 0.
batch_num = 0
losses = []
log_lrs = []
for i, (imgs, labels) in tqdm(enumerate(self.loader), total=num):
imgs = imgs.to(conf.device)
labels = labels.to(conf.device)
batch_num += 1
self.optimizer.zero_grad()
embeddings = self.model(imgs)
thetas = self.head(embeddings, labels)
loss = conf.ce_loss(thetas, labels)
#Compute the smoothed loss
avg_loss = beta * avg_loss + (1 - beta) * loss.item()
self.writer.add_scalar('avg_loss', avg_loss, batch_num)
smoothed_loss = avg_loss / (1 - beta**batch_num)
self.writer.add_scalar('smoothed_loss', smoothed_loss,batch_num)
#Stop if the loss is exploding
if batch_num > 1 and smoothed_loss > bloding_scale * best_loss:
print('exited with best_loss at {}'.format(best_loss))
plt.plot(log_lrs[10:-5], losses[10:-5])
return log_lrs, losses
#Record the best loss
if smoothed_loss < best_loss or batch_num == 1:
best_loss = smoothed_loss
#Store the values
losses.append(smoothed_loss)
log_lrs.append(math.log10(lr))
self.writer.add_scalar('log_lr', math.log10(lr), batch_num)
#Do the SGD step
#Update the lr for the next step
loss.backward()
self.optimizer.step()
lr *= mult
for params in self.optimizer.param_groups:
params['lr'] = lr
if batch_num > num:
plt.plot(log_lrs[10:-5], losses[10:-5])
return log_lrs, losses
def train(self, conf, epochs):
self.model.train()
running_loss = 0.
for e in range(epochs):
print('epoch {} started'.format(e))
if e == self.milestones[0]:
self.schedule_lr()
if e == self.milestones[1]:
self.schedule_lr()
if e == self.milestones[2]:
self.schedule_lr()
for imgs, labels in tqdm(iter(self.loader)):
imgs = imgs.to(conf.device)
labels = labels.to(conf.device)
self.optimizer.zero_grad()
embeddings = self.model(imgs)
thetas = self.head(embeddings, labels)
loss = conf.ce_loss(thetas, labels)
loss.backward()
running_loss += loss.item()
self.optimizer.step()
if self.step % self.board_loss_every == 0 and self.step != 0:
loss_board = running_loss / self.board_loss_every
self.writer.add_scalar('train_loss', loss_board, self.step)
running_loss = 0.
if self.step % self.evaluate_every == 0 and self.step != 0:
accuracy, best_threshold, roc_curve_tensor = self.evaluate(conf, self.agedb_30, self.agedb_30_issame)
self.board_val('agedb_30', accuracy, best_threshold, roc_curve_tensor)
accuracy, best_threshold, roc_curve_tensor = self.evaluate(conf, self.lfw, self.lfw_issame)
self.board_val('lfw', accuracy, best_threshold, roc_curve_tensor)
accuracy, best_threshold, roc_curve_tensor = self.evaluate(conf, self.cfp_fp, self.cfp_fp_issame)
self.board_val('cfp_fp', accuracy, best_threshold, roc_curve_tensor)
self.model.train()
if self.step % self.save_every == 0 and self.step != 0:
self.save_state(conf, accuracy)
self.step += 1
self.save_state(conf, accuracy, to_save_folder=True, extra='final')
def schedule_lr(self):
for params in self.optimizer.param_groups:
params['lr'] /= 10
print(self.optimizer)
def infer(self, conf, faces, target_embs, tta=False):
'''
faces : list of PIL Image
target_embs : [n, 512] computed embeddings of faces in facebank
names : recorded names of faces in facebank
tta : test time augmentation (hfilp, that's all)
'''
embs = []
for img in faces:
if tta:
mirror = trans.functional.hflip(img)
emb = self.model(conf.test_transform(img).to(conf.device).unsqueeze(0))
emb_mirror = self.model(conf.test_transform(mirror).to(conf.device).unsqueeze(0))
embs.append(l2_norm(emb + emb_mirror))
else:
# print(conf.test_transform(img).to(conf.device).unsqueeze(0).shape)
emb = self.model(conf.test_transform(img).to(conf.device).unsqueeze(0))
embs.append(emb)
source_embs = torch.cat(embs)
diff = source_embs.unsqueeze(-1) - target_embs.transpose(1,0).unsqueeze(0)
dist = torch.sum(torch.pow(diff, 2), dim=1)
minimum, min_idx = torch.min(dist, dim=1)
min_idx[minimum > self.threshold] = -1 # if no match, set idx to -1
return min_idx, minimum
def infer_es(self, conf, face,inst_id,env,threshold=0.5):
emb = self.model(conf.test_transform(face).to(conf.device).unsqueeze(0))
features = list(list(emb.detach().numpy())[0])
l2_distance,student_id = search_similer_face(self.es,features,inst_id,env)
if l2_distance>threshold and student_id !=-1:
return student_id
return "unknow"
def update_student_feature(self,conf,faces,name,inst_id,env):
# 默认faces是长度为1,调用前判断过了
img = faces[0]
emb = self.model(conf.test_transform(img).to(conf.device).unsqueeze(0))
features = list(list(emb.detach().numpy())[0])
# 将对应校区的对应学生的人脸特征存入es,返回的是更新
success = update_feature_to_es(self.es,features,name,inst_id,env)
return success
def insert_student_feature(self,conf,faces,name,inst_id,env):
# 默认faces是长度为1,调用前判断过了
img = faces[0]
emb = self.model(conf.test_transform(img).to(conf.device).unsqueeze(0))
features = list(list(emb.detach().numpy())[0])
# 将对应校区的对应学生的人脸特征存入es,返回的是更新
success = insert_feature_to_es(self.es,features,name,inst_id,env)
return success
from easydict import EasyDict as edict
from pathlib import Path
import torch
from torch.nn import CrossEntropyLoss
from torchvision import transforms as trans
def get_config(training = True):
conf = edict()
conf.data_path = Path('data')
conf.work_path = Path('work_space/')
conf.model_path = conf.work_path/'models'
conf.log_path = conf.work_path/'log'
conf.save_path = conf.work_path/'save'
conf.input_size = [112,112]
conf.embedding_size = 512
# 如果要使用mobileNet 将该参数改为True
conf.use_mobilfacenet = False
conf.net_depth = 50
conf.drop_ratio = 0.6
conf.net_mode = 'ir_se' # or 'ir'
conf.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
conf.test_transform = trans.Compose([
trans.ToTensor(),
trans.Normalize([0.5, 0.5, 0.5], [0.5, 0.5, 0.5])
])
conf.data_mode = 'emore'
conf.vgg_folder = conf.data_path/'faces_vgg_112x112'
conf.ms1m_folder = conf.data_path/'faces_ms1m_112x112'
conf.emore_folder = conf.data_path/'faces_emore'
conf.batch_size = 100 # irse net depth 50
# conf.batch_size = 200 # mobilefacenet
#--------------------Training Config ------------------------
if training:
conf.log_path = conf.work_path/'log'
conf.save_path = conf.work_path/'save'
# conf.weight_decay = 5e-4
conf.lr = 1e-3
conf.milestones = [12,15,18]
conf.momentum = 0.9
conf.pin_memory = True
conf.num_workers = 3
conf.ce_loss = CrossEntropyLoss()
#--------------------Inference Config ------------------------
else:
# conf.facebank_path = conf.data_path/'facebank'
conf.facebank_path = Path('/Users/wda/IdeaProjects/oss/src/main/resources/1315584165094813698')
conf.threshold = 1.5
conf.face_limit = 10
# 这个参数越大,检测越快
conf.min_face_size = 30
return conf
cfg_mnet = {
'name': 'mobilenet0.25',
'min_sizes': [[16, 32], [64, 128], [256, 512]],
'steps': [8, 16, 32],
'variance': [0.1, 0.2],
'clip': False,
'loc_weight': 2.0,
#------------------------------------------------------------------#
# 视频上看到的训练图片大小为640,为了提高大图状态下的困难样本
# 的识别能力,我将训练图片进行调大
#------------------------------------------------------------------#
'train_image_size': 840,
'return_layers': {'stage1': 1, 'stage2': 2, 'stage3': 3},
'in_channel': 32,
'out_channel': 64
}
\ No newline at end of file
def de_preprocess(tensor):
return tensor*0.5 + 0.5
# -*- coding: utf-8 -*-
import logging
import json
from Learner import face_learner
from config import get_config
from PIL import Image
from mtcnn import MTCNN
from io import BytesIO
import requests
import oss2
learner = None
conf = None
mtcnn = None
bucket = None
auth = None
index_map = {}
bucket_map = {}
def initializer(context):
global mtcnn
mtcnn = MTCNN()
global conf
conf = get_config(False)
global learner
learner = face_learner(conf, True)
learner.threshold = 1.44
learner.load_state(conf, 'ir_se50.pth', True, True)
learner.model.eval()
global auth
# LTAI5tFb87uVi1B4BcW6SJfH 这个key只能访问 dev 和 rc
# auth = oss2.Auth("LTAI5t7B2jc6QFnZRi3kC3rd", "m127LkW55ha0LAWZThid94ojtAOQ2j")
auth = oss2.Auth("LTAI5tFb87uVi1B4BcW6SJfH", "3PdUomiQg6yAduq97KX4rBNpkhzdml")
global index_map
index_map = {"prod": "faces", "dev": "faces_dev", "rc": "faces_rc"}
global bucket_map
bucket_map = {"prod": "xm-prod-resource", "dev": "xmdev-resource", "rc": "xmrc-resource"}
def handler(environ, start_response):
request_method = environ['REQUEST_METHOD']
recognized = {}
unknow = {}
bboxs_map = {}
message = "complete"
if request_method == 'POST':
try:
request_body_size = int(environ.get('CONTENT_LENGTH', 0))
except (ValueError):
request_body_size = 0
request_body = environ['wsgi.input'].read(request_body_size)
req = json.loads(request_body)
try:
threshold = req['threshold']
except KeyError:
threshold = 0.5
# 注册人脸
if 'student_id' in req and 'inst_id' in req and 'img_url' in req and 'env' in req:
message = ""
faces = None
bboxes = None
env_param = req['env']
# 根据环境测试决定去哪个es 的index
env = index_map[env_param]
# 带人脸ID
# 向ES插入新的人脸feature,有则更新,否则插入,保持幂等
try:
# 根据环境决定去哪个bucket查询
bucket_name = bucket_map[env_param]
bucket = oss2.Bucket(auth, "https://oss-cn-hangzhou.aliyuncs.com", bucket_name)
url = bucket.sign_url('GET', req['img_url'], 60) if not str(req['img_url']).startswith("http") else req['img_url']
print(url)
response = requests.get(url)
image = Image.open(BytesIO(response.content))
bboxes, faces = mtcnn.align_multi(image, conf.face_limit, conf.min_face_size)
except Exception as e:
message = "detect face error " + str(e)
if faces is not None and len(faces) > 1:
message = "find multi faces"
elif message != "" and faces is None:
message = message
elif message == "" and len(faces)==0:
message = 'find no face'
elif len(faces) == 1 and len(bboxes) == 1:
success = learner.update_student_feature(conf, faces, req['student_id'], req['inst_id'], env)
if success != 0:
# 代表本身就有人脸了,更新成功
message = "update face feature successful"
else:
# 判断是否有不同的student_id 所注册的相似人脸
similar_student_id = has_similar_face(env, faces[0], req, threshold)
if similar_student_id!='unknow':
message = 'detect similar face with different student_id {0}'.format(similar_student_id)
else:
# 本身没有人脸,需要新插入
success = learner.insert_student_feature(conf, faces, req['student_id'], req['inst_id'], env)
if success == 1:
message = "insert new face feature successful"
else:
message = "insert new face feature failed"
# 如果更新或者插入成功了,返回识别到的框和请求的学生ID和message,否则除message外都为{}
if message == "insert new face feature successful" or message == "update face feature successful":
bboxs_map[0] = {"left": list(bboxes[0])[0], "top": list(bboxes[0])[1], "right": list(bboxes[0])[2],
"bottom": list(bboxes[0])[3], "confidence": list(bboxes[0])[4]}
recognized[0] = req['student_id']
# 查询人脸
elif 'img_url' in req and 'env' in req and 'inst_id' in req and 'student_id' not in req:
inst_id = req['inst_id']
env_param = req['env']
env = index_map[env_param]
bucket_name = bucket_map[env_param]
bucket = oss2.Bucket(auth, "https://oss-cn-hangzhou.aliyuncs.com", bucket_name)
url = bucket.sign_url('GET', req['img_url'], 60) if not str(req['img_url']).startswith("http") else req['img_url']
response = requests.get(url)
image = Image.open(BytesIO(response.content))
bboxes, faces = [], []
try:
bboxes, faces = mtcnn.align_multi(image, conf.face_limit, conf.min_face_size)
assert len(bboxes) == len(faces)
except Exception as e:
message = "face detect failed, no faces find"
if (len(bboxes)) != 0:
for i in range(len(bboxes)):
try:
bboxs_map[i] = {"left": list(bboxes[i])[0], "top": list(bboxes[i])[1],
"right": list(bboxes[i])[2],
"bottom": list(bboxes[i])[3], "confidence": list(bboxes[i])[4]}
face = faces[i]
result = learner.infer_es(conf, face, inst_id, env,threshold=threshold)
if result != 'unknow':
recognized[i] = result
else:
unknow[i] = 'unknow'
except Exception as e:
unknow[i] = "failed" + str(e)
continue
# # do something here
status = '200 OK'
response_headers = [('Content-type', 'application/json')]
start_response(status, response_headers)
results = json.dumps({"faces": bboxs_map, "recognized": recognized, "unknown": unknow, "message": message})
return [bytes(results, encoding='utf-8')]
def has_similar_face(env, face, req, threshold):
try:
result = learner.infer_es(conf, face, req['inst_id'], env, threshold=threshold)
return result
except Exception as e:
return "unknow"
from torch.nn import Linear, Conv2d, BatchNorm1d, BatchNorm2d, PReLU, ReLU, Sigmoid, Dropout2d, Dropout, AvgPool2d, MaxPool2d, AdaptiveAvgPool2d, Sequential, Module, Parameter
import torch.nn.functional as F
import torch
from collections import namedtuple
import math
import pdb
################################## Original Arcface Model #############################################################
class Flatten(Module):
def forward(self, input):
return input.view(input.size(0), -1)
def l2_norm(input,axis=1):
norm = torch.norm(input,2,axis,True)
output = torch.div(input, norm)
return output
class SEModule(Module):
def __init__(self, channels, reduction):
super(SEModule, self).__init__()
self.avg_pool = AdaptiveAvgPool2d(1)
self.fc1 = Conv2d(
channels, channels // reduction, kernel_size=1, padding=0 ,bias=False)
self.relu = ReLU(inplace=True)
self.fc2 = Conv2d(
channels // reduction, channels, kernel_size=1, padding=0 ,bias=False)
self.sigmoid = Sigmoid()
def forward(self, x):
module_input = x
x = self.avg_pool(x)
x = self.fc1(x)
x = self.relu(x)
x = self.fc2(x)
x = self.sigmoid(x)
return module_input * x
class bottleneck_IR(Module):
def __init__(self, in_channel, depth, stride):
super(bottleneck_IR, self).__init__()
if in_channel == depth:
self.shortcut_layer = MaxPool2d(1, stride)
else:
self.shortcut_layer = Sequential(
Conv2d(in_channel, depth, (1, 1), stride ,bias=False), BatchNorm2d(depth))
self.res_layer = Sequential(
BatchNorm2d(in_channel),
Conv2d(in_channel, depth, (3, 3), (1, 1), 1 ,bias=False), PReLU(depth),
Conv2d(depth, depth, (3, 3), stride, 1 ,bias=False), BatchNorm2d(depth))
def forward(self, x):
shortcut = self.shortcut_layer(x)
res = self.res_layer(x)
return res + shortcut
class bottleneck_IR_SE(Module):
def __init__(self, in_channel, depth, stride):
super(bottleneck_IR_SE, self).__init__()
if in_channel == depth:
self.shortcut_layer = MaxPool2d(1, stride)
else:
self.shortcut_layer = Sequential(
Conv2d(in_channel, depth, (1, 1), stride ,bias=False),
BatchNorm2d(depth))
self.res_layer = Sequential(
BatchNorm2d(in_channel),
Conv2d(in_channel, depth, (3,3), (1,1),1 ,bias=False),
PReLU(depth),
Conv2d(depth, depth, (3,3), stride, 1 ,bias=False),
BatchNorm2d(depth),
SEModule(depth,16)
)
def forward(self,x):
shortcut = self.shortcut_layer(x)
res = self.res_layer(x)
return res + shortcut
class Bottleneck(namedtuple('Block', ['in_channel', 'depth', 'stride'])):
'''A named tuple describing a ResNet block.'''
def get_block(in_channel, depth, num_units, stride = 2):
return [Bottleneck(in_channel, depth, stride)] + [Bottleneck(depth, depth, 1) for i in range(num_units-1)]
def get_blocks(num_layers):
if num_layers == 50:
blocks = [
get_block(in_channel=64, depth=64, num_units = 3),
get_block(in_channel=64, depth=128, num_units=4),
get_block(in_channel=128, depth=256, num_units=14),
get_block(in_channel=256, depth=512, num_units=3)
]
elif num_layers == 100:
blocks = [
get_block(in_channel=64, depth=64, num_units=3),
get_block(in_channel=64, depth=128, num_units=13),
get_block(in_channel=128, depth=256, num_units=30),
get_block(in_channel=256, depth=512, num_units=3)
]
elif num_layers == 152:
blocks = [
get_block(in_channel=64, depth=64, num_units=3),
get_block(in_channel=64, depth=128, num_units=8),
get_block(in_channel=128, depth=256, num_units=36),
get_block(in_channel=256, depth=512, num_units=3)
]
return blocks
class Backbone(Module):
def __init__(self, num_layers, drop_ratio, mode='ir'):
super(Backbone, self).__init__()
assert num_layers in [50, 100, 152], 'num_layers should be 50,100, or 152'
assert mode in ['ir', 'ir_se'], 'mode should be ir or ir_se'
blocks = get_blocks(num_layers)
if mode == 'ir':
unit_module = bottleneck_IR
elif mode == 'ir_se':
unit_module = bottleneck_IR_SE
self.input_layer = Sequential(Conv2d(3, 64, (3, 3), 1, 1 ,bias=False),
BatchNorm2d(64),
PReLU(64))
self.output_layer = Sequential(BatchNorm2d(512),
Dropout(drop_ratio),
Flatten(),
Linear(512 * 7 * 7, 512),
BatchNorm1d(512))
modules = []
for block in blocks:
for bottleneck in block:
modules.append(
unit_module(bottleneck.in_channel,
bottleneck.depth,
bottleneck.stride))
self.body = Sequential(*modules)
def forward(self,x):
x = self.input_layer(x)
x = self.body(x)
x = self.output_layer(x)
return l2_norm(x)
################################## MobileFaceNet #############################################################
class Conv_block(Module):
def __init__(self, in_c, out_c, kernel=(1, 1), stride=(1, 1), padding=(0, 0), groups=1):
super(Conv_block, self).__init__()
self.conv = Conv2d(in_c, out_channels=out_c, kernel_size=kernel, groups=groups, stride=stride, padding=padding, bias=False)
self.bn = BatchNorm2d(out_c)
self.prelu = PReLU(out_c)
def forward(self, x):
x = self.conv(x)
x = self.bn(x)
x = self.prelu(x)
return x
class Linear_block(Module):
def __init__(self, in_c, out_c, kernel=(1, 1), stride=(1, 1), padding=(0, 0), groups=1):
super(Linear_block, self).__init__()
self.conv = Conv2d(in_c, out_channels=out_c, kernel_size=kernel, groups=groups, stride=stride, padding=padding, bias=False)
self.bn = BatchNorm2d(out_c)
def forward(self, x):
x = self.conv(x)
x = self.bn(x)
return x
class Depth_Wise(Module):
def __init__(self, in_c, out_c, residual = False, kernel=(3, 3), stride=(2, 2), padding=(1, 1), groups=1):
super(Depth_Wise, self).__init__()
self.conv = Conv_block(in_c, out_c=groups, kernel=(1, 1), padding=(0, 0), stride=(1, 1))
self.conv_dw = Conv_block(groups, groups, groups=groups, kernel=kernel, padding=padding, stride=stride)
self.project = Linear_block(groups, out_c, kernel=(1, 1), padding=(0, 0), stride=(1, 1))
self.residual = residual
def forward(self, x):
if self.residual:
short_cut = x
x = self.conv(x)
x = self.conv_dw(x)
x = self.project(x)
if self.residual:
output = short_cut + x
else:
output = x
return output
class Residual(Module):
def __init__(self, c, num_block, groups, kernel=(3, 3), stride=(1, 1), padding=(1, 1)):
super(Residual, self).__init__()
modules = []
for _ in range(num_block):
modules.append(Depth_Wise(c, c, residual=True, kernel=kernel, padding=padding, stride=stride, groups=groups))
self.model = Sequential(*modules)
def forward(self, x):
return self.model(x)
class MobileFaceNet(Module):
def __init__(self, embedding_size):
super(MobileFaceNet, self).__init__()
self.conv1 = Conv_block(3, 64, kernel=(3, 3), stride=(2, 2), padding=(1, 1))
self.conv2_dw = Conv_block(64, 64, kernel=(3, 3), stride=(1, 1), padding=(1, 1), groups=64)
self.conv_23 = Depth_Wise(64, 64, kernel=(3, 3), stride=(2, 2), padding=(1, 1), groups=128)
self.conv_3 = Residual(64, num_block=4, groups=128, kernel=(3, 3), stride=(1, 1), padding=(1, 1))
self.conv_34 = Depth_Wise(64, 128, kernel=(3, 3), stride=(2, 2), padding=(1, 1), groups=256)
self.conv_4 = Residual(128, num_block=6, groups=256, kernel=(3, 3), stride=(1, 1), padding=(1, 1))
self.conv_45 = Depth_Wise(128, 128, kernel=(3, 3), stride=(2, 2), padding=(1, 1), groups=512)
self.conv_5 = Residual(128, num_block=2, groups=256, kernel=(3, 3), stride=(1, 1), padding=(1, 1))
self.conv_6_sep = Conv_block(128, 512, kernel=(1, 1), stride=(1, 1), padding=(0, 0))
self.conv_6_dw = Linear_block(512, 512, groups=512, kernel=(7,7), stride=(1, 1), padding=(0, 0))
self.conv_6_flatten = Flatten()
self.linear = Linear(512, embedding_size, bias=False)
self.bn = BatchNorm1d(embedding_size)
def forward(self, x):
out = self.conv1(x)
out = self.conv2_dw(out)
out = self.conv_23(out)
out = self.conv_3(out)
out = self.conv_34(out)
out = self.conv_4(out)
out = self.conv_45(out)
out = self.conv_5(out)
out = self.conv_6_sep(out)
out = self.conv_6_dw(out)
out = self.conv_6_flatten(out)
out = self.linear(out)
out = self.bn(out)
return l2_norm(out)
################################## Arcface head #############################################################
class Arcface(Module):
# implementation of additive margin softmax loss in https://arxiv.org/abs/1801.05599
def __init__(self, embedding_size=512, classnum=51332, s=64., m=0.5):
super(Arcface, self).__init__()
self.classnum = classnum
self.kernel = Parameter(torch.Tensor(embedding_size,classnum))
# initial kernel
self.kernel.data.uniform_(-1, 1).renorm_(2,1,1e-5).mul_(1e5)
self.m = m # the margin value, default is 0.5
self.s = s # scalar value default is 64, see normface https://arxiv.org/abs/1704.06369
self.cos_m = math.cos(m)
self.sin_m = math.sin(m)
self.mm = self.sin_m * m # issue 1
self.threshold = math.cos(math.pi - m)
def forward(self, embbedings, label):
# weights norm
nB = len(embbedings)
kernel_norm = l2_norm(self.kernel,axis=0)
# cos(theta+m)
cos_theta = torch.mm(embbedings,kernel_norm)
# output = torch.mm(embbedings,kernel_norm)
cos_theta = cos_theta.clamp(-1,1) # for numerical stability
cos_theta_2 = torch.pow(cos_theta, 2)
sin_theta_2 = 1 - cos_theta_2
sin_theta = torch.sqrt(sin_theta_2)
# cos(thema+m) = cos(theta)*cos(m)-sin(theta)*sin(m)
cos_theta_m = (cos_theta * self.cos_m - sin_theta * self.sin_m)
# this condition controls the theta+m should in range [0, pi]
# 0<=theta+m<=pi
# -m<=theta<=pi-m
cond_v = cos_theta - self.threshold
cond_mask = cond_v <= 0
keep_val = (cos_theta - self.mm) # when theta not in [0,pi], use cosface instead
cos_theta_m[cond_mask] = keep_val[cond_mask]
output = cos_theta * 1.0 # a little bit hacky way to prevent in_place operation on cos_theta
idx_ = torch.arange(0, nB, dtype=torch.long)
output[idx_, label] = cos_theta_m[idx_, label]
output *= self.s # scale up in order to make softmax work, first introduced in normface
return output
################################## Cosface head #############################################################
class Am_softmax(Module):
# implementation of additive margin softmax loss in https://arxiv.org/abs/1801.05599
def __init__(self,embedding_size=512,classnum=51332):
super(Am_softmax, self).__init__()
self.classnum = classnum
self.kernel = Parameter(torch.Tensor(embedding_size,classnum))
# initial kernel
self.kernel.data.uniform_(-1, 1).renorm_(2,1,1e-5).mul_(1e5)
self.m = 0.35 # additive margin recommended by the paper
self.s = 30. # see normface https://arxiv.org/abs/1704.06369
def forward(self,embbedings,label):
kernel_norm = l2_norm(self.kernel,axis=0)
cos_theta = torch.mm(embbedings,kernel_norm)
cos_theta = cos_theta.clamp(-1,1) # for numerical stability
phi = cos_theta - self.m
label = label.view(-1,1) #size=(B,1)
index = cos_theta.data * 0.0 #size=(B,Classnum)
index.scatter_(1,label.data.view(-1,1),1)
index = index.byte()
output = cos_theta * 1.0
output[index] = phi[index] #only change the correct predicted output
output *= self.s # scale up in order to make softmax work, first introduced in normface
return output
import numpy as np
import torch
from PIL import Image
from mtcnn_pytorch.src.get_nets import PNet, RNet, ONet
from mtcnn_pytorch.src.box_utils import nms, calibrate_box, get_image_boxes, convert_to_square
from mtcnn_pytorch.src.first_stage import run_first_stage
from mtcnn_pytorch.src.align_trans import get_reference_facial_points, warp_and_crop_face
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
class MTCNN():
def __init__(self):
self.pnet = PNet().to(device)
self.rnet = RNet().to(device)
self.onet = ONet().to(device)
self.pnet.eval()
self.rnet.eval()
self.onet.eval()
self.refrence = get_reference_facial_points(default_square= True)
def align(self, img):
_, landmarks = self.detect_faces(img)
facial5points = [[landmarks[0][j],landmarks[0][j+5]] for j in range(5)]
warped_face = warp_and_crop_face(np.array(img), facial5points, self.refrence, crop_size=(112,112))
return Image.fromarray(warped_face)
def align_multi(self, img, limit=None, min_face_size=30.0):
boxes, landmarks = self.detect_faces(img, min_face_size)
if limit:
boxes = boxes[:limit]
landmarks = landmarks[:limit]
faces = []
for landmark in landmarks:
facial5points = [[landmark[j],landmark[j+5]] for j in range(5)]
warped_face = warp_and_crop_face(np.array(img), facial5points, self.refrence, crop_size=(112,112))
faces.append(Image.fromarray(warped_face))
return boxes, faces
def detect_faces(self, image, min_face_size=20.0,
thresholds=[0.5, 0.6, 0.7],
nms_thresholds=[0.5, 0.5, 0.5]):
"""
Arguments:
image: an instance of PIL.Image.
min_face_size: a float number.
thresholds: a list of length 3.
nms_thresholds: a list of length 3.
Returns:
two float numpy arrays of shapes [n_boxes, 4] and [n_boxes, 10],
bounding boxes and facial landmarks.
"""
# BUILD AN IMAGE PYRAMID
width, height = image.size
min_length = min(height, width)
min_detection_size = 12
factor = 0.707 # sqrt(0.5)
# scales for scaling the image
scales = []
# scales the image so that
# minimum size that we can detect equals to
# minimum face size that we want to detect
m = min_detection_size/min_face_size
min_length *= m
factor_count = 0
while min_length > min_detection_size:
scales.append(m*factor**factor_count)
min_length *= factor
factor_count += 1
# STAGE 1
# it will be returned
bounding_boxes = []
with torch.no_grad():
# run P-Net on different scales
for s in scales:
boxes = run_first_stage(image, self.pnet, scale=s, threshold=thresholds[0])
bounding_boxes.append(boxes)
# collect boxes (and offsets, and scores) from different scales
bounding_boxes = [i for i in bounding_boxes if i is not None]
bounding_boxes = np.vstack(bounding_boxes)
keep = nms(bounding_boxes[:, 0:5], nms_thresholds[0])
bounding_boxes = bounding_boxes[keep]
# use offsets predicted by pnet to transform bounding boxes
bounding_boxes = calibrate_box(bounding_boxes[:, 0:5], bounding_boxes[:, 5:])
# shape [n_boxes, 5]
bounding_boxes = convert_to_square(bounding_boxes)
bounding_boxes[:, 0:4] = np.round(bounding_boxes[:, 0:4])
# STAGE 2
img_boxes = get_image_boxes(bounding_boxes, image, size=24)
img_boxes = torch.FloatTensor(img_boxes).to(device)
output = self.rnet(img_boxes)
offsets = output[0].cpu().data.numpy() # shape [n_boxes, 4]
probs = output[1].cpu().data.numpy() # shape [n_boxes, 2]
keep = np.where(probs[:, 1] > thresholds[1])[0]
bounding_boxes = bounding_boxes[keep]
bounding_boxes[:, 4] = probs[keep, 1].reshape((-1,))
offsets = offsets[keep]
keep = nms(bounding_boxes, nms_thresholds[1])
bounding_boxes = bounding_boxes[keep]
bounding_boxes = calibrate_box(bounding_boxes, offsets[keep])
bounding_boxes = convert_to_square(bounding_boxes)
bounding_boxes[:, 0:4] = np.round(bounding_boxes[:, 0:4])
# STAGE 3
img_boxes = get_image_boxes(bounding_boxes, image, size=48)
if len(img_boxes) == 0:
return [], []
img_boxes = torch.FloatTensor(img_boxes).to(device)
output = self.onet(img_boxes)
landmarks = output[0].cpu().data.numpy() # shape [n_boxes, 10]
offsets = output[1].cpu().data.numpy() # shape [n_boxes, 4]
probs = output[2].cpu().data.numpy() # shape [n_boxes, 2]
keep = np.where(probs[:, 1] > thresholds[2])[0]
bounding_boxes = bounding_boxes[keep]
bounding_boxes[:, 4] = probs[keep, 1].reshape((-1,))
offsets = offsets[keep]
landmarks = landmarks[keep]
# compute landmark points
width = bounding_boxes[:, 2] - bounding_boxes[:, 0] + 1.0
height = bounding_boxes[:, 3] - bounding_boxes[:, 1] + 1.0
xmin, ymin = bounding_boxes[:, 0], bounding_boxes[:, 1]
landmarks[:, 0:5] = np.expand_dims(xmin, 1) + np.expand_dims(width, 1)*landmarks[:, 0:5]
landmarks[:, 5:10] = np.expand_dims(ymin, 1) + np.expand_dims(height, 1)*landmarks[:, 5:10]
bounding_boxes = calibrate_box(bounding_boxes, offsets)
keep = nms(bounding_boxes, nms_thresholds[2], mode='min')
bounding_boxes = bounding_boxes[keep]
landmarks = landmarks[keep]
return bounding_boxes, landmarks
from .visualization_utils import show_bboxes
from .detector import detect_faces
# -*- coding: utf-8 -*-
"""
Created on Mon Apr 24 15:43:29 2017
@author: zhaoy
"""
import numpy as np
import cv2
# from scipy.linalg import lstsq
# from scipy.ndimage import geometric_transform # , map_coordinates
from mtcnn_pytorch.src.matlab_cp2tform import get_similarity_transform_for_cv2
# reference facial points, a list of coordinates (x,y)
REFERENCE_FACIAL_POINTS = [
[30.29459953, 51.69630051],
[65.53179932, 51.50139999],
[48.02519989, 71.73660278],
[33.54930115, 92.3655014],
[62.72990036, 92.20410156]
]
DEFAULT_CROP_SIZE = (96, 112)
class FaceWarpException(Exception):
def __str__(self):
return 'In File {}:{}'.format(
__file__, super.__str__(self))
def get_reference_facial_points(output_size=None,
inner_padding_factor=0.0,
outer_padding=(0, 0),
default_square=False):
"""
Function:
----------
get reference 5 key points according to crop settings:
0. Set default crop_size:
if default_square:
crop_size = (112, 112)
else:
crop_size = (96, 112)
1. Pad the crop_size by inner_padding_factor in each side;
2. Resize crop_size into (output_size - outer_padding*2),
pad into output_size with outer_padding;
3. Output reference_5point;
Parameters:
----------
@output_size: (w, h) or None
size of aligned face image
@inner_padding_factor: (w_factor, h_factor)
padding factor for inner (w, h)
@outer_padding: (w_pad, h_pad)
each row is a pair of coordinates (x, y)
@default_square: True or False
if True:
default crop_size = (112, 112)
else:
default crop_size = (96, 112);
!!! make sure, if output_size is not None:
(output_size - outer_padding)
= some_scale * (default crop_size * (1.0 + inner_padding_factor))
Returns:
----------
@reference_5point: 5x2 np.array
each row is a pair of transformed coordinates (x, y)
"""
#print('\n===> get_reference_facial_points():')
#print('---> Params:')
#print(' output_size: ', output_size)
#print(' inner_padding_factor: ', inner_padding_factor)
#print(' outer_padding:', outer_padding)
#print(' default_square: ', default_square)
tmp_5pts = np.array(REFERENCE_FACIAL_POINTS)
tmp_crop_size = np.array(DEFAULT_CROP_SIZE)
# 0) make the inner region a square
if default_square:
size_diff = max(tmp_crop_size) - tmp_crop_size
tmp_5pts += size_diff / 2
tmp_crop_size += size_diff
#print('---> default:')
#print(' crop_size = ', tmp_crop_size)
#print(' reference_5pts = ', tmp_5pts)
if (output_size and
output_size[0] == tmp_crop_size[0] and
output_size[1] == tmp_crop_size[1]):
#print('output_size == DEFAULT_CROP_SIZE {}: return default reference points'.format(tmp_crop_size))
return tmp_5pts
if (inner_padding_factor == 0 and
outer_padding == (0, 0)):
if output_size is None:
#print('No paddings to do: return default reference points')
return tmp_5pts
else:
raise FaceWarpException(
'No paddings to do, output_size must be None or {}'.format(tmp_crop_size))
# check output size
if not (0 <= inner_padding_factor <= 1.0):
raise FaceWarpException('Not (0 <= inner_padding_factor <= 1.0)')
if ((inner_padding_factor > 0 or outer_padding[0] > 0 or outer_padding[1] > 0)
and output_size is None):
output_size = tmp_crop_size * \
(1 + inner_padding_factor * 2).astype(np.int32)
output_size += np.array(outer_padding)
#print(' deduced from paddings, output_size = ', output_size)
if not (outer_padding[0] < output_size[0]
and outer_padding[1] < output_size[1]):
raise FaceWarpException('Not (outer_padding[0] < output_size[0]'
'and outer_padding[1] < output_size[1])')
# 1) pad the inner region according inner_padding_factor
#print('---> STEP1: pad the inner region according inner_padding_factor')
if inner_padding_factor > 0:
size_diff = tmp_crop_size * inner_padding_factor * 2
tmp_5pts += size_diff / 2
tmp_crop_size += np.round(size_diff).astype(np.int32)
#print(' crop_size = ', tmp_crop_size)
#print(' reference_5pts = ', tmp_5pts)
# 2) resize the padded inner region
#print('---> STEP2: resize the padded inner region')
size_bf_outer_pad = np.array(output_size) - np.array(outer_padding) * 2
#print(' crop_size = ', tmp_crop_size)
#print(' size_bf_outer_pad = ', size_bf_outer_pad)
if size_bf_outer_pad[0] * tmp_crop_size[1] != size_bf_outer_pad[1] * tmp_crop_size[0]:
raise FaceWarpException('Must have (output_size - outer_padding)'
'= some_scale * (crop_size * (1.0 + inner_padding_factor)')
scale_factor = size_bf_outer_pad[0].astype(np.float32) / tmp_crop_size[0]
#print(' resize scale_factor = ', scale_factor)
tmp_5pts = tmp_5pts * scale_factor
# size_diff = tmp_crop_size * (scale_factor - min(scale_factor))
# tmp_5pts = tmp_5pts + size_diff / 2
tmp_crop_size = size_bf_outer_pad
#print(' crop_size = ', tmp_crop_size)
#print(' reference_5pts = ', tmp_5pts)
# 3) add outer_padding to make output_size
reference_5point = tmp_5pts + np.array(outer_padding)
tmp_crop_size = output_size
#print('---> STEP3: add outer_padding to make output_size')
#print(' crop_size = ', tmp_crop_size)
#print(' reference_5pts = ', tmp_5pts)
#print('===> end get_reference_facial_points\n')
return reference_5point
def get_affine_transform_matrix(src_pts, dst_pts):
"""
Function:
----------
get affine transform matrix 'tfm' from src_pts to dst_pts
Parameters:
----------
@src_pts: Kx2 np.array
source points matrix, each row is a pair of coordinates (x, y)
@dst_pts: Kx2 np.array
destination points matrix, each row is a pair of coordinates (x, y)
Returns:
----------
@tfm: 2x3 np.array
transform matrix from src_pts to dst_pts
"""
tfm = np.float32([[1, 0, 0], [0, 1, 0]])
n_pts = src_pts.shape[0]
ones = np.ones((n_pts, 1), src_pts.dtype)
src_pts_ = np.hstack([src_pts, ones])
dst_pts_ = np.hstack([dst_pts, ones])
# #print(('src_pts_:\n' + str(src_pts_))
# #print(('dst_pts_:\n' + str(dst_pts_))
A, res, rank, s = np.linalg.lstsq(src_pts_, dst_pts_)
# #print(('np.linalg.lstsq return A: \n' + str(A))
# #print(('np.linalg.lstsq return res: \n' + str(res))
# #print(('np.linalg.lstsq return rank: \n' + str(rank))
# #print(('np.linalg.lstsq return s: \n' + str(s))
if rank == 3:
tfm = np.float32([
[A[0, 0], A[1, 0], A[2, 0]],
[A[0, 1], A[1, 1], A[2, 1]]
])
elif rank == 2:
tfm = np.float32([
[A[0, 0], A[1, 0], 0],
[A[0, 1], A[1, 1], 0]
])
return tfm
def warp_and_crop_face(src_img,
facial_pts,
reference_pts=None,
crop_size=(96, 112),
align_type='smilarity'):
"""
Function:
----------
apply affine transform 'trans' to uv
Parameters:
----------
@src_img: 3x3 np.array
input image
@facial_pts: could be
1)a list of K coordinates (x,y)
or
2) Kx2 or 2xK np.array
each row or col is a pair of coordinates (x, y)
@reference_pts: could be
1) a list of K coordinates (x,y)
or
2) Kx2 or 2xK np.array
each row or col is a pair of coordinates (x, y)
or
3) None
if None, use default reference facial points
@crop_size: (w, h)
output face image size
@align_type: transform type, could be one of
1) 'similarity': use similarity transform
2) 'cv2_affine': use the first 3 points to do affine transform,
by calling cv2.getAffineTransform()
3) 'affine': use all points to do affine transform
Returns:
----------
@face_img: output face image with size (w, h) = @crop_size
"""
if reference_pts is None:
if crop_size[0] == 96 and crop_size[1] == 112:
reference_pts = REFERENCE_FACIAL_POINTS
else:
default_square = False
inner_padding_factor = 0
outer_padding = (0, 0)
output_size = crop_size
reference_pts = get_reference_facial_points(output_size,
inner_padding_factor,
outer_padding,
default_square)
ref_pts = np.float32(reference_pts)
ref_pts_shp = ref_pts.shape
if max(ref_pts_shp) < 3 or min(ref_pts_shp) != 2:
raise FaceWarpException(
'reference_pts.shape must be (K,2) or (2,K) and K>2')
if ref_pts_shp[0] == 2:
ref_pts = ref_pts.T
src_pts = np.float32(facial_pts)
src_pts_shp = src_pts.shape
if max(src_pts_shp) < 3 or min(src_pts_shp) != 2:
raise FaceWarpException(
'facial_pts.shape must be (K,2) or (2,K) and K>2')
if src_pts_shp[0] == 2:
src_pts = src_pts.T
# #print('--->src_pts:\n', src_pts
# #print('--->ref_pts\n', ref_pts
if src_pts.shape != ref_pts.shape:
raise FaceWarpException(
'facial_pts and reference_pts must have the same shape')
if align_type is 'cv2_affine':
tfm = cv2.getAffineTransform(src_pts[0:3], ref_pts[0:3])
# #print(('cv2.getAffineTransform() returns tfm=\n' + str(tfm))
elif align_type is 'affine':
tfm = get_affine_transform_matrix(src_pts, ref_pts)
# #print(('get_affine_transform_matrix() returns tfm=\n' + str(tfm))
else:
tfm = get_similarity_transform_for_cv2(src_pts, ref_pts)
# #print(('get_similarity_transform_for_cv2() returns tfm=\n' + str(tfm))
# #print('--->Transform matrix: '
# #print(('type(tfm):' + str(type(tfm)))
# #print(('tfm.dtype:' + str(tfm.dtype))
# #print( tfm
face_img = cv2.warpAffine(src_img, tfm, (crop_size[0], crop_size[1]))
return face_img
\ No newline at end of file
import numpy as np
from PIL import Image
def nms(boxes, overlap_threshold=0.5, mode='union'):
"""Non-maximum suppression.
Arguments:
boxes: a float numpy array of shape [n, 5],
where each row is (xmin, ymin, xmax, ymax, score).
overlap_threshold: a float number.
mode: 'union' or 'min'.
Returns:
list with indices of the selected boxes
"""
# if there are no boxes, return the empty list
if len(boxes) == 0:
return []
# list of picked indices
pick = []
# grab the coordinates of the bounding boxes
x1, y1, x2, y2, score = [boxes[:, i] for i in range(5)]
area = (x2 - x1 + 1.0)*(y2 - y1 + 1.0)
ids = np.argsort(score) # in increasing order
while len(ids) > 0:
# grab index of the largest value
last = len(ids) - 1
i = ids[last]
pick.append(i)
# compute intersections
# of the box with the largest score
# with the rest of boxes
# left top corner of intersection boxes
ix1 = np.maximum(x1[i], x1[ids[:last]])
iy1 = np.maximum(y1[i], y1[ids[:last]])
# right bottom corner of intersection boxes
ix2 = np.minimum(x2[i], x2[ids[:last]])
iy2 = np.minimum(y2[i], y2[ids[:last]])
# width and height of intersection boxes
w = np.maximum(0.0, ix2 - ix1 + 1.0)
h = np.maximum(0.0, iy2 - iy1 + 1.0)
# intersections' areas
inter = w * h
if mode == 'min':
overlap = inter/np.minimum(area[i], area[ids[:last]])
elif mode == 'union':
# intersection over union (IoU)
overlap = inter/(area[i] + area[ids[:last]] - inter)
# delete all boxes where overlap is too big
ids = np.delete(
ids,
np.concatenate([[last], np.where(overlap > overlap_threshold)[0]])
)
return pick
def convert_to_square(bboxes):
"""Convert bounding boxes to a square form.
Arguments:
bboxes: a float numpy array of shape [n, 5].
Returns:
a float numpy array of shape [n, 5],
squared bounding boxes.
"""
square_bboxes = np.zeros_like(bboxes)
x1, y1, x2, y2 = [bboxes[:, i] for i in range(4)]
h = y2 - y1 + 1.0
w = x2 - x1 + 1.0
max_side = np.maximum(h, w)
square_bboxes[:, 0] = x1 + w*0.5 - max_side*0.5
square_bboxes[:, 1] = y1 + h*0.5 - max_side*0.5
square_bboxes[:, 2] = square_bboxes[:, 0] + max_side - 1.0
square_bboxes[:, 3] = square_bboxes[:, 1] + max_side - 1.0
return square_bboxes
def calibrate_box(bboxes, offsets):
"""Transform bounding boxes to be more like true bounding boxes.
'offsets' is one of the outputs of the nets.
Arguments:
bboxes: a float numpy array of shape [n, 5].
offsets: a float numpy array of shape [n, 4].
Returns:
a float numpy array of shape [n, 5].
"""
x1, y1, x2, y2 = [bboxes[:, i] for i in range(4)]
w = x2 - x1 + 1.0
h = y2 - y1 + 1.0
w = np.expand_dims(w, 1)
h = np.expand_dims(h, 1)
# this is what happening here:
# tx1, ty1, tx2, ty2 = [offsets[:, i] for i in range(4)]
# x1_true = x1 + tx1*w
# y1_true = y1 + ty1*h
# x2_true = x2 + tx2*w
# y2_true = y2 + ty2*h
# below is just more compact form of this
# are offsets always such that
# x1 < x2 and y1 < y2 ?
translation = np.hstack([w, h, w, h])*offsets
bboxes[:, 0:4] = bboxes[:, 0:4] + translation
return bboxes
def get_image_boxes(bounding_boxes, img, size=24):
"""Cut out boxes from the image.
Arguments:
bounding_boxes: a float numpy array of shape [n, 5].
img: an instance of PIL.Image.
size: an integer, size of cutouts.
Returns:
a float numpy array of shape [n, 3, size, size].
"""
num_boxes = len(bounding_boxes)
width, height = img.size
[dy, edy, dx, edx, y, ey, x, ex, w, h] = correct_bboxes(bounding_boxes, width, height)
img_boxes = np.zeros((num_boxes, 3, size, size), 'float32')
for i in range(num_boxes):
img_box = np.zeros((h[i], w[i], 3), 'uint8')
img_array = np.asarray(img, 'uint8')
img_box[dy[i]:(edy[i] + 1), dx[i]:(edx[i] + 1), :] =\
img_array[y[i]:(ey[i] + 1), x[i]:(ex[i] + 1), :]
# resize
img_box = Image.fromarray(img_box)
img_box = img_box.resize((size, size), Image.BILINEAR)
img_box = np.asarray(img_box, 'float32')
img_boxes[i, :, :, :] = _preprocess(img_box)
return img_boxes
def correct_bboxes(bboxes, width, height):
"""Crop boxes that are too big and get coordinates
with respect to cutouts.
Arguments:
bboxes: a float numpy array of shape [n, 5],
where each row is (xmin, ymin, xmax, ymax, score).
width: a float number.
height: a float number.
Returns:
dy, dx, edy, edx: a int numpy arrays of shape [n],
coordinates of the boxes with respect to the cutouts.
y, x, ey, ex: a int numpy arrays of shape [n],
corrected ymin, xmin, ymax, xmax.
h, w: a int numpy arrays of shape [n],
just heights and widths of boxes.
in the following order:
[dy, edy, dx, edx, y, ey, x, ex, w, h].
"""
x1, y1, x2, y2 = [bboxes[:, i] for i in range(4)]
w, h = x2 - x1 + 1.0, y2 - y1 + 1.0
num_boxes = bboxes.shape[0]
# 'e' stands for end
# (x, y) -> (ex, ey)
x, y, ex, ey = x1, y1, x2, y2
# we need to cut out a box from the image.
# (x, y, ex, ey) are corrected coordinates of the box
# in the image.
# (dx, dy, edx, edy) are coordinates of the box in the cutout
# from the image.
dx, dy = np.zeros((num_boxes,)), np.zeros((num_boxes,))
edx, edy = w.copy() - 1.0, h.copy() - 1.0
# if box's bottom right corner is too far right
ind = np.where(ex > width - 1.0)[0]
edx[ind] = w[ind] + width - 2.0 - ex[ind]
ex[ind] = width - 1.0
# if box's bottom right corner is too low
ind = np.where(ey > height - 1.0)[0]
edy[ind] = h[ind] + height - 2.0 - ey[ind]
ey[ind] = height - 1.0
# if box's top left corner is too far left
ind = np.where(x < 0.0)[0]
dx[ind] = 0.0 - x[ind]
x[ind] = 0.0
# if box's top left corner is too high
ind = np.where(y < 0.0)[0]
dy[ind] = 0.0 - y[ind]
y[ind] = 0.0
return_list = [dy, edy, dx, edx, y, ey, x, ex, w, h]
return_list = [i.astype('int32') for i in return_list]
return return_list
def _preprocess(img):
"""Preprocessing step before feeding the network.
Arguments:
img: a float numpy array of shape [h, w, c].
Returns:
a float numpy array of shape [1, c, h, w].
"""
img = img.transpose((2, 0, 1))
img = np.expand_dims(img, 0)
img = (img - 127.5)*0.0078125
return img
import numpy as np
import torch
from torch.autograd import Variable
from .get_nets import PNet, RNet, ONet
from .box_utils import nms, calibrate_box, get_image_boxes, convert_to_square
from .first_stage import run_first_stage
def detect_faces(image, min_face_size=20.0,
thresholds=[0.6, 0.7, 0.8],
nms_thresholds=[0.7, 0.7, 0.7]):
"""
Arguments:
image: an instance of PIL.Image.
min_face_size: a float number.
thresholds: a list of length 3.
nms_thresholds: a list of length 3.
Returns:
two float numpy arrays of shapes [n_boxes, 4] and [n_boxes, 10],
bounding boxes and facial landmarks.
"""
# LOAD MODELS
pnet = PNet()
rnet = RNet()
onet = ONet()
onet.eval()
# BUILD AN IMAGE PYRAMID
width, height = image.size
min_length = min(height, width)
min_detection_size = 12
factor = 0.707 # sqrt(0.5)
# scales for scaling the image
scales = []
# scales the image so that
# minimum size that we can detect equals to
# minimum face size that we want to detect
m = min_detection_size/min_face_size
min_length *= m
factor_count = 0
while min_length > min_detection_size:
scales.append(m*factor**factor_count)
min_length *= factor
factor_count += 1
# STAGE 1
# it will be returned
bounding_boxes = []
with torch.no_grad():
# run P-Net on different scales
for s in scales:
boxes = run_first_stage(image, pnet, scale=s, threshold=thresholds[0])
bounding_boxes.append(boxes)
# collect boxes (and offsets, and scores) from different scales
bounding_boxes = [i for i in bounding_boxes if i is not None]
bounding_boxes = np.vstack(bounding_boxes)
keep = nms(bounding_boxes[:, 0:5], nms_thresholds[0])
bounding_boxes = bounding_boxes[keep]
# use offsets predicted by pnet to transform bounding boxes
bounding_boxes = calibrate_box(bounding_boxes[:, 0:5], bounding_boxes[:, 5:])
# shape [n_boxes, 5]
bounding_boxes = convert_to_square(bounding_boxes)
bounding_boxes[:, 0:4] = np.round(bounding_boxes[:, 0:4])
# STAGE 2
img_boxes = get_image_boxes(bounding_boxes, image, size=24)
img_boxes = torch.FloatTensor(img_boxes)
output = rnet(img_boxes)
offsets = output[0].data.numpy() # shape [n_boxes, 4]
probs = output[1].data.numpy() # shape [n_boxes, 2]
keep = np.where(probs[:, 1] > thresholds[1])[0]
bounding_boxes = bounding_boxes[keep]
bounding_boxes[:, 4] = probs[keep, 1].reshape((-1,))
offsets = offsets[keep]
keep = nms(bounding_boxes, nms_thresholds[1])
bounding_boxes = bounding_boxes[keep]
bounding_boxes = calibrate_box(bounding_boxes, offsets[keep])
bounding_boxes = convert_to_square(bounding_boxes)
bounding_boxes[:, 0:4] = np.round(bounding_boxes[:, 0:4])
# STAGE 3
img_boxes = get_image_boxes(bounding_boxes, image, size=48)
if len(img_boxes) == 0:
return [], []
img_boxes = torch.FloatTensor(img_boxes)
output = onet(img_boxes)
landmarks = output[0].data.numpy() # shape [n_boxes, 10]
offsets = output[1].data.numpy() # shape [n_boxes, 4]
probs = output[2].data.numpy() # shape [n_boxes, 2]
keep = np.where(probs[:, 1] > thresholds[2])[0]
bounding_boxes = bounding_boxes[keep]
bounding_boxes[:, 4] = probs[keep, 1].reshape((-1,))
offsets = offsets[keep]
landmarks = landmarks[keep]
# compute landmark points
width = bounding_boxes[:, 2] - bounding_boxes[:, 0] + 1.0
height = bounding_boxes[:, 3] - bounding_boxes[:, 1] + 1.0
xmin, ymin = bounding_boxes[:, 0], bounding_boxes[:, 1]
landmarks[:, 0:5] = np.expand_dims(xmin, 1) + np.expand_dims(width, 1)*landmarks[:, 0:5]
landmarks[:, 5:10] = np.expand_dims(ymin, 1) + np.expand_dims(height, 1)*landmarks[:, 5:10]
bounding_boxes = calibrate_box(bounding_boxes, offsets)
keep = nms(bounding_boxes, nms_thresholds[2], mode='min')
bounding_boxes = bounding_boxes[keep]
landmarks = landmarks[keep]
return bounding_boxes, landmarks
import torch
from torch.autograd import Variable
import math
from PIL import Image
import numpy as np
from .box_utils import nms, _preprocess
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
# device = 'cpu'
def run_first_stage(image, net, scale, threshold):
"""Run P-Net, generate bounding boxes, and do NMS.
Arguments:
image: an instance of PIL.Image.
net: an instance of pytorch's nn.Module, P-Net.
scale: a float number,
scale width and height of the image by this number.
threshold: a float number,
threshold on the probability of a face when generating
bounding boxes from predictions of the net.
Returns:
a float numpy array of shape [n_boxes, 9],
bounding boxes with scores and offsets (4 + 1 + 4).
"""
# scale the image and convert it to a float array
width, height = image.size
sw, sh = math.ceil(width*scale), math.ceil(height*scale)
img = image.resize((sw, sh), Image.BILINEAR)
img = np.asarray(img, 'float32')
img = torch.FloatTensor(_preprocess(img)).to(device)
with torch.no_grad():
output = net(img)
probs = output[1].cpu().data.numpy()[0, 1, :, :]
offsets = output[0].cpu().data.numpy()
# probs: probability of a face at each sliding window
# offsets: transformations to true bounding boxes
boxes = _generate_bboxes(probs, offsets, scale, threshold)
if len(boxes) == 0:
return None
keep = nms(boxes[:, 0:5], overlap_threshold=0.5)
return boxes[keep]
def _generate_bboxes(probs, offsets, scale, threshold):
"""Generate bounding boxes at places
where there is probably a face.
Arguments:
probs: a float numpy array of shape [n, m].
offsets: a float numpy array of shape [1, 4, n, m].
scale: a float number,
width and height of the image were scaled by this number.
threshold: a float number.
Returns:
a float numpy array of shape [n_boxes, 9]
"""
# applying P-Net is equivalent, in some sense, to
# moving 12x12 window with stride 2
stride = 2
cell_size = 12
# indices of boxes where there is probably a face
inds = np.where(probs > threshold)
if inds[0].size == 0:
return np.array([])
# transformations of bounding boxes
tx1, ty1, tx2, ty2 = [offsets[0, i, inds[0], inds[1]] for i in range(4)]
# they are defined as:
# w = x2 - x1 + 1
# h = y2 - y1 + 1
# x1_true = x1 + tx1*w
# x2_true = x2 + tx2*w
# y1_true = y1 + ty1*h
# y2_true = y2 + ty2*h
offsets = np.array([tx1, ty1, tx2, ty2])
score = probs[inds[0], inds[1]]
# P-Net is applied to scaled images
# so we need to rescale bounding boxes back
bounding_boxes = np.vstack([
np.round((stride*inds[1] + 1.0)/scale),
np.round((stride*inds[0] + 1.0)/scale),
np.round((stride*inds[1] + 1.0 + cell_size)/scale),
np.round((stride*inds[0] + 1.0 + cell_size)/scale),
score, offsets
])
# why one is added?
return bounding_boxes.T
import torch
import torch.nn as nn
import torch.nn.functional as F
from collections import OrderedDict
import numpy as np
class Flatten(nn.Module):
def __init__(self):
super(Flatten, self).__init__()
def forward(self, x):
"""
Arguments:
x: a float tensor with shape [batch_size, c, h, w].
Returns:
a float tensor with shape [batch_size, c*h*w].
"""
# without this pretrained model isn't working
x = x.transpose(3, 2).contiguous()
return x.view(x.size(0), -1)
class PNet(nn.Module):
def __init__(self):
super(PNet, self).__init__()
# suppose we have input with size HxW, then
# after first layer: H - 2,
# after pool: ceil((H - 2)/2),
# after second conv: ceil((H - 2)/2) - 2,
# after last conv: ceil((H - 2)/2) - 4,
# and the same for W
self.features = nn.Sequential(OrderedDict([
('conv1', nn.Conv2d(3, 10, 3, 1)),
('prelu1', nn.PReLU(10)),
('pool1', nn.MaxPool2d(2, 2, ceil_mode=True)),
('conv2', nn.Conv2d(10, 16, 3, 1)),
('prelu2', nn.PReLU(16)),
('conv3', nn.Conv2d(16, 32, 3, 1)),
('prelu3', nn.PReLU(32))
]))
self.conv4_1 = nn.Conv2d(32, 2, 1, 1)
self.conv4_2 = nn.Conv2d(32, 4, 1, 1)
weights = np.load('mtcnn_pytorch/src/weights/pnet.npy',allow_pickle=True)[()]
for n, p in self.named_parameters():
p.data = torch.FloatTensor(weights[n])
def forward(self, x):
"""
Arguments:
x: a float tensor with shape [batch_size, 3, h, w].
Returns:
b: a float tensor with shape [batch_size, 4, h', w'].
a: a float tensor with shape [batch_size, 2, h', w'].
"""
x = self.features(x)
a = self.conv4_1(x)
b = self.conv4_2(x)
a = F.softmax(a, dim=-1)
return b, a
class RNet(nn.Module):
def __init__(self):
super(RNet, self).__init__()
self.features = nn.Sequential(OrderedDict([
('conv1', nn.Conv2d(3, 28, 3, 1)),
('prelu1', nn.PReLU(28)),
('pool1', nn.MaxPool2d(3, 2, ceil_mode=True)),
('conv2', nn.Conv2d(28, 48, 3, 1)),
('prelu2', nn.PReLU(48)),
('pool2', nn.MaxPool2d(3, 2, ceil_mode=True)),
('conv3', nn.Conv2d(48, 64, 2, 1)),
('prelu3', nn.PReLU(64)),
('flatten', Flatten()),
('conv4', nn.Linear(576, 128)),
('prelu4', nn.PReLU(128))
]))
self.conv5_1 = nn.Linear(128, 2)
self.conv5_2 = nn.Linear(128, 4)
weights = np.load('mtcnn_pytorch/src/weights/rnet.npy',allow_pickle=True)[()]
for n, p in self.named_parameters():
p.data = torch.FloatTensor(weights[n])
def forward(self, x):
"""
Arguments:
x: a float tensor with shape [batch_size, 3, h, w].
Returns:
b: a float tensor with shape [batch_size, 4].
a: a float tensor with shape [batch_size, 2].
"""
x = self.features(x)
a = self.conv5_1(x)
b = self.conv5_2(x)
a = F.softmax(a, dim=-1)
return b, a
class ONet(nn.Module):
def __init__(self):
super(ONet, self).__init__()
self.features = nn.Sequential(OrderedDict([
('conv1', nn.Conv2d(3, 32, 3, 1)),
('prelu1', nn.PReLU(32)),
('pool1', nn.MaxPool2d(3, 2, ceil_mode=True)),
('conv2', nn.Conv2d(32, 64, 3, 1)),
('prelu2', nn.PReLU(64)),
('pool2', nn.MaxPool2d(3, 2, ceil_mode=True)),
('conv3', nn.Conv2d(64, 64, 3, 1)),
('prelu3', nn.PReLU(64)),
('pool3', nn.MaxPool2d(2, 2, ceil_mode=True)),
('conv4', nn.Conv2d(64, 128, 2, 1)),
('prelu4', nn.PReLU(128)),
('flatten', Flatten()),
('conv5', nn.Linear(1152, 256)),
('drop5', nn.Dropout(0.25)),
('prelu5', nn.PReLU(256)),
]))
self.conv6_1 = nn.Linear(256, 2)
self.conv6_2 = nn.Linear(256, 4)
self.conv6_3 = nn.Linear(256, 10)
weights = np.load('mtcnn_pytorch/src/weights/onet.npy',allow_pickle=True)[()]
for n, p in self.named_parameters():
p.data = torch.FloatTensor(weights[n])
def forward(self, x):
"""
Arguments:
x: a float tensor with shape [batch_size, 3, h, w].
Returns:
c: a float tensor with shape [batch_size, 10].
b: a float tensor with shape [batch_size, 4].
a: a float tensor with shape [batch_size, 2].
"""
x = self.features(x)
a = self.conv6_1(x)
b = self.conv6_2(x)
c = self.conv6_3(x)
a = F.softmax(a, dim = -1)
return c, b, a
# -*- coding: utf-8 -*-
"""
Created on Tue Jul 11 06:54:28 2017
@author: zhaoyafei
"""
import numpy as np
from numpy.linalg import inv, norm, lstsq
from numpy.linalg import matrix_rank as rank
class MatlabCp2tormException(Exception):
def __str__(self):
return 'In File {}:{}'.format(
__file__, super.__str__(self))
def tformfwd(trans, uv):
"""
Function:
----------
apply affine transform 'trans' to uv
Parameters:
----------
@trans: 3x3 np.array
transform matrix
@uv: Kx2 np.array
each row is a pair of coordinates (x, y)
Returns:
----------
@xy: Kx2 np.array
each row is a pair of transformed coordinates (x, y)
"""
uv = np.hstack((
uv, np.ones((uv.shape[0], 1))
))
xy = np.dot(uv, trans)
xy = xy[:, 0:-1]
return xy
def tforminv(trans, uv):
"""
Function:
----------
apply the inverse of affine transform 'trans' to uv
Parameters:
----------
@trans: 3x3 np.array
transform matrix
@uv: Kx2 np.array
each row is a pair of coordinates (x, y)
Returns:
----------
@xy: Kx2 np.array
each row is a pair of inverse-transformed coordinates (x, y)
"""
Tinv = inv(trans)
xy = tformfwd(Tinv, uv)
return xy
def findNonreflectiveSimilarity(uv, xy, options=None):
options = {'K': 2}
K = options['K']
M = xy.shape[0]
x = xy[:, 0].reshape((-1, 1)) # use reshape to keep a column vector
y = xy[:, 1].reshape((-1, 1)) # use reshape to keep a column vector
# print('--->x, y:\n', x, y
tmp1 = np.hstack((x, y, np.ones((M, 1)), np.zeros((M, 1))))
tmp2 = np.hstack((y, -x, np.zeros((M, 1)), np.ones((M, 1))))
X = np.vstack((tmp1, tmp2))
# print('--->X.shape: ', X.shape
# print('X:\n', X
u = uv[:, 0].reshape((-1, 1)) # use reshape to keep a column vector
v = uv[:, 1].reshape((-1, 1)) # use reshape to keep a column vector
U = np.vstack((u, v))
# print('--->U.shape: ', U.shape
# print('U:\n', U
# We know that X * r = U
if rank(X) >= 2 * K:
r, _, _, _ = lstsq(X, U)
r = np.squeeze(r)
else:
raise Exception('cp2tform:twoUniquePointsReq')
# print('--->r:\n', r
sc = r[0]
ss = r[1]
tx = r[2]
ty = r[3]
Tinv = np.array([
[sc, -ss, 0],
[ss, sc, 0],
[tx, ty, 1]
])
# print('--->Tinv:\n', Tinv
T = inv(Tinv)
# print('--->T:\n', T
T[:, 2] = np.array([0, 0, 1])
return T, Tinv
def findSimilarity(uv, xy, options=None):
options = {'K': 2}
# uv = np.array(uv)
# xy = np.array(xy)
# Solve for trans1
trans1, trans1_inv = findNonreflectiveSimilarity(uv, xy, options)
# Solve for trans2
# manually reflect the xy data across the Y-axis
xyR = xy
xyR[:, 0] = -1 * xyR[:, 0]
trans2r, trans2r_inv = findNonreflectiveSimilarity(uv, xyR, options)
# manually reflect the tform to undo the reflection done on xyR
TreflectY = np.array([
[-1, 0, 0],
[0, 1, 0],
[0, 0, 1]
])
trans2 = np.dot(trans2r, TreflectY)
# Figure out if trans1 or trans2 is better
xy1 = tformfwd(trans1, uv)
norm1 = norm(xy1 - xy)
xy2 = tformfwd(trans2, uv)
norm2 = norm(xy2 - xy)
if norm1 <= norm2:
return trans1, trans1_inv
else:
trans2_inv = inv(trans2)
return trans2, trans2_inv
def get_similarity_transform(src_pts, dst_pts, reflective=True):
"""
Function:
----------
Find Similarity Transform Matrix 'trans':
u = src_pts[:, 0]
v = src_pts[:, 1]
x = dst_pts[:, 0]
y = dst_pts[:, 1]
[x, y, 1] = [u, v, 1] * trans
Parameters:
----------
@src_pts: Kx2 np.array
source points, each row is a pair of coordinates (x, y)
@dst_pts: Kx2 np.array
destination points, each row is a pair of transformed
coordinates (x, y)
@reflective: True or False
if True:
use reflective similarity transform
else:
use non-reflective similarity transform
Returns:
----------
@trans: 3x3 np.array
transform matrix from uv to xy
trans_inv: 3x3 np.array
inverse of trans, transform matrix from xy to uv
"""
if reflective:
trans, trans_inv = findSimilarity(src_pts, dst_pts)
else:
trans, trans_inv = findNonreflectiveSimilarity(src_pts, dst_pts)
return trans, trans_inv
def cvt_tform_mat_for_cv2(trans):
"""
Function:
----------
Convert Transform Matrix 'trans' into 'cv2_trans' which could be
directly used by cv2.warpAffine():
u = src_pts[:, 0]
v = src_pts[:, 1]
x = dst_pts[:, 0]
y = dst_pts[:, 1]
[x, y].T = cv_trans * [u, v, 1].T
Parameters:
----------
@trans: 3x3 np.array
transform matrix from uv to xy
Returns:
----------
@cv2_trans: 2x3 np.array
transform matrix from src_pts to dst_pts, could be directly used
for cv2.warpAffine()
"""
cv2_trans = trans[:, 0:2].T
return cv2_trans
def get_similarity_transform_for_cv2(src_pts, dst_pts, reflective=True):
"""
Function:
----------
Find Similarity Transform Matrix 'cv2_trans' which could be
directly used by cv2.warpAffine():
u = src_pts[:, 0]
v = src_pts[:, 1]
x = dst_pts[:, 0]
y = dst_pts[:, 1]
[x, y].T = cv_trans * [u, v, 1].T
Parameters:
----------
@src_pts: Kx2 np.array
source points, each row is a pair of coordinates (x, y)
@dst_pts: Kx2 np.array
destination points, each row is a pair of transformed
coordinates (x, y)
reflective: True or False
if True:
use reflective similarity transform
else:
use non-reflective similarity transform
Returns:
----------
@cv2_trans: 2x3 np.array
transform matrix from src_pts to dst_pts, could be directly used
for cv2.warpAffine()
"""
trans, trans_inv = get_similarity_transform(src_pts, dst_pts, reflective)
cv2_trans = cvt_tform_mat_for_cv2(trans)
return cv2_trans
if __name__ == '__main__':
"""
u = [0, 6, -2]
v = [0, 3, 5]
x = [-1, 0, 4]
y = [-1, -10, 4]
# In Matlab, run:
#
# uv = [u'; v'];
# xy = [x'; y'];
# tform_sim=cp2tform(uv,xy,'similarity');
#
# trans = tform_sim.tdata.T
# ans =
# -0.0764 -1.6190 0
# 1.6190 -0.0764 0
# -3.2156 0.0290 1.0000
# trans_inv = tform_sim.tdata.Tinv
# ans =
#
# -0.0291 0.6163 0
# -0.6163 -0.0291 0
# -0.0756 1.9826 1.0000
# xy_m=tformfwd(tform_sim, u,v)
#
# xy_m =
#
# -3.2156 0.0290
# 1.1833 -9.9143
# 5.0323 2.8853
# uv_m=tforminv(tform_sim, x,y)
#
# uv_m =
#
# 0.5698 1.3953
# 6.0872 2.2733
# -2.6570 4.3314
"""
u = [0, 6, -2]
v = [0, 3, 5]
x = [-1, 0, 4]
y = [-1, -10, 4]
uv = np.array((u, v)).T
xy = np.array((x, y)).T
print('\n--->uv:')
print(uv)
print('\n--->xy:')
print(xy)
trans, trans_inv = get_similarity_transform(uv, xy)
print('\n--->trans matrix:')
print(trans)
print('\n--->trans_inv matrix:')
print(trans_inv)
print('\n---> apply transform to uv')
print('\nxy_m = uv_augmented * trans')
uv_aug = np.hstack((
uv, np.ones((uv.shape[0], 1))
))
xy_m = np.dot(uv_aug, trans)
print(xy_m)
print('\nxy_m = tformfwd(trans, uv)')
xy_m = tformfwd(trans, uv)
print(xy_m)
print('\n---> apply inverse transform to xy')
print('\nuv_m = xy_augmented * trans_inv')
xy_aug = np.hstack((
xy, np.ones((xy.shape[0], 1))
))
uv_m = np.dot(xy_aug, trans_inv)
print(uv_m)
print('\nuv_m = tformfwd(trans_inv, xy)')
uv_m = tformfwd(trans_inv, xy)
print(uv_m)
uv_m = tforminv(trans, xy)
print('\nuv_m = tforminv(trans, xy)')
print(uv_m)
from PIL import ImageDraw
def show_bboxes(img, bounding_boxes, facial_landmarks=[]):
"""Draw bounding boxes and facial landmarks.
Arguments:
img: an instance of PIL.Image.
bounding_boxes: a float numpy array of shape [n, 5].
facial_landmarks: a float numpy array of shape [n, 10].
Returns:
an instance of PIL.Image.
"""
img_copy = img.copy()
draw = ImageDraw.Draw(img_copy)
for b in bounding_boxes:
draw.rectangle([
(b[0], b[1]), (b[2], b[3])
], outline='white')
for p in facial_landmarks:
for i in range(5):
draw.ellipse([
(p[i] - 1.0, p[i + 5] - 1.0),
(p[i] + 1.0, p[i + 5] + 1.0)
], outline='blue')
return img_copy
import requests
import json
import base64
def getByte(path):
with open(path, 'rb') as f:
img_byte = base64.b64encode(f.read()) # 二进制读取后变base64编码
img_str = img_byte.decode('ascii')
return img_str
# 没有人脸
requestsss = {'inst_id': '1315584165094813698', 'img_url': "https://img0.baidu.com/it/u=52681052,678098948&fm=253&fmt=auto&app=120&f=JPEG?w=1000&h=400"}
# 返回值 {'faces': {}, 'recognized': {}, 'unknown': {}, 'message': 'face detect failed,no faces find'}
# 不存在的人脸
# requestsss = {'inst_id': '1315584165094813698', 'img_url': "https://gimg2.baidu.com/image_search/src=http%3A%2F%2Fwww.n63.com%2Fphotodir%2Fn63img%2F%3FN%3DX2hiJTI2ZGRXZ1drV2xXJTVEV2xXZldlZyU1QiUyNiUyQi5mJTI2b29vJTI3YXJfZmdfZllhbGYlNUQlNjBrJTI3ZmZZaFliJTI3%26v%3D.jpg&refer=http%3A%2F%2Fwww.n63.com&app=2002&size=f9999,10000&q=a80&n=0&g=0n&fmt=jpeg?sec=1630835867&t=717f70fd50bc5ccc7b81ad1f2fd9e938"}
# 返回值 {'faces': {'0': {'left': 194.52226984500885, 'top': 104.55831748247147, 'right': 409.6213800907135, 'bottom': 377.34358298778534, 'confidence': 0.9952797889709473}}, 'recognized': {}, 'unknown': {'0': 'unknow'}, 'message': 'complete'}
req = json.dumps(requestsss) # 字典数据结构变json(所有程序语言都认识的字符串)
res = requests.post('https://1290825930774375.cn-hangzhou.fc.aliyuncs.com/2016-08-15/proxy/faces/faces/', data=req)
print(res.json())
import cv2
import argparse
from pathlib import Path
from PIL import Image
from mtcnn import MTCNN
from datetime import datetime
from PIL import Image
import numpy as np
from mtcnn_pytorch.src.align_trans import get_reference_facial_points, warp_and_crop_face
parser = argparse.ArgumentParser(description='take a picture')
parser.add_argument('--name', '-n', default='unknown', type=str, help='input the name of the recording person')
args = parser.parse_args()
from pathlib import Path
data_path = Path('data')
save_path = data_path / 'facebank' / args.name
if not save_path.exists():
save_path.mkdir()
# 初始化摄像头
cap = cv2.VideoCapture(0)
# 我的摄像头默认像素640*480,可以根据摄像头素质调整分辨率
cap.set(3, 1280)
cap.set(4, 720)
mtcnn = MTCNN()
while cap.isOpened():
# 采集一帧一帧的图像数据
isSuccess, frame = cap.read()
# 实时的将采集到的数据显示到界面上
if isSuccess:
frame_text = cv2.putText(frame,
'Press t to take a picture,q to quit.....',
(5, 100),
cv2.FONT_HERSHEY_SIMPLEX,
2,
(0, 255, 0),
3,
cv2.LINE_AA)
cv2.imshow("My Capture", frame_text)
# 实现按下“t”键拍照
if cv2.waitKey(1) & 0xFF == ord('t'):
p = Image.fromarray(frame[..., ::-1])
try:
warped_face = np.array(mtcnn.align(p))[..., ::-1]
print(save_path / '{}.jpg'.format(str(datetime.now())[:-7].replace(":", "-").replace(" ", "-")))
cv2.imwrite(str(save_path / '{}.jpg'.format(str(datetime.now())[:-7].replace(":", "-").replace(" ", "-"))),
warped_face)
name = input("输入接下来要拍摄的人名")
if name == "继续":
continue
save_path = data_path / 'facebank' / name
if not save_path.exists():
save_path.mkdir()
except:
print('no face captured')
if cv2.waitKey(1) & 0xFF == ord('q'):
cap.release()
cv2.destroyAllWindows()
break
# 释放摄像头资源
ROSTemplateFormatVersion: '2015-09-01'
Transform: 'Aliyun::Serverless-2018-04-03'
Resources:
faces:
Type: 'Aliyun::Serverless::Service'
Properties:
VpcConfig:
VpcId: 'vpc-bp1tl9xp5tiofeobcev6n'
VSwitchIds: [ 'vsw-bp1hcr2yhzer8j6vt4u7i' ]
SecurityGroupId: 'sg-bp18y43m16vy66b1axzm'
NasConfig:
UserId: 10003
GroupId: 10003
MountPoints:
- ServerAddr: '0df8f4b72f-hoj6.cn-hangzhou.nas.aliyuncs.com:/'
MountDir: '/mnt/nas'
Description: 'face extrate'
faces:
Type: 'Aliyun::Serverless::Function'
Properties:
Initializer: 'index.initializer'
Handler: index.handler
Runtime: python3
CodeUri: './'
InitializationTimeout: 13
MemorySize: 1024
Timeout: 15
Events:
httpTrigger:
Type: HTTP
Properties:
AuthType: ANONYMOUS
Methods: ['POST', 'GET']
\ No newline at end of file
from Learner import face_learner
from config import get_config
from mtcnn import MTCNN
from utils import insert_feature_to_es
from PIL import Image
import cv2
import numpy as np
from datetime import datetime
from pathlib import Path
import os
import time
from utils import prepare_facebank_to_es
from elasticsearch7 import Elasticsearch
# 把人脸的照片通过MCTNN检测,裁剪出人脸区域保存到facebank文件夹下
for root,dir,files in os.walk("/Users/wda/PycharmProjects/faces/data/tmp"):
for file in files:
i = 1
if(file.endswith(".jpg") or file.endswith(".jpeg")):
# data_path = Path('data')
name = root.split("/")[-1]
save_path = '/Users/wda/PycharmProjects/faces/data/facebank/1404722168218124290/'
try:
frame = cv2.imread(os.path.join(root,file))
p = Image.fromarray(frame[...,::-1])
mtcnn = MTCNN()
warped_face = np.array(mtcnn.align(p))[..., ::-1]
# if not save_path.exists():
# save_path.mkdir()
cv2.imwrite("/Users/wda/PycharmProjects/faces/data/facebank/1404722168218124290/image0_112*112.jpg",warped_face)
# cv2.imwrite(str(save_path/'{}.jpg'.format(str(datetime.now())[:-7].replace(":","-").replace(" ","-")+"-"+str(i))), warped_face)
i = i + 1
except IndexError as e:
print(e)
continue
except ValueError as e:
print(e)
continue
except TypeError as e:
print(e)
continue
if i == 2:
break;
# host = "es-cn-7mz28jd5z001oqrc9.public.elasticsearch.aliyuncs.com"
# port = 9200
# user = "elastic"
# password = "4rfv%TGB"
# authen = None
# uripart = host + ':' + str(port)
# if user is not None:
# authen = user
# if authen is not None and password is not None:
# authen += ':' + password
# if authen is not None:
# uripart = authen + '@' + uripart
# protocol = 'http'
# if protocol is not None:
# protocol = protocol
# uri = protocol + '://' + uripart
# es = Elasticsearch(uri)
# print(es.ping())
# # # 测试将512特征插入ES成功
conf = get_config(False)
learner = face_learner(conf, True)
learner.threshold = 1.44
if conf.device.type == 'cpu':
learner.load_state(conf, 'ir_se50.pth', True, True)
else:
# 这个分支没用,都用上面cpu版本的
learner.load_state(conf, 'final.pth', True, True)
learner.model.eval()
print('learner loaded')
#
mtcnn = MTCNN()
# import oss2
# auth = oss2.Auth("LTAI5tFb87uVi1B4BcW6SJfH", "3PdUomiQg6yAduq97KX4rBNpkhzdml")
# bucket = oss2.Bucket(auth, "https://oss-cn-hangzhou.aliyuncs.com", "xmrc-resource")
# url = bucket.sign_url('GET','inst/1031482107660075104/student/1176335046671904770/face/20190924140010/3Fnks2mGnZAxzCtc',60)
# import requests
# from io import BytesIO
# response = requests.get(url)
# image = Image.open(BytesIO(response.content))
# bboxes, faces = mtcnn.align_multi(image, conf.face_limit, conf.min_face_size)
# success = learner.update_student_feature(conf, faces, 1349974664509480962, 1349972667086348289, "faces_rc")
# print(success)
#
from datetime import datetime
import matplotlib.pyplot as plt
plt.switch_backend('agg')
import io
from torchvision import transforms as trans
from data.data_pipe import de_preprocess
import torch
from model import l2_norm, MobileFaceNet
from elasticsearch7 import helpers
import pymysql
import pandas as pd
from tqdm import tqdm
def separate_bn_paras(modules):
if not isinstance(modules, list):
modules = [*modules.modules()]
paras_only_bn = []
paras_wo_bn = []
for layer in modules:
if 'model' in str(layer.__class__):
continue
if 'container' in str(layer.__class__):
continue
else:
if 'batchnorm' in str(layer.__class__):
paras_only_bn.extend([*layer.parameters()])
else:
paras_wo_bn.extend([*layer.parameters()])
return paras_only_bn, paras_wo_bn
def prepare_facebank(conf, model, mtcnn, tta = True):
model.eval()
embeddings = []
names = ['Unknown']
for path in conf.facebank_path.iterdir():
if path.is_file():
continue
else:
embs = []
for file in path.iterdir():
if not file.is_file():
continue
else:
try:
img = Image.open(file)
except:
continue
if img.size != (112, 112):
img = mtcnn.align(img)
with torch.no_grad():
if tta:
mirror = trans.functional.hflip(img)
emb = model(conf.test_transform(img).to(conf.device).unsqueeze(0))
emb_mirror = model(conf.test_transform(mirror).to(conf.device).unsqueeze(0))
embs.append(l2_norm(emb + emb_mirror))
else:
embs.append(model(conf.test_transform(img).to(conf.device).unsqueeze(0)))
if len(embs) == 0:
continue
embedding = torch.cat(embs).mean(0,keepdim=True)
embeddings.append(embedding)
names.append(path.name)
embeddings = torch.cat(embeddings)
names = np.array(names)
torch.save(embeddings, conf.facebank_path/'facebank.pth')
np.save(conf.facebank_path/'names', names)
return embeddings, names
def load_facebank(conf):
embeddings = torch.load(conf.facebank_path/'facebank.pth')
names = np.load(conf.facebank_path/'names.npy')
return embeddings, names
def prepare_facebank_to_es(es,conf, model, mtcnn, tta = True):
model.eval()
embeddings = []
names = ['Unknown']
for path in conf.facebank_path.iterdir():
if path.is_file():
continue
else:
embs = []
for file in path.iterdir():
if not file.is_file():
continue
else:
try:
img = Image.open(file)
except:
continue
if img.size != (112, 112):
img = mtcnn.align(img)
with torch.no_grad():
if tta:
mirror = trans.functional.hflip(img)
emb = model(conf.test_transform(img).to(conf.device).unsqueeze(0))
emb_mirror = model(conf.test_transform(mirror).to(conf.device).unsqueeze(0))
embs.append(l2_norm(emb + emb_mirror))
else:
emb = model(conf.test_transform(img).to(conf.device).unsqueeze(0))
embs.append(emb)
insert_to_es(es, "1315584165094813698", path.name, list(emb.numpy())[0])
if len(embs) == 0:
continue
embedding = torch.cat(embs).mean(0,keepdim=True)
embeddings.append(embedding)
names.append(path.name)
embeddings = torch.cat(embeddings)
names = np.array(names)
return embeddings, names
def insert_to_es(es,inst_id,student_id,features):
actions = [{
"_index": "faces",
"_source": {
"inst_id":inst_id,
"student_id":student_id,
"face_feature":list(features)
}
}]
helpers.bulk(es, actions)
def load_facebank(conf):
embeddings = torch.load(conf.facebank_path/'facebank.pth')
names = np.load(conf.facebank_path/'names.npy')
return embeddings, names
def face_reader(conf, conn, flag, boxes_arr, result_arr, learner, mtcnn, targets, tta):
while True:
try:
image = conn.recv()
except:
continue
try:
bboxes, faces = mtcnn.align_multi(image, limit=conf.face_limit)
except:
bboxes = []
results = learner.infer(conf, faces, targets, tta)
if len(bboxes) > 0:
print('bboxes in reader : {}'.format(bboxes))
bboxes = bboxes[:,:-1] #shape:[10,4],only keep 10 highest possibiity faces
bboxes = bboxes.astype(int)
bboxes = bboxes + [-1,-1,1,1] # personal choice
assert bboxes.shape[0] == results.shape[0],'bbox and faces number not same'
bboxes = bboxes.reshape([-1])
for i in range(len(boxes_arr)):
if i < len(bboxes):
boxes_arr[i] = bboxes[i]
else:
boxes_arr[i] = 0
for i in range(len(result_arr)):
if i < len(results):
result_arr[i] = results[i]
else:
result_arr[i] = -1
else:
for i in range(len(boxes_arr)):
boxes_arr[i] = 0 # by default,it's all 0
for i in range(len(result_arr)):
result_arr[i] = -1 # by default,it's all -1
print('boxes_arr : {}'.format(boxes_arr[:4]))
print('result_arr : {}'.format(result_arr[:4]))
flag.value = 0
hflip = trans.Compose([
de_preprocess,
trans.ToPILImage(),
trans.functional.hflip,
trans.ToTensor(),
trans.Normalize([0.5, 0.5, 0.5], [0.5, 0.5, 0.5])
])
def hflip_batch(imgs_tensor):
hfliped_imgs = torch.empty_like(imgs_tensor)
for i, img_ten in enumerate(imgs_tensor):
hfliped_imgs[i] = hflip(img_ten)
return hfliped_imgs
def get_time():
return (str(datetime.now())[:-10]).replace(' ','-').replace(':','-')
def gen_plot(fpr, tpr):
"""Create a pyplot plot and save to buffer."""
plt.figure()
plt.xlabel("FPR", fontsize=14)
plt.ylabel("TPR", fontsize=14)
plt.title("ROC Curve", fontsize=14)
plot = plt.plot(fpr, tpr, linewidth=2)
buf = io.BytesIO()
plt.savefig(buf, format='jpeg')
buf.seek(0)
plt.close()
return buf
def draw_box_name(bbox,name,frame):
frame = cv2.rectangle(frame,(bbox[0],bbox[1]),(bbox[2],bbox[3]),(0,0,255),6)
frame = cv2.putText(frame,
name,
(bbox[0],bbox[1]),
cv2.FONT_HERSHEY_SIMPLEX,
2,
(0,255,0),
3,
cv2.LINE_AA)
return frame
def transfer_model_to_android():
from config import get_config
model_pth = "/Users/wda/PycharmProjects/InsightFace_Pytorch-master/work_space/save/model_mobilefacenet.pth"
model_pt = "/Users/wda/PycharmProjects/InsightFace_Pytorch-master/work_space/save/model_mobilefacenet.pt"
conf = get_config(False)
# model = Retinaface(cfg = cfg_mnet, phase='eval', pre_train=False).eval()
# model = Backbone(conf.net_depth, conf.drop_ratio, conf.net_mode).eval()
model = MobileFaceNet(conf.embedding_size).to(conf.device).eval()
model.load_state_dict(torch.load(model_pth,map_location='cpu'))
model.eval()
input_tensor = torch.rand(1,3,112,112)
ir_se = torch.jit.trace(model,input_tensor)
ir_se.save(model_pt)
import math
import cv2
import numpy as np
from PIL import Image
def letterbox_image(image, size):
ih, iw, _ = np.shape(image)
w, h = size
scale = min(w / iw, h / ih)
nw = int(iw * scale)
nh = int(ih * scale)
image = cv2.resize(image, (nw, nh), Image.BICUBIC)
new_image = np.ones([size[1], size[0], 3]) * 128
new_image[(h - nh) // 2:nh + (h - nh) // 2, (w - nw) // 2:nw + (w - nw) // 2] = image
return new_image
def retinaface_correct_boxes(result, input_shape, image_shape):
# 它的作用是将归一化后的框坐标转换成原图的大小
scale_for_offset_for_boxs = np.array([image_shape[1], image_shape[0], image_shape[1], image_shape[0]])
scale_for_landmarks_offset_for_landmarks = np.array([image_shape[1], image_shape[0], image_shape[1], image_shape[0],
image_shape[1], image_shape[0], image_shape[1], image_shape[0],
image_shape[1], image_shape[0]])
new_shape = image_shape * np.min(input_shape / image_shape)
offset = (input_shape - new_shape) / 2. / input_shape
scale = input_shape / new_shape
scale_for_boxs = [scale[1], scale[0], scale[1], scale[0]]
scale_for_landmarks = [scale[1], scale[0], scale[1], scale[0], scale[1], scale[0], scale[1], scale[0], scale[1],
scale[0]]
offset_for_boxs = np.array([offset[1], offset[0], offset[1], offset[0]]) * scale_for_offset_for_boxs
offset_for_landmarks = np.array(
[offset[1], offset[0], offset[1], offset[0], offset[1], offset[0], offset[1], offset[0], offset[1],
offset[0]]) * scale_for_landmarks_offset_for_landmarks
result[:, :4] = (result[:, :4] - np.array(offset_for_boxs)) * np.array(scale_for_boxs)
result[:, 5:] = (result[:, 5:] - np.array(offset_for_landmarks)) * np.array(scale_for_landmarks)
return result
# ---------------------------------#
# 计算人脸距离
# ---------------------------------#
def face_distance(face_encodings, face_to_compare):
if len(face_encodings) == 0:
return np.empty((0))
# (n, )
return np.linalg.norm(face_encodings - face_to_compare, axis=1)
# ---------------------------------#
# 比较人脸
# ---------------------------------#
def compare_faces(known_face_encodings, face_encoding_to_check, tolerance=1):
dis = face_distance(known_face_encodings, face_encoding_to_check)
return list(dis <= tolerance), dis
# -------------------------------------#
# 人脸对齐
# -------------------------------------#
def Alignment_1(img, landmark):
if landmark.shape[0] == 68:
x = landmark[36, 0] - landmark[45, 0]
y = landmark[36, 1] - landmark[45, 1]
elif landmark.shape[0] == 5:
x = landmark[0, 0] - landmark[1, 0]
y = landmark[0, 1] - landmark[1, 1]
# 眼睛连线相对于水平线的倾斜角
if x == 0:
angle = 0
else:
# 计算它的弧度制
angle = math.atan(y / x) * 180 / math.pi
center = (img.shape[1] // 2, img.shape[0] // 2)
# 旋转中心、旋转角度、缩放比例
RotationMatrix = cv2.getRotationMatrix2D(center, angle, 1)
# 仿射函数
# 输入图像、变换矩阵、输出图像大小
new_img = cv2.warpAffine(img, RotationMatrix, (img.shape[1], img.shape[0]))
RotationMatrix = np.array(RotationMatrix)
new_landmark = []
for i in range(landmark.shape[0]):
pts = []
pts.append(RotationMatrix[0, 0] * landmark[i, 0] + RotationMatrix[0, 1] * landmark[i, 1] + RotationMatrix[0, 2])
pts.append(RotationMatrix[1, 0] * landmark[i, 0] + RotationMatrix[1, 1] * landmark[i, 1] + RotationMatrix[1, 2])
new_landmark.append(pts)
new_landmark = np.array(new_landmark)
return new_img, new_landmark
def search_similer_face(es,feature,inst_id,env):
if inst_id is None:
query = {
"size": 1,
"query": {
"script_score": {
"query": {
"match_all": {
}
},
"script": {
"source": "1 / (1 + l2norm(params.queryVector, 'face_feature'))",
"params": {
"queryVector": feature
}
}
},
}}
else:
query = {
"size": 1,
"query": {
"script_score": {
"query": {
"term": {
"inst_id":inst_id
}
},
"script": {
"source": "1 / (1 + l2norm(params.queryVector, 'face_feature'))",
"params": {
"queryVector": feature
}
}
},
}}
x = es.search(doc_type="_doc", body=query, params={"_source": "student_id"},index = env)
try:
result = x['hits']['hits'][0]['_score'],x['hits']['hits'][0]['_source']['student_id']
except IndexError:
return 10000,-1
return result
def update_feature_to_es(es,feature,name,inst_id,env):
update_query = {
"query": {
"bool": {
"must": [
{
"term": {
"student_id": name
}
},
{
"term": {
"inst_id": inst_id
}
}
]
}
},
"script": {
"lang": "painless",
"inline": "ctx._source.face_feature=params.face_feature",
"params": {
"face_feature": feature
}
}
}
x = es.update_by_query(doc_type="_doc", body=update_query,index=env)
try:
total = x['total']
return total
except KeyError:
return 0
def insert_feature_to_es(es,feature,name,inst_id,env):
data = {
"student_id":name,
"inst_id":inst_id,
"face_feature":feature
}
res = es.index(index = env,doc_type='_doc',body=data)
try:
total = res['result']
return 1 if total == 'created' else 0
except KeyError:
return 0
# def sync_data_to_es(env):
# try:
# if env == 'rc':
# mysql_conn_eagle = pymysql.connect(host="jdbc:mysql://rm-bp17594n0001qi1t67o.mysql.rds.aliyuncs.com:3306/eagle?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=GMT%2B8",user="xiaomai",password="ACxi1bb4K24Q21IJ")
# mysql_conn_mfs = pymysql.connect(
# host="jdbc:mysql://rm-bp17594n0001qi1t67o.mysql.rds.aliyuncs.com:3306/mfs?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=GMT%2B8",
# user="xiaomai", password="ACxi1bb4K24Q21IJ")
# elif env == 'dev':
# mysql_conn_eagle = pymysql.connect(host="jdbc:mysql://rm-bp1619xc3i4g0b45v0o.mysql.rds.aliyuncs.com:3306/eagle?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=GMT%2B8",user="root",password="p0FqIsgTXTuiKIsF")
# mysql_conn_mfs = pymysql.connect(
# host="jdbc:mysql://rm-bp1619xc3i4g0b45v0o.mysql.rds.aliyuncs.com:3306/mfs?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=GMT%2B8",
# user="root", password="p0FqIsgTXTuiKIsF")
# else:
# mysql_conn_mfs = ""
# mysql_conn_eagle = ""
# mysql_conn_eagle.ping()
# mysql_conn_mfs.ping()
#
# except ConnectionError as e:
# print("mysql connection error")
# cursor_eagle = mysql_conn_eagle.cursor()
# cursor_mfs = mysql_conn_mfs.cursor()
# inst_list_sql = """
# select disinct(group_id) from baidu_face
# """
# cursor_eagle.execute(inst_list_sql)
# inst_list = cursor_eagle.fetchall()
# all = pd.DataFrame([])
# all.columns = ['group_id','user_id','relative_url']
# for inst_id in inst_list:
# print("开始同步机构ID为 " + str(inst_id) +" 的人脸数据")
# resource_sql = "select relative_url,inst_id as group_id,id from mfs.resource_store where inst_id = {0} ".format(inst_id)
# cursor_mfs.execute(resource_sql)
# resources = pd.DataFrame(cursor_mfs.fetchall())
#
# baidu_sql = "select group_id,resource_id as id,user_id from eagle.baidu_face where group_id = {0}".format(inst_id)
# cursor_eagle.execute(baidu_sql)
# faces = pd.DataFrame(cursor_eagle.fetchall())
#
# result = pd.merge(faces,resources,how='left',on=['id','group_id'])
# result = result[['group_id','user_id','relative_url']]
# result = result[result['relative_url'].notnull()]
# all = pd.concat([all,result],axis=0)
# all.to_csv(env+".csv",header=True)
if __name__ == '__main__':
# import requests
# import json
# import oss2
# auth = oss2.Auth("LTAI5tFb87uVi1B4BcW6SJfH", "3PdUomiQg6yAduq97KX4rBNpkhzdml")
# df_rc = pd.read_csv("/Users/wda/Downloads/rc.csv",header=0)
# df_rc.sort_values(['updated'],ascending=1,inplace=True)
# grouped_rc = df_rc.groupby(['user_id']).head(1)
#
# df_dev = pd.read_csv("/Users/wda/Downloads/dev.csv",header = 0)
# df_dev.sort_values(['updated'],ascending=1,inplace=True)
# grouped_dev = df_dev.groupby(['user_id']).head(1)
#
# # for index,row in grouped_dev.iterrows():
# # inst_id = row['group_id']
# # img_url = row['relative_url']
# # student_id = row['user_id']
# # bucket_name = row['bucket']
# # print(img_url)
# # import oss2
# # bucket = oss2.Bucket(auth, "https://oss-cn-hangzhou.aliyuncs.com", "xmdev-resource")
# # url = bucket.sign_url('GET',
# # img_url,
# # 60)
# # print(url)
# # requestsss = {'inst_id': inst_id,
# # 'img_url': img_url,"student_id":student_id,'env':'dev'}
# # req = json.dumps(requestsss)
# # res = requests.post('https://1290825930774375.cn-hangzhou.fc.aliyuncs.com/2016-08-15/proxy/faces/faces/', data=req)
# # print(res.json())
#
# for index,row in grouped_rc.iterrows():
# inst_id = row['group_id']
# img_url = row['relative_url']
# student_id = row['user_id']
# bucket_name = row['bucket']
# print(img_url)
# import oss2
# bucket = oss2.Bucket(auth, "https://oss-cn-hangzhou.aliyuncs.com", "xmrc-resource")
# url = bucket.sign_url('GET',
# img_url,
# 60)
# print(url)
# requestsss = {'inst_id': inst_id,
# 'img_url': img_url,"student_id":student_id,'env':'rc'}
# print(requestsss)
# req = json.dumps(requestsss)
# res = requests.post('https://1290825930774375.cn-hangzhou.fc.aliyuncs.com/2016-08-15/proxy/faces/faces/', data=req)
# print(res.json())
img = Image.open("/Users/wda/PycharmProjects/faces/data/facebank/1404722168218124290/image0_112*112.jpg")
tmp = np.array(img)
print(tmp)
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment