cachep
Published © LGPL

Smart Assistant Trainner using Intel OpenVINO toolkit

Smart review your workout using Intel OpenVINO toolkit

IntermediateProtip24 hours491
Smart Assistant Trainner using Intel OpenVINO toolkit

Things used in this project

Hardware components

Webcam, Logitech® HD Pro
Webcam, Logitech® HD Pro
×1

Software apps and online services

OpenVINO™ toolkit
Intel OpenVINO™ toolkit

Story

Read more

Schematics

Model Open Vino

Code

main.py

Python
import argparse
import os
import sys
import cv2
import numpy as np
from openvino.inference_engine import IECore

from pose_detect import Detector
from pose_estimator import HumanPoseEstimator, cacul_score

sys.path.append(os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'common'))
# import monitors


Human_detect_model = './model_pose/mobilenet-ssd.xml'
Human_pose_model = './model_pose/single-human-pose-estimation-0001.xml'
guild_json = './input/guild.json'
def build_argparser():
    parser = argparse.ArgumentParser()
    # parser.add_argument("-m_od", "--model_od", type=str, required=True,
    #                     help="path to model of object detector in xml format")

    # parser.add_argument("-m_hpe", "--model_hpe", type=str, required=True,
    #                     help="path to model of human pose estimator in xml format")

    parser.add_argument("-i", "--input", type=str, nargs='+', default='', help="path to video or image/images")
    parser.add_argument("-d", "--device", type=str, default='CPU', required=False,
                        help="Specify the target to infer on CPU or GPU")
    parser.add_argument("--person_label", type=int, required=False, default=15, help="Label of class person for detector")
    parser.add_argument("--no_show", help='Optional. Do not display output.', action='store_true')
    parser.add_argument("-u", "--utilicd C:\Program Files (x86)\IntelSWTools\openvino_2020.4.287\deployment_tools\open_model_zoo\demos\python_demos zation_monitors", default="", type=str,
                        help="Optional. List of monitors to show initially.")
    parser.add_argument("-s", "--save", type = bool, required = False, default= False)
    return parser

class ImageReader(object):
    def __init__(self, file_names):
        self.file_names = file_names
        self.max_idx = len(file_names)

    def __iter__(self):
        self.idx = 0
        return self

    def __next__(self):
        if self.idx == self.max_idx:
            raise StopIteration
        img = cv2.imread(self.file_names[self.idx], cv2.IMREAD_COLOR)
        if img.size == 0:
            raise IOError('Image {} cannot be read'.format(self.file_names[self.idx]))
        self.idx += 1
        return img


class VideoReader(object):
    def __init__(self, file_name):
        try:
            self.file_name = int(file_name[0])
        except:
            self.file_name = file_name[0]


    def __iter__(self):
        self.cap = cv2.VideoCapture(self.file_name)
        if not self.cap.isOpened():
            raise IOError('Video {} cannot be opened'.format(self.file_name))
        return self

    def __next__(self):
        was_read, img = self.cap.read()
        if not was_read:
            raise StopIteration
        return img


def run_demo(args):
    ie = IECore()
    detector_person = Detector(ie, path_to_model_xml=Human_detect_model,
                              device=args.device,
                              label_class=args.person_label)

    single_human_pose_estimator = HumanPoseEstimator(ie, path_to_model_xml=Human_pose_model,
                                                  device=args.device)

    one_frame_score = cacul_score(guild_json, 160)


    if args.input != '':
        img = cv2.imread(args.input[0], cv2.IMREAD_COLOR)
        frames_reader, delay = (VideoReader(args.input), 1) if img is None else (ImageReader(args.input), 0)
    else:
        raise ValueError('--input has to be set')

    total_gradien =[]
    gradient_index = 0
    array_bad_score = []

    for frame in frames_reader:
        bboxes = detector_person.detect(frame)
        human_poses = [single_human_pose_estimator.estimate(frame, bbox) for bbox in bboxes]

        colors = [(0, 0, 255),
                  (255, 0, 0), (0, 255, 0), (255, 0, 0), (0, 255, 0),
                  (255, 0, 0), (0, 255, 0),(255, 0, 0), (0, 255, 0),
                  (255, 0, 0), (0, 255, 0), (0, 0, 0), (0, 255, 255),
                  (255, 0, 0), (0, 255, 0),(255, 0, 0), (0, 255, 0)]

        pose_array = []
        for pose, bbox in zip(human_poses, bboxes):
            cv2.rectangle(frame, (bbox[0], bbox[1]), (bbox[0] + bbox[2], bbox[1] + bbox[3]), (255, 0, 0), 2)
            for id_kpt, kpt in enumerate(pose):
                cv2.circle(frame, (int(kpt[0]), int(kpt[1])), 3, colors[id_kpt], -1)

                pose_array.append([kpt[0], kpt[1]])
            # print("pose array", pose_array)
            if args.save:
                gra_one_frame = one_frame_score.caculator_gradient(pose_array)
                total_gradien.append(gra_one_frame)
                #luu aray vao 1 bien
            else:
                one_frame_bad_score = one_frame_score.oneframe_cacul(pose_array, gradient_index)
                gradient_index += 1
                score_oneframe = one_frame_score.oneframe_score(one_frame_bad_score)
                array_bad_score.append(score_oneframe)

        imS = cv2.resize(frame, (600, 400))  # Resize image
        cv2.putText(imS, 'summary: {:.1f} FPS (estimation: {:.1f} FPS / detection: {:.1f} FPS)'.format(
            float(1 / (detector_person.infer_time + single_human_pose_estimator.infer_time * len(human_poses))),
            float(1 / single_human_pose_estimator.infer_time),
            float(1 / detector_person.infer_time)), (5, 15), cv2.FONT_HERSHEY_COMPLEX, 0.5, (0, 0, 200))


        cv2.imshow('Human Pose Estimation Demo', imS )
        key = cv2.waitKey(delay)
        if key == 27:
            cv2.destroyAllWindows()
            break
    if args.save:
        one_frame_score.total_score_save(total_gradien)
 
    else:
        #total score
        final_score = np.average(array_bad_score)
        print("total score", final_score)
        #comment score
        one_frame_score.comment_on_score(final_score)

if __name__ == "__main__":
    args = build_argparser().parse_args()
    run_demo(args)

pose_detect.py

Python
import os
import cv2


class Detector(object):
    def __init__(self, ie, path_to_model_xml, label_class, scale=None, thr=0.3, device='CPU'):
        self.OUTPUT_SIZE = 7
        self.CHANNELS_SIZE = 3
        self.model = ie.read_network(path_to_model_xml, os.path.splitext(path_to_model_xml)[0] + '.bin')

        assert len(self.model.input_info) == 1, "Expected 1 input blob"

        assert len(self.model.outputs) == 1, "Expected 1 output blob"

        self._input_layer_name = next(iter(self.model.input_info))
        self._output_layer_name = next(iter(self.model.outputs))

        assert len(self.model.input_info[self._input_layer_name].input_data.shape) == 4 and \
               self.model.input_info[self._input_layer_name].input_data.shape[1] == self.CHANNELS_SIZE, \
            "Expected model output shape with %s channels " % (self.CHANNELS_SIZE)

        assert len(self.model.outputs[self._output_layer_name].shape) == 4 and \
               self.model.outputs[self._output_layer_name].shape[3] == self.OUTPUT_SIZE, \
            "Expected model output shape with %s outputs" % (self.OUTPUT_SIZE)

        self._ie = ie
        self._exec_model = self._ie.load_network(self.model, device)
        self._scale = scale
        self._thr = thr
        self._label_class = label_class
        _, _, self.input_h, self.input_w = self.model.input_info[self._input_layer_name].input_data.shape
        self._h = -1
        self._w = -1
        self.infer_time = -1

    def _preprocess(self, img):
        self._h, self._w, _ = img.shape
        if self._h != self.input_h or self._w != self.input_w:
            img = cv2.resize(img, dsize=(self.input_w, self.input_h), fy=self._h / self.input_h,
                             fx=self._h / self.input_h)
        img = img.transpose(2, 0, 1)
        return img[None, ]

    def _infer(self, prep_img):
        t0 = cv2.getTickCount()
        output = self._exec_model.infer(inputs={self._input_layer_name: prep_img})
        self.infer_time = (cv2.getTickCount() - t0) / cv2.getTickFrequency()
        return output


    def _postprocess(self, bboxes):

        def coord_translation(bbox):
            xmin = int(self._w * bbox[0])
            ymin = int(self._h * bbox[1])
            xmax = int(self._w * bbox[2])
            ymax = int(self._h * bbox[3])
            w_box = xmax - xmin
            h_box = ymax - ymin
            return [xmin, ymin, w_box, h_box]

        bboxes_new = [coord_translation(bbox[3:]) for bbox in bboxes if bbox[1] == self._label_class and bbox[2] > self._thr]

        return bboxes_new

    def detect(self, img):
        img = self._preprocess(img)
        output = self._infer(img)
        bboxes = self._postprocess(output[self._output_layer_name][0][0])
        return bboxes

pose_estimator.py

Python
import os
import numpy as np
import json
import cv2


def preprocess_bbox(bbox, image):
    aspect_ratio = 0.75
    bbox[0] = np.clip(bbox[0], 0, image.shape[0] - 1)
    bbox[1] = np.clip(bbox[1], 0, image.shape[0] - 1)
    x2 = np.min((image.shape[1] - 1, bbox[0] + np.max((0, bbox[2] - 1))))
    y2 = np.min((image.shape[0] - 1, bbox[1] + np.max((0, bbox[3] - 1))))

    bbox = [bbox[0], bbox[1], x2 - bbox[0], y2 - bbox[1]]

    cx_bbox = bbox[0] + bbox[2] * 0.5
    cy_bbox = bbox[1] + bbox[3] * 0.5
    center = np.array([np.float32(cx_bbox), np.float32(cy_bbox)])

    if bbox[2] > aspect_ratio * bbox[3]:
        bbox[3] = bbox[2] * 1.0 / aspect_ratio
    elif bbox[2] < aspect_ratio * bbox[3]:
        bbox[2] = bbox[3] * aspect_ratio

    s = np.array([bbox[2], bbox[3]], np.float32)
    scale = s * 1.25

    return center, scale


def extract_keypoints(heatmap, min_confidence=-100):
    ind = np.unravel_index(np.argmax(heatmap, axis=None), heatmap.shape)
    if heatmap[ind] < min_confidence:
        ind = (-1, -1)
    else:
        ind = (int(ind[1]), int(ind[0]))
    return heatmap[ind[1]][ind[0]], ind


def affine_transform(pt, t):
    transformed_point = np.dot(t, [pt[0], pt[1], 1.])[:2]
    return transformed_point


class TransformedCrop(object):
    def __init__(self, input_height=384, input_width=288, output_height=48, output_width=36):
        self._num_keypoints = 17
        self.input_width = input_width
        self.input_height = input_height
        self.output_width = output_width
        self.output_height = output_height

    def __call__(self, img, bbox):
        c, s = preprocess_bbox(bbox, img)
        trans, _ = self.get_trasformation_matrix(c, s, [self.input_width, self.input_height])
        transformed_image = cv2.warpAffine(img, trans, (self.input_width, self.input_height), flags=cv2.INTER_LINEAR)
        rev_trans = self.get_trasformation_matrix(c, s, [self.output_width, self.output_height])[1]

        return rev_trans, transformed_image.transpose(2, 0, 1)[None,]

    @staticmethod
    def get_trasformation_matrix(center, scale, output_size):
        w, h = scale
        points = np.zeros((3, 2), dtype=np.float32)
        transformed_points = np.zeros((3, 2), dtype=np.float32)

        transformed_points[0, :] = [output_size[0] * 0.5, output_size[1] * 0.5]
        transformed_points[1, :] = [output_size[0] * 0.5, output_size[1] * 0.5 - output_size[0] * 0.5]
        transformed_points[2, :] = [0, output_size[1] * 0.5]

        shift_y = [0, - w * 0.5]
        shift_x = [- w * 0.5, 0]

        points[0, :] = center
        points[1, :] = center + shift_y
        points[2, :] = center + shift_x

        rev_trans = cv2.getAffineTransform(np.float32(transformed_points), np.float32(points))

        trans = cv2.getAffineTransform(np.float32(points), np.float32(transformed_points))

        return trans, rev_trans


class HumanPoseEstimator(object):
    def __init__(self, ie, path_to_model_xml, scale=None, thr=-100, device='CPU'):
        self.model = ie.read_network(path_to_model_xml, os.path.splitext(path_to_model_xml)[0] + '.bin')

        assert len(self.model.input_info) == 1, "Expected 1 input blob"

        assert len(self.model.outputs) == 1, "Expected 1 output blob"

        self._input_layer_name = next(iter(self.model.input_info))
        self._output_layer_name = next(iter(self.model.outputs))
        self.CHANNELS_SIZE = 3
        self.OUTPUT_CHANNELS_SIZE = 17

        assert len(self.model.input_info[self._input_layer_name].input_data.shape) == 4 and \
               self.model.input_info[self._input_layer_name].input_data.shape[1] == self.CHANNELS_SIZE, \
            "Expected model input blob with shape [1, 3, H, W]"

        assert len(self.model.outputs[self._output_layer_name].shape) == 4 and \
               self.model.outputs[self._output_layer_name].shape[1] == self.OUTPUT_CHANNELS_SIZE, \
            "Expected model output shape [1, %s, H, W]" % (self.OUTPUT_CHANNELS_SIZE)

        self._ie = ie
        self._exec_model = self._ie.load_network(self.model, device)
        self._scale = scale
        self._thr = thr

        _, _, self.input_h, self.input_w = self.model.input_info[self._input_layer_name].input_data.shape
        _, _, self.output_h, self.output_w = self.model.outputs[self._output_layer_name].shape
        self._transform = TransformedCrop(self.input_h, self.input_w, self.output_h, self.output_w)
        self.infer_time = -1

    def _preprocess(self, img, bbox):
        return self._transform(img, bbox)

    def _infer(self, prep_img):
        t0 = cv2.getTickCount()
        output = self._exec_model.infer(inputs={self._input_layer_name: prep_img})
        self.infer_time = ((cv2.getTickCount() - t0) / cv2.getTickFrequency())
        return output[self._output_layer_name][0]

    @staticmethod
    def _postprocess(heatmaps, rev_trans):
        all_keypoints = [extract_keypoints(heatmap) for heatmap in heatmaps]
        all_keypoints_transformed = [affine_transform([kp[1][0], kp[1][1]], rev_trans) for kp in all_keypoints]

        return all_keypoints_transformed

    def estimate(self, img, bbox):
        rev_trans, preprocessed_img = self._preprocess(img, bbox)
        heatmaps = self._infer(preprocessed_img)
        keypoints = self._postprocess(heatmaps, rev_trans)
        return keypoints

#         return keypointout
pairs = [(3, 4), (3, 6), (3, 5), (5, 7), (7, 9), (4, 5), (4, 6), (6, 8), (8, 10), (5, 11),
         (11, 13), (13, 15), (6, 12), (12, 14), (14, 16), (0, 1), (1, 3), (0, 2), (2, 4)
         ]


class cacul_score(object):
    def __init__(self, path_to_guild_json, input_high):
        with open(path_to_guild_json) as f:
            self.guild_json = json.load(f)  # read json
        # print(self.guild_json)
        self.lengh_array = len(self.guild_json)
        print((self.lengh_array))
        self.default_margin = 0.1                      #5% different
        self.input_high = input_high
        self.coach_high = 180                            #in cm

    def real_margin(self):
        fin_margin = abs((self.coach_high - self.input_high)/1000)
        return fin_margin + self.default_margin
    #first cacul gradient
    def caculator_gradient(self, array_point):
        gradientResults = []
        for p in pairs:
            node_one_x = array_point[p[0]][0]  # x1
            node_one_y = array_point[p[0]][1]  # y1

            node_two_x = array_point[p[1]][0]  # x2
            node_two_y = array_point[p[1]][1]  # y2
            if (node_two_x - node_one_x) == 0:
                m = 0
            else:
                m = (node_two_y - node_one_y) / (node_two_x - node_one_x)
            gradientResults.append(m)
            # print("x1", node_one_x, "y1",node_one_y)
            # print("x2", node_two_x, "y2", node_two_y)
            # print("gradien m:", m)
        return gradientResults

    #second cacul the different between 2 gradient: guild and input
    #then cacul final score for 1 frame
    def oneframe_caculate_score(self, input_gradient, guild_index):

        frame_result = []
        print(guild_index)
        if(guild_index >= self.lengh_array):
             bad_score = 0
        else:
            for i in range(len(input_gradient)):
                if abs(self.guild_json[guild_index][i]) == 0:
                    diff = 0
                else:
                    diff = abs((abs(input_gradient[i]) -
                                abs(self.guild_json[guild_index][i])) / abs(self.guild_json[guild_index][i]))
                frame_result.append(diff)
                # print("diff ", i," = ", diff)
            #cacul result
            diff_margin = self.real_margin()
            sum_frame = sum(frame_result)
            if sum_frame <= (20*diff_margin):
                bad_score = 0
            else:
                bad_score = np.average(frame_result) - (diff_margin)

        return bad_score

    #function for main.py,
    # First - caculator for 1 frame
    def oneframe_cacul(self, array_point, guild_index):
        one_frame_gradian = self.caculator_gradient(array_point)
        one_frame_bad_score = self.oneframe_caculate_score(one_frame_gradian, guild_index)
        return one_frame_bad_score

    #for main.py
    # Second - total score for 1 frame
    def oneframe_score(self, bad_score_one_frame_array):
        average_bad_score = np.average(bad_score_one_frame_array)
        total_score = np.round((1 - average_bad_score)* 100, 2)
        return total_score

    #for main.py
    # Third - total score for all frame
    def comment_on_score(self, score):
        print("your score: ", score)
        if score >= 95:
            print("you are working right")
        if score > 90 and score < 95:
            print("execellent")
        if score > 80 and score <=90:
            print("good")
        if score > 70 and score <= 80:
            print("not good")
        if score > 60 and score <= 70:
            print("please see guild")
        if score >50 and score <= 60:
            print("keep working")
        if score <=50:
            print("please stop")

    #save json
    def total_score_save(self, input_total_gradient):

        with open('./input/guild.json', 'w') as json_file:
            json.dump(input_total_gradient, json_file)
        print("save json file")

Credits

cachep

cachep

4 projects • 8 followers

Comments