Hardware components | ||||||
| × | 1 | ||||
| × | 1 | ||||
Software apps and online services | ||||||
| ||||||
|
We thought about how to share computer vision projects with friends and colleagues? π€How to spread the love for computer vision and open source tools like OpenCV? πCan we build something which makes it fast, fun and easy to show your own projects? π₯³
We introduce π₯...
MeetingCam π
MeetingCam allows you to run your AI and CV algorithms in meetings such as Zoom, Meets or Teams! πIt allows anyone to show their own computer vision projects during online meetings, it makes it easy and fast to set up a new project or wrap an existing project into a MeetingCam plugin. And it allows you to share your project with others.
Intro VideoBackground π‘You can use OpenCV and other tools to process and visualize images and videos. You can also use OpenCV to run processing functions directly on a webcam stream, which is awesome!
But you can not run your processing easily on a webcam stream while this camera is accessed by a meeting tool like Zoom, Meets or Teams.
Here MeetingCam comes into the game.It uses libraries like pyvirtualcam
, opencv-python
and typer
to make it easy to run custom image processing on a webcam stream and forward it to the meeting tool of your choice.
The project is available here: MeetingCam.
It comes with a MIT license, which gives you the flexibility to do nearly anything you want. β€οΈ
It will work with any webcam but also with an Azure Kinect or even Depthai cameras (OAKs). π―Currently, it only runs on Linux systems, but this might be extended in future (contributions are welcome). π₯οΈ
Plugins πThe project is based on a plugin architecture, which means you can use this tool with example plugins, with your custom plugins or even plugins of friends or colleagues.
A plugin is basically a tiny Python package inside the MeetingCams plugin directory. It contains the necessary code to initialize and run a specific image processing pipeline, which can be customized to your needs.
Plugins are automatically invoked and can be managed (run, create, delete) with MeetingCam. Think of a plugin as an app you can run. This way you can create and maintain lots of different "apps" which you can run during your meetings.
Features β¨- Tool for running CV/AI in meetings
- Set of example plugins
- Creation of custom plugins
- Hotkey triggers to trigger events during meetings
- Support for Depthai on device compute or as webcam
- Support for Roboflow models and inference
Install as described in the repositories' readme.
Run 'main.py' to see all available commands and options.
python ./src/meetingcam/main.py
List camera devices gives you an overview of available webcams in your system.
python src/meetingcam/main.py list-devices
Then add a camera device you want to use and modify.
python src/meetingcam/main.py add-devices
Now copy and execute the command of the camera you want to use.This step is necessary because you should execute these system level operations yourself, that ensures you are in power. However, we construct the command based on your specific cameras and setup to make it as easy as possible to use.
After that you are able to run all plugins with the following command.
python src/meetingcam/main.py PLUGIN_NAME [OPTIONS] DEVICE_PATH
See also the examples below! π
Example Plugins βΆοΈfirst-person face detector π¦±
Prints your face detection and name in your webcam stream.It's possibility to hide the bounding box and name imprint by hotkey triggers (<Ctrl>+<Alt>+f
/ <Ctrl>+<Alt>+n
)
roboflow general π€
The robflow general plugin allows you to run any object detection or instance segmentation model from roboflow on your webcam stream. You can now use models you build with roboflow within your meetings!
depthai yolov5 π€
The example depthai plugin is a yolov5 model trained on the COCO dataset. The model computation takes place on the depthai device, the camera detections are imprinted into the image before streaming to your meeting tool of choice.
Getting Started β‘We recommend to start with the first-person face detector. βΆοΈ
Make sure your in the MeetingCam root directory
- Make sure you're in the MeetingCam root directory
- Download and convert a face detection model, then opt out from sending usage data with the following commands.
omz_downloader --name ultra-lightweight-face-detection-rfb-320 --output_dir src/meetingcam/models
omz_converter --name ultra-lightweight-face-detection-rfb-320 --download_dir src/meetingcam/models --output_dir src/meetingcam/models --precision=FP16
opt_in_out --opt_out
- Get the command to add a camera device (create virtual camera). Just needed if you haven't done already.
python src/meetingcam/main.py add-devices
- Then copy and execute the command of the camera you want to use.
- List your devices and remember the
device_path
of the camera you want to use (e.g./dev/video0
)
python src/meetingcam/main.py list-devices
- Run the face detection plugin! π
python src/meetingcam/main.py face-detector --name yourname /dev/video0
If anything goes wrong, the response in the command line should give you a good guidance.If there are bigger road blocks, feel free to open an issue here.
Custom Plugins π₯Now the fun begins!Write your own plugin or wrap an existing project into a plugin. To do so run:
python src/meetingcam/main.py create-plugin
You will need to enter a plugin name and optionally a short and normal description. This will create a new tiny Python package inside /src/meetingcam/plugins
.
Run the new plugin to ensure the creation worked smoothly.
python src/meetingcam/main.py your_plugin /dev/your_device
Then you can modify the /src/meetingcam/plugins/yourplugin/plugin.py
file according to your needs. The created code in the template is commented and is self explaining. If there is any open question checkout the custom plugin section, our FAQ and if not clear by then open an issue here.
Go and start with MeetingCam here.Create your custom plugin with your own CV and AI ideas and show them in web meetings! π
Spread the love for Computer Vision and OpenSource Tools like OpenCV. π
#!/usr/bin/env python3
"""This script is the entry point of MeetingCam.
MeetingCam allows to manage and modify video streams and route them to web meeting tools like Teams, Meets or Zoom.
"""
import shutil
import sys
from pathlib import Path
import typer
from constants import DEPTHAI, TYPES, WEBCAM, TypeArgument
from device import DepthaiDevice, WebcamDevice
from jinja2 import Environment, FileSystemLoader
from plugins.plugin_utils import PluginRegistry
from print import Printer
app = typer.Typer(add_completion=False)
registry = PluginRegistry()
plugin_list = registry.search_plugins()
registry.register_plugins(app, plugin_list)
printer = Printer()
# app entry point of typer main app
@app.callback(
invoke_without_command=True,
epilog=printer.epilog(),
no_args_is_help=False,
help=(
"AI and CV webcam utility for online meetings.\n\nRun your artificial"
" intelligence and computer vision algorithms in online meetings such"
" as Zoom, Meets or Teams! "
),
context_settings={
"help_option_names": ["-h", "--help"],
},
)
def main(ctx: typer.Context) -> None:
"""
Typer app entry point.
Print title, subtitle or help, depending on context.
"""
for plugin in app.registered_groups:
if ctx.invoked_subcommand == plugin.name:
printer.subtitle(plugin.name)
if ctx.invoked_subcommand is None:
printer.title()
ctx.get_help()
# General-Commands
@app.command(
help="List all camera devices",
context_settings={"help_option_names": ["-h", "--help"]},
rich_help_panel="General-Commands",
)
def list_devices(type: TYPES = TypeArgument) -> None:
"""List all camera devices, camera paths and their virtual counter part."""
type = str(type.value)
if type == DEPTHAI or type == "all":
depthai_handler = DepthaiDevice(pipeline=None)
printer.console.print(
"\nAvailable depthai devices:", style="bold underline"
)
printer.available_devices(
depthai_handler.mapping, depthai_handler.available_devices_real
)
if type == WEBCAM or type == "all":
webcam_handler = WebcamDevice()
printer.console.print(
"\nAvailable webcam devices:", style="bold underline"
)
printer.available_devices(
webcam_handler.mapping, webcam_handler.available_devices_real
)
@app.command(
help="List commands to add camera devices",
context_settings={"help_option_names": ["-h", "--help"]},
rich_help_panel="General-Commands",
)
def add_devices(type: TYPES = TypeArgument) -> None:
"""List commands on how to add camera devices to be used with MeetingCam."""
type = str(type.value)
depthai_handler = DepthaiDevice(pipeline=None)
webcam_handler = WebcamDevice()
if depthai_handler.mapping or webcam_handler.mapping:
printer.console.print("\nThere are already virtual devices available!")
reset_devices()
sys.exit(0)
if type == DEPTHAI or type == "all":
if len(depthai_handler.available_devices_real[0]) > 0:
printer.console.print(
"\nAdd depthai devices:", style="bold underline"
)
printer.add_virtual_devices(
depthai_handler.available_devices_real, DEPTHAI
)
else:
printer.console.print(
"\nNo depthai device available. Make sure to connect a depthai"
" device to your PC.\n",
style=printer.warning_style,
)
sys.exit(0)
if type == WEBCAM or type == "all":
if len(webcam_handler.available_devices_real[0]) > 0:
printer.console.print(
"\nAdd webcam devices:", style="bold underline"
)
printer.add_virtual_devices(
webcam_handler.available_devices_real, WEBCAM
)
else:
printer.console.print(
"\nNo webcam device available. Make sure to connect a webcam"
" device to your PC.\n",
style=printer.warning_style,
)
sys.exit(0)
if type == DEPTHAI or type == WEBCAM or type == "all":
printer.console.print(
"\nYou can [bold]reset[/bold] the added [bold]virtual"
" devices[/bold] running:\n[bold][cyan]sudo modprobe -r"
" v4l2loopback[/cyan][/bold]\nIf you want to run a reset, please"
" close all applications which might access a camera device (e.g."
" your browser) before running this command.\nIf you previously"
" added devices, a reset is necessary before adding a new camera"
" device.\n"
)
@app.command(
help="List reset command to reset virtual camera devices",
context_settings={"help_option_names": ["-h", "--help"]},
rich_help_panel="General-Commands",
)
def reset_devices() -> None:
"""List command on how to reset the added (virtual) camera devices."""
printer.reset_devices()
@app.command(
help="Create new plugin",
context_settings={"help_option_names": ["-h", "--help"]},
rich_help_panel="General-Commands",
)
def create_plugin(
name: str = typer.Option(default=None, help="Name of new plugin"),
short_description: str = typer.Option(
default=None, help="Short one line description of new plugin"
),
description: str = typer.Option(
default=None, help="Description of new plugin"
),
) -> None:
"""Create a new plugin"""
if name in plugin_list:
typer.echo(f"Plugin {name} already exists.")
sys.exit(0)
if not name:
name = typer.prompt("Name of new plugin")
if not short_description:
short_description = typer.prompt(
"Short one line description of new plugin"
)
if not description:
description = typer.prompt("Description of new plugin")
path = Path(f"src/meetingcam/plugins/{name}")
path.mkdir(parents=True, exist_ok=True)
template_path = Path("src/meetingcam/plugins/plugin_template.py")
env = Environment(loader=FileSystemLoader(template_path.parent.as_posix()))
template = env.get_template(template_path.name)
output = template.render(
name=name, short_description=short_description, description=description
)
with open(f"{path}/plugin.py", "w") as f:
f.write(output)
with open(f"{path}/__init__.py", "w") as f:
f.write("")
typer.echo(f"Plugin {name} created successfully.")
@app.command(
help="Delete plugin",
context_settings={"help_option_names": ["-h", "--help"]},
rich_help_panel="General-Commands",
)
def delete_plugin(
name: str = typer.Option(default=None, help="Name of plugin to delete")
) -> None:
"""Delete a plugin"""
if name not in plugin_list:
typer.echo(f"Plugin {name} does not exist.")
sys.exit(0)
confirm = typer.confirm(f"Are you sure you want to delete plugin {name}?")
if not confirm:
typer.echo(f"Plugin {name} not deleted.")
sys.exit(0)
else:
path = Path(f"src/meetingcam/plugins/{name}")
shutil.rmtree(Path(f"src/meetingcam/plugins/{name}"))
typer.echo(f"Plugin {name} deleted successfully.")
if __name__ == "__main__":
app()
"""This file contains a runner class which is initializing and running the main loop within MeetingCam."""
import cv2
import pyvirtualcam
from constants import DEPTHAI, WEBCAM
from device import DepthaiDevice, WebcamDevice
from plugins.plugin_utils import PluginBase, PluginDepthai
from utils import KeyHandler
class Runner:
def __init__(
self,
plugin: PluginBase | PluginDepthai,
device_path: str | None = None,
) -> None:
"""Initialize the device handler, real and virtual camera devices.
Args:
plugin --- Plugin which should be run.
device_path --- The device path for the real camera. Defaults to None.
"""
if type(device_path) is not str and type(device_path) is not None:
device_path = device_path.value
self.plugin = plugin
if self.plugin.type == DEPTHAI:
# initialize camera device handler with depthai as input device
self.device_handler = DepthaiDevice(plugin.pipeline)
self.virtual_path = self.device_handler.init_device(device_path)
elif self.plugin.type == WEBCAM:
# initialize camera device handler with webcam as input device
self.device_handler = WebcamDevice()
self.virtual_path = self.device_handler.init_device(device_path)
else:
raise ValueError(
"Your plugin needs to have a plugin type assigned."
" plugin.type: WEBCAM | DEPTHAI"
)
def run(self) -> None:
"""Main loop for video frame capture and processing within MeetingCam."""
if self.plugin.verbose:
# print available hotkeys
print(
"\nThe following keyboard triggers and switches are available"
" within this plugin:"
)
for h in KeyHandler().default_hotkeys:
print(f"{h.hotkey}: {h.description}")
for h in self.plugin.hotkeys:
print(f"{h.hotkey}: {h.description}")
print("")
# initialize real camera, to get frames
with self.device_handler.get_device() as r_cam:
# custom setup for depthai (on device handling)
if self.plugin.type == DEPTHAI:
r_cam.setup(self.plugin.device_setup, self.plugin.acquisition)
# initialize a virtual camera where the modified frames will be sent to
with pyvirtualcam.Camera(
width=r_cam.width,
height=r_cam.height,
fps=24,
device=self.virtual_path,
) as v_cam:
# initialize a keyboard keyhandler to get and use keystroke during runtime as trigger or switch
with self.plugin.keyhandler() as keyhandler:
keyhandler.start()
# print in command line that the pipeline is running
self.device_handler.device_running()
# get frames from real camera, process it and sent it out via virtual camera
while True:
# get a frame and optionally some on camera detections
frame, detection = r_cam.get_frame()
# convert bgr to rgb if <Ctrl>+<Alt>+r keys are pressed
if keyhandler.bgr2rgb:
frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
frame = self.plugin.process(
frame, detection, keyhandler
)
# flip image if <Ctrl>+<Alt>+m keys are pressed
if keyhandler.mirror:
frame = cv2.flip(frame, 1)
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
# sent out the modified frame to the virtual camera
v_cam.send(frame)
v_cam.sleep_until_next_frame()
from types import MethodType
from typing import Any, Callable
import cv2
import depthai
from constants import MAX_HEIGHT, MAX_WIDTH
from numpy.typing import NDArray
from pynput import keyboard
from typing_extensions import Self
class VideoCapture(cv2.VideoCapture):
"""Handle video capture functionalities with OpenCV.
Inherits from cv2.VideoCapture and extends functionalities with
methods to safely enter, exit, get frames and frame rates from
a video capture.
"""
def __init__(self, *args: Any, **kwargs: Any) -> None:
"""Initialize the VideoCapture object with width and height properties."""
super().__init__(*args, **kwargs)
cam_width = int(self.get(cv2.CAP_PROP_FRAME_WIDTH))
cam_height = int(self.get(cv2.CAP_PROP_FRAME_HEIGHT))
self.width = min(cam_width, MAX_WIDTH)
self.height = min(cam_height, MAX_HEIGHT)
self.img_handler = ImageHandler()
def __enter__(self) -> Self:
"""Enter method for context management, returning self."""
return self
def __exit__(self, *args: Any) -> None:
"""Exit method for context management, releasing the video capture object."""
self.release()
def get_frame(self) -> NDArray[Any]:
"""Capture a frame from the video stream.
Raises:
RuntimeError: If image acquisition fails.
Returns:
A frame which has been captured by the camera.
"""
(grabbed, frame) = self.read()
if not grabbed:
raise RuntimeError(
"Image acquisition failed. Make sure the specified camera is"
" not running in another application."
)
# high image resolution is usually not supported by online meeting tools
frame = self.img_handler.correct_img_size(frame)
# return the frame and None, there is no detection available from a webcam
return frame, None
def get_fps(self) -> int:
"""Retrieve the frames per second (FPS) of the video capture.
Prints the total FPS and returns the FPS as an integer.
"""
fps = self.get(cv2.CAP_PROP_FPS)
print(f"total FPS: {int(fps)}")
return int(fps)
class DepthaiCapture(depthai.Device):
"""Handle video capture functionalities with depthai.
Inherits from depthai.Device and extends functionalities with
methods to safely enter, exit, get frames and frame rates from
a depthai device.
"""
def __init__(self, *args: Any, **kwargs: Any) -> None:
"""Initialize the depthai.Device object and assign width and height properties."""
super().__init__(*args, **kwargs)
self.width = MAX_WIDTH
self.height = MAX_HEIGHT
self.img_handler = ImageHandler()
def __enter__(self) -> Self:
"""Enter method for context management, returning self."""
return self
def __exit__(self, *args: Any) -> None:
"""Exit method for context management, releasing the video capture object."""
self.close()
def get_frame(self) -> NDArray[Any]:
"""
Capture a frame from the video stream.
Returns:
A frame and potentially detections which has been captured by the camera.
"""
# get image and detections from depthai plugin (image acquisition function)
img, det = self.acquisition()
# high image resolution is usually not supported by online meeting tools
img = self.img_handler.correct_img_size(img)
return img, det
def setup(self, setup_func: Callable, acquisition_func: Callable):
"""Setup initializations defined in the plugins setup function and create an acquisition function also based on plugin specification.
Args:
setup_func --- initialization function of plugin
acquisition_func --- handling function for image acquisition of plugin
"""
setup_func(self)
self.acquisition = MethodType(acquisition_func, self)
def get_fps(self) -> int:
"""Retrieve the frames per second (FPS) of the video capture.
Prints the total FPS and returns the FPS as an integer.
"""
# fps =
# print(f"total FPS: {int(fps)}")
# return int(fps)
raise NotImplementedError
class KeyHandler(keyboard.GlobalHotKeys):
"""A class to handle global hotkeys and their functionalities.
Inherits from keyboard.GlobalHotKeys and defines hotkey triggers.
"""
def __init__(
self, plugin_hotkeys: dict[str, str] = {}, verbose=False
) -> None:
"""Initialize the KeyHandler with hotkeys and their respective states.
Args:
hotkeys: A dictionary of hotkeys and their respective variable name.
"""
self.hotkeys = {}
self.verbose = verbose
self.default_hotkeys = [
Hotkey(
"<ctrl>+<alt>+r",
"bgr2rgb",
False,
"Switch RGB to BGR color schema.",
),
Hotkey(
"<ctrl>+<alt>+m", "mirror", True, "Mirror the camera stream."
),
]
for h in self.default_hotkeys:
self.add_trigger(h.hotkey, h.variable, h.enabled)
for h in plugin_hotkeys:
self.add_trigger(h.hotkey, h.variable, h.enabled)
super().__init__(self.hotkeys)
def __enter__(self) -> Self:
"""Enter method for context management, returning self."""
return self
def __exit__(self, *args: tuple[Any]) -> None:
"""Exit method for context management, stopping the KeyHandler."""
self.stop()
def add_trigger(
self, hotkey: str, variable: str, enabled: bool = True
) -> None:
"""Add a new trigger function for a specific hotkey.
Args:
hotkey: The hotkey string.
variable: The name of the variable to toggle.
"""
self.validate(hotkey, variable)
setattr(self, variable, enabled)
def trigger_func():
setattr(self, variable, not getattr(self, variable))
if self.verbose:
print(
"Triggered: ",
hotkey,
" ",
variable,
" ",
getattr(self, variable),
)
self.hotkeys[hotkey] = trigger_func
def validate(self, hotkey: str, variable: str) -> None:
"""Check if a hotkey and variable names are valid and not existing.
Args:
hotkey: The hotkey string.
variable: The name of the variable to toggle."""
if not isinstance(hotkey, str):
raise ValueError(
"Hotkey must be a string. For example: '<Ctrl>+<Alt>+z'"
)
if not isinstance(variable, str):
raise ValueError(
"Variable must be a string. For example: 'trig_var'"
)
if variable in self.hotkeys.values():
raise ValueError(f"Variable name '{variable}' already exists.")
if hotkey in self.hotkeys.keys():
raise ValueError(f"Hotkey '{hotkey}' already exists.")
class Hotkey:
"""Hotkey data structure."""
def __init__(
self,
hotkey: str,
variable: str,
enabled: bool = True,
description: str = None,
) -> None:
"""Initialize the Hotkey object with a hotkey string, a variable name and a boolean value indicating whether the hotkey is enabled or not.
Args:
hotkey: The hotkey string.
variable: The name of the variable to toggle.
enabled: A boolean value indicating whether the hotkey is enabled or not.
description: A description of the hotkey.
"""
self.hotkey = hotkey
self.variable = variable
self.enabled = enabled
self.description = description
class InvalidArgumentError(ValueError):
"""Custom exception for invalid arguments, inherits from ValueError."""
def __init__(self, message: str = "Invalid argument") -> None:
"""Initialize the exception with a custom or default message."""
super().__init__(message)
class ArgumentHandler:
"""A class to handle and validate command-line arguments."""
def __init__(self, ctx_args: list[str]) -> None:
"""Initialize the ArgumentHandler and check the provided arguments."""
self.check(ctx_args)
def check(self, ctx_args: list[str]) -> None:
"""Check the validity of the command-line arguments.
Raises:
InvalidArgumentError: If the arguments are not correctly formatted.
"""
if len(ctx_args) < 2 and len(ctx_args) != 0:
raise InvalidArgumentError(
"You need to provide extra arguments in the form '--name'"
" 'argument'. "
)
if bool(len(ctx_args) % 2):
raise InvalidArgumentError(
"You need to provide extra arguments in the form '--name'"
" 'argument'. Multiple arguments per parameter are not"
" allowed. The following is invalid: '--your_param argument1"
" argument2', while '--your_param argument1argument2' or"
" '--your_param1 argument1 --your_param2 argument2' are valid."
)
def print(self, ctx_args: list[str]) -> None:
"""Print the list of provided command-line arguments or a message if none were provided."""
if len(ctx_args) > 0:
print("\nProvided extra arguments are:")
for param, arg in zip(ctx_args[0::2], ctx_args[1::2]):
print(param, ": ", arg)
else:
print("\nNo extra arguments provided.\n")
def get(self, pram_name: str, ctx_args: list[str]) -> str | None:
"""Retrieve the value of a specific parameter from the command-line arguments.
Args:
pram_name --- the name of the parameter to retrieve.
ctx_args --- a list of command-line arguments.
Returns:
The value of the parameter if found, otherwise None.
"""
if pram_name in ctx_args:
idx = ctx_args.index(pram_name) + 1
arg = ctx_args[idx]
return arg
else:
return None
class ImageHandler:
"""A class to handle image processing functions."""
def __init__(self) -> None:
"""Initialize placeholder."""
pass
def correct_img_size(self, image: NDArray[Any]) -> NDArray[Any]:
"""Check image for maximum image size and resize if exceeded.
The resizing will just be applied on the exceeding dimension (width, height or both) without taking the aspect ratio into account.
Args:
image --- the image to be processed.
Returns:
The processed image.
"""
if image.shape[0] > MAX_HEIGHT or image.shape[1] > MAX_WIDTH:
h = min(image.shape[0], MAX_HEIGHT)
w = min(image.shape[1], MAX_WIDTH)
image = cv2.resize(image, (w, h))
return image
"""This file facilitates the management and utilization of virtual and real camera devices."""
import re
import signal
import sys
from enum import Enum
from pathlib import Path
from types import FrameType
from typing import Any, Optional
import depthai
from constants import DEPTHAI, WEBCAM, PluginType
from print import Printer
from utils import DepthaiCapture, VideoCapture
from v4l2ctl import V4l2Capabilities, V4l2Device
class V4l2Capture(V4l2Device):
"""Handle real camera devices.
Inherits from v4l2ctl.V4l2Device and extends functionalities with
methods to safely enter and exit.
"""
def __enter__(self) -> V4l2Device:
"""Enter method for context management, returning self."""
return self
def __exit__(self, *args: tuple[Any]) -> None:
"""Exit method for context management, releasing the devices."""
self.flush()
self.close()
class DeviceHandler:
"""Base class for management and mapping of real and virtual devices."""
def __init__(self) -> None:
"""Initialize the DeviceHandler by setting up necessary mappings and signal handler."""
signal.signal(signal.SIGINT, self._interrupt)
self.mapping = self.device_mapping()
self.pprint = Printer()
def _interrupt(self, signal: int, frame: FrameType | None) -> None:
"""Handle SIGINT signal by stopping the device and exiting the program."""
self.pprint.device_stopped()
sys.exit(0)
def init_device(self, device_path: str | None) -> str:
"""Initialize the device by analyzing the device path and mapping it a virtual device.
Args:
device_path -- the path of the device to initialize, None if not specified
Returns:
A a string containing the path to the virtual device.
"""
device_map = {
device["path_real"]: device["path_virtual"]
for device in self.mapping.values()
}
if device_path:
# device is specified as argument in command line
if device_path in device_map.keys():
# check if specified device is available in real devices and has virtual counterpart
self.real_path = device_path
virtual_path = device_map[device_path]
return virtual_path
else:
# if not print that the specified device needs to have a virtual counterpart and how to do it
self.pprint.device_not_available(device_path)
self.pprint.available_devices(
self.mapping, self.available_devices_real
)
self.pprint.add_device_first()
sys.exit()
else:
raise ValueError(
f"Device path is {device_path}, which is not a valid device"
" path."
)
def device_mapping(self) -> dict[int, dict[str, str]]:
"""Create a mapping from real camera devices to virtual camera devices.
Returns:
A dictionary that contains information about real and virtual devices.
"""
device_map = {}
(paths_real, labels_real) = self.available_devices_real
(paths_virtual, labels_virtual) = self.available_devices_virtual
# extract the IDs (first number in virtual camera label) of available virtual cameras for mapping it to real cameras
# the mapping index is same as real cameras path id. e.g. /dev/video_n_ with n as id
if self.type == WEBCAM:
mapping_ids = [
int(re.findall(r"\d+", label)[0]) for label in labels_virtual
]
ids_real = [
int(path.replace("/dev/video", "")) for path in paths_real
]
try:
mapping_indices = [ids_real.index(id) for id in mapping_ids]
except:
mapping_indices = {}
elif self.type == DEPTHAI:
mapping_indices = []
for i, mxid in enumerate(paths_real):
for j, label in enumerate(labels_virtual):
if mxid in label:
mapping_indices.append(j)
else:
raise NotImplementedError(
"Device type needs to be either 'webcam' or 'depthai'."
)
for i, j in enumerate(mapping_indices):
device_map[i] = {
"path_real": paths_real[j],
"label_real": labels_real[j],
"path_virtual": paths_virtual[i],
"label_virtual": labels_virtual[i],
}
return device_map
def get_available(self, real: bool) -> tuple[list[str], list[str]]:
"""Return a list of available real or virtual camera devices based on the input flag.
Args:
real --- a flag to determine whether to return real or virtual devices.
Returns:
A tuple containing lists of device paths and labels.
"""
device_paths = []
labels = []
with V4l2Capture() as v4l2:
for device in v4l2.iter_devices(skip_links=True):
if (
real
and not self._is_virtual_device(device)
and V4l2Capabilities.VIDEO_CAPTURE in device.capabilities
):
device_path, label = self._get_device_info(device)
elif (
not real
and self._is_virtual_device(device)
and V4l2Capabilities.VIDEO_OUTPUT in device.capabilities
):
device_path, label = self._get_device_info(device)
else:
continue
labels.append(label)
device_paths.append(device_path)
return device_paths, labels
def device_running(self) -> None:
"""Print the running device."""
self.pprint.device_running()
def _is_virtual_device(self, device: V4l2Device) -> bool:
"""Check if the device is a virtual device.
Args:
device (V4l2Device) --- the device to check.
Returns:
True if the device is virtual, False otherwise.
"""
virtual = True if "platform:v4l2loopback" in str(device.bus) else False
return virtual
def _get_device_info(self, device: V4l2Device) -> tuple[str, str]:
"""Extract and return device information including path and label.
Args:
device (V4l2Device) --- the device to extract information from.
Returns:
A tuple containing the device path (str) and label (str).
Raises:
AssertionError: If the device path does not follow the expected format or does not exist.
"""
device_path = str(device.device)
card_label = str(device.name)
assert device_path.__contains__(
"/dev/video"
), "Device name '{device_name}' should be of format '/dev/video_n_'."
assert Path(
device_path
).exists(), "Device path '{device_name}' does not exist."
return device_path, card_label
class WebcamDevice(DeviceHandler):
"""Handles the management and mapping of real and virtual webcam devices."""
def __init__(self) -> None:
"""Initialize the WebcamDevice class."""
self.type = WEBCAM
self.real_path = None
self.available_devices_real = None
self.available_devices_virtual = None
self.update_available()
super().__init__()
def get_device(self) -> VideoCapture:
"""Get a VideoCapture class instance can be used for webcam image acquisition.
Raises:
LookupError in case the device path is incorrect.
Returns:
A VideoCapture class instance.
"""
if self.real_path:
return VideoCapture(self.real_path)
else:
raise LookupError(
"self.real_path is {self.real_path}. Device initialization"
" 'self.init_device(device_path)' is needed before a device"
" instance is created."
)
def update_available(self) -> None:
"""Update the list of available real and virtual devices."""
self.available_devices_real = self.get_available(real=True)
self.available_devices_virtual = self.get_available(real=False)
class DepthaiDevice(DeviceHandler):
"""Handles the management and mapping of depthai and virtual devices."""
def __init__(self, pipeline) -> None:
"""Initialize the DepthaiDevice class."""
self.type = DEPTHAI
self.real_path = None
self.available_devices_real = None
self.available_devices_virtual = None
self.update_available()
super().__init__()
self.usb_speed = depthai.UsbSpeed.SUPER_PLUS
self.device_info = None
self.pipeline = pipeline
def get_device(self) -> DepthaiCapture:
"""Get a DepthaiCapture class instance can be used for depthai image acquisition.
Raises:
LookupError in case the device path is incorrect.
Returns:
A DepthaiCapture class instance.
"""
if self.real_path:
self.device_info = depthai.DeviceInfo(self.real_path)
return DepthaiCapture(
pipeline=self.pipeline,
deviceInfo=self.device_info,
maxUsbSpeed=self.usb_speed,
)
else:
raise LookupError(
"self.real_path is {self.real_path}. Device initialization"
" 'self.init_device(device_path)' is needed before a device"
" instance is created."
)
def update_available(self) -> None:
"""Update the list of available real and virtual devices."""
self.available_devices_real = self.get_available_depthai()
self.available_devices_virtual = self.get_available(real=False)
def get_available_depthai(
self,
) -> tuple[list[str], list[str],]:
"""Get available depthai camera devices.
Returns:
Device paths and labels.
"""
device_paths = []
labels = []
device_info = depthai.Device.getAllAvailableDevices()
for device in device_info:
labels.append(f"OAK Device on port {device.name}")
device_paths.append(device.mxid)
return device_paths, labels
def device_choice(type: PluginType) -> Enum | Optional[str]:
"""Get default values for device paths if available"""
if type == WEBCAM:
device_handler = WebcamDevice()
elif type == DEPTHAI:
device_handler = DepthaiDevice(pipeline=None)
else:
raise ValueError(
f"Device type needs to be '{WEBCAM}' or '{DEPTHAI}' not '{type}'"
)
if device_handler.mapping:
devices = {
d["path_real"]: d["path_real"]
for d in device_handler.mapping.values()
}
DevicePath = Enum("DevicePath", devices)
return DevicePath
else:
return Optional[str]
from typing import Any, Optional, Type
import cv2
import typer
from constants import WEBCAM, DevicePathWebcam
from device import device_choice
from numpy.typing import NDArray
from runner import Runner
from utils import Hotkey, KeyHandler
from ..plugin_utils import PluginBase
name = "{{name}}" # modifiable
short_description = "{{short_description}}" # modifiable
description = """
{{description}}""" # modifiable
TYPE = WEBCAM
DevicePath = device_choice(TYPE)
plugin_txt = f"\n\n\n\nPlugin type: {TYPE}"
plugin_app = typer.Typer(
name=name,
context_settings={"help_option_names": ["-h", "--help"]},
no_args_is_help=True,
short_help=short_description,
help=str(description + plugin_txt),
invoke_without_command=True,
)
plugin_app.type = TYPE
class CustomPlugin(PluginBase):
"""A custom plugin.
This class is based on a template and meant for modification to make it your custom plugin.
"""
def __init__(
self, arg1: str | None = None, arg2: str | None = None
) -> None:
"""Initialize the plugin wit custom arguments, variables and classes.
Args:
arg1 --- argument you define.
arg2 --- argument you define.
"""
super().__init__()
self.type = TYPE
# modify arguments and
# initialize additional variables or classes (e.g. your AI model)
# e.g.:
# self.arg1 = arg1
# you can use the model path to save and load your model weights
# e.g.:
# self.model_path = (Path(self.model_dir) / "your_model.pth")
# define custom triggers (hotkeys, which you can trigger and enable/disable functionality)
# e.g.:
# self.hotkeys = {
# "<Ctrl>+<Alt>+z", "z_trigger", True, "toggle text",
# "<ctrl>+<alt>+p", "p_trigger", False, "toggle text",
# key_combination, variable_name, is_enabled, description
# }
# this will get you two triggers (keyhandler.z_trigger, keyhandler.p_trigger), set to be True, False respectively
# you can use them to enable/disable functionality based on the trigger state
self.hotkeys = [
Hotkey("<Ctrl>+<Alt>+z", "z_trigger", True, "toggle text"),
]
# set verbose to True to print additional information like available hotkeys and hotkey changes
self.verbose = True
def process(
self,
image: NDArray[Any],
detection: Any,
keyhandler: Type[KeyHandler],
) -> NDArray[Any]:
"""Process the webcam image and return a modified image.
Args:
image --- the input image (real camera frames) to be processed.
detection --- the on camera detection (just in case of depthai camera).
keyhandler --- keyhandler instance to enable/disable functionality by hotkey trigger.
Returns:
The processed image which will be sent to the virtual camera (video meeting stream)
"""
# implement your custom image processing here
# e.g. run opencv functions on the image or your AI model
image = cv2.putText(
image,
"Your custom plugin",
(50, 75),
fontFace=cv2.FONT_HERSHEY_SIMPLEX,
fontScale=1,
color=(0, 255, 0),
thickness=2,
)
# If z_trigger <Ctrl>+<Alt>+z is True, print in additional text
if keyhandler.z_trigger:
image = cv2.putText(
image,
"Toggle this text with <Ctrl>+<Alt>+z",
(50, 200),
fontFace=cv2.FONT_HERSHEY_SIMPLEX,
fontScale=0.7,
color=(0, 255, 0),
thickness=1,
)
# If not z_trigger <Ctrl>+<Alt>+z, print next steps
if not keyhandler.z_trigger:
image = cv2.putText(
image,
"Great! Now you can start to modify this plugin. ;)",
(50, 300),
fontFace=cv2.FONT_HERSHEY_SIMPLEX,
fontScale=0.7,
color=(0, 255, 0),
thickness=1,
)
return image
@plugin_app.callback(rich_help_panel="Plugin-Commands")
def main(
device_path: DevicePath = DevicePathWebcam,
arg1: Optional[str] = typer.Option(
default="", help="Your custom argument 1."
),
arg2: Optional[str] = typer.Option(
default="", help="Your custom argument 2."
),
):
# define plugin
plugin = CustomPlugin(arg1, arg2)
# define runner
runner = Runner(plugin, device_path)
# run
runner.run()
from abc import ABC, abstractmethod
from pathlib import Path
from typing import Any
import depthai
import typer
from constants import DEPTHAI, WEBCAM
from numpy.typing import NDArray
from utils import KeyHandler
class PluginBase(ABC):
"""PluginBase class acts as a base class for plugins with image processing capabilities.
This class is intended to be subclassed, with subclasses implementing the abstract
`__init__` and `process` method to perform image processing tasks.
Args:
model_dir --- directory where models are stored. Defaults to "./src/meetingcam/models".
"""
@abstractmethod
def __init__(self, *args: tuple[Any], **kwargs: tuple[Any]) -> None:
"""Initialize the PluginBase instance with the default model directory.
Args:
*args --- Variable length argument list.
**kwargs --- Arbitrary keyword arguments.
With this custom arguments can be passed to plugins, e.g. --name YourName
"""
self.model_dir = "./src/meetingcam/models"
self.type = WEBCAM # defaults to webcam
self.hotkeys = {}
self.verbose = False
@abstractmethod
def process(
self,
image: NDArray[Any],
detection: Any,
trigger: tuple[bool, bool, bool],
) -> NDArray[Any]:
"""Process an image through the plugin's image processing method.
This method is abstract and should be overwritten in subclasses to implement
specific image processing functionality.
Args:
image --- the image to be processed.
Returns:
The processed image.
"""
pass
def keyhandler(self) -> KeyHandler:
"""Return the keyhandler for this plugin."""
return KeyHandler(self.hotkeys, self.verbose)
class PluginDepthai(PluginBase):
"""PluginBase class for Depthai plugins with on device compute."""
@abstractmethod
def __init__(self, *args: tuple[Any], **kwargs: tuple[Any]) -> None:
"""Initialize the PluginBase instance with the default model directory.
Args:
*args --- Variable length argument list.
**kwargs --- Arbitrary keyword arguments.
With this custom arguments can be passed to plugins, e.g. --name YourName
"""
super().__init__()
self.type = DEPTHAI
@abstractmethod
def device_setup(self, device) -> None:
"""Setup of device before image acquisition loop, e.g. for get queue definition etc.
Args:
device --- depthai device
"""
pass
@abstractmethod
def acquisition(
self, device
) -> tuple[NDArray[Any], list[depthai.ImgDetection]]:
"""Acquire an image and optionally detections from camera queue and return them.
Args:
device --- depthai device
"""
pass
class PluginRegistry:
"""PluginRegistry class acts as registry for default and newly added plugins."""
def register_plugins(
self, main_app: typer.main.Typer, plugin_list: list
) -> None:
"""Register plugins in typer main app"""
for name in plugin_list:
plugin_app = self._import_plugin(
f"plugins.{name}.plugin", "plugin_app"
)
help = (
plugin_app.info.help
if type(plugin_app.info.help) is str
else None
)
short_help = (
plugin_app.info.short_help
if type(plugin_app.info.short_help) is str
else None
)
plugin_name = (
plugin_app.info.name
if type(plugin_app.info.help) is str
else name
)
main_app.add_typer(
plugin_app, name=plugin_name, help=help, short_help=short_help
)
def search_plugins(self, path: str = "src/meetingcam/plugins") -> list:
"""Check plugin directory for plugins and return list with found and valid plugins."""
dirs = [d for d in Path(path).iterdir() if d.is_dir()]
dirs = self._sortout(dirs)
plugins = []
for dir in dirs:
plugin_name = dir.name
valid = self._check_plugin(dir)
if valid:
plugins.append(plugin_name)
else:
print(
f"Plugin {plugin_name} is not valid. Continue without"
" registering this plugin."
)
return plugins
def _check_plugin(self, path: str) -> bool:
# TODO: Implement checks to validate plugin
return True
def _sortout(self, dirs: list) -> list:
"""Sort out directories which start with dot or underscores."""
for d in dirs:
if str(d.name).startswith(".") or str(d.name).startswith("__"):
dirs.remove(d)
return dirs
def _import_plugin(self, modulename, name):
"""Import a named object from a module in the context of this function.
Code adapted from: https://www.oreilly.com/library/view/python-cookbook/0596001673/ch15s04.html
"""
try:
module = __import__(modulename, globals(), locals(), [name])
except ImportError:
return None
return vars(module)[name]
"""Definition of constants and types"""
from enum import Enum
from typing import NewType
import typer
# Definition of maximum image size for virtual camera.
# Higher resolutions are usually not supported on online meeting tools
# 720p:(1280, 720) 540p:(960, 540)
MAX_WIDTH = 1280
MAX_HEIGHT = 720
# Plugin types (avoid spelling errors with explicit type)
PluginType = NewType("PluginType", str)
DEPTHAI = PluginType("depthai")
WEBCAM = PluginType("webcam")
# Argument and Option types
TYPES = Enum("DevicePath", {"all": "all", WEBCAM: WEBCAM, DEPTHAI: DEPTHAI})
TypeArgument = typer.Option(default=WEBCAM, help=f"Choose camera type")
DevicePathWebcam = typer.Argument(
default=..., help="Path to real camera device, e.g. /dev/video0."
)
DevicePathDepthai = typer.Argument(
default=...,
help="Path (mxid) to real camera device, e.g. 14442C1021C694D000.",
)
"""This file contains a printing class which is used for common terminal prints."""
from typing import Any
import pyfiglet
import rich
from constants import DEPTHAI, WEBCAM
from rich.console import Console
from rich.style import Style
from rich.table import Table
from rich.text import Text
from v4l2ctl import V4l2Device
class Printer:
"""Handles printing functions for common terminal prints"""
def __init__(self) -> None:
"""
Initialize the DevicePrinter.
Instantiates a Console object and initializes styles for different types of messages (danger, warning, ok).
"""
self.console = Console()
self.danger_style = Style(color="red", blink=True, bold=True)
self.warning_style = Style(color="yellow", blink=True, bold=True)
self.ok_style = Style(color="green", blink=True, bold=True)
def available_devices(
self,
device_map: dict[int, dict[str, Any]],
available_devices_real: tuple[
list[str], list[str], list[int], list[str]
],
) -> None:
"""
Print available real and virtual devices to the console.
Args:
device_map --- a mapping of device ids to device properties including paths and labels.
available_devices_real --- a tuple containing lists of paths, labels, ids, and another attribute of real devices.
The method extracts real and mapped device information and formats them for console printing.
"""
(paths_real, labels_real) = available_devices_real
devices_real = [
"|" + label + "|" + str(path) + "|"
for label, path in zip(labels_real, paths_real)
]
devices_real = (
str(devices_real).replace("'", "").replace(", ", "\n")[1:-1]
)
devices_mapped = [
"|"
+ v["label_real"]
+ "|"
+ v["path_real"]
+ " -> |"
+ v["label_virtual"]
+ "|"
+ v["path_virtual"]
+ "|"
for v in device_map.values()
]
devices_mapped = (
str(devices_mapped).replace("'", "").replace(", ", "\n")[1:-1]
)
table = Table()
table.add_column(
"Camera name", justify="right", style="cyan", no_wrap=True
)
table.add_column("Camera path", style="magenta", no_wrap=True)
table.add_column(
"Virtual camera name", justify="right", style="cyan", no_wrap=True
)
for rp, rn in zip(
available_devices_real[0], available_devices_real[1]
):
cam = {"label_real": rn, "path_real": rp, "label_virtual": None}
for d in device_map.values():
if d["path_real"] == rp:
cam["label_virtual"] = d["label_virtual"]
table.add_row(
str(cam["label_real"]),
str(cam["path_real"]),
str(cam["label_virtual"]),
)
self.console.print(table)
def add_virtual_devices(
self,
available_devices_real: tuple[list[str], list[str]],
type: str | int,
) -> None:
"""
Print instructions to add virtual devices to the console.
Args:
available_devices_real --- a tuple containing lists of paths, labels, ids, and another attribute of real devices.
The method extracts device labels and ids to create instructions for adding virtual devices and prints them to the console.
"""
(device_paths, labels) = available_devices_real
cli_cmd_single = {}
labels_str = []
if type == WEBCAM:
ids = [
int(path.replace("/dev/video", "")) for path in device_paths
]
vd_nrs = ids
for idx, label in zip(ids, labels):
cli_cmd_single[label] = (
"`sudo modprobe v4l2loopback devices=1"
f" video_nr={idx} card_label='MeetingCam{idx} {label}'`"
)
labels_str.append(f"MeetingCam{idx} {label}")
labels_str = str(labels_str).replace(", ", ",")[1:-1]
vd_nrs = str(vd_nrs).replace(" ", "")[1:-1]
cli_cmd_multi = (
"`sudo modprobe v4l2loopback"
f" devices={len(ids)} video_nr={str(vd_nrs)} card_label='{labels_str[1:-1]}'`"
)
elif type == DEPTHAI:
ids = device_paths
for idx, label in zip(ids, labels):
cli_cmd_single[label] = (
"`sudo modprobe v4l2loopback devices=1"
f" card_label='MeetingCam{idx} {label}'`"
)
labels_str.append(f"MeetingCam{idx} {label}")
labels_str = str(labels_str).replace(", ", ",")[1:-1]
cli_cmd_multi = (
"`sudo modprobe v4l2loopback"
f" devices={len(ids)} card_label='{labels_str[1:-1]}'`"
)
else:
raise NotImplementedError(
"Device type needs to be either 'webcam' or 'depthai'."
)
if len(device_paths) > 0:
self.console.print(
"\n[bold]Add[/bold] a [bold]single device[/bold] with one of"
" the following commands:"
)
for label, cmd in cli_cmd_single.items():
cmd = Text(cmd[1:-1])
self.console.print(f"\n{label}:")
self.console.print(cmd, style="cyan bold")
self.console.print(
"\n\n[bold]Add all devices[/bold] with the following command:"
)
cmd = Text("\n" + cli_cmd_multi[1:-1])
self.console.print(cmd, style="cyan bold")
def device_not_available(self, device: V4l2Device) -> None:
"""Print a warning message indicating the specified device is not available.
Args:
device (V4l2Device) --- the device that is not available.
"""
self.console.print("\nWarning! :warning:", style=self.warning_style)
text = (
"[bold]Device not available.[/bold]\nThe specified device"
f" '{device}' is not available or does not have a virtual"
" counterpart."
)
self.console.print(text)
def add_device_first(self) -> None:
"""Print a message guiding the user to run add devices first."""
self.console.print(
"\nYou'll need to add a device first. Run [cyan"
" bold]add-devices[/cyan bold] command to see how to add a"
" virtual counterpart to a camera device.\n"
)
def reset_devices(self) -> None:
"""Print instructions on how to reset virtual devices."""
self.console.print(
"\n1. [bold]Close[/bold] all [bold]applications which access"
" camera[/bold] devices, including browser."
"\n2. Run: [bold][cyan]sudo modprobe -r v4l2loopback[/cyan][/bold]"
"\n\nA reset is necessary if you want to access your real camera"
" device normally or if you want to add another device to be used"
" with MeetingCam.\nIf the reset command is isn't working make"
" sure all applications which might access cameras are closed. If"
" this still isn't working, restart your system.\n"
)
def device_running(self) -> None:
"""Print a message indicating that the device is currently running and how to access the stream."""
self.console.print(
"Device running! :arrow_forward:", style=self.ok_style
)
self.console.print(
"You can now access the modified camera stream in Meets, Teams or"
" Zoom. :rocket:",
style=Style(bold=True),
)
self.console.print(
"Press `<Ctrl>+C` to stop the running stream and access your"
" device normally."
)
def device_stopped(self) -> None:
"""Print a message indicating that the device has stopped and can now be accessed normally."""
self.console.print(
"Device stopped. :raised_hand:", style=self.danger_style
)
self.console.print(
"You can access your device now as usual. MeetingCam has stopped.",
style=Style(bold=True),
)
def title(self) -> None:
"""Print MeeingCam title."""
title = pyfiglet.figlet_format("MeetingCam", font="big")
title = Text(title)
title.stylize("bold green", 0, 234)
title.stylize("bold magenta", 235)
rich.print(title)
def subtitle(self, name: str) -> None:
"""Print Plugin title."""
title = pyfiglet.figlet_format(name, font="big")
title = Text(title)
rich.print(title)
def epilog(self) -> str:
"""Get epilog text to be printed underneath help."""
text = (
"Submit feedback, issues and questions via"
" https://github.com/nengelmann/MeetingCam/issues.\nPlease"
" consider to star https://github.com/nengelmann/MeetingCam if you"
" find it helpful. "
)
return text
Comments