Hardware components | ||||||
| × | 1 | ||||
Software apps and online services | ||||||
|
1) Introduction
Read moreWorkout from home with assistant PT, this assistant will take video of your workout and compare with the guild, then show the result that your workout correct or not same the guildline
2) How it workMake a guild sample
First, input a guild video
The code will dectect person and pose
The pose model will detect posistion of 16 point in human
Second, caculate the Gradient betwen every join of human and save to json file
Score your workout
First upload your video workout
Second, The programe will caculate the different of Gradient of each frame
If the different more than 10%, the score will - 1%
Repeat for all frame then we will have final score
3) Setup1 - Download and install Anaconda Distribution of Python
2 - Install all the dependencies
3 - Download the Intel Distribution of OpenVINO toolkit.(My OpenVINO version - 2020.0.1.033)
Download the pre-trained models,
Download code:
input : input video
model_pose : Pre-trained model downloaded
pose_detect.py : detect human
pose_estimator.py : detect pose
main.py : RUN
4) Runningpython main.py --i <input video
> --s <Bool>
--i: input
--s: save Guild sample for
This is guild video
This is workout
and this is result: 97.26% - Very good :)
import argparse
import os
import sys
import cv2
import numpy as np
from openvino.inference_engine import IECore
from pose_detect import Detector
from pose_estimator import HumanPoseEstimator, cacul_score
sys.path.append(os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'common'))
# import monitors
Human_detect_model = './model_pose/mobilenet-ssd.xml'
Human_pose_model = './model_pose/single-human-pose-estimation-0001.xml'
guild_json = './input/guild.json'
def build_argparser():
parser = argparse.ArgumentParser()
# parser.add_argument("-m_od", "--model_od", type=str, required=True,
# help="path to model of object detector in xml format")
# parser.add_argument("-m_hpe", "--model_hpe", type=str, required=True,
# help="path to model of human pose estimator in xml format")
parser.add_argument("-i", "--input", type=str, nargs='+', default='', help="path to video or image/images")
parser.add_argument("-d", "--device", type=str, default='CPU', required=False,
help="Specify the target to infer on CPU or GPU")
parser.add_argument("--person_label", type=int, required=False, default=15, help="Label of class person for detector")
parser.add_argument("--no_show", help='Optional. Do not display output.', action='store_true')
parser.add_argument("-u", "--utilicd C:\Program Files (x86)\IntelSWTools\openvino_2020.4.287\deployment_tools\open_model_zoo\demos\python_demos zation_monitors", default="", type=str,
help="Optional. List of monitors to show initially.")
parser.add_argument("-s", "--save", type = bool, required = False, default= False)
return parser
class ImageReader(object):
def __init__(self, file_names):
self.file_names = file_names
self.max_idx = len(file_names)
def __iter__(self):
self.idx = 0
return self
def __next__(self):
if self.idx == self.max_idx:
raise StopIteration
img = cv2.imread(self.file_names[self.idx], cv2.IMREAD_COLOR)
if img.size == 0:
raise IOError('Image {} cannot be read'.format(self.file_names[self.idx]))
self.idx += 1
return img
class VideoReader(object):
def __init__(self, file_name):
try:
self.file_name = int(file_name[0])
except:
self.file_name = file_name[0]
def __iter__(self):
self.cap = cv2.VideoCapture(self.file_name)
if not self.cap.isOpened():
raise IOError('Video {} cannot be opened'.format(self.file_name))
return self
def __next__(self):
was_read, img = self.cap.read()
if not was_read:
raise StopIteration
return img
def run_demo(args):
ie = IECore()
detector_person = Detector(ie, path_to_model_xml=Human_detect_model,
device=args.device,
label_class=args.person_label)
single_human_pose_estimator = HumanPoseEstimator(ie, path_to_model_xml=Human_pose_model,
device=args.device)
one_frame_score = cacul_score(guild_json, 160)
if args.input != '':
img = cv2.imread(args.input[0], cv2.IMREAD_COLOR)
frames_reader, delay = (VideoReader(args.input), 1) if img is None else (ImageReader(args.input), 0)
else:
raise ValueError('--input has to be set')
total_gradien =[]
gradient_index = 0
array_bad_score = []
for frame in frames_reader:
bboxes = detector_person.detect(frame)
human_poses = [single_human_pose_estimator.estimate(frame, bbox) for bbox in bboxes]
colors = [(0, 0, 255),
(255, 0, 0), (0, 255, 0), (255, 0, 0), (0, 255, 0),
(255, 0, 0), (0, 255, 0),(255, 0, 0), (0, 255, 0),
(255, 0, 0), (0, 255, 0), (0, 0, 0), (0, 255, 255),
(255, 0, 0), (0, 255, 0),(255, 0, 0), (0, 255, 0)]
pose_array = []
for pose, bbox in zip(human_poses, bboxes):
cv2.rectangle(frame, (bbox[0], bbox[1]), (bbox[0] + bbox[2], bbox[1] + bbox[3]), (255, 0, 0), 2)
for id_kpt, kpt in enumerate(pose):
cv2.circle(frame, (int(kpt[0]), int(kpt[1])), 3, colors[id_kpt], -1)
pose_array.append([kpt[0], kpt[1]])
# print("pose array", pose_array)
if args.save:
gra_one_frame = one_frame_score.caculator_gradient(pose_array)
total_gradien.append(gra_one_frame)
#luu aray vao 1 bien
else:
one_frame_bad_score = one_frame_score.oneframe_cacul(pose_array, gradient_index)
gradient_index += 1
score_oneframe = one_frame_score.oneframe_score(one_frame_bad_score)
array_bad_score.append(score_oneframe)
imS = cv2.resize(frame, (600, 400)) # Resize image
cv2.putText(imS, 'summary: {:.1f} FPS (estimation: {:.1f} FPS / detection: {:.1f} FPS)'.format(
float(1 / (detector_person.infer_time + single_human_pose_estimator.infer_time * len(human_poses))),
float(1 / single_human_pose_estimator.infer_time),
float(1 / detector_person.infer_time)), (5, 15), cv2.FONT_HERSHEY_COMPLEX, 0.5, (0, 0, 200))
cv2.imshow('Human Pose Estimation Demo', imS )
key = cv2.waitKey(delay)
if key == 27:
cv2.destroyAllWindows()
break
if args.save:
one_frame_score.total_score_save(total_gradien)
else:
#total score
final_score = np.average(array_bad_score)
print("total score", final_score)
#comment score
one_frame_score.comment_on_score(final_score)
if __name__ == "__main__":
args = build_argparser().parse_args()
run_demo(args)
import os
import cv2
class Detector(object):
def __init__(self, ie, path_to_model_xml, label_class, scale=None, thr=0.3, device='CPU'):
self.OUTPUT_SIZE = 7
self.CHANNELS_SIZE = 3
self.model = ie.read_network(path_to_model_xml, os.path.splitext(path_to_model_xml)[0] + '.bin')
assert len(self.model.input_info) == 1, "Expected 1 input blob"
assert len(self.model.outputs) == 1, "Expected 1 output blob"
self._input_layer_name = next(iter(self.model.input_info))
self._output_layer_name = next(iter(self.model.outputs))
assert len(self.model.input_info[self._input_layer_name].input_data.shape) == 4 and \
self.model.input_info[self._input_layer_name].input_data.shape[1] == self.CHANNELS_SIZE, \
"Expected model output shape with %s channels " % (self.CHANNELS_SIZE)
assert len(self.model.outputs[self._output_layer_name].shape) == 4 and \
self.model.outputs[self._output_layer_name].shape[3] == self.OUTPUT_SIZE, \
"Expected model output shape with %s outputs" % (self.OUTPUT_SIZE)
self._ie = ie
self._exec_model = self._ie.load_network(self.model, device)
self._scale = scale
self._thr = thr
self._label_class = label_class
_, _, self.input_h, self.input_w = self.model.input_info[self._input_layer_name].input_data.shape
self._h = -1
self._w = -1
self.infer_time = -1
def _preprocess(self, img):
self._h, self._w, _ = img.shape
if self._h != self.input_h or self._w != self.input_w:
img = cv2.resize(img, dsize=(self.input_w, self.input_h), fy=self._h / self.input_h,
fx=self._h / self.input_h)
img = img.transpose(2, 0, 1)
return img[None, ]
def _infer(self, prep_img):
t0 = cv2.getTickCount()
output = self._exec_model.infer(inputs={self._input_layer_name: prep_img})
self.infer_time = (cv2.getTickCount() - t0) / cv2.getTickFrequency()
return output
def _postprocess(self, bboxes):
def coord_translation(bbox):
xmin = int(self._w * bbox[0])
ymin = int(self._h * bbox[1])
xmax = int(self._w * bbox[2])
ymax = int(self._h * bbox[3])
w_box = xmax - xmin
h_box = ymax - ymin
return [xmin, ymin, w_box, h_box]
bboxes_new = [coord_translation(bbox[3:]) for bbox in bboxes if bbox[1] == self._label_class and bbox[2] > self._thr]
return bboxes_new
def detect(self, img):
img = self._preprocess(img)
output = self._infer(img)
bboxes = self._postprocess(output[self._output_layer_name][0][0])
return bboxes
import os
import numpy as np
import json
import cv2
def preprocess_bbox(bbox, image):
aspect_ratio = 0.75
bbox[0] = np.clip(bbox[0], 0, image.shape[0] - 1)
bbox[1] = np.clip(bbox[1], 0, image.shape[0] - 1)
x2 = np.min((image.shape[1] - 1, bbox[0] + np.max((0, bbox[2] - 1))))
y2 = np.min((image.shape[0] - 1, bbox[1] + np.max((0, bbox[3] - 1))))
bbox = [bbox[0], bbox[1], x2 - bbox[0], y2 - bbox[1]]
cx_bbox = bbox[0] + bbox[2] * 0.5
cy_bbox = bbox[1] + bbox[3] * 0.5
center = np.array([np.float32(cx_bbox), np.float32(cy_bbox)])
if bbox[2] > aspect_ratio * bbox[3]:
bbox[3] = bbox[2] * 1.0 / aspect_ratio
elif bbox[2] < aspect_ratio * bbox[3]:
bbox[2] = bbox[3] * aspect_ratio
s = np.array([bbox[2], bbox[3]], np.float32)
scale = s * 1.25
return center, scale
def extract_keypoints(heatmap, min_confidence=-100):
ind = np.unravel_index(np.argmax(heatmap, axis=None), heatmap.shape)
if heatmap[ind] < min_confidence:
ind = (-1, -1)
else:
ind = (int(ind[1]), int(ind[0]))
return heatmap[ind[1]][ind[0]], ind
def affine_transform(pt, t):
transformed_point = np.dot(t, [pt[0], pt[1], 1.])[:2]
return transformed_point
class TransformedCrop(object):
def __init__(self, input_height=384, input_width=288, output_height=48, output_width=36):
self._num_keypoints = 17
self.input_width = input_width
self.input_height = input_height
self.output_width = output_width
self.output_height = output_height
def __call__(self, img, bbox):
c, s = preprocess_bbox(bbox, img)
trans, _ = self.get_trasformation_matrix(c, s, [self.input_width, self.input_height])
transformed_image = cv2.warpAffine(img, trans, (self.input_width, self.input_height), flags=cv2.INTER_LINEAR)
rev_trans = self.get_trasformation_matrix(c, s, [self.output_width, self.output_height])[1]
return rev_trans, transformed_image.transpose(2, 0, 1)[None,]
@staticmethod
def get_trasformation_matrix(center, scale, output_size):
w, h = scale
points = np.zeros((3, 2), dtype=np.float32)
transformed_points = np.zeros((3, 2), dtype=np.float32)
transformed_points[0, :] = [output_size[0] * 0.5, output_size[1] * 0.5]
transformed_points[1, :] = [output_size[0] * 0.5, output_size[1] * 0.5 - output_size[0] * 0.5]
transformed_points[2, :] = [0, output_size[1] * 0.5]
shift_y = [0, - w * 0.5]
shift_x = [- w * 0.5, 0]
points[0, :] = center
points[1, :] = center + shift_y
points[2, :] = center + shift_x
rev_trans = cv2.getAffineTransform(np.float32(transformed_points), np.float32(points))
trans = cv2.getAffineTransform(np.float32(points), np.float32(transformed_points))
return trans, rev_trans
class HumanPoseEstimator(object):
def __init__(self, ie, path_to_model_xml, scale=None, thr=-100, device='CPU'):
self.model = ie.read_network(path_to_model_xml, os.path.splitext(path_to_model_xml)[0] + '.bin')
assert len(self.model.input_info) == 1, "Expected 1 input blob"
assert len(self.model.outputs) == 1, "Expected 1 output blob"
self._input_layer_name = next(iter(self.model.input_info))
self._output_layer_name = next(iter(self.model.outputs))
self.CHANNELS_SIZE = 3
self.OUTPUT_CHANNELS_SIZE = 17
assert len(self.model.input_info[self._input_layer_name].input_data.shape) == 4 and \
self.model.input_info[self._input_layer_name].input_data.shape[1] == self.CHANNELS_SIZE, \
"Expected model input blob with shape [1, 3, H, W]"
assert len(self.model.outputs[self._output_layer_name].shape) == 4 and \
self.model.outputs[self._output_layer_name].shape[1] == self.OUTPUT_CHANNELS_SIZE, \
"Expected model output shape [1, %s, H, W]" % (self.OUTPUT_CHANNELS_SIZE)
self._ie = ie
self._exec_model = self._ie.load_network(self.model, device)
self._scale = scale
self._thr = thr
_, _, self.input_h, self.input_w = self.model.input_info[self._input_layer_name].input_data.shape
_, _, self.output_h, self.output_w = self.model.outputs[self._output_layer_name].shape
self._transform = TransformedCrop(self.input_h, self.input_w, self.output_h, self.output_w)
self.infer_time = -1
def _preprocess(self, img, bbox):
return self._transform(img, bbox)
def _infer(self, prep_img):
t0 = cv2.getTickCount()
output = self._exec_model.infer(inputs={self._input_layer_name: prep_img})
self.infer_time = ((cv2.getTickCount() - t0) / cv2.getTickFrequency())
return output[self._output_layer_name][0]
@staticmethod
def _postprocess(heatmaps, rev_trans):
all_keypoints = [extract_keypoints(heatmap) for heatmap in heatmaps]
all_keypoints_transformed = [affine_transform([kp[1][0], kp[1][1]], rev_trans) for kp in all_keypoints]
return all_keypoints_transformed
def estimate(self, img, bbox):
rev_trans, preprocessed_img = self._preprocess(img, bbox)
heatmaps = self._infer(preprocessed_img)
keypoints = self._postprocess(heatmaps, rev_trans)
return keypoints
# return keypointout
pairs = [(3, 4), (3, 6), (3, 5), (5, 7), (7, 9), (4, 5), (4, 6), (6, 8), (8, 10), (5, 11),
(11, 13), (13, 15), (6, 12), (12, 14), (14, 16), (0, 1), (1, 3), (0, 2), (2, 4)
]
class cacul_score(object):
def __init__(self, path_to_guild_json, input_high):
with open(path_to_guild_json) as f:
self.guild_json = json.load(f) # read json
# print(self.guild_json)
self.lengh_array = len(self.guild_json)
print((self.lengh_array))
self.default_margin = 0.1 #5% different
self.input_high = input_high
self.coach_high = 180 #in cm
def real_margin(self):
fin_margin = abs((self.coach_high - self.input_high)/1000)
return fin_margin + self.default_margin
#first cacul gradient
def caculator_gradient(self, array_point):
gradientResults = []
for p in pairs:
node_one_x = array_point[p[0]][0] # x1
node_one_y = array_point[p[0]][1] # y1
node_two_x = array_point[p[1]][0] # x2
node_two_y = array_point[p[1]][1] # y2
if (node_two_x - node_one_x) == 0:
m = 0
else:
m = (node_two_y - node_one_y) / (node_two_x - node_one_x)
gradientResults.append(m)
# print("x1", node_one_x, "y1",node_one_y)
# print("x2", node_two_x, "y2", node_two_y)
# print("gradien m:", m)
return gradientResults
#second cacul the different between 2 gradient: guild and input
#then cacul final score for 1 frame
def oneframe_caculate_score(self, input_gradient, guild_index):
frame_result = []
print(guild_index)
if(guild_index >= self.lengh_array):
bad_score = 0
else:
for i in range(len(input_gradient)):
if abs(self.guild_json[guild_index][i]) == 0:
diff = 0
else:
diff = abs((abs(input_gradient[i]) -
abs(self.guild_json[guild_index][i])) / abs(self.guild_json[guild_index][i]))
frame_result.append(diff)
# print("diff ", i," = ", diff)
#cacul result
diff_margin = self.real_margin()
sum_frame = sum(frame_result)
if sum_frame <= (20*diff_margin):
bad_score = 0
else:
bad_score = np.average(frame_result) - (diff_margin)
return bad_score
#function for main.py,
# First - caculator for 1 frame
def oneframe_cacul(self, array_point, guild_index):
one_frame_gradian = self.caculator_gradient(array_point)
one_frame_bad_score = self.oneframe_caculate_score(one_frame_gradian, guild_index)
return one_frame_bad_score
#for main.py
# Second - total score for 1 frame
def oneframe_score(self, bad_score_one_frame_array):
average_bad_score = np.average(bad_score_one_frame_array)
total_score = np.round((1 - average_bad_score)* 100, 2)
return total_score
#for main.py
# Third - total score for all frame
def comment_on_score(self, score):
print("your score: ", score)
if score >= 95:
print("you are working right")
if score > 90 and score < 95:
print("execellent")
if score > 80 and score <=90:
print("good")
if score > 70 and score <= 80:
print("not good")
if score > 60 and score <= 70:
print("please see guild")
if score >50 and score <= 60:
print("keep working")
if score <=50:
print("please stop")
#save json
def total_score_save(self, input_total_gradient):
with open('./input/guild.json', 'w') as json_file:
json.dump(input_total_gradient, json_file)
print("save json file")
Comments