Hackster is hosting Hackster Holidays, Ep. 6: Livestream & Giveaway Drawing. Watch previous episodes or stream live on Monday!Stream Hackster Holidays, Ep. 6 on Monday!
Michael Darby - 314Reactor
Published © GPL3+

The Raspbinator

A Terminator-inspired, Raspberry Pi-driven facial recognition talking robot head.

IntermediateFull instructions provided10 hours4,457
The Raspbinator

Things used in this project

Hardware components

Raspberry Pi 3 Model B
Raspberry Pi 3 Model B
×1
Adafruit PWM HAT
×1
battery box
×1
speaker
×1
batteries
×1
usb sound card
×1
pan and tilt kit
×1
Camera Module
Raspberry Pi Camera Module
×1
camera cable
×1
microphone
×1
3.5mm jack splitter
×1

Software apps and online services

wit.ai

Hand tools and fabrication machines

Soldering iron (generic)
Soldering iron (generic)

Story

Read more

Code

Main Bot code

Python
This is the main code that runs for moving the 'eye' as well as running facial recognition
#some code/annotations from https://github.com/adafruit/Adafruit_Python_PCA9685/blob/master/examples/simpletest.py 
#and https://thecodacus.com/face-recognition-opencv-train-recognizer/

# import packages
from __future__ import division
import sys
import os
sys.path.append('/usr/local/lib/python2.7/site-packages')
from picamera.array import PiRGBArray
from picamera import PiCamera
import time
import cv2
import numpy as np
from PIL import Image
import time
import bot_9_import as b9

# Import the PCA9685 module.
import Adafruit_PCA9685

# Initialise the PCA9685 using the default address (0x40).
pwm = Adafruit_PCA9685.PCA9685()

#set the max/min servo positions
servo_min_v = 420
servo_max_v = 390
servo_min_h = 355
servo_max_h = 400

#set the scaleup/scaledown dividers (used later)
scaledown = 0.2
scaleup = 1/scaledown

# Create the haar cascade
cascadePath = "haarcascade_frontalface_alt.xml"
faceCascade = cv2.CascadeClassifier(cascadePath);
recognizer = cv2.face.createLBPHFaceRecognizer()

# set font
font = cv2.FONT_HERSHEY_PLAIN

def get_images_and_labels(path):
	# Append all the absolute image paths in a list image_paths
	# Will not read .sad extensions - will only use to test accuracy
	image_paths = [os.path.join(path, f) for f in os.listdir(path) if not f.endswith('.sad')]
	# images will contain faces
	images = []
	# labels will contain the label assigned to images
	labels = []
	for image_path in image_paths:
		# Read image and convert to grayscale
		image_pil = Image.open(image_path).convert('L')
		
		# Convert the image into numpy array
		image = np.array(image_pil, 'uint8')
		
		# Get the label
		nbr = int(os.path.split(image_path)[1].split(".")[0].replace("subject", ""))
		# Detect the face in image
		faces = faceCascade.detectMultiScale(image)
		# If face detected, append to images and label to labels
		for (x, y, w, h) in faces:
			images.append(image[y: y + h, x: x + w])
			labels.append(nbr)
			#cv2.imshow("Adding faces to training set", image[y: y + h, x: x + w])
			#cv2.waitKey(1000)
			
	# return the images list and labels list
	return images, labels

# Helper function to make setting a servo pulse width simpler.
def set_servo_pulse(channel, pulse):
    pulse_length = 1000000    # 1,000,000 us per second
    pulse_length //= 60       # 60 Hz
    print('{0}us per period'.format(pulse_length))
    pulse_length //= 4096     # 12 bits of resolution
    print('{0}us per bit'.format(pulse_length))
    pulse *= 1000
    pulse //= pulse_length
    pwm.set_pwm(channel, 0, pulse)
    
def face_detect():

	for frame in camera.capture_continuous(rawCapture, format="bgr", use_video_port=True):
		# grab the raw NumPy array representing the image, then initialize timestamp and occupied/unoccupied text
		image = frame.array
		
		#switch the image to grayscale for quicker processing	
		gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
		
		#scale the image down for further quicker processing (but less accurate)		
		small_img = cv2.resize(gray, (0,0), fx=scaledown, fy=scaledown, interpolation=cv2.INTER_LINEAR)
		
		#equalize the histogram of the grayscale image
		cv2.equalizeHist(small_img, small_img)
		
		# Run the cascade face detection
		faces = faceCascade.detectMultiScale(small_img)	
			
		if len(faces) > 0:		
			for (x, y, w, h) in faces:
				#if faces are detected from the face detection above run the prediction function
				faceid, conf = recognizer.predict(small_img[y: y + h, x: x + w])
				#print the faceid int as well as the confidence level
				print faceid, conf
				#if the confidence level is over 200 identify whos face it is from the trained images
				if conf > 180:
					if faceid == int(1):
						humanid = 'John Connor'
					if faceid == int(2):
						humanid = 'Sarah Connor'
					if faceid == int(3):
						humanid = 'Mike'
				#if the confidence level is below 200 then return 'identify yourself' for the TTS
				else:
					humanid = 'Identify yourself'
		#if no faces detected delete the raw capture from the camera and break the loop to return to the eye movement loop
		else:
			rawCapture.truncate(0)
			break
		#print the humanid for debug purposes
		print humanid
		#call the conversation class from the chatbot and pass along the humanid
		b9.openConversation(humanid)
		#delete the raw capture from the camera
		rawCapture.truncate(0)
		#wait two seconds
		time.sleep(2)

# Set frequency to 60hz, good for servos.
pwm.set_pwm_freq(60)

# Path to the Yale Dataset
path = 'testfaces'
# The folder is the same path as the script
# Call get_images_and_labels function and get the images and the
# corresponding labels
images, labels = get_images_and_labels(path)

# Perform the training
recognizer.train(images, np.array(labels))

# initialize camera and grab a reference raw image
camera = PiCamera()
camera.resolution = (320, 240)
camera.framerate = 32
rawCapture = PiRGBArray(camera, size=(320, 240))

# allow camera to warmup
time.sleep(0.1)

#this loop moves the camera in a square motion to look around, running a face detection before each movement
while True:
    # Move servo on channel O between extremes.
    face_detect()
    pwm.set_pwm(0, 0, servo_min_v)
    time.sleep(2)
    face_detect()
    pwm.set_pwm(0, 0, servo_max_h)
    time.sleep(2)
    face_detect()
    pwm.set_pwm(1, 0, servo_max_v)
    time.sleep(2)
    face_detect()
    pwm.set_pwm(1, 0, servo_min_h)
    face_detect()
    time.sleep(2)
    
# Close windows
cv2.destroyAllWindows()

The Chatbot code

Python
This is the code that runs the Speech to Text and the Text to speech as well as the main chatbot part of the program
#!/usr/bin/

#imports
import sys
import subprocess
import time
import random
import pymongo
import datetime
import sys
import time
import numpy
from colors import *
from pymongo import MongoClient
from pprint import pprint
from difflib import SequenceMatcher
from wit import Wit

#main class where all the workings happen
class talkLoop(object):
	
	#initialise the class with all variables required
	def __init__(self, client, db, responses, allwords, inputwords, globalReply, botAccuracy, botAccuracyLower):
		self.client = client
		self.db = db
		self.responses = responses
		self.allwords = allwords
		self.inputwords = inputwords
		self.globalReply = globalReply
		self.botAccuracy = botAccuracy
		self.botAccuracyLower = botAccuracyLower
	
	#function for comparing string similarity
	def similar(self, a, b):
		return SequenceMatcher(None, a, b).ratio()
	
	#function for grabbing a random document from the database
	def get_random_doc(self):
		count = self.allwords.count()
		return self.allwords.find()[random.randrange(count)]
	
	#this function generates a random sentence at any length between 1 and 10 words long
	def sentenceGen(self):
		#set a clear string and set a random integer 1-10
		result = ""
		length = random.randint(1, 10)
		
		#for the range in the integer above find a random word from the db and append to the string
		for i in range(length):
			cursor = self.get_random_doc()
			for x, y in cursor.items():
				if x == "word":
					cWord = (y)
					result += cWord
					result += ' '
					#clear the cursor
					del cursor
		#return the constructed sentence
		return result
	
	#this function searches the database for the input string and returns all replies for that string, returning a random one
	def dbSearch(self, searchIn):
		#search the database for inputs the bot has said prior
		cursor = self.responses.find_one({"whatbotsaid": searchIn})
		#return list of human replies to this response and choose one at random
		for x, y in cursor.items():
			if x == 'humanReply':
				chosenReply = (random.choice(y))
		#erase the cursor and return the chosen string
		del cursor
		return chosenReply
	
	#the string comparison function
	def mongoFuzzyMatch(self, inputString, searchZone, termZone, setting):
		#create an empty dictionary
		compareList = {}
		#search the database passed in
		for cursor in searchZone.find():
			for x, y in cursor.items():
				#find the item in the cursor that matches the search term passed into the function, eg: 'whatbotsaid'				
				if x == termZone:
					#compare the input string to the current string in the cursor, which returns a decimal point of accuracy (0.0 > 1.0)
					compareNo = self.similar(inputString, y)
					#if accuracy is off then append the string and its accuracy to the dictionary no matter the accuracy
					if setting == ('off'):
						compareList[y] = compareNo
					#if accuracy is medium then append the string and its accuracy to the dictionary only if its over the medium setting
					elif setting == ('med'):
						if compareNo > self.botAccuracyLower:
							compareList[y] = compareNo
					#if accuracy is on/high then append the string and its accuracy to the dictionary only if its over the on/high setting
					elif setting == ('on'):
						if compareNo > self.botAccuracy:
							compareList[y] = compareNo
		#if nothing found then return a non match
		if compareList == {}:
			compareChosen = 'none_match'
		#if there are matching strings identify the highest accuracy from the dictionary made above		
		else:
			compareChosen = max(compareList.iterkeys(), key=(lambda key: compareList[key]))
		#erase the cursor and return the chosen matching string
		del cursor
		return compareChosen
	
	
	def replyTumbler(self):
		#find the search string using the high accuracy number - to find a decent match to what the bot has said prior
		#when this function is called it required four arguments: the human response, the database to search on, the response required from the database and the accuracy level
		searchSaid = self.mongoFuzzyMatch(self.wordsIn, self.responses, 'whatbotsaid', 'on')
		#if no matches then try with a lower accuracy to find a less similar sentence
		if searchSaid == ('none_match'):
			searchSaid = self.mongoFuzzyMatch(self.wordsIn, self.responses, 'whatbotsaid', 'med')
			#if still no match then move onto generating a totally random reply either from words in the database (if there are over twenty stored)
			#and if under twenty words stored run the search function with zero minimum accuracy to essentially return a random sentence the bot has said prior
			if searchSaid == ('none_match'):
				if int(self.allwords.count()) < 20:
					searchSaid = self.mongoFuzzyMatch(self.wordsIn, self.responses, 'whatbotsaid', 'off')
					#pass the response into the database to find prior human responses to the above sentence
					chosenReply = self.dbSearch(searchSaid)			
				else:	
					chosenReply = self.sentenceGen()
			else:
				#pass the response into the database to find prior human responses to the above sentence
				chosenReply = self.dbSearch(searchSaid)
		else:
			#pass the response into the database to find prior human responses to the above sentence		
			chosenReply = self.dbSearch(searchSaid)
		#clear the search variable
		del searchSaid
		return (chosenReply)

	#this function passes in the information from the loop, the input reply and the bots last reply and appends them to the database	
	def updateDB(self, wordsIn, bResponse):
		self.wordsIn = wordsIn
		self.bResponse = bResponse
		
		#search the database for prior responses the bot has said
		cursor = self.responses.find_one({"whatbotsaid": self.bResponse})
		#if none then store a new bot response with the humans reply
		if cursor is None:
			postR = {"whatbotsaid": self.bResponse, "humanReply": [self.wordsIn]}
			self.responses.insert_one(postR).inserted_id
			del cursor
		#if already existing then update the database with a new reply
		else:
			self.responses.update({"whatbotsaid": self.bResponse}, {'$addToSet':{"humanReply": self.wordsIn}}, upsert=True)
			#clear the cursor
			del cursor
			
		#split the input sentence into individual words and store each in the database
		wordsInDB = self.wordsIn.split(' ')
		for word in wordsInDB:
			#search the database for the word
			cursor = self.allwords.find_one({"word": word})
			#if its not already in the database then insert into the database
			if cursor is None:
				postW = {"word": word}
				self.allwords.insert_one(postW).inserted_id
			#if the word is already in the database pass and clear the cursor
			else:
				pass
			del cursor

#the function called from the main code that will run the main class with all the necessary parameters and start the loop
def openConversation(personName):
	
	#the wit.ai API key (this is a fake one you will need to sign up for your own at wit.ai)
	client_wit = Wit('YOURKEYHERE')

	#setting up variables for mongodb
	client = MongoClient('localhost', 27017)
	db = client.words_database
	responses = db.responses
	allwords = db.allwords

	#variables for first input and the 2 levels of search accuracy
	inputWords = ("hello")
	globalReply = ("hello")
	botAccuracy = 0.725
	botAccuracyLower = 0.45
	
	#initialise the main class and get a basic first response from the bot
	talkClass = talkLoop(client, db, responses, allwords, inputWords, globalReply, botAccuracy, botAccuracyLower)
	#pass the starting inputs to the database for storage
	talkClass.updateDB(inputWords, globalReply)
	#the below three lines push the input words into the reply tumbler in order to find another greeting other than just human responses to 'hello'
	#for instance: 'hello' can return 'greeting' which will return human responses to that such as 'good day' instead of just returning 'greeting'
	inputWords = (talkClass.replyTumbler())
	talkClass.updateDB(inputWords, globalReply)
	globalReply = (talkClass.replyTumbler())
	#combine the greeting with the humans name from the face idenfication code
	globalReply = str(globalReply + " " + personName)
	#use subprocess again to initialise espeak (the TTS) and say the bots response
	subprocess.call(['espeak', globalReply])
	#print the output words to the screen (debug/testing purposes)
	sys.stdout.write(BLUE)
	print (globalReply)
	sys.stdout.write(RESET)

	#the main loop wrapped in a try to capture any errors and hopefully exit cleanly
	try:
		while True:
			#using subprocess to call the sox recording software with a configuration to trim silence from the recording and stop recording when the speaker has finished
			subprocess.call(['rec test.wav rate 32k silence 1 0.1 5% 1 1.0 5%'], shell=True)
			resp = None
			#use the wit.ai class to interface with the API and send off the wav file from above for STT functions
			with open('test.wav', 'rb') as f:
			  resp = client_wit.speech(f, None, {'Content-Type': 'audio/wav'})
			#parse the response given to get the text sent back which will then become the words the bot uses
			inputWords = str(resp['_text'])
			#if the word(s) goodbye/good bye are said then break the loop which will return to the main code and resume the skull to look around for another human face
			if inputWords == "goodbye":
				break
			if inputWords == "good bye":
				break
			#print the input words to the screen (debug/testing purposes)
			sys.stdout.write(RED)
			print inputWords
			sys.stdout.write(RESET)
			#update the database with the humans response and the bots last response
			talkClass.updateDB(inputWords, globalReply)
			#call the reply tumbler function for the bots reply
			globalReply = (talkClass.replyTumbler())
			#use subprocess again to initialise espeak (the TTS) and say the bots response
			subprocess.call(['espeak', globalReply])
			#print the output words to the screen (debug/testing purposes)
			sys.stdout.write(BLUE)
			print(globalReply)
			sys.stdout.write(RESET)
	except: 
	  pass

colors.py

Python
Color python file for changing the colour of the terminal outputs on the chatbot. (Credit: https://stackoverflow.com/questions/37340049/how-do-i-print-colored-output-to-the-terminal-in-python)
RED   = "\033[1;31m"  
BLUE  = "\033[1;34m"
CYAN  = "\033[1;36m"
GREEN = "\033[0;32m"
RESET = "\033[0;0m"
BOLD    = "\033[;1m"
REVERSE = "\033[;7m"

Github file

https://github.com/adafruit/Adafruit_Python_PCA9685/blob/master/examples/simpletest.py

Credits

Michael Darby - 314Reactor

Michael Darby - 314Reactor

56 projects • 146 followers
I like to keep fit, explore and of course make projects.

Comments