Hackster is hosting Hackster Holidays, Ep. 7: Livestream & Giveaway Drawing. Watch previous episodes or stream live on Friday!Stream Hackster Holidays, Ep. 7 on Friday!
nengelmann
Published © MIT

MeetingCam - Run your AI and CV algorithms in meetings πŸš€

Run your AI and CV algorithms in online meetings such as Zoom, Meets or Teams πŸš€ and by doing so, spread the love for computer vision! πŸ’ž

BeginnerFull instructions provided30 minutes373

Things used in this project

Hardware components

Computer (Linux OS: Ubuntu22.04)
×1
Webcam - any webcam will do
×1

Software apps and online services

Python
OpenCV
OpenCV

Story

Read more

Code

main.py

Python
#!/usr/bin/env python3

"""This script is the entry point of MeetingCam.
MeetingCam allows to manage and modify video streams and route them to web meeting tools like Teams, Meets or Zoom.
"""

import shutil
import sys
from pathlib import Path

import typer
from constants import DEPTHAI, TYPES, WEBCAM, TypeArgument
from device import DepthaiDevice, WebcamDevice
from jinja2 import Environment, FileSystemLoader
from plugins.plugin_utils import PluginRegistry
from print import Printer

app = typer.Typer(add_completion=False)

registry = PluginRegistry()
plugin_list = registry.search_plugins()
registry.register_plugins(app, plugin_list)

printer = Printer()


# app entry point of typer main app
@app.callback(
    invoke_without_command=True,
    epilog=printer.epilog(),
    no_args_is_help=False,
    help=(
        "AI and CV webcam utility for online meetings.\n\nRun your artificial"
        " intelligence and computer vision algorithms in online meetings such"
        " as Zoom, Meets or Teams! "
    ),
    context_settings={
        "help_option_names": ["-h", "--help"],
    },
)
def main(ctx: typer.Context) -> None:
    """
    Typer app entry point.
    Print title, subtitle or help, depending on context.
    """
    for plugin in app.registered_groups:
        if ctx.invoked_subcommand == plugin.name:
            printer.subtitle(plugin.name)

    if ctx.invoked_subcommand is None:
        printer.title()
        ctx.get_help()


# General-Commands
@app.command(
    help="List all camera devices",
    context_settings={"help_option_names": ["-h", "--help"]},
    rich_help_panel="General-Commands",
)
def list_devices(type: TYPES = TypeArgument) -> None:
    """List all camera devices, camera paths and their virtual counter part."""

    type = str(type.value)
    if type == DEPTHAI or type == "all":
        depthai_handler = DepthaiDevice(pipeline=None)
        printer.console.print(
            "\nAvailable depthai devices:", style="bold underline"
        )
        printer.available_devices(
            depthai_handler.mapping, depthai_handler.available_devices_real
        )
    if type == WEBCAM or type == "all":
        webcam_handler = WebcamDevice()
        printer.console.print(
            "\nAvailable webcam devices:", style="bold underline"
        )
        printer.available_devices(
            webcam_handler.mapping, webcam_handler.available_devices_real
        )


@app.command(
    help="List commands to add camera devices",
    context_settings={"help_option_names": ["-h", "--help"]},
    rich_help_panel="General-Commands",
)
def add_devices(type: TYPES = TypeArgument) -> None:
    """List commands on how to add camera devices to be used with MeetingCam."""

    type = str(type.value)
    depthai_handler = DepthaiDevice(pipeline=None)
    webcam_handler = WebcamDevice()

    if depthai_handler.mapping or webcam_handler.mapping:
        printer.console.print("\nThere are already virtual devices available!")
        reset_devices()
        sys.exit(0)

    if type == DEPTHAI or type == "all":
        if len(depthai_handler.available_devices_real[0]) > 0:
            printer.console.print(
                "\nAdd depthai devices:", style="bold underline"
            )
            printer.add_virtual_devices(
                depthai_handler.available_devices_real, DEPTHAI
            )
        else:
            printer.console.print(
                "\nNo depthai device available. Make sure to connect a depthai"
                " device to your PC.\n",
                style=printer.warning_style,
            )
            sys.exit(0)

    if type == WEBCAM or type == "all":
        if len(webcam_handler.available_devices_real[0]) > 0:
            printer.console.print(
                "\nAdd webcam devices:", style="bold underline"
            )
            printer.add_virtual_devices(
                webcam_handler.available_devices_real, WEBCAM
            )
        else:
            printer.console.print(
                "\nNo webcam device available. Make sure to connect a webcam"
                " device to your PC.\n",
                style=printer.warning_style,
            )
            sys.exit(0)
    if type == DEPTHAI or type == WEBCAM or type == "all":
        printer.console.print(
            "\nYou can [bold]reset[/bold] the added [bold]virtual"
            " devices[/bold] running:\n[bold][cyan]sudo modprobe -r"
            " v4l2loopback[/cyan][/bold]\nIf you want to run a reset, please"
            " close all applications which might access a camera device (e.g."
            " your browser) before running this command.\nIf you previously"
            " added devices, a reset is necessary before adding a new camera"
            " device.\n"
        )


@app.command(
    help="List reset command to reset virtual camera devices",
    context_settings={"help_option_names": ["-h", "--help"]},
    rich_help_panel="General-Commands",
)
def reset_devices() -> None:
    """List command on how to reset the added (virtual) camera devices."""
    printer.reset_devices()


@app.command(
    help="Create new plugin",
    context_settings={"help_option_names": ["-h", "--help"]},
    rich_help_panel="General-Commands",
)
def create_plugin(
    name: str = typer.Option(default=None, help="Name of new plugin"),
    short_description: str = typer.Option(
        default=None, help="Short one line description of new plugin"
    ),
    description: str = typer.Option(
        default=None, help="Description of new plugin"
    ),
) -> None:
    """Create a new plugin"""

    if name in plugin_list:
        typer.echo(f"Plugin {name} already exists.")
        sys.exit(0)
    if not name:
        name = typer.prompt("Name of new plugin")
    if not short_description:
        short_description = typer.prompt(
            "Short one line description of new plugin"
        )
    if not description:
        description = typer.prompt("Description of new plugin")

    path = Path(f"src/meetingcam/plugins/{name}")
    path.mkdir(parents=True, exist_ok=True)
    template_path = Path("src/meetingcam/plugins/plugin_template.py")

    env = Environment(loader=FileSystemLoader(template_path.parent.as_posix()))
    template = env.get_template(template_path.name)
    output = template.render(
        name=name, short_description=short_description, description=description
    )

    with open(f"{path}/plugin.py", "w") as f:
        f.write(output)

    with open(f"{path}/__init__.py", "w") as f:
        f.write("")

    typer.echo(f"Plugin {name} created successfully.")


@app.command(
    help="Delete plugin",
    context_settings={"help_option_names": ["-h", "--help"]},
    rich_help_panel="General-Commands",
)
def delete_plugin(
    name: str = typer.Option(default=None, help="Name of plugin to delete")
) -> None:
    """Delete a plugin"""

    if name not in plugin_list:
        typer.echo(f"Plugin {name} does not exist.")
        sys.exit(0)

    confirm = typer.confirm(f"Are you sure you want to delete plugin {name}?")
    if not confirm:
        typer.echo(f"Plugin {name} not deleted.")
        sys.exit(0)
    else:
        path = Path(f"src/meetingcam/plugins/{name}")
        shutil.rmtree(Path(f"src/meetingcam/plugins/{name}"))
        typer.echo(f"Plugin {name} deleted successfully.")


if __name__ == "__main__":
    app()

runner.py

Python
"""This file contains a runner class which is initializing and running the main loop within MeetingCam."""

import cv2
import pyvirtualcam
from constants import DEPTHAI, WEBCAM
from device import DepthaiDevice, WebcamDevice
from plugins.plugin_utils import PluginBase, PluginDepthai
from utils import KeyHandler


class Runner:
    def __init__(
        self,
        plugin: PluginBase | PluginDepthai,
        device_path: str | None = None,
    ) -> None:
        """Initialize the device handler, real and virtual camera devices.

        Args:
            plugin --- Plugin which should be run.
            device_path --- The device path for the real camera. Defaults to None.
        """

        if type(device_path) is not str and type(device_path) is not None:
            device_path = device_path.value

        self.plugin = plugin

        if self.plugin.type == DEPTHAI:
            # initialize camera device handler with depthai as input device
            self.device_handler = DepthaiDevice(plugin.pipeline)
            self.virtual_path = self.device_handler.init_device(device_path)
        elif self.plugin.type == WEBCAM:
            # initialize camera device handler with webcam as input device
            self.device_handler = WebcamDevice()
            self.virtual_path = self.device_handler.init_device(device_path)
        else:
            raise ValueError(
                "Your plugin needs to have a plugin type assigned."
                " plugin.type: WEBCAM | DEPTHAI"
            )

    def run(self) -> None:
        """Main loop for video frame capture and processing within MeetingCam."""

        if self.plugin.verbose:
            # print available hotkeys
            print(
                "\nThe following keyboard triggers and switches are available"
                " within this plugin:"
            )
            for h in KeyHandler().default_hotkeys:
                print(f"{h.hotkey}:    {h.description}")
            for h in self.plugin.hotkeys:
                print(f"{h.hotkey}:    {h.description}")
            print("")
        # initialize real camera, to get frames
        with self.device_handler.get_device() as r_cam:
            # custom setup for depthai (on device handling)
            if self.plugin.type == DEPTHAI:
                r_cam.setup(self.plugin.device_setup, self.plugin.acquisition)

            # initialize a virtual camera where the modified frames will be sent to
            with pyvirtualcam.Camera(
                width=r_cam.width,
                height=r_cam.height,
                fps=24,
                device=self.virtual_path,
            ) as v_cam:
                # initialize a keyboard keyhandler to get and use keystroke during runtime as trigger or switch
                with self.plugin.keyhandler() as keyhandler:
                    keyhandler.start()

                    # print in command line that the pipeline is running
                    self.device_handler.device_running()

                    # get frames from real camera, process it and sent it out via virtual camera
                    while True:
                        # get a frame and optionally some on camera detections
                        frame, detection = r_cam.get_frame()

                        # convert bgr to rgb if <Ctrl>+<Alt>+r keys are pressed
                        if keyhandler.bgr2rgb:
                            frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)

                        frame = self.plugin.process(
                            frame, detection, keyhandler
                        )

                        # flip image if <Ctrl>+<Alt>+m keys are pressed
                        if keyhandler.mirror:
                            frame = cv2.flip(frame, 1)

                        frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

                        # sent out the modified frame to the virtual camera
                        v_cam.send(frame)
                        v_cam.sleep_until_next_frame()

utils.py

Python
from types import MethodType
from typing import Any, Callable

import cv2
import depthai
from constants import MAX_HEIGHT, MAX_WIDTH
from numpy.typing import NDArray
from pynput import keyboard
from typing_extensions import Self


class VideoCapture(cv2.VideoCapture):
    """Handle video capture functionalities with OpenCV.

    Inherits from cv2.VideoCapture and extends functionalities with
    methods to safely enter, exit, get frames and frame rates from
    a video capture.
    """

    def __init__(self, *args: Any, **kwargs: Any) -> None:
        """Initialize the VideoCapture object with width and height properties."""
        super().__init__(*args, **kwargs)
        cam_width = int(self.get(cv2.CAP_PROP_FRAME_WIDTH))
        cam_height = int(self.get(cv2.CAP_PROP_FRAME_HEIGHT))
        self.width = min(cam_width, MAX_WIDTH)
        self.height = min(cam_height, MAX_HEIGHT)

        self.img_handler = ImageHandler()

    def __enter__(self) -> Self:
        """Enter method for context management, returning self."""
        return self

    def __exit__(self, *args: Any) -> None:
        """Exit method for context management, releasing the video capture object."""
        self.release()

    def get_frame(self) -> NDArray[Any]:
        """Capture a frame from the video stream.

        Raises:
            RuntimeError: If image acquisition fails.

        Returns:
            A frame which has been captured by the camera.
        """
        (grabbed, frame) = self.read()

        if not grabbed:
            raise RuntimeError(
                "Image acquisition failed. Make sure the specified camera is"
                " not running in another application."
            )

        # high image resolution is usually not supported by online meeting tools
        frame = self.img_handler.correct_img_size(frame)

        # return the frame and None, there is no detection available from a webcam
        return frame, None

    def get_fps(self) -> int:
        """Retrieve the frames per second (FPS) of the video capture.

        Prints the total FPS and returns the FPS as an integer.
        """
        fps = self.get(cv2.CAP_PROP_FPS)
        print(f"total FPS: {int(fps)}")
        return int(fps)


class DepthaiCapture(depthai.Device):
    """Handle video capture functionalities with depthai.

    Inherits from depthai.Device and extends functionalities with
    methods to safely enter, exit, get frames and frame rates from
    a depthai device.
    """

    def __init__(self, *args: Any, **kwargs: Any) -> None:
        """Initialize the depthai.Device object and assign width and height properties."""
        super().__init__(*args, **kwargs)
        self.width = MAX_WIDTH
        self.height = MAX_HEIGHT
        self.img_handler = ImageHandler()

    def __enter__(self) -> Self:
        """Enter method for context management, returning self."""
        return self

    def __exit__(self, *args: Any) -> None:
        """Exit method for context management, releasing the video capture object."""
        self.close()

    def get_frame(self) -> NDArray[Any]:
        """
        Capture a frame from the video stream.

        Returns:
            A frame and potentially detections which has been captured by the camera.
        """
        # get image and detections from depthai plugin (image acquisition function)
        img, det = self.acquisition()

        # high image resolution is usually not supported by online meeting tools
        img = self.img_handler.correct_img_size(img)

        return img, det

    def setup(self, setup_func: Callable, acquisition_func: Callable):
        """Setup initializations defined in the plugins setup function and create an acquisition function also based on plugin specification.

        Args:
            setup_func --- initialization function of plugin
            acquisition_func --- handling function for image acquisition of plugin
        """
        setup_func(self)
        self.acquisition = MethodType(acquisition_func, self)

    def get_fps(self) -> int:
        """Retrieve the frames per second (FPS) of the video capture.

        Prints the total FPS and returns the FPS as an integer.
        """

        # fps =
        # print(f"total FPS: {int(fps)}")
        # return int(fps)
        raise NotImplementedError


class KeyHandler(keyboard.GlobalHotKeys):
    """A class to handle global hotkeys and their functionalities.

    Inherits from keyboard.GlobalHotKeys and defines hotkey triggers.
    """

    def __init__(
        self, plugin_hotkeys: dict[str, str] = {}, verbose=False
    ) -> None:
        """Initialize the KeyHandler with hotkeys and their respective states.

        Args:
            hotkeys: A dictionary of hotkeys and their respective variable name.
        """

        self.hotkeys = {}
        self.verbose = verbose

        self.default_hotkeys = [
            Hotkey(
                "<ctrl>+<alt>+r",
                "bgr2rgb",
                False,
                "Switch RGB to BGR color schema.",
            ),
            Hotkey(
                "<ctrl>+<alt>+m", "mirror", True, "Mirror the camera stream."
            ),
        ]

        for h in self.default_hotkeys:
            self.add_trigger(h.hotkey, h.variable, h.enabled)

        for h in plugin_hotkeys:
            self.add_trigger(h.hotkey, h.variable, h.enabled)

        super().__init__(self.hotkeys)

    def __enter__(self) -> Self:
        """Enter method for context management, returning self."""
        return self

    def __exit__(self, *args: tuple[Any]) -> None:
        """Exit method for context management, stopping the KeyHandler."""
        self.stop()

    def add_trigger(
        self, hotkey: str, variable: str, enabled: bool = True
    ) -> None:
        """Add a new trigger function for a specific hotkey.

        Args:
            hotkey: The hotkey string.
            variable: The name of the variable to toggle.
        """
        self.validate(hotkey, variable)
        setattr(self, variable, enabled)

        def trigger_func():
            setattr(self, variable, not getattr(self, variable))
            if self.verbose:
                print(
                    "Triggered: ",
                    hotkey,
                    " ",
                    variable,
                    " ",
                    getattr(self, variable),
                )

        self.hotkeys[hotkey] = trigger_func

    def validate(self, hotkey: str, variable: str) -> None:
        """Check if a hotkey and variable names are valid and not existing.

        Args:
            hotkey: The hotkey string.
            variable: The name of the variable to toggle."""
        if not isinstance(hotkey, str):
            raise ValueError(
                "Hotkey must be a string. For example: '<Ctrl>+<Alt>+z'"
            )
        if not isinstance(variable, str):
            raise ValueError(
                "Variable must be a string. For example: 'trig_var'"
            )
        if variable in self.hotkeys.values():
            raise ValueError(f"Variable name '{variable}' already exists.")
        if hotkey in self.hotkeys.keys():
            raise ValueError(f"Hotkey '{hotkey}' already exists.")


class Hotkey:
    """Hotkey data structure."""

    def __init__(
        self,
        hotkey: str,
        variable: str,
        enabled: bool = True,
        description: str = None,
    ) -> None:
        """Initialize the Hotkey object with a hotkey string, a variable name and a boolean value indicating whether the hotkey is enabled or not.

        Args:
            hotkey: The hotkey string.
            variable: The name of the variable to toggle.
            enabled: A boolean value indicating whether the hotkey is enabled or not.
            description: A description of the hotkey.
        """
        self.hotkey = hotkey
        self.variable = variable
        self.enabled = enabled
        self.description = description


class InvalidArgumentError(ValueError):
    """Custom exception for invalid arguments, inherits from ValueError."""

    def __init__(self, message: str = "Invalid argument") -> None:
        """Initialize the exception with a custom or default message."""
        super().__init__(message)


class ArgumentHandler:
    """A class to handle and validate command-line arguments."""

    def __init__(self, ctx_args: list[str]) -> None:
        """Initialize the ArgumentHandler and check the provided arguments."""
        self.check(ctx_args)

    def check(self, ctx_args: list[str]) -> None:
        """Check the validity of the command-line arguments.

        Raises:
            InvalidArgumentError: If the arguments are not correctly formatted.
        """
        if len(ctx_args) < 2 and len(ctx_args) != 0:
            raise InvalidArgumentError(
                "You need to provide extra arguments in the form '--name'"
                " 'argument'. "
            )

        if bool(len(ctx_args) % 2):
            raise InvalidArgumentError(
                "You need to provide extra arguments in the form '--name'"
                " 'argument'. Multiple arguments per parameter are not"
                " allowed. The following is invalid: '--your_param argument1"
                " argument2', while '--your_param argument1argument2' or"
                " '--your_param1 argument1 --your_param2 argument2' are valid."
            )

    def print(self, ctx_args: list[str]) -> None:
        """Print the list of provided command-line arguments or a message if none were provided."""
        if len(ctx_args) > 0:
            print("\nProvided extra arguments are:")
            for param, arg in zip(ctx_args[0::2], ctx_args[1::2]):
                print(param, ": ", arg)
        else:
            print("\nNo extra arguments provided.\n")

    def get(self, pram_name: str, ctx_args: list[str]) -> str | None:
        """Retrieve the value of a specific parameter from the command-line arguments.

        Args:
            pram_name --- the name of the parameter to retrieve.
            ctx_args --- a list of command-line arguments.

        Returns:
            The value of the parameter if found, otherwise None.
        """
        if pram_name in ctx_args:
            idx = ctx_args.index(pram_name) + 1
            arg = ctx_args[idx]
            return arg
        else:
            return None


class ImageHandler:
    """A class to handle image processing functions."""

    def __init__(self) -> None:
        """Initialize placeholder."""
        pass

    def correct_img_size(self, image: NDArray[Any]) -> NDArray[Any]:
        """Check image for maximum image size and resize if exceeded.
        The resizing will just be applied on the exceeding dimension (width, height or both) without taking the aspect ratio into account.

        Args:
            image --- the image to be processed.

        Returns:
            The processed image.
        """
        if image.shape[0] > MAX_HEIGHT or image.shape[1] > MAX_WIDTH:
            h = min(image.shape[0], MAX_HEIGHT)
            w = min(image.shape[1], MAX_WIDTH)
            image = cv2.resize(image, (w, h))
        return image

device.py

Python
"""This file facilitates the management and utilization of virtual and real camera devices."""

import re
import signal
import sys
from enum import Enum
from pathlib import Path
from types import FrameType
from typing import Any, Optional

import depthai
from constants import DEPTHAI, WEBCAM, PluginType
from print import Printer
from utils import DepthaiCapture, VideoCapture
from v4l2ctl import V4l2Capabilities, V4l2Device


class V4l2Capture(V4l2Device):
    """Handle real camera devices.

    Inherits from v4l2ctl.V4l2Device and extends functionalities with
    methods to safely enter and exit.
    """

    def __enter__(self) -> V4l2Device:
        """Enter method for context management, returning self."""
        return self

    def __exit__(self, *args: tuple[Any]) -> None:
        """Exit method for context management, releasing the devices."""
        self.flush()
        self.close()


class DeviceHandler:
    """Base class for management and mapping of real and virtual devices."""

    def __init__(self) -> None:
        """Initialize the DeviceHandler by setting up necessary mappings and signal handler."""
        signal.signal(signal.SIGINT, self._interrupt)

        self.mapping = self.device_mapping()
        self.pprint = Printer()

    def _interrupt(self, signal: int, frame: FrameType | None) -> None:
        """Handle SIGINT signal by stopping the device and exiting the program."""
        self.pprint.device_stopped()
        sys.exit(0)

    def init_device(self, device_path: str | None) -> str:
        """Initialize the device by analyzing the device path and mapping it a virtual device.

        Args:
            device_path -- the path of the device to initialize, None if not specified

        Returns:
            A a string containing the path to the virtual device.
        """
        device_map = {
            device["path_real"]: device["path_virtual"]
            for device in self.mapping.values()
        }
        if device_path:
            # device is specified as argument in command line
            if device_path in device_map.keys():
                # check if specified device is available in real devices and has virtual counterpart
                self.real_path = device_path
                virtual_path = device_map[device_path]
                return virtual_path
            else:
                # if not print that the specified device needs to have a virtual counterpart and how to do it
                self.pprint.device_not_available(device_path)
                self.pprint.available_devices(
                    self.mapping, self.available_devices_real
                )
                self.pprint.add_device_first()
                sys.exit()
        else:
            raise ValueError(
                f"Device path is {device_path}, which is not a valid device"
                " path."
            )

    def device_mapping(self) -> dict[int, dict[str, str]]:
        """Create a mapping from real camera devices to virtual camera devices.

        Returns:
            A dictionary that contains information about real and virtual devices.
        """

        device_map = {}

        (paths_real, labels_real) = self.available_devices_real
        (paths_virtual, labels_virtual) = self.available_devices_virtual

        # extract the IDs (first number in virtual camera label) of available virtual cameras for mapping it to real cameras
        # the mapping index is same as real cameras path id. e.g. /dev/video_n_ with n as id
        if self.type == WEBCAM:
            mapping_ids = [
                int(re.findall(r"\d+", label)[0]) for label in labels_virtual
            ]
            ids_real = [
                int(path.replace("/dev/video", "")) for path in paths_real
            ]

            try:
                mapping_indices = [ids_real.index(id) for id in mapping_ids]
            except:
                mapping_indices = {}

        elif self.type == DEPTHAI:
            mapping_indices = []
            for i, mxid in enumerate(paths_real):
                for j, label in enumerate(labels_virtual):
                    if mxid in label:
                        mapping_indices.append(j)
        else:
            raise NotImplementedError(
                "Device type needs to be either 'webcam' or 'depthai'."
            )

        for i, j in enumerate(mapping_indices):
            device_map[i] = {
                "path_real": paths_real[j],
                "label_real": labels_real[j],
                "path_virtual": paths_virtual[i],
                "label_virtual": labels_virtual[i],
            }

        return device_map

    def get_available(self, real: bool) -> tuple[list[str], list[str]]:
        """Return a list of available real or virtual camera devices based on the input flag.

        Args:
            real --- a flag to determine whether to return real or virtual devices.

        Returns:
            A tuple containing lists of device paths and labels.
        """

        device_paths = []
        labels = []

        with V4l2Capture() as v4l2:
            for device in v4l2.iter_devices(skip_links=True):
                if (
                    real
                    and not self._is_virtual_device(device)
                    and V4l2Capabilities.VIDEO_CAPTURE in device.capabilities
                ):
                    device_path, label = self._get_device_info(device)
                elif (
                    not real
                    and self._is_virtual_device(device)
                    and V4l2Capabilities.VIDEO_OUTPUT in device.capabilities
                ):
                    device_path, label = self._get_device_info(device)
                else:
                    continue

                labels.append(label)
                device_paths.append(device_path)

        return device_paths, labels

    def device_running(self) -> None:
        """Print the running device."""
        self.pprint.device_running()

    def _is_virtual_device(self, device: V4l2Device) -> bool:
        """Check if the device is a virtual device.

        Args:
            device (V4l2Device) --- the device to check.

        Returns:
            True if the device is virtual, False otherwise.
        """
        virtual = True if "platform:v4l2loopback" in str(device.bus) else False
        return virtual

    def _get_device_info(self, device: V4l2Device) -> tuple[str, str]:
        """Extract and return device information including path and label.

        Args:
            device (V4l2Device) --- the device to extract information from.

        Returns:
            A tuple containing the device path (str) and label (str).

        Raises:
            AssertionError: If the device path does not follow the expected format or does not exist.
        """
        device_path = str(device.device)
        card_label = str(device.name)

        assert device_path.__contains__(
            "/dev/video"
        ), "Device name '{device_name}' should be of format '/dev/video_n_'."
        assert Path(
            device_path
        ).exists(), "Device path '{device_name}' does not exist."
        return device_path, card_label


class WebcamDevice(DeviceHandler):
    """Handles the management and mapping of real and virtual webcam devices."""

    def __init__(self) -> None:
        """Initialize the WebcamDevice class."""

        self.type = WEBCAM
        self.real_path = None
        self.available_devices_real = None
        self.available_devices_virtual = None

        self.update_available()
        super().__init__()

    def get_device(self) -> VideoCapture:
        """Get a VideoCapture class instance can be used for webcam image acquisition.

        Raises:
            LookupError in case the device path is incorrect.

        Returns:
            A VideoCapture class instance.
        """
        if self.real_path:
            return VideoCapture(self.real_path)
        else:
            raise LookupError(
                "self.real_path is {self.real_path}. Device initialization"
                " 'self.init_device(device_path)' is needed before a device"
                " instance is created."
            )

    def update_available(self) -> None:
        """Update the list of available real and virtual devices."""
        self.available_devices_real = self.get_available(real=True)
        self.available_devices_virtual = self.get_available(real=False)


class DepthaiDevice(DeviceHandler):
    """Handles the management and mapping of depthai and virtual devices."""

    def __init__(self, pipeline) -> None:
        """Initialize the DepthaiDevice class."""

        self.type = DEPTHAI
        self.real_path = None
        self.available_devices_real = None
        self.available_devices_virtual = None

        self.update_available()
        super().__init__()

        self.usb_speed = depthai.UsbSpeed.SUPER_PLUS
        self.device_info = None
        self.pipeline = pipeline

    def get_device(self) -> DepthaiCapture:
        """Get a DepthaiCapture class instance can be used for depthai image acquisition.

        Raises:
            LookupError in case the device path is incorrect.

        Returns:
            A DepthaiCapture class instance.
        """
        if self.real_path:
            self.device_info = depthai.DeviceInfo(self.real_path)
            return DepthaiCapture(
                pipeline=self.pipeline,
                deviceInfo=self.device_info,
                maxUsbSpeed=self.usb_speed,
            )
        else:
            raise LookupError(
                "self.real_path is {self.real_path}. Device initialization"
                " 'self.init_device(device_path)' is needed before a device"
                " instance is created."
            )

    def update_available(self) -> None:
        """Update the list of available real and virtual devices."""
        self.available_devices_real = self.get_available_depthai()
        self.available_devices_virtual = self.get_available(real=False)

    def get_available_depthai(
        self,
    ) -> tuple[list[str], list[str],]:
        """Get available depthai camera devices.

        Returns:
            Device paths and labels.
        """
        device_paths = []
        labels = []

        device_info = depthai.Device.getAllAvailableDevices()

        for device in device_info:
            labels.append(f"OAK Device on port {device.name}")
            device_paths.append(device.mxid)

        return device_paths, labels


def device_choice(type: PluginType) -> Enum | Optional[str]:
    """Get default values for device paths if available"""
    if type == WEBCAM:
        device_handler = WebcamDevice()
    elif type == DEPTHAI:
        device_handler = DepthaiDevice(pipeline=None)
    else:
        raise ValueError(
            f"Device type needs to be '{WEBCAM}' or '{DEPTHAI}' not '{type}'"
        )

    if device_handler.mapping:
        devices = {
            d["path_real"]: d["path_real"]
            for d in device_handler.mapping.values()
        }
        DevicePath = Enum("DevicePath", devices)
        return DevicePath
    else:
        return Optional[str]

plugin_template.py

Python
from typing import Any, Optional, Type

import cv2
import typer
from constants import WEBCAM, DevicePathWebcam
from device import device_choice
from numpy.typing import NDArray
from runner import Runner
from utils import Hotkey, KeyHandler

from ..plugin_utils import PluginBase

name = "{{name}}"  # modifiable
short_description = "{{short_description}}"  # modifiable
description = """
{{description}}"""  # modifiable

TYPE = WEBCAM
DevicePath = device_choice(TYPE)

plugin_txt = f"\n\n\n\nPlugin type: {TYPE}"

plugin_app = typer.Typer(
    name=name,
    context_settings={"help_option_names": ["-h", "--help"]},
    no_args_is_help=True,
    short_help=short_description,
    help=str(description + plugin_txt),
    invoke_without_command=True,
)
plugin_app.type = TYPE


class CustomPlugin(PluginBase):
    """A custom plugin.

    This class is based on a template and meant for modification to make it your custom plugin.
    """

    def __init__(
        self, arg1: str | None = None, arg2: str | None = None
    ) -> None:
        """Initialize the plugin wit custom arguments, variables and classes.

        Args:
            arg1 --- argument you define.
            arg2 --- argument you define.

        """
        super().__init__()
        self.type = TYPE

        # modify arguments and
        # initialize additional variables or classes (e.g. your AI model)
        # e.g.:
        # self.arg1 = arg1

        # you can use the model path to save and load your model weights
        # e.g.:
        # self.model_path = (Path(self.model_dir) / "your_model.pth")

        # define custom triggers (hotkeys, which you can trigger and enable/disable functionality)
        # e.g.:
        # self.hotkeys = {
        #    "<Ctrl>+<Alt>+z", "z_trigger", True, "toggle text",
        #    "<ctrl>+<alt>+p", "p_trigger", False, "toggle text",
        #    key_combination, variable_name, is_enabled, description
        # }
        # this will get you two triggers (keyhandler.z_trigger, keyhandler.p_trigger), set to be True, False respectively
        # you can use them to enable/disable functionality based on the trigger state
        self.hotkeys = [
            Hotkey("<Ctrl>+<Alt>+z", "z_trigger", True, "toggle text"),
        ]
        # set verbose to True to print additional information like available hotkeys and hotkey changes
        self.verbose = True

    def process(
        self,
        image: NDArray[Any],
        detection: Any,
        keyhandler: Type[KeyHandler],
    ) -> NDArray[Any]:
        """Process the webcam image and return a modified image.

        Args:
            image --- the input image (real camera frames) to be processed.
            detection --- the on camera detection (just in case of depthai camera).
            keyhandler --- keyhandler instance to enable/disable functionality by hotkey trigger.

        Returns:
            The processed image which will be sent to the virtual camera (video meeting stream)
        """

        # implement your custom image processing here
        # e.g. run opencv functions on the image or your AI model

        image = cv2.putText(
            image,
            "Your custom plugin",
            (50, 75),
            fontFace=cv2.FONT_HERSHEY_SIMPLEX,
            fontScale=1,
            color=(0, 255, 0),
            thickness=2,
        )

        # If z_trigger <Ctrl>+<Alt>+z is True, print in additional text
        if keyhandler.z_trigger:
            image = cv2.putText(
                image,
                "Toggle this text with <Ctrl>+<Alt>+z",
                (50, 200),
                fontFace=cv2.FONT_HERSHEY_SIMPLEX,
                fontScale=0.7,
                color=(0, 255, 0),
                thickness=1,
            )

        # If not z_trigger <Ctrl>+<Alt>+z, print next steps
        if not keyhandler.z_trigger:
            image = cv2.putText(
                image,
                "Great! Now you can start to modify this plugin. ;)",
                (50, 300),
                fontFace=cv2.FONT_HERSHEY_SIMPLEX,
                fontScale=0.7,
                color=(0, 255, 0),
                thickness=1,
            )

        return image


@plugin_app.callback(rich_help_panel="Plugin-Commands")
def main(
    device_path: DevicePath = DevicePathWebcam,
    arg1: Optional[str] = typer.Option(
        default="", help="Your custom argument 1."
    ),
    arg2: Optional[str] = typer.Option(
        default="", help="Your custom argument 2."
    ),
):
    # define plugin
    plugin = CustomPlugin(arg1, arg2)
    # define runner
    runner = Runner(plugin, device_path)
    # run
    runner.run()

plugin_utils.py

Python
from abc import ABC, abstractmethod
from pathlib import Path
from typing import Any

import depthai
import typer
from constants import DEPTHAI, WEBCAM
from numpy.typing import NDArray
from utils import KeyHandler


class PluginBase(ABC):
    """PluginBase class acts as a base class for plugins with image processing capabilities.

    This class is intended to be subclassed, with subclasses implementing the abstract
    `__init__` and `process` method to perform image processing tasks.

    Args:
        model_dir --- directory where models are stored. Defaults to "./src/meetingcam/models".
    """

    @abstractmethod
    def __init__(self, *args: tuple[Any], **kwargs: tuple[Any]) -> None:
        """Initialize the PluginBase instance with the default model directory.

        Args:
            *args --- Variable length argument list.
            **kwargs --- Arbitrary keyword arguments.

            With this custom arguments can be passed to plugins, e.g. --name YourName
        """
        self.model_dir = "./src/meetingcam/models"
        self.type = WEBCAM  # defaults to webcam
        self.hotkeys = {}
        self.verbose = False

    @abstractmethod
    def process(
        self,
        image: NDArray[Any],
        detection: Any,
        trigger: tuple[bool, bool, bool],
    ) -> NDArray[Any]:
        """Process an image through the plugin's image processing method.

        This method is abstract and should be overwritten in subclasses to implement
        specific image processing functionality.

        Args:
            image --- the image to be processed.

        Returns:
            The processed image.
        """
        pass

    def keyhandler(self) -> KeyHandler:
        """Return the keyhandler for this plugin."""
        return KeyHandler(self.hotkeys, self.verbose)


class PluginDepthai(PluginBase):
    """PluginBase class for Depthai plugins with on device compute."""

    @abstractmethod
    def __init__(self, *args: tuple[Any], **kwargs: tuple[Any]) -> None:
        """Initialize the PluginBase instance with the default model directory.

        Args:
            *args --- Variable length argument list.
            **kwargs --- Arbitrary keyword arguments.

            With this custom arguments can be passed to plugins, e.g. --name YourName
        """
        super().__init__()
        self.type = DEPTHAI

    @abstractmethod
    def device_setup(self, device) -> None:
        """Setup of device before image acquisition loop, e.g. for get queue definition etc.

        Args:
            device --- depthai device
        """
        pass

    @abstractmethod
    def acquisition(
        self, device
    ) -> tuple[NDArray[Any], list[depthai.ImgDetection]]:
        """Acquire an image and optionally detections from camera queue and return them.

        Args:
            device --- depthai device
        """
        pass


class PluginRegistry:
    """PluginRegistry class acts as registry for default and newly added plugins."""

    def register_plugins(
        self, main_app: typer.main.Typer, plugin_list: list
    ) -> None:
        """Register plugins in typer main app"""
        for name in plugin_list:
            plugin_app = self._import_plugin(
                f"plugins.{name}.plugin", "plugin_app"
            )
            help = (
                plugin_app.info.help
                if type(plugin_app.info.help) is str
                else None
            )
            short_help = (
                plugin_app.info.short_help
                if type(plugin_app.info.short_help) is str
                else None
            )
            plugin_name = (
                plugin_app.info.name
                if type(plugin_app.info.help) is str
                else name
            )
            main_app.add_typer(
                plugin_app, name=plugin_name, help=help, short_help=short_help
            )

    def search_plugins(self, path: str = "src/meetingcam/plugins") -> list:
        """Check plugin directory for plugins and return list with found and valid plugins."""
        dirs = [d for d in Path(path).iterdir() if d.is_dir()]
        dirs = self._sortout(dirs)
        plugins = []

        for dir in dirs:
            plugin_name = dir.name
            valid = self._check_plugin(dir)

            if valid:
                plugins.append(plugin_name)
            else:
                print(
                    f"Plugin {plugin_name} is not valid. Continue without"
                    " registering this plugin."
                )

        return plugins

    def _check_plugin(self, path: str) -> bool:
        # TODO: Implement checks to validate plugin
        return True

    def _sortout(self, dirs: list) -> list:
        """Sort out directories which start with dot or underscores."""
        for d in dirs:
            if str(d.name).startswith(".") or str(d.name).startswith("__"):
                dirs.remove(d)
        return dirs

    def _import_plugin(self, modulename, name):
        """Import a named object from a module in the context of this function.
        Code adapted from: https://www.oreilly.com/library/view/python-cookbook/0596001673/ch15s04.html
        """
        try:
            module = __import__(modulename, globals(), locals(), [name])
        except ImportError:
            return None
        return vars(module)[name]

constants.py

Python
"""Definition of constants and types"""

from enum import Enum
from typing import NewType

import typer

# Definition of maximum image size for virtual camera.
# Higher resolutions are usually not supported on online meeting tools
# 720p:(1280, 720)  540p:(960, 540)
MAX_WIDTH = 1280
MAX_HEIGHT = 720

# Plugin types (avoid spelling errors with explicit type)
PluginType = NewType("PluginType", str)
DEPTHAI = PluginType("depthai")
WEBCAM = PluginType("webcam")

# Argument and Option types
TYPES = Enum("DevicePath", {"all": "all", WEBCAM: WEBCAM, DEPTHAI: DEPTHAI})
TypeArgument = typer.Option(default=WEBCAM, help=f"Choose camera type")
DevicePathWebcam = typer.Argument(
    default=..., help="Path to real camera device, e.g. /dev/video0."
)
DevicePathDepthai = typer.Argument(
    default=...,
    help="Path (mxid) to real camera device, e.g. 14442C1021C694D000.",
)

print.py

Python
"""This file contains a printing class which is used for common terminal prints."""

from typing import Any

import pyfiglet
import rich
from constants import DEPTHAI, WEBCAM
from rich.console import Console
from rich.style import Style
from rich.table import Table
from rich.text import Text
from v4l2ctl import V4l2Device


class Printer:
    """Handles printing functions for common terminal prints"""

    def __init__(self) -> None:
        """
        Initialize the DevicePrinter.

        Instantiates a Console object and initializes styles for different types of messages (danger, warning, ok).
        """
        self.console = Console()
        self.danger_style = Style(color="red", blink=True, bold=True)
        self.warning_style = Style(color="yellow", blink=True, bold=True)
        self.ok_style = Style(color="green", blink=True, bold=True)

    def available_devices(
        self,
        device_map: dict[int, dict[str, Any]],
        available_devices_real: tuple[
            list[str], list[str], list[int], list[str]
        ],
    ) -> None:
        """
        Print available real and virtual devices to the console.

        Args:
            device_map --- a mapping of device ids to device properties including paths and labels.
            available_devices_real --- a tuple containing lists of paths, labels, ids, and another attribute of real devices.

        The method extracts real and mapped device information and formats them for console printing.
        """

        (paths_real, labels_real) = available_devices_real

        devices_real = [
            "|" + label + "|" + str(path) + "|"
            for label, path in zip(labels_real, paths_real)
        ]
        devices_real = (
            str(devices_real).replace("'", "").replace(", ", "\n")[1:-1]
        )

        devices_mapped = [
            "|"
            + v["label_real"]
            + "|"
            + v["path_real"]
            + " -> |"
            + v["label_virtual"]
            + "|"
            + v["path_virtual"]
            + "|"
            for v in device_map.values()
        ]
        devices_mapped = (
            str(devices_mapped).replace("'", "").replace(", ", "\n")[1:-1]
        )

        table = Table()
        table.add_column(
            "Camera name", justify="right", style="cyan", no_wrap=True
        )
        table.add_column("Camera path", style="magenta", no_wrap=True)
        table.add_column(
            "Virtual camera name", justify="right", style="cyan", no_wrap=True
        )

        for rp, rn in zip(
            available_devices_real[0], available_devices_real[1]
        ):
            cam = {"label_real": rn, "path_real": rp, "label_virtual": None}
            for d in device_map.values():
                if d["path_real"] == rp:
                    cam["label_virtual"] = d["label_virtual"]

            table.add_row(
                str(cam["label_real"]),
                str(cam["path_real"]),
                str(cam["label_virtual"]),
            )

        self.console.print(table)

    def add_virtual_devices(
        self,
        available_devices_real: tuple[list[str], list[str]],
        type: str | int,
    ) -> None:
        """
        Print instructions to add virtual devices to the console.

        Args:
            available_devices_real --- a tuple containing lists of paths, labels, ids, and another attribute of real devices.

        The method extracts device labels and ids to create instructions for adding virtual devices and prints them to the console.
        """
        (device_paths, labels) = available_devices_real

        cli_cmd_single = {}
        labels_str = []

        if type == WEBCAM:
            ids = [
                int(path.replace("/dev/video", "")) for path in device_paths
            ]
            vd_nrs = ids
            for idx, label in zip(ids, labels):
                cli_cmd_single[label] = (
                    "`sudo modprobe v4l2loopback devices=1"
                    f" video_nr={idx} card_label='MeetingCam{idx} {label}'`"
                )
                labels_str.append(f"MeetingCam{idx} {label}")

            labels_str = str(labels_str).replace(", ", ",")[1:-1]
            vd_nrs = str(vd_nrs).replace(" ", "")[1:-1]
            cli_cmd_multi = (
                "`sudo modprobe v4l2loopback"
                f" devices={len(ids)} video_nr={str(vd_nrs)} card_label='{labels_str[1:-1]}'`"
            )
        elif type == DEPTHAI:
            ids = device_paths
            for idx, label in zip(ids, labels):
                cli_cmd_single[label] = (
                    "`sudo modprobe v4l2loopback devices=1"
                    f" card_label='MeetingCam{idx} {label}'`"
                )
                labels_str.append(f"MeetingCam{idx} {label}")

            labels_str = str(labels_str).replace(", ", ",")[1:-1]
            cli_cmd_multi = (
                "`sudo modprobe v4l2loopback"
                f" devices={len(ids)} card_label='{labels_str[1:-1]}'`"
            )

        else:
            raise NotImplementedError(
                "Device type needs to be either 'webcam' or 'depthai'."
            )

        if len(device_paths) > 0:
            self.console.print(
                "\n[bold]Add[/bold] a [bold]single device[/bold] with one of"
                " the following commands:"
            )
            for label, cmd in cli_cmd_single.items():
                cmd = Text(cmd[1:-1])
                self.console.print(f"\n{label}:")
                self.console.print(cmd, style="cyan bold")
            self.console.print(
                "\n\n[bold]Add all devices[/bold] with the following command:"
            )
            cmd = Text("\n" + cli_cmd_multi[1:-1])
            self.console.print(cmd, style="cyan bold")

    def device_not_available(self, device: V4l2Device) -> None:
        """Print a warning message indicating the specified device is not available.

        Args:
            device (V4l2Device) --- the device that is not available.
        """
        self.console.print("\nWarning! :warning:", style=self.warning_style)
        text = (
            "[bold]Device not available.[/bold]\nThe specified device"
            f" '{device}' is not available or does not have a virtual"
            " counterpart."
        )
        self.console.print(text)

    def add_device_first(self) -> None:
        """Print a message guiding the user to run add devices first."""
        self.console.print(
            "\nYou'll need to add a device first. Run [cyan"
            " bold]add-devices[/cyan bold] command to see how to add a"
            " virtual counterpart to a camera device.\n"
        )

    def reset_devices(self) -> None:
        """Print instructions on how to reset virtual devices."""
        self.console.print(
            "\n1. [bold]Close[/bold] all [bold]applications which access"
            " camera[/bold] devices, including browser."
            "\n2. Run: [bold][cyan]sudo modprobe -r v4l2loopback[/cyan][/bold]"
            "\n\nA reset is necessary if you want to access your real camera"
            " device normally or if you want to add another device to be used"
            " with MeetingCam.\nIf the reset command is isn't working make"
            " sure all applications which might access cameras are closed. If"
            " this still isn't working, restart your system.\n"
        )

    def device_running(self) -> None:
        """Print a message indicating that the device is currently running and how to access the stream."""
        self.console.print(
            "Device running! :arrow_forward:", style=self.ok_style
        )
        self.console.print(
            "You can now access the modified camera stream in Meets, Teams or"
            " Zoom. :rocket:",
            style=Style(bold=True),
        )
        self.console.print(
            "Press `<Ctrl>+C` to stop the running stream and access your"
            " device normally."
        )

    def device_stopped(self) -> None:
        """Print a message indicating that the device has stopped and can now be accessed normally."""
        self.console.print(
            "Device stopped. :raised_hand:", style=self.danger_style
        )
        self.console.print(
            "You can access your device now as usual. MeetingCam has stopped.",
            style=Style(bold=True),
        )

    def title(self) -> None:
        """Print MeeingCam title."""
        title = pyfiglet.figlet_format("MeetingCam", font="big")
        title = Text(title)
        title.stylize("bold green", 0, 234)
        title.stylize("bold magenta", 235)
        rich.print(title)

    def subtitle(self, name: str) -> None:
        """Print Plugin title."""
        title = pyfiglet.figlet_format(name, font="big")
        title = Text(title)
        rich.print(title)

    def epilog(self) -> str:
        """Get epilog text to be printed underneath help."""
        text = (
            "Submit feedback, issues and questions via"
            " https://github.com/nengelmann/MeetingCam/issues.\nPlease"
            " consider to star https://github.com/nengelmann/MeetingCam if you"
            " find it helpful. "
        )
        return text

MeetingCam

Here is the repository containing MeetingCam. Follow the installation instructions and run your custom AI and CV algorithm in a meeting today! πŸš€_

Credits

nengelmann
1 project β€’ 0 followers

Comments