Juan Pablo Risso
Published © Apache-2.0

Computer Vision as Motion Sensor for SmartThings

Using a Raspberry Pi 3 and a PiCam this computer vision powered sensor detects faces and sends presence data over LAN - UPNP to SmartThings.

IntermediateProtip2 hours16,017
Computer Vision as Motion Sensor for SmartThings

Things used in this project

Story

Read more

Code

Camera Python Script

Python
#!/usr/bin/python2.7

""" Computer Vision Camera for SmartThings

Copyright 2016 Juan Pablo Risso <juano23@gmail.com>

Dependencies: python-twisted, cv2, pyimagesearch

Licensed under the Apache License, Version 2.0 (the "License"); you may not use
this file except in compliance with the License. You may obtain a copy of the
License at:

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed
under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""

import argparse
import logging
import cv2
import urllib2
import imutils
from time import time
from picamera.array import PiRGBArray
from picamera import PiCamera
from twisted.web import server, resource
from twisted.internet import reactor
from twisted.internet.defer import succeed
from twisted.internet.protocol import DatagramProtocol
from twisted.web.client import Agent
from twisted.web.http_headers import Headers
from twisted.web.iweb import IBodyProducer
from twisted.web._newclient import ResponseFailed
from zope.interface import implements

SSDP_PORT = 1900
SSDP_ADDR = '239.255.255.250'
UUID = 'd1c58eb4-9220-11e4-96fa-123b93f75cba'
SEARCH_RESPONSE = 'HTTP/1.1 200 OK\r\nCACHE-CONTROL:max-age=30\r\nEXT:\r\nLOCATION:%s\r\nSERVER:Linux, UPnP/1.0, Pi_Garage/1.0\r\nST:%s\r\nUSN:uuid:%s::%s'

# initialize the camera and grab a reference to the raw camera
# capture

camera = PiCamera()
camera.resolution = (640, 480)
camera.framerate = 32
rawCapture = PiRGBArray(camera, size=(640, 480))
auxcount = 0

# construct the face detector and allow the camera to warm up
fd = FaceDetector("cascades/haarcascade_frontalface_default.xml")
time.sleep(0.1)

def determine_ip_for_host(host):
    """Determine local IP address used to communicate with a particular host"""
    test_sock = DatagramProtocol()
    test_sock_listener = reactor.listenUDP(0, test_sock) # pylint: disable=no-member
    test_sock.transport.connect(host, 1900)
    my_ip = test_sock.transport.getHost().host
    test_sock_listener.stopListening()
    return my_ip

class StringProducer(object):
    """Writes an in-memory string to a Twisted request"""
    implements(IBodyProducer)

    def __init__(self, body):
        self.body = body
        self.length = len(body)

    def startProducing(self, consumer): # pylint: disable=invalid-name
        """Start producing supplied string to the specified consumer"""
        consumer.write(self.body)
        return succeed(None)

    def pauseProducing(self): # pylint: disable=invalid-name
        """Pause producing - no op"""
        pass

    def stopProducing(self): # pylint: disable=invalid-name
        """ Stop producing - no op"""
        pass

class SSDPServer(DatagramProtocol):
    """Receive and response to M-SEARCH discovery requests from SmartThings hub"""

    def __init__(self, interface='', status_port=0, device_target=''):
        self.interface = interface
        self.device_target = device_target
        self.status_port = status_port
        self.port = reactor.listenMulticast(SSDP_PORT, self, listenMultiple=True) # pylint: disable=no-member
        self.port.joinGroup(SSDP_ADDR, interface=interface)
        reactor.addSystemEventTrigger('before', 'shutdown', self.stop) # pylint: disable=no-member

    def datagramReceived(self, data, (host, port)):
        try:
            header, _ = data.split(b'\r\n\r\n')[:2]
        except ValueError:
            return
        lines = header.split('\r\n')
        cmd = lines.pop(0).split(' ')
        lines = [x.replace(': ', ':', 1) for x in lines]
        lines = [x for x in lines if len(x) > 0]
        headers = [x.split(':', 1) for x in lines]
        headers = dict([(x[0].lower(), x[1]) for x in headers])

        logging.debug('SSDP command %s %s - from %s:%d with headers %s', cmd[0], cmd[1], host, port, headers)

        search_target = ''
        if 'st' in headers:
            search_target = headers['st']

        if cmd[0] == 'M-SEARCH' and cmd[1] == '*' and search_target in self.device_target:
            logging.info('Received %s %s for %s from %s:%d', cmd[0], cmd[1], search_target, host, port)
            url = 'http://%s:%d/status' % (determine_ip_for_host(host), self.status_port)
            response = SEARCH_RESPONSE % (url, search_target, UUID, self.device_target)
            self.port.write(response, (host, port))
        else:
            logging.debug('Ignored SSDP command %s %s', cmd[0], cmd[1])

    def stop(self):
        """Leave multicast group and stop listening"""
        self.port.leaveGroup(SSDP_ADDR, interface=self.interface)
        self.port.stopListening()

class StatusServer(resource.Resource):
    """HTTP server that serves the status of the camera to the
       SmartThings hub"""
    isLeaf = True
    def __init__(self, device_target, subscription_list, garage_door_status):
        self.device_target = device_target
        self.subscription_list = subscription_list
        self.garage_door_status = garage_door_status
        resource.Resource.__init__(self)

    def render_SUBSCRIBE(self, request): # pylint: disable=invalid-name
        """Handle subscribe requests from ST hub - hub wants to be notified of
           garage door status updates"""
        headers = request.getAllHeaders()
        logging.debug("SUBSCRIBE: %s", headers)
        if 'callback' in headers:
            cb_url = headers['callback'][1:-1]

            if not cb_url in self.subscription_list:
                self.subscription_list[cb_url] = {}
                #reactor.stop()
                logging.info('Added subscription %s', cb_url)
            self.subscription_list[cb_url]['expiration'] = time() + 24 * 3600

        return ""

    def render_GET(self, request): # pylint: disable=invalid-name
        """Handle polling requests from ST hub"""
        if request.path == '/status':
            if self.garage_door_status['last_state'] == 'inactive':
                cmd = 'status-inactive'
            else:
                cmd = 'status-active'
            msg = '<msg><cmd>%s</cmd><usn>uuid:%s::%s</usn></msg>' % (cmd, UUID, self.device_target)
            logging.info("Polling request from %s for %s - returned %s",
                         request.getClientIP(),
                         request.path,
                         cmd)
            return msg
        else:
            logging.info("Received bogus request from %s for %s",
                         request.getClientIP(),
                         request.path)
            return ""

class MonitorCamera(object):
    """Monitors camera status, generating notifications whenever its state changes"""
    def __init__(self, device_target, subscription_list, camera_status): # pylint: disable=too-many-arguments
        self.device_target = device_target
        self.subscription_list = subscription_list
        self.camera_status = camera_status
		
	current_state = 'inactive'
	reactor.callLater(0, self.check_garage_state, current_state, auxcount) # pylint: disable=no-member

    def check_garage_state(self, current_state, auxcount):
		self.current_state = current_state
		self.auxcount = auxcount
		camera.capture(rawCapture, format="bgr", use_video_port=True)
		# grab the raw NumPy array representing the image
		frame = rawCapture.array
		# resize the frame and convert it to grayscale
		frame = imutils.resize(frame, width = 640)
		gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
		
		# detect faces in the image and then clone the frame
		# so that we can draw on it
		faceRects = fd.detect(gray, scaleFactor = 1.1, minNeighbors = 10,
			minSize = (30, 30))
		frameClone = frame.copy()
		if faceRects != ():
			auxcount = 0	
			if current_state == 'inactive':
				current_state = 'active'	
				logging.info('State changed from %s to %s', self.camera_status['last_state'], current_state)
				self.camera_status['last_state'] = current_state
				self.notify_hubs()
		else: 
			auxcount = auxcount + 1
			if auxcount == 60:
				current_state = 'inactive'	
				logging.info('State changed from %s to %s', self.camera_status['last_state'], current_state)
				self.camera_status['last_state'] = current_state
				self.notify_hubs()				
		
		# loop over the face bounding boxes and draw them
		for (fX, fY, fW, fH) in faceRects:
			cv2.rectangle(frameClone, (fX, fY), (fX + fW, fY + fH), (0, 255, 0), 2)
		
		# show the video feed on a new GUI window
		#cv2.imshow("Face", videorotate)
		
		rawCapture.truncate(0) 
		
		# Schedule next check
		reactor.callLater(0, self.check_garage_state, current_state, auxcount) # pylint: disable=no-member

    def notify_hubs(self):
        """Notify the subscribed SmartThings hubs that a state change has occurred"""
        if self.camera_status['last_state'] == 'inactive':
            cmd = 'status-inactive'
        else:
            cmd = 'status-active'
        for subscription in self.subscription_list:
            if self.subscription_list[subscription]['expiration'] > time():
                logging.info("Notifying hub %s", subscription)
                msg = '<msg><cmd>%s</cmd><usn>uuid:%s::%s</usn></msg>' % (cmd, UUID, self.device_target)
                body = StringProducer(msg)
                agent = Agent(reactor)
                req = agent.request(
                    'POST',
                    subscription,
                    Headers({'CONTENT-LENGTH': [len(msg)]}),
                    body)
                req.addCallback(self.handle_response)
                req.addErrback(self.handle_error)

    def handle_response(self, response): # pylint: disable=no-self-use
        """Handle the SmartThings hub returning a status code to the POST.
           This is actually unexpected - it typically closes the connection
           for POST/PUT without giving a response code."""
        if response.code == 202:
        	logging.info("Status update accepted")
        else:
        	logging.error("Unexpected response code: %s", response.code)

    def handle_error(self, response): # pylint: disable=no-self-use
        """Handle errors generating performing the NOTIFY. There doesn't seem
           to be a way to avoid ResponseFailed - the SmartThings Hub
           doesn't generate a proper response code for POST or PUT, and if
           NOTIFY is used, it ignores the body."""
        if isinstance(response.value, ResponseFailed):
            logging.debug("Response failed (expected)")
        else:
            logging.error("Unexpected response: %s", response)

def main():
    """Main function to handle use from command line"""

    arg_proc = argparse.ArgumentParser(description='Provides camera active/inactive status to a SmartThings hub')
    arg_proc.add_argument('--httpport', dest='http_port', help='HTTP port number', default=8080, type=int)
    arg_proc.add_argument('--deviceindex', dest='device_index', help='Device index', default=1, type=int)
    arg_proc.add_argument('--pollingfreq', dest='polling_freq', help='Number of seconds between polling camera state', default=5, type=int)
    arg_proc.add_argument('--debug', dest='debug', help='Enable debug messages', default=False, action='store_true')
    options = arg_proc.parse_args()

    device_target = 'urn:schemas-upnp-org:device:RPi_Computer_Vision:%d' % (options.device_index)
    log_level = logging.INFO
    if options.debug:
        log_level = logging.DEBUG

    logging.basicConfig(format='%(asctime)-15s %(levelname)-8s %(message)s', level=log_level)

    subscription_list = {}
    camera_status = {'last_state': 'unknown'}

    logging.info('Initializing camera')
    
    # SSDP server to handle discovery
    SSDPServer(status_port=options.http_port, device_target=device_target)

    # HTTP site to handle subscriptions/polling
    status_site = server.Site(StatusServer(device_target, subscription_list, camera_status))
    reactor.listenTCP(options.http_port, status_site) # pylint: disable=no-member

    logging.info('Initialization complete')

    # Monitor camera state and send notifications on state change
    MonitorCamera(device_target=device_target,
                  subscription_list=subscription_list,
                  camera_status=camera_status) 

    reactor.run() # pylint: disable=no-member

if __name__ == "__main__":
    main()

SmartApp

Groovy
/**
 *  Raspberry Pi - Computer Vision (Connect)
 *
 *  Copyright 2016 Juan Pablo Risso <juano23@gmail.com>
 *
 *  Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
 *  in compliance with the License. You may obtain a copy of the License at:
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software distributed under the License is distributed
 *  on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
 *  for the specific language governing permissions and limitations under the License.
 *
 */

definition(
    name: "Computer Vision (Connect)",
    namespace: "juano2310",
    author: "Juan Pablo Risso",
    description: "Open source Computer Vision Camera using a Raspberry Pi and a PiCam",
    category: "SmartThings Labs",
    iconUrl: "https://www.raspberrypi.org/wp-content/uploads/2015/08/raspberry-pi-logo.png",
    iconX2Url: "https://www.raspberrypi.org/wp-content/uploads/2015/08/raspberry-pi-logo.png",
    singleInstance: true
)

preferences {
    page(name:"selectRPi", title:"Serching for devices...", content:"selectRPi")
}

def searchTarget(){ return "urn:schemas-upnp-org:device:RPi_Computer_Vision" }

def selectRPi() {
    def refreshInterval = 3

	/* Subscribe to responses from this device */
    ssdpSubscribe()
    
    /* Send M-Search command to this device */
    ssdpDiscover()

    def devicesForDialog = getDevicesForDialog()

    return dynamicPage(name:"selectRPi", title:"", nextPage:"", refreshInterval: refreshInterval, install:true, uninstall: true) {
        section("") {
        	image "http://media.point2.com/p2a/module/6e99/6311/70bc/e95acda9115d52af1106/original.jpg"
            input "selectedRPi", "enum", required:false, title:"Select Raspberry Pi (${devicesForDialog.size() ?: 0} found)", multiple:true, options:devicesForDialog
        }
    }
}

/* Generate the list of devices for the preferences dialog */
def getDevicesForDialog() {
    def devices = getDevices()
    def map = [:]
    devices.each {
        def value = "Raspberry Pi (" + convertHexToIP(it.value.networkAddress) + ')'
        def key = it.value.ssdpUSN.toString()
        map["${key}"] = value
    }
    map
}

def installed() {
    log.debug "Installed with settings: ${settings}"
    initialize()
}

def updated() {
    log.debug "Updated with settings: ${settings}"
    initialize()
}

def initialize() {
    log.debug('Initializing')
    unsubscribe()
    addDevices()
    unschedule()
    runEvery5Minutes("subscribeToDevices")
    subscribeToDevices()
}

/* Get map containing discovered devices. Maps USN to parsed event. */
def getDevices() {
    if (!state.devices) { state.devices = [:] }
    log.debug("There are ${state.devices.size()} devices at this time")
    state.devices
}

void ssdpDiscover() {
	log.trace "Lan discovery ${searchTarget()}"
	sendHubCommand(new physicalgraph.device.HubAction("lan discovery ${searchTarget()}", physicalgraph.device.Protocol.LAN))
}

void ssdpSubscribe() {
	subscribe(location, "ssdpTerm.${searchTarget()}", ssdpHandler)
}

def ssdpHandler(evt) {
    if(evt.name == "ping") { return "" }
    log.debug('Received Response: ' + evt.description)
    def description = evt.description
    def hub = evt?.hubId
	def parsedEvent = parseLanMessage(description)
    parsedEvent << ["hub":hub]
    if (parsedEvent?.ssdpTerm?.contains("${searchTarget()}")) {
        def devices = getDevices()
        def ip = convertHexToIP(parsedEvent.networkAddress)
        def port = convertHexToInt(parsedEvent.deviceAddress)
		if (!(devices."${parsedEvent.ssdpUSN.toString()}")) { //if it doesn't already exist
            //log.debug('Parsed Event: ' + parsedEvent)
            devices << ["${parsedEvent.ssdpUSN.toString()}":parsedEvent]
        } else { // just update the values
            def d = devices."${parsedEvent.ssdpUSN.toString()}"
            boolean deviceChangedValues = false
            if(d.ip != ip || d.port != port) {
                d.ip = ip
                d.port = port
                deviceChangedValues = true
            }
            if (deviceChangedValues) {
                def children = getChildDevices()
                children.each {
                    if (it.getDeviceDataByName("ssdpUSN") == parsedEvent.ssdpUSN) {
                        it.updateDataValue("ip", ip)                        
                        it.updateDataValue("port", port)
                    }
                }
            }
        }
    }
}

def addDevices() {
    selectedRPi.each { ssdpUSN ->
        def devices = getDevices()
        
        // Make the dni the MAC followed by the index from the USN, unless it's the USN ending in :1
        // that device has just the MAC address as its DNI and receives all the notifications from
        // the RPi
        
        def dni = devices[ssdpUSN].mac + ':' + ssdpUSN.split(':').last()

        if (ssdpUSN.endsWith(":1")) { dni = devices[ssdpUSN].mac }

        // Check if child already exists
        def d = getChildDevices()?.find { it.device.deviceNetworkId == dni }

        if (!d) {
            def ip = convertHexToIP(devices[ssdpUSN].networkAddress).toString()
            def port = convertHexToInt(devices[ssdpUSN].deviceAddress).toString()
            log.debug("Adding ${dni} for ${ssdpUSN} / ${ip}:${port}")
            d = addChildDevice("smartthings", "Computer Vision Room Presence", dni, devices[ssdpUSN].hub, [
                "label": "Computer Vision Presence" ,
                "data": [
                    "ip": ip,
                    "port": port,
                    "ssdpUSN": ssdpUSN,
                    "ssdpPath": devices[ssdpUSN].ssdpPath
                ]
            ])
        } else {
        	log.debug("This device already exists")
        }
    }
}

def subscribeToDevices() {
    log.debug "subscribeToDevices() called"
    def devices = getAllChildDevices()
    devices.each { d ->
        d.subscribe()
    }
}

/* Convert hex (e.g. port number) to decimal number */
private Integer convertHexToInt(hex) {
    Integer.parseInt(hex,16)
}

/* Convert internal hex representation of IP address to dotted quad */
private String convertHexToIP(hex) {
    [convertHexToInt(hex[0..1]),convertHexToInt(hex[2..3]),convertHexToInt(hex[4..5]),convertHexToInt(hex[6..7])].join(".")
}

Device Handler

Groovy
/**
 *  Raspberry Pi - Computer Vision Room Presence (Device Handler)
 *
 *  Copyright 2016 Juan Pablo Risso <juano23@gmail.com>
 *
 *  Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
 *  in compliance with the License. You may obtain a copy of the License at:
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software distributed under the License is distributed
 *  on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
 *  for the specific language governing permissions and limitations under the License.
 *
 */
metadata {
	definition (name: "Computer Vision Room Presence", namespace: "smartthings", author: "Juan Pablo Risso") {
		capability "Motion Sensor"
		capability "Sensor"
        capability "Refresh"
        command "subscribe"        
	}

	simulator {
		status "active": "motion: 1"
		status "inactive": "motion: 0"
	}

	tiles(scale: 2) {
		multiAttributeTile(name:"motion", type: "generic", width: 6, height: 4){
			tileAttribute ("device.motion", key: "PRIMARY_CONTROL") {
				attributeState "active", label:'motion', icon:"st.motion.motion.active", backgroundColor:"#53a7c0"
				attributeState "inactive", label:'no motion', icon:"st.motion.motion.inactive", backgroundColor:"#ffffff"
			}
		}
		standardTile("refresh", "device.refresh", inactiveLabel: false, decoration: "flat", width: 2, height: 2) {
			state "default", action:"refresh.refresh", icon:"st.secondary.refresh"
		}

        main "motion"
        details (["motion", "refresh"])
	}
}

// parse events into attributes
void parse(String description) {
    def usn = getDataValue('ssdpUSN')
    def parsedEvent = parseLanMessage(description)
    log.debug "Parsing garage DT ${device.deviceNetworkId} ${usn}"
    if (parsedEvent['body'] != null) {
        def xmlTop = new XmlSlurper().parseText(parsedEvent.body)
        def cmd = xmlTop.cmd[0]
        def targetUsn = xmlTop.usn[0].toString()
        log.debug "Processing command ${cmd} for ${targetUsn}"
        parent.getChildDevices().each { child ->
            def childUsn = child.getDataValue("ssdpUSN").toString()
            if (childUsn == targetUsn) {
                if (cmd == 'refresh') {
                    log.debug "Instructing child ${child.device.label} to refresh"
                    child.refresh()
                } else if (cmd == 'status-active') {
                    def value = 'active'
                    log.debug "Updating ${child.device.label} to ${value}"
                    child.sendEvent(name: 'motion', value: value)
                } else if (cmd == 'status-inactive') {
                    def value = 'inactive'
                    log.debug "Updating ${child.device.label} to ${value}"
                    child.sendEvent(name: 'motion', value: value)
                }
            }
        }
    }
}

def refresh() {
    log.debug "Executing 'refresh'"
    subscribeAction()    
    getRequest()
}

def subscribe() {
    subscribeAction()
}

private subscribeAction(callbackPath="") {
    log.debug "Subscribe requested"
    def hubip = device.hub.getDataValue("localIP")
    def hubport = device.hub.getDataValue("localSrvPortTCP")
    def result = new physicalgraph.device.HubAction(
        method: "SUBSCRIBE",
        path: getDataValue("ssdpPath"),
        headers: [
            HOST: getHostAddress(),
            CALLBACK: "<http://${hubip}:${hubport}/notify$callbackPath>",
            NT: "upnp:event",
            TIMEOUT: "Second-3600"])
    result
}

def getRequest() {
    log.debug "Sending request for ${path} from ${device.deviceNetworkId}"
    new physicalgraph.device.HubAction(
        'method': 'GET',
        'path': getDataValue("ssdpPath"),
        'headers': [
            'HOST': getHostAddress(),
        ], device.deviceNetworkId)
}

private getHostAddress() {
    def host = getDataValue("ip") + ":" + getDataValue("port")
    return host
}

Credits

Juan Pablo Risso
6 projects • 33 followers
I am a geek, technology Innovator, Software Engineer and Maker. I work at Samsung / SmartThings propelling the Internet of Things.

Comments