Hackster is hosting Hackster Holidays, Ep. 6: Livestream & Giveaway Drawing. Watch previous episodes or stream live on Monday!Stream Hackster Holidays, Ep. 6 on Monday!
Yeoh Wui Keat
Created October 27, 2021

Smart Baby Sleep Monitor

This portable intelligent system provides non-intrusive monitoring for SIDs.

45
Smart Baby Sleep Monitor

Things used in this project

Hardware components

Kria KV260 Vision AI Starter Kit
AMD Kria KV260 Vision AI Starter Kit
for MCU + FPGA based image analytics
×1

Software apps and online services

TensorFlow
TensorFlow
Vivado Design Suite
AMD Vivado Design Suite
PetaLinux
AMD PetaLinux

Hand tools and fabrication machines

3D Printer (generic)
3D Printer (generic)

Story

Read more

Custom parts and enclosures

Mechanical Design

Smart Baby Sleep Monitoring mechanical design/assembly.

Schematics

Full Hardware Setup

Full setup of KRIA, Camera, Thermal Imager, HDMI LCD, Wireless Speaker and Keyboard

Code

Smart Baby Sleep Monitoring Main Program.

Python
Main Application Program. The program needs additional support files to run properly: (a) beep-01a.wav, (b) haarcascade_frontalface_default.xml, (c) MobileNetSSD_deploy.caffemodel, (d) MobileNetSSD_deploy.prototxt.txt. These files are open-sourced and can easily be downloaded from the internet.
import os
import os.path
import sys
import threading
import serial
import time
import argparse
import imutils
import cv2
import numpy as np

#import PyQt5
#from PyQt5 import QtWidgets
#from PyQt5.QtCore import Qt
#from PyQt5.QtWidgets import *
#from PyQt5.QtGui import *
#from PyQt5 import QtCore, QtGui, QtWidgets

from subprocess import call
from imutils.video import VideoStream
from imutils.video import FPS
from multiprocessing import Process
from multiprocessing import Queue

def initializeSerialPort(Portname, Baudrate):
    global serialport
    serialport.port = Portname
    serialport.baudrate = Baudrate
    serialport.timeout = 2
    try:
        serialport.open()
        return True
    except serial.SerialException as e:
        #sys.stderr.write('Could not open serial port {}: {}\n' .format(serialport.name, e))
        return False

def readData(Length):
    global serialport
    data = serialport.read(Length)
    return data        
    
def getTemperature():
    global serialport, max_temp, port_status
    
    while(port_status):
        if serialport.write(txdata) == 1:
            result = readData(7)
#           print(result)
            try:
                max_temp = round(float(result[0:5]) + 8, 2)
            except:
                pass

def playSound():
    global port_status, sound_enable
      
    while(port_status):
        if sound_enable == True:
            call(["aplay","./beep-01a.wav"])
        
        time.sleep(0.1)

def classifyFrame(net, inputQueue, outputQueue):
    # keep looping
    while True:
        # check to see if there is a frame in our input queue
        if not inputQueue.empty():
            # grab the frame from the input queue, resize it, and
            # construct a blob from it
            frame = inputQueue.get()
            frame = cv2.resize(frame, (227, 227))
            #blob  = cv2.dnn.blobFromImage(frame, 0.007843, (299, 299), 127.5)   #224 x 224, 227 x 227, 299 x 299
            blob  = cv2.dnn.blobFromImage(frame, 0.007843, (227, 227), 127.5)   #224 x 224, 227 x 227, 299 x 299    #Change HERE!

            # set the blob as input to our deep learning object
            # detector and obtain the detections
            net.setInput(blob)
            detections = net.forward()

            # write the detections to the output queue
            outputQueue.put(detections)
               
def startMonitoring():
    global face_cascade, max_temp, idx, sound_enable, port_status, inputQueue, outputQueue, vs
    
#   face_size_th = 2400
    face_size_th = 2000
    
#    aoi_x1 = 400 - 150
#    aoi_x2 = 400 + 150
#    aoi_y1 = 20
#    aoi_y2 = 425

    aoi_x1 = 200 - 75
    aoi_x2 = 200 + 75
#    aoi_y1 = 13
#    aoi_y2 = 285
    aoi_y1 = 13
    aoi_y2 = 290


    face_count = 0
    face_size = 0
    face_not_detect = 0
    
    #0:Infrared, 2:RGB 
    videosrc = 2
    
    #person_detected = False
    #temp_measure_range = False
    
    person_in_aoi = False
    person = False
    
    # construct the argument parse and parse the arguments
    ap = argparse.ArgumentParser()
    #ap.add_argument("-p", "--prototxt", required=True,
    #    help="path to Caffe 'deploy' prototxt file")
    #ap.add_argument("-m", "--model", required=True,
    #    help="path to Caffe pre-trained model")
    ap.add_argument("-c", "--confidence", type=float, default=0.2,
        help="minimum probability to filter weak detections")
    args = vars(ap.parse_args())

    # initialize the list of class labels MobileNet SSD was trained to
    # detect, then generate a set of bounding box colors for each class
    CLASSES = ["background", "aeroplane", "bicycle", "bird", "boat",
               "bottle", "bus", "car", "cat", "chair", "cow", "diningtable",
               "dog", "horse", "motorbike", "person", "pottedplant", "sheep",
               "sofa", "train", "tvmonitor"]
    COLORS = np.random.uniform(0, 255, size=(len(CLASSES), 3))
    detections  = None

    # initialize the video stream, allow the cammera sensor to warmup,
    # and initialize the FPS counter
    print("[INFO] starting video stream...")
    vs = VideoStream(src=videosrc).start()
    time.sleep(2.0)
    fps = FPS().start()

    # loop over the frames from the video stream
    while True:
        # grab the frame from the threaded video stream, resize it, and
        # grab its dimensions
        frame = vs.read()
        #frame = cv2.rotate(frame, cv2.ROTATE_90_CLOCKWISE)
        frame = imutils.resize(frame, width=400)
        frame = imutils.rotate(frame, 270)    #290
        (fH, fW) = frame.shape[:2]

        # if the input queue *is* empty, give the current frame to
        # classify
        if inputQueue.empty():
            inputQueue.put(frame)

        # if the output queue *is not* empty, grab the detections
        if not outputQueue.empty():
            detections = outputQueue.get()

        # check to see if our detectios are not None (and if so, we'll
        # draw the detections on the frame)
        if detections is not None:
            # loop over the detections
            for i in np.arange(0, detections.shape[2]):
                # extract the confidence (i.e., probability) associated
                # with the prediction
                confidence = detections[0, 0, i, 2]

                # filter out weak detections by ensuring the `confidence`
                # is greater than the minimum confidence
                if confidence < args["confidence"]:
                    continue

                # otherwise, extract the index of the class label from
                # the `detections`, then compute the (x, y)-coordinates
                # of the bounding box for the object
                idx = int(detections[0, 0, i, 1])
                dims = np.array([fW, fH, fW, fH])
                box = detections[0, 0, i, 3:7] * dims
                (startX, startY, endX, endY) = box.astype("int")
                
                centroidX = startX + abs(endX - startX) / 2
                centroidY = startY + abs(endY - startY) / 2
                
                if CLASSES[idx] == "person":
                    person = True
                else:
                    person = False

                # draw the prediction on the frame
               #if CLASSES[idx] == "person" and startX > aoi_x1 and endX < aoi_x2 and startY > aoi_y1 and endY < aoi_y2:
               #if CLASSES[idx] == "person" and centroidX > aoi_x1 and centroidX < aoi_x2 and centroidY > aoi_y1 and centroidY < aoi_y2:
                if person == True and centroidX > aoi_x1 and centroidX < aoi_x2 and centroidY > aoi_y1 and centroidY < aoi_y2:
                    person_in_aoi = True
                else:
                    person_in_aoi = False
            
            face_count = 0
#           if person_in_aoi == True:
            gray  = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)            
#           faces = face_cascade.detectMultiScale(gray, 1.03, 5)
#           faces = face_cascade.detectMultiScale(gray, 1.3, 6)    #Change Here (5 ~ 10)
            faces = face_cascade.detectMultiScale(gray, 1.1, 8, minSize=(55, 55), flags=cv2.CASCADE_SCALE_IMAGE)    #Change Here (5 ~ 10)
            for (x, y, w, h) in faces:
                if max_temp > 37.5:    
                    cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 0, 255), 2)
                    cv2.putText(frame, str(max_temp), (x, y - 13), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 2)
                    #sound_enable = True
                else:
                    cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 255, 0), 2)
                    cv2.putText(frame, str(max_temp), (x, y - 13), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
                    #sound_enable = False
                
                face_size = w * h    
                face_count = face_count + 1
                print("Detected Face Size: {}\t Person: {}\t Person in AOI: {}".format(face_size, person, person_in_aoi))                    
                
            if person_in_aoi == True and face_count == 0 and face_not_detect == 30:        #20
                sound_enable = True
                face_size = 0
            
            elif person_in_aoi == True and face_count == 0 and face_not_detect < 30:       #20
                sound_enable = False
                face_not_detect = face_not_detect + 1
            elif person_in_aoi == True and face_count > 0 and max_temp <= 37.5:
                sound_enable = False
                face_not_detect = 0
            elif person_in_aoi == True and face_count > 0 and max_temp > 37.5:
                sound_enable = True
                face_not_detect = 0                          
            else:
                sound_enable = False
                face_size = 0
                face_not_detect = 0

        cv2.namedWindow('BabyMon')       
        cv2.moveWindow('BabyMon',0,0)
        cv2.namedWindow('BabyMon',cv2.WINDOW_NORMAL)
        
        if person_in_aoi == True and face_size >= face_size_th:
            cv2.rectangle(frame, (aoi_x1, aoi_y1), (aoi_x2, aoi_y2), (0,255, 0), 2)
        elif person_in_aoi == True and face_size < face_size_th and face_size > 0:
            cv2.rectangle(frame, (aoi_x1, aoi_y1), (aoi_x2, aoi_y2), (0,255, 255), 2)
        elif person_in_aoi == True and face_size == 0:
            cv2.rectangle(frame, (aoi_x1, aoi_y1), (aoi_x2, aoi_y2), (0,0, 255), 2)
        else:
            cv2.rectangle(frame, (aoi_x1, aoi_y1), (aoi_x2, aoi_y2), (255,255, 0), 2)

        frame = cv2.resize(frame,None,None,fx=1.99,fy=1.49)
        cv2.setWindowProperty('BabyMon',cv2.WND_PROP_FULLSCREEN,cv2.WINDOW_FULLSCREEN)
        cv2.imshow('BabyMon', frame)
        
        key = cv2.waitKey(1) & 0xFF
        # if the `q` key was pressed, break from the loop
        if key == ord("q"):
            break

        if port_status == False:
            break
        
        #if cv2.getWindowProperty('Infrared Camera', cv2.WND_PROP_VISIBLE) < 1:
        try:
            dummy = cv2.getWindowProperty('BabyMon', 0)
                
        except:
            break
        # update the FPS counter
        fps.update()

    # stop the timer and display FPS information
    fps.stop()
    print("[INFO] elapsed time: {:.2f}".format(fps.elapsed()))
    print("[INFO] approx. FPS: {:.2f}".format(fps.fps()))

    # do a bit of cleanup
    cv2.destroyAllWindows()
    vs.stop()

def quitProgram():
    global serialport, port_status, t1, t2
    serialport.close()
    port_status = False
    t1.join()
    t2.join()
    exit()
                        
txdata = [63]   #ASCII of '?'
max_temp = 0.0
idx = 0

port_status  = False;
sound_enable = False;

# initialize the input queue (frames), output queue (detections),
# and the list of actual detections returned by the child process
inputQueue  = Queue(maxsize=1)
outputQueue = Queue(maxsize=1)

serialport   = serial.Serial()
port_status  = initializeSerialPort('/dev/ttyACM0', (115200))
face_cascade = cv2.CascadeClassifier('haarcascade_frontalface_default.xml')

print("[INFO] loading model...")
net = cv2.dnn.readNetFromCaffe('MobileNetSSD_deploy.prototxt.txt', 'MobileNetSSD_deploy.caffemodel')

print("[INFO] starting threads...")
t1 = threading.Thread(target=getTemperature)
t1.start()

t2 = threading.Thread(target=playSound)
t2.start()

print("[INFO] starting process...")
p1 = Process(target=classifyFrame, args=(net, inputQueue, outputQueue,))
p1.daemon = True
p1.start()

startMonitoring()
quitProgram()

Credits

Yeoh Wui Keat

Yeoh Wui Keat

2 projects • 0 followers
My interests are AI on-the-edge, image and digital signal processing in embedded system for industrial applications.

Comments