Timothy Lovett
Published © CC BY-NC-SA

Sense Staff

Unihiker powered depth sensing staff accessory for detecting obstructions and alerting over bluetooth

BeginnerFull instructions provided6 hours272

Things used in this project

Hardware components

UNIHIKER - IoT Python Programming Single Board Computer with Touchscreen
DFRobot UNIHIKER - IoT Python Programming Single Board Computer with Touchscreen
×1
Oak-D Lite
×1
Lenovo X5 Bone Conduction Headphones
×1
Bendable Phone Tripod
Not the exact one I used but close enough. The one I used I've had for some time / no manufacturer listed.
×1
Wristband
×1

Story

Read more

Custom parts and enclosures

obstruction.wav

Code

SenseStaff.py

Python
import cv2
import numpy as np
import depthai as dai
import threading
import datetime
from unihiker import Audio

# Weights to use when blending depth/rgb image
rgbWeight = 0.4
depthWeight = 0.6

fps = 30
monoResolution = dai.MonoCameraProperties.SensorResolution.THE_720_P

audio = Audio()
audio_playing = False
audio_lock = threading.Lock()

def send_alert(area, height, color_intensity):
    global audio_playing
    global audio_lock

    timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    print(f"[{timestamp}] Alert: Object detected in {area} area, height: {height}, color intensity: {color_intensity}")

    if not audio_playing and area == "front":
        threading.Thread(target=play_audio, daemon=True).start()

def play_audio():
    global audio_playing
    global audio_lock

    with audio_lock:
        if not audio_playing:
            audio_playing = True
            audio.play('obstruction.wav')
            audio_playing = False

def process_rgb_image(frame, color_threshold, min_height, screen_width):
    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    mask = cv2.inRange(gray, color_threshold, 255)
    vertical_sum = np.sum(mask, axis=1)
    detected_height = np.count_nonzero(vertical_sum)

    if detected_height >= min_height:
        left_sum = np.sum(mask[:, :screen_width//3])
        front_sum = np.sum(mask[:, screen_width//3:(2*screen_width)//3])
        right_sum = np.sum(mask[:, (2*screen_width)//3:])

        if max(left_sum, front_sum, right_sum) == front_sum:
            area = "front"
        else:
            area = "other"

        avg_intensity = np.mean(gray[mask > 0])
        send_alert(area, detected_height, avg_intensity)

    cv2.imshow("Mask", mask)

pipeline = dai.Pipeline()
device = dai.Device()
queueNames = []

camRgb = pipeline.create(dai.node.ColorCamera)
left = pipeline.create(dai.node.MonoCamera)
right = pipeline.create(dai.node.MonoCamera)
stereo = pipeline.create(dai.node.StereoDepth)

disparityOut = pipeline.create(dai.node.XLinkOut)
disparityOut.setStreamName("disp")
queueNames.append("disp")

rgbCamSocket = dai.CameraBoardSocket.CAM_A

camRgb.setBoardSocket(rgbCamSocket)
camRgb.setResolution(dai.ColorCameraProperties.SensorResolution.THE_720_P)
camRgb.setFps(30)

try:
    calibData = device.readCalibration()
    lensPosition = calibData.getLensPosition(rgbCamSocket)
    if lensPosition:
        camRgb.initialControl.setManualFocus(lensPosition)
except:
    raise

left.setResolution(dai.MonoCameraProperties.SensorResolution.THE_720_P)
left.setBoardSocket(dai.CameraBoardSocket.LEFT)
left.setFps(30)

right.setResolution(dai.MonoCameraProperties.SensorResolution.THE_720_P)
right.setBoardSocket(dai.CameraBoardSocket.RIGHT)
right.setFps(30)

stereo.setDefaultProfilePreset(dai.node.StereoDepth.PresetMode.HIGH_DENSITY)
stereo.setLeftRightCheck(True)
stereo.setExtendedDisparity(True)
stereo.setSubpixel(False)
stereo.setDepthAlign(rgbCamSocket)

left.out.link(stereo.left)
right.out.link(stereo.right)
stereo.disparity.link(disparityOut.input)

with device:
    device.startPipeline(pipeline)

    frameDisp = None

    depthWindowName = "depth"
    cv2.namedWindow(depthWindowName, cv2.WINDOW_NORMAL)
    cv2.setWindowProperty(depthWindowName, cv2.WND_PROP_FULLSCREEN, cv2.WINDOW_FULLSCREEN)

    screen_width = 240  
    screen_height = 320
    color_threshold = 200
    min_height = 5

    while True:
        latestPacket = None

        queueEvents = device.getQueueEvents(("disp",))
        if len(queueEvents) > 0:
            packets = device.getOutputQueue("disp").tryGetAll()
            if len(packets) > 0:
                latestPacket = packets[-1]

        if latestPacket is not None:
            frameDisp = latestPacket.getFrame()
            maxDisparity = stereo.initialConfig.getMaxDisparity()
            frameDisp = (frameDisp * 255. / maxDisparity).astype(np.uint8)
            frameDisp = cv2.applyColorMap(frameDisp, cv2.COLORMAP_HOT)
            frameDisp = np.ascontiguousarray(frameDisp)
            frameDisp = cv2.rotate(frameDisp, cv2.ROTATE_90_CLOCKWISE)
            frameDisp = cv2.resize(frameDisp, (screen_width, screen_height))
            process_rgb_image(frameDisp, color_threshold, min_height, screen_width)
            cv2.imshow(depthWindowName, frameDisp)

        if cv2.getWindowProperty(depthWindowName, cv2.WND_PROP_VISIBLE) < 1:
            break
        cv2.waitKey(1)

Credits

Timothy Lovett

Timothy Lovett

16 projects • 15 followers
Maker. I spent over a decade working on backend systems in various languages.

Comments