Hackster is hosting Hackster Holidays, Ep. 6: Livestream & Giveaway Drawing. Watch previous episodes or stream live on Monday!Stream Hackster Holidays, Ep. 6 on Monday!
explorer1
Published © GPL3+

Real-time Privacy Enabled Virtual Classroom and Conference

Enabling privacy for video call by removing background of end user. This feature will be applicable to Smart class and video conferencing.

IntermediateFull instructions provided2 hours718

Things used in this project

Hardware components

Intel(R) Xeon(R) Gold 5118 CPU @ 2.30GHz
x86_64
×1
Intel FPGA Arria10 PAC
×1
Raspberry Pi 4 Model B
Raspberry Pi 4 Model B
×1

Software apps and online services

OpenVINO™ toolkit
Intel OpenVINO™ toolkit

Story

Read more

Custom parts and enclosures

OpenVINO Toolkit Download Proof

Linux

OpenVINO Toolkit Download Proof - fpga

Linux FPGA

Schematics

Application flow

Code

main.py

Python
Follow given instructions in github to execute
from __future__ import print_function

import logging as log
import os
import sys
import time
from argparse import ArgumentParser, SUPPRESS

import cv2
import numpy as np
from openvino.inference_engine import IENetwork, IECore

from customize_video_background_with_person_segment.tracker import StaticIOUTracker
from customize_video_background_with_person_segment.visualizer import Visualizer


def build_argparser():
    parser = ArgumentParser(add_help=False)
    args = parser.add_argument_group('Options')
    args.add_argument('-h', '--help', action='help', default=SUPPRESS,
                      help='Show this help message and exit.')
    args.add_argument('-f', '--function', help='Required. enter \n 1 for removing background \n 2 for changing background \n 3 for background blurring.',
    required=True, type=str)
    args.add_argument('-m', '--model',
                      help='Required. Path to an .xml file with a trained model.',
                      required=True, type=str, metavar='"<path>"')
    args.add_argument('--labels',
                      help='Required. Path to a text file with class labels.',
                      required=True, type=str, metavar='"<path>"')
    args.add_argument('-i',
                      dest='input_source',
                      help='Required. Path to an image, video file or a numeric camera ID.',
                      required=True, type=str, metavar='"<path>"')
    args.add_argument('-d', '--device',
                      help='Optional. Specify the target device to infer on: CPU, GPU, FPGA, HDDL or MYRIAD. '
                           'The demo will look for a suitable plugin for device specified '
                           '(by default, it is CPU).',
                      default='CPU', type=str, metavar='"<device>"')
    args.add_argument('-l', '--cpu_extension',
                      help='Required for CPU custom layers. '
                           'Absolute path to a shared library with the kernels implementation.',
                      default=None, type=str, metavar='"<absolute_path>"')
    args.add_argument('--delay',
                      help='Optional. Interval in milliseconds of waiting for a key to be pressed.',
                      default=0, type=int, metavar='"<num>"')
    args.add_argument('--custom_image',
                      help='Required. Path to an custom backgroung image file.', default = '/Users/mahesh-4427/mahi_wfh/b.jpg', type=str, metavar='"<path>"')
    args.add_argument('-pt', '--prob_threshold',
                      help='Optional. Probability threshold for detections filtering.',
                      default=0.5, type=float, metavar='"<num>"')
    args.add_argument('--no_keep_aspect_ratio',
                      help='Optional. Force image resize not to keep aspect ratio.',
                      action='store_true')
    args.add_argument('--no_track',
                      help='Optional. Disable tracking.',
                      action='store_true')
    args.add_argument('--show_scores',
                      help='Optional. Show detection scores.',
                      action='store_true')
    args.add_argument('--show_boxes',
                      help='Optional. Show bounding boxes.',
                      action='store_true')
    args.add_argument('-pc', '--perf_counts',
                      help='Optional. Report performance counters.',
                      action='store_true')
    args.add_argument('-r', '--raw_output_message',
                      help='Optional. Output inference results raw values.',
                      action='store_true')
    args.add_argument("--no_show",
                      help="Optional. Don't show output",
                      action='store_true')
    return parser


def expand_box(box, scale):
    w_half = (box[2] - box[0]) * .5
    h_half = (box[3] - box[1]) * .5
    x_c = (box[2] + box[0]) * .5
    y_c = (box[3] + box[1]) * .5
    w_half *= scale
    h_half *= scale
    box_exp = np.zeros(box.shape)
    box_exp[0] = x_c - w_half
    box_exp[2] = x_c + w_half
    box_exp[1] = y_c - h_half
    box_exp[3] = y_c + h_half
    return box_exp


def segm_postprocess(box, raw_cls_mask, im_h, im_w):
    # Add zero border to prevent upsampling artifacts on segment borders.
    raw_cls_mask = np.pad(raw_cls_mask, ((1, 1), (1, 1)), 'constant', constant_values=0)
    extended_box = expand_box(box, raw_cls_mask.shape[0] / (raw_cls_mask.shape[0] - 1.0)).astype(int)
    #extended_box = expand_box(box, raw_cls_mask.shape[0] / (raw_cls_mask.shape[0] - 2.0)).astype(int)
    #w, h = np.maximum(extended_box[2:] - extended_box[:2] + 1, 1)
    w, h = np.maximum(extended_box[2:] - extended_box[:2] , 1)
    x0, y0 = np.clip(extended_box[:2], a_min=0, a_max=[im_w, im_h])
    #x1, y1 = np.clip(extended_box[2:] + 1, a_min=0, a_max=[im_w, im_h])
    x1, y1 = np.clip(extended_box[2:], a_min=0, a_max=[im_w, im_h])

    raw_cls_mask = cv2.resize(raw_cls_mask, (w, h)) > 0.5
    mask = raw_cls_mask.astype(np.uint8)
    # Put an object mask in an image mask.
    im_mask = np.zeros((im_h, im_w), dtype=np.uint8)
    im_mask[y0:y1, x0:x1] = mask[(y0 - extended_box[1]):(y1 - extended_box[1]),
                            (x0 - extended_box[0]):(x1 - extended_box[0])]
    return im_mask


def main():
    log.basicConfig(format='[ %(levelname)s ] %(message)s', level=log.INFO, stream=sys.stdout)
    args = build_argparser().parse_args()
    print(args.custom_image)
    model_xml = args.model
    model_bin = os.path.splitext(model_xml)[0] + '.bin'

    # Plugin initialization for specified device and load extensions library if specified.
    log.info('Creating Inference Engine...')
    ie = IECore()
    if args.cpu_extension and 'CPU' in args.device:
        ie.add_extension(args.cpu_extension, 'CPU')
    # Read IR
    log.info('Loading network files:\n\t{}\n\t{}'.format(model_xml, model_bin))
    net = IENetwork(model=model_xml, weights=model_bin)

    if 'CPU' in args.device:
        supported_layers = ie.query_network(net, 'CPU')
        not_supported_layers = [l for l in net.layers.keys() if l not in supported_layers]
        if len(not_supported_layers) != 0:
            log.error('Following layers are not supported by the plugin for specified device {}:\n {}'.
                      format(args.device, ', '.join(not_supported_layers)))
            log.error("Please try to specify cpu extensions library path in sample's command line parameters using -l "
                      "or --cpu_extension command line argument")
            sys.exit(1)

    required_input_keys = {'im_data', 'im_info'}
    assert required_input_keys == set(net.inputs.keys()), \
        'Demo supports only topologies with the following input keys: {}'.format(', '.join(required_input_keys))
    required_output_keys = {'boxes', 'scores', 'classes', 'raw_masks'}
    assert required_output_keys.issubset(net.outputs.keys()), \
        'Demo supports only topologies with the following output keys: {}'.format(', '.join(required_output_keys))

    n, c, h, w = net.inputs['im_data'].shape
    assert n == 1, 'Only batch 1 is supported by the demo application'

    log.info('Loading IR to the plugin...')
    exec_net = ie.load_network(network=net, device_name=args.device, num_requests=2)

    try:
        input_source = int(args.input_source)
    except ValueError:
        input_source = args.input_source
    cap = cv2.VideoCapture(input_source)
    if not cap.isOpened():
        log.error('Failed to open "{}"'.format(args.input_source))
    cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)

    if args.no_track:
        tracker = None
    else:
        tracker = StaticIOUTracker()

    with open(args.labels, 'rt') as labels_file:
        class_labels = labels_file.read().splitlines()

    visualizer = Visualizer(class_labels, show_boxes=args.show_boxes, show_scores=args.show_scores)

    render_time = 0

    log.info('Starting inference...')
    print("To close the application, press 'CTRL+C' here or switch to the output window and press ESC key")
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        if args.no_keep_aspect_ratio:
            # Resize the image to a target size.
            scale_x = w / frame.shape[1]
            scale_y = h / frame.shape[0]
            input_image = cv2.resize(frame, (w, h))
        else:
            # Resize the image to keep the same aspect ratio and to fit it to a window of a target size.
            scale_x = scale_y = min(h / frame.shape[0], w / frame.shape[1])
            input_image = cv2.resize(frame, None, fx=scale_x, fy=scale_y)

        input_image_size = input_image.shape[:2]
        input_image = np.pad(input_image, ((0, h - input_image_size[0]),
                                           (0, w - input_image_size[1]),
                                           (0, 0)),
                             mode='constant', constant_values=0)
        # Change data layout from HWC to CHW.
        input_image = input_image.transpose((2, 0, 1))
        input_image = input_image.reshape((n, c, h, w)).astype(np.float32)
        input_image_info = np.asarray([[input_image_size[0], input_image_size[1], 1]], dtype=np.float32)
        
        # Run the net.
        inf_start = time.time()
        outputs = exec_net.infer({'im_data': input_image, 'im_info': input_image_info})
        inf_end = time.time()
        det_time = inf_end - inf_start
        #print(det_time * 1000)
        #print(outputs)
        #output_1 = outputs.transpose((1,2,0))
        #print(output_1)
        
        #outputs_1 = exec_net.requests[0].outputs[self.output_blob]
        #outputs_1 = outputs_1.transpose((1,2,0))
        #print(output_1)
        # Parse detection results of the current request
        boxes = outputs['boxes']
        boxes[:, 0::2] /= scale_x
        boxes[:, 1::2] /= scale_y
        scores = outputs['scores']
        classes = outputs['classes']
        #classes = outputs['classes'].astype(np.uint32)
        
        #print(classes[0])
        masks = []
        for box, cls, raw_mask in zip(boxes, classes, outputs['raw_masks']):
            raw_cls_mask = raw_mask[cls, ...]
            mask = segm_postprocess(box, raw_cls_mask, frame.shape[0], frame.shape[1])
            masks.append(mask)
            #print(mask)

        # Filter out detections with low confidence.
        detections_filter = scores > args.prob_threshold
        scores = scores[detections_filter]
        classes = classes[detections_filter]
        boxes = boxes[detections_filter]
        #print(detections_filter)

        masks = list(segm for segm, is_valid in zip(masks, detections_filter) if is_valid)
        #print(masks)
        render_start = time.time()

        if len(boxes) and args.raw_output_message:
            log.info('Detected boxes:')
            log.info('  Class ID | Confidence |     XMIN |     YMIN |     XMAX |     YMAX ')
            for box, cls, score, mask in zip(boxes, classes, scores, masks):
                log.info('{:>10} | {:>10f} | {:>8.2f} | {:>8.2f} | {:>8.2f} | {:>8.2f} '.format(cls, score, *box))

        # Get instance track IDs.
        masks_tracks_ids = None
        if tracker is not None:
            masks_tracks_ids = tracker(masks, classes)
            
        function = args.function
        c_img = args.custom_image
        # Visualize masks.
        #cv2.imshow("frame",frame)
        frame = visualizer(function, c_img, frame, boxes, classes, scores, masks, masks_tracks_ids)

        # Draw performance stats.
        inf_time_message = 'Inference time: {:.3f} ms'.format(det_time * 1000)
        #fps = 'FPS'.format(1/inf_time_message)
        render_time_message = 'OpenCV rendering time: {:.3f} ms'.format(render_time * 1000)
        #cv2.putText(frame, inf_time_message, (15, 15), cv2.FONT_HERSHEY_COMPLEX, 0.5, (200, 10, 10), 1)
        #cv2.putText(frame, render_time_message, (15, 30), cv2.FONT_HERSHEY_COMPLEX, 0.5, (10, 10, 200), 1)
        #cv2.putText(frame, fps, (15, 45), cv2.FONT_HERSHEY_COMPLEX, 0.5, (200, 10, 10), 1)

        # Print performance counters.
        if args.perf_counts:
            perf_counts = exec_net.requests[0].get_perf_counts()
            log.info('Performance counters:')
            print('{:<70} {:<15} {:<15} {:<15} {:<10}'.format('name', 'layer_type', 'exet_type', 'status',
                                                              'real_time, us'))
            for layer, stats in perf_counts.items():
                print('{:<70} {:<15} {:<15} {:<15} {:<10}'.format(layer, stats['layer_type'], stats['exec_type'],
                                                                  stats['status'], stats['real_time']))

        if not args.no_show:
            # Show resulting image.
            cv2.imshow('Results', frame)
        render_end = time.time()
        render_time = render_end - render_start

        if not args.no_show:
            key = cv2.waitKey(args.delay)
            esc_code = 27
            if key == esc_code:
                break

    cv2.destroyAllWindows()
    cap.release()


if __name__ == '__main__':
    sys.exit(main() or 0)

tracker.py

Python
import numpy as np


class StaticIOUTracker(object):
    def __init__(self, iou_threshold=0.5, age_threshold=10):
        super().__init__()
        self.history = []
        self.history_areas = []
        self.history_classes = []
        self.ids = []
        self.age = []
        self.iou_threshold = iou_threshold
        self.age_threshold = age_threshold
        self.last_id = 0

    def affinity(self, masks, classes):
        areas = [np.count_nonzero(mask) for mask in masks]
        affinity_matrix = np.zeros((len(masks), len(self.history)), dtype=np.float32)
        for i, (history_mask, history_area, history_class) in \
                enumerate(zip(self.history, self.history_areas, self.history_classes)):
            for j, (mask, area, cls) in enumerate(zip(masks, areas, classes)):
                if cls != history_class:
                    continue
                intersection = np.count_nonzero(np.logical_and(history_mask, mask))
                union = history_area + area - intersection
                iou = intersection / union
                affinity_matrix[j, i] = iou
        return affinity_matrix, areas

    def __call__(self, masks, classes):
        # Get affinity with history.
        affinity_matrix, areas = self.affinity(masks, classes)

        # Make assignment of currents masks to existing tracks.
        assignment = []
        indices = np.arange(len(self.history))
        for i in range(len(masks)):
            j = 0
            affinity_score = -1.0
            if affinity_matrix.shape[1] > 0:
                j = np.argmax(affinity_matrix[i])
                affinity_score = affinity_matrix[i, j]
            if affinity_score > self.iou_threshold:
                assignment.append(indices[j])
                affinity_matrix = np.delete(affinity_matrix, j, 1)
                indices = np.delete(indices, j)
            else:
                assignment.append(None)

        # Increase age for existing tracks.
        for i in range(len(self.age)):
            self.age[i] += 1

        # Update existing tracks.
        for i, j in enumerate(assignment):
            if j is not None:
                self.history[j] = masks[i]
                self.history_areas[j] = areas[i]
                self.age[j] = 0
                assignment[i] = self.ids[j]

        # Prune out too old tracks.
        alive = tuple(i for i, age in enumerate(self.age) if age < self.age_threshold)
        self.history = list(self.history[i] for i in alive)
        self.history_areas = list(self.history_areas[i] for i in alive)
        self.history_classes = list(self.history_classes[i] for i in alive)
        self.age = list(self.age[i] for i in alive)
        self.ids = list(self.ids[i] for i in alive)

        # Save new tracks.
        for i, j in enumerate(assignment):
            if j is None:
                self.history.append(masks[i])
                self.history_areas.append(areas[i])
                self.history_classes.append(classes[i])
                self.age.append(0)
                self.ids.append(self.last_id)
                assignment[i] = self.last_id
                self.last_id += 1

        return assignment

visualizer.py

Python
from __future__ import print_function

import cv2
import numpy as np


class Visualizer(object):
    color_palette = np.array([[0, 113, 188]], dtype=np.uint8)
    def __init__(self, class_labels, confidence_threshold=0.8, show_boxes=False,
                 show_masks=True, show_scores=False):
        super().__init__()
        self.class_labels = [class_labels]
        #self.class_labels = [[1]]
        self.confidence_threshold = confidence_threshold
        self.class_color_palette = np.asarray([2 ** 25 - 1, 2 ** 15 - 1, 2 ** 21 - 1])
        self.instance_color_palette = self.color_palette
        self.show_masks = show_masks
        #print(show_masks)
        self.show_boxes = show_boxes
        self.show_scores = show_scores

    def __call__(self, function, c_img, image, boxes, classes, scores, segms=None, ids=None):
        result = image.copy()

        # Filter out detections with low confidence.
        filter_mask = scores > self.confidence_threshold
        scores = scores[filter_mask]
        classes = classes[filter_mask]
        #print(classes)
        boxes = boxes[filter_mask]
        #print(filter_mask)
        filter_mask = [True]
        
        if self.show_masks and segms is not None:
            segms = list(segm for segm, show in zip(segms, filter_mask) if show)
            #print(segms[0])
            result = self.overlay_masks(function, c_img, result, segms, classes, ids)

        if self.show_boxes:
            result = self.overlay_boxes(result, boxes, classes)

        result = self.overlay_class_names(result, boxes, classes, scores,
                                          show_score=self.show_scores)
        return result

    def compute_colors_for_labels(self, labels):
        print(labels)
        colors = labels[:, None] * self.class_color_palette
        colors = (colors % 255).astype(np.uint8)
        #print(colors)

        return colors

    def overlay_boxes(self, image, boxes, classes):
        colors = self.compute_colors_for_labels(classes).tolist()
        for box, color in zip(boxes, colors):
            box = box.astype(int)
            top_left, bottom_right = box[:2].tolist(), box[2:].tolist()
            cv2.imshow("g",image)
            image = cv2.rectangle(
                image, tuple(top_left), tuple(bottom_right), tuple(color), 1
            )
        return image

    def overlay_masks(self, function, c_img, image, masks, classes, ids=None):
        
        bg = cv2.imread(c_img, cv2.IMREAD_UNCHANGED)
        colors = (  1, 127,  31)
        segments_image = image.copy()
        aggregated_mask = np.zeros(image.shape[:2], dtype=np.uint8)
        aggregated_colored_mask = np.zeros(image.shape, dtype=np.uint8)
        for i, (mask, color) in enumerate(zip(masks, colors)):
            color_idx = i if ids is None else ids[i]
        
            mask_color = [255, 255, 255]
            cv2.bitwise_or(aggregated_mask, mask, dst=aggregated_mask)
            
            cv2.bitwise_or(aggregated_colored_mask, np.asarray(mask_color, dtype=np.uint8),
                           dst=aggregated_colored_mask, mask=mask)
                    
            mask_inv= cv2.bitwise_not(aggregated_colored_mask)
            #cv2.imshow("mask", aggregated_colored_mask)

        if function == '1':
            image = cv2.bitwise_or(mask_inv, image)

        if function == '2':
            image = cv2.bitwise_or(mask_inv, image)
            bg = cv2.resize(bg, (image.shape[1], image.shape[0]), interpolation = cv2.INTER_LINEAR)

            bg = cv2.bitwise_or(bg, aggregated_colored_mask)
            image = cv2.bitwise_and(image, bg)

        if function == '3':
            image = cv2.bitwise_or(mask_inv, image)
            segments_image = cv2.bitwise_or(segments_image, aggregated_colored_mask)
            segments_image = cv2.GaussianBlur(segments_image, (235,235), 0)
            
            image = cv2.bitwise_and(image, segments_image)
        
        if function == '0':
            image = cv2.bitwise_and(image, aggregated_colored_mask)
            #cv2.addWeighted(image, 0.5, segments_image, 0.5, 0, dst=image)
            
        image = image[0:image.shape[0]-70, 0:image.shape[1]]

        return image

    def overlay_class_names(self, image, boxes, classes, scores, show_score=True):
        labels = ['person']#[self.class_labels[i] for i in classes]
        template = '{}: {:.2f}' if show_score else '{}'
        white = (255, 255, 255)

        for box, score, label in zip(boxes, scores, labels):
            s = template.format(label, score)
            textsize = cv2.getTextSize(s, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1)[0]
            position = ((box[:2] + box[2:] - textsize) / 2).astype(int)
            #cv2.putText(image, s, tuple(position), cv2.FONT_HERSHEY_SIMPLEX, .5, white, 1)
        return image

Real-time Privacy Enabled Virtual Classroom/Discussion

Credits

explorer1

explorer1

2 projects • 0 followers
Thanks to Intel OpenVINO.

Comments