Vrishak Vemuri
Created November 24, 2023

isDoorLocked()

Simple, efficient solution to detect if the door is locked on nearly all doors.

19
isDoorLocked()

Things used in this project

Hardware components

Raspberry Pi 4 Model B
Raspberry Pi 4 Model B
Any Pi that can run OpenCV
×1
Ultrasonic Sensor - HC-SR04 (Generic)
Ultrasonic Sensor - HC-SR04 (Generic)
×3
Camera Module
Raspberry Pi Camera Module
×1

Software apps and online services

OpenCV
OpenCV
Maker service
IFTTT Maker service
Adafruit IO
Onshape

Hand tools and fabrication machines

Soldering iron (generic)
Soldering iron (generic)
Solder Wire, Lead Free
Solder Wire, Lead Free

Story

Read more

Schematics

Schematic

Main schematic

Code

Full code

Python
This is the code to be run on the pi.
import cv2
import numpy as np
from skimage.metrics import structural_similarity
import time 
from Adafruit_IO import Client, Data
import yaml

from gpiozero import DistanceSensor
u1 = DistanceSensor(echo=18, trigger=23, max_distance=.8, threshold_distance=.7)
u2 = DistanceSensor(echo=24, trigger=25, max_distance=.8, threshold_distance=.7)
u3 = DistanceSensor(echo=12, trigger=16, max_distance=.8, threshold_distance=.7)


logins = yaml.safe_load(open('priv.yml'))
aio = Client(logins['username'], logins['key'])
# read image
tmplt_handle = cv2.imread('templates/template_open2.png', cv2.IMREAD_UNCHANGED)
tmplt_deadbolt = cv2.imread('templates/template_deadbolt.png', cv2.IMREAD_UNCHANGED)
compoh = cv2.imread('templates/oh.png')
compod = cv2.imread('templates/od.png')
compch = cv2.imread('templates/ch.png')
compcd = cv2.imread('templates/cd.png')

def getMatch(img, tmplt):
    hh, ww = tmplt.shape[:2]
    tmplt_mask = tmplt[:,:,3]
    tmplt_mask = cv2.merge([tmplt_mask,tmplt_mask,tmplt_mask])
    tmplt2 = tmplt[:,:,0:3]
    corrimg = cv2.matchTemplate(img,tmplt2,cv2.TM_CCORR_NORMED, mask=tmplt_mask)
    min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(corrimg)
    xx = max_loc[0]
    yy = max_loc[1]
    pt1 = (xx,yy)
    pt2 = (xx+ww, yy+hh)
    newimg = img[pt1[1]:pt2[1], pt1[0]:pt2[0]]
    newimg = newimg[len(newimg)//3:2*len(newimg)//3, len(newimg[0])//3:2*len(newimg[0])//3]
    return newimg

def imgsim(first, second):
    first_gray = cv2.cvtColor(first, cv2.COLOR_BGR2GRAY)
    second_gray = cv2.cvtColor(second, cv2.COLOR_BGR2GRAY)
    score, diff = structural_similarity(first_gray, second_gray, full=True)
    return score*100


cam = cv2.VideoCapture(0)
b2hstatus = ''
b1hstatus = ''
b1hcounter = 0
b2dstatus = ''
b1dstatus = ''
b1dcounter = 0

uploadqueue = []

LASTUPLOAD = time.time()
LAST_USUPLOAD = time.time()
while True:
    FPSSTART = time.time()

    ret, img = cam.read()
    #img = cv2.imread('../dchc.png')
    handle, deadbolt = getMatch(img, tmplt_handle), getMatch(img, tmplt_deadbolt)

    ohsim, chsim = imgsim(handle, compoh), imgsim(handle, compch)
    odsim, cdsim = imgsim(deadbolt, compod), imgsim(deadbolt, compcd)
    #print(ohsim, chsim, odsim, cdsim)

    hstatus = 'inconclusive' if ohsim<30 and chsim<30 else 'locked' if chsim>ohsim else 'unlocked'
    dstatus = 'inconclusive' if odsim<30 and cdsim<30 else 'locked' if cdsim>odsim else 'unlocked'
    
    if(b1hstatus!=hstatus): #Fail change.
        b1hstatus = hstatus
        b1hcounter = 0
    elif(b1hstatus==hstatus and b2hstatus!=b1hstatus): #waiting to change
        b1hcounter += 1
        if(b1hcounter > 15): #Success change.
            uploadqueue.append(f'{b1hstatus}')
            b1hcounter = 0
            b2hstatus = b1hstatus

    if(b1dstatus!=dstatus): #Fail change.
        b1dstatus = dstatus
        b1dcounter = 0
    elif(b1dstatus==dstatus and b2dstatus!=b1dstatus): #waiting to change
        b1dcounter += 1
        if(b1dcounter > 15): #Success change.
            uploadqueue.append(f'{b1dstatus}')
            b1dcounter = 0
            b2dstatus = b1dstatus



    flagonthisiter = False
    if(uploadqueue and time.time()-LASTUPLOAD>1): #max 60 readings/min = 1 reading/sec
        item = uploadqueue.pop(0)
        data = Data(value=item)
        if(item[0] == 'h'): #handle
            aio.create_data('handle', data)
            flagonthisiter = True
        else: #deadbolt
            aio.create_data('deadbolt',data)
            flagonthisiter = True
        print(f"upload {item}")
        LASTUPLOAD = time.time()
    
    if(not flagonthisiter and time.time()-LAST_USUPLOAD>1):
        if(min(u1.distance,u2.distance,u3.distance)<.7):
            aio.create_data('ultrasonic',Data(value='door closed'))
        else:
            aio.create_data('ultrasonic',Data(value='door open'))
        LAST_USUPLOAD=time.time()
    FPSEND = time.time()
    print(f"FPS: {int(1/(FPSEND-FPSSTART))}")
    #print(b1hcounter, b1dcounter, uploadqueue)

Credits

Vrishak Vemuri

Vrishak Vemuri

3 projects • 3 followers
Your avid electronics enthusiast =D // Making magic blue smoke since summer 2018 // Currently in high school (class of 2025)

Comments