Hackster is hosting Hackster Holidays, Finale: Livestream & Giveaway Drawing. Watch previous episodes or stream live on Tuesday!Stream Hackster Holidays, Finale on Tuesday!
Devin MuiJaiveer SinghJaimie JinJesse Liang
Published © CC BY-NC-SA

Blindsight - Virtual Eyes Through Haptic Feedback

Blind people have a big problem: they can’t see. Blindsight simulates a novel sense of sight with haptic feedback vibration motors and ML.

IntermediateWork in progressOver 2 days8,268

Things used in this project

Hardware components

Raspberry Pi Zero Wireless
Raspberry Pi Zero Wireless
×1
Wide Angle FOV160° 5-Megapixel Camera Module for Raspberry Pi
×1
Coin Mobile Phone Vibration Motors
×8
LED (generic)
LED (generic)
×8
Adafruit PowerBoost 500 Charger - Rechargeable 5V Lipo USB Boost @ 500mA+
×1
Lithium Polymer Battery 3.7V 2000mAH
×2
Qi charger deck
Bitcraze Qi charger deck
×1
Fabric
×1

Software apps and online services

Raspbian
Raspberry Pi Raspbian
Snappy Ubuntu Core
Snappy Ubuntu Core
Python 3.6
OpenCV
OpenCV
AWS EC2
Amazon Web Services AWS EC2
Firebase
Google Firebase
Google Cloud Services for OCR
TensorFlow
TensorFlow
Nvidia CUDA
Autodesk Inventor
Cura

Hand tools and fabrication machines

Soldering iron (generic)
Soldering iron (generic)
Hot glue gun (generic)
Hot glue gun (generic)
3D Printer (generic)
3D Printer (generic)
Sewing Machine

Story

Read more

Custom parts and enclosures

! MK2 - LED Holder

These LEDs indicate motor direction. That way, video viewers can see what motors are on at the current moment.

MK2 - Complete Assembly

MK2 - Central Module Base

MK2 - Central Module Lid

MK2 - Qi Pad + PowerBoost Backing

MK1 - Battery Module Lid

MK1 - Battery Module Base

MK1 - Central Module Lid

MK1 - Central Module Base

Schematics

Raspberry Pi Hardware Schematic

This is the Fritzing schematic that shows the necessary hardware components and their wiring.

Code

blindsight.py

Python
This is the code we run on the Raspberry Pi
from picamera import PiCamera
from picamera.array import PiRGBArray
from gpiozero import Button, PWMOutputDevice, DigitalInputDevice, LED
from subprocess import check_call
from time import sleep

import requests
import numpy as np
import cv2
import time
import sys
import base64
import face_recognition
import glob
import os

url = ":<INSERT URL OF SERVER>"

# These pin values define where the components are plugged in
powerPin = -1 # YOUR PIN NUMBERS
touchPin = -1 # YOUR PIN NUMBERS
motorNEPin = -1 # YOUR PIN NUMBERS
motorNPin = -1 # YOUR PIN NUMBERS
motorNWPin = -1 # YOUR PIN NUMBERS
motorEPin = -1 # YOUR PIN NUMBERS
motorWPin = -1 # YOUR PIN NUMBERS
motorSEPin = -1 # YOUR PIN NUMBERS
motorSPin = -1 # YOUR PIN NUMBERS
motorSWPin = -1 # YOUR PIN NUMBERS

# These pin values define where the indicator LEDs are plugged in
LEDNEPin = -1 # YOUR PIN NUMBERS
LEDNPin = -1 # YOUR PIN NUMBERS
LEDNWPin = -1 # YOUR PIN NUMBERS
LEDEPin = -1 # YOUR PIN NUMBERS
LEDWPin = -1 # YOUR PIN NUMBERS
LEDSEPin = -1 # YOUR PIN NUMBERS
LEDSPin = -1 # YOUR PIN NUMBERS
LEDSWPin = -1 # YOUR PIN NUMBERS

# This function is called to shutdown the RPi
def shutdown():
	check_call(['sudo', 'poweroff'])

# This code is used to enable a shutdown button if you want one
powerSwitch = Button(powerPin, hold_time=5)
powerSwitch.when_held = shutdown

# This is the capacitive touch sensor, or "wake button"
touchBtn = DigitalInputDevice(touchPin)

# Defining motors around the armband (North, South, East, West)
motorNE = PWMOutputDevice(motorNEPin)
motorN = PWMOutputDevice(motorNPin)
motorNW = PWMOutputDevice(motorNWPin)
motorE = PWMOutputDevice(motorEPin)
motorW = PWMOutputDevice(motorWPin)
motorSE = PWMOutputDevice(motorSEPin)
motorS = PWMOutputDevice(motorSPin)
motorSW = PWMOutputDevice(motorSWPin)

# Similarly, defining the LEDs
LEDNE = LED(LEDNEPin)
LEDN = LED(LEDNPin)
LEDNW = LED(LEDNWPin)
LEDE = LED(LEDEPin)
LEDW = LED(LEDWPin)
LEDSE = LED(LEDSEPin)
LEDS = LED(LEDSPin)
LEDSW = LED(LEDSWPin)

# Create an arry to iterate through these motors + LEDs
motors = [motorNE, motorN, motorNW, motorE, motorW, motorSE, motorS, motorSW, LEDNE, LEDN, LEDNW, LEDE, LEDW, LEDSE, LEDS, LEDSW]

# Initialize the camera module
camera = PiCamera()
camera.resolution = (640, 640) # Low resolution makes the processing faster
camera.framerate = 32 # Low framerate as well
rawCapture = PiRGBArray(camera, size=(640, 640))

# Initialize this boolean flag variable
reachedObject = False

# This function will issue a high-powered "pulse" on all motors
def pulse(strength=1.0):
	for motor in motors:
		motor.value = 1.0
	time.sleep(0.5)
	print("pulsed")
	for motor in motors:
		motor.value = 0
	time.sleep(0.25)

# This function is used for object tracking motor updates
def updateMotors(p1, p2, image):
	yFactor = 0
	tolerance = 100 # How close we need to get to the exact center
	
	xOffset = (p1[0] + p2[0])/2 - (640/2) # Calculated from the bounding box
	yOffset = (p1[1] + p2[1])/2 - (640/2) + yFactor # Includes offset for camera elevation
	cv2.putText(image, "Center : " + str(xOffset) +" "+ str(yOffset), (100,90), cv2.FONT_HERSHEY_SIMPLEX, 0.25, (50,170,50), 1);
	if(abs(xOffset) <= tolerance and abs(yOffset) <= tolerance):
                # If we are centered over the object, tell the user we are done
		cv2.putText(image, "CENTERED", (100,100), cv2.FONT_HERSHEY_SIMPLEX, 0.75, (255,255,50), 3);
		return True
		pulse()
		pulse()
	updateMotors = []
	# Otherwise, determine which motors need to be updated, and append
	if(xOffset > tolerance):
		updateMotors.append(motorE)
		updateMotors.append(LEDE)
	if(xOffset < -tolerance):
		updateMotors.append(motorW)
		updateMotors.append(LEDW)
	if(yOffset > tolerance):
		updateMotors.append(motorN)
		updateMotors.append(LEDN)
	if(xOffset < -tolerance):
		updateMotors.append(motorS)
		updateMotors.append(LEDS)
	if(xOffset > tolerance and yOffset > tolerance):
		updateMotors.append(motorNE)
		updateMotors.append(LEDNE)
	if(xOffset < -tolerance) and yOffset > tolerance:
		updateMotors.append(motorNW)
		updateMotors.append(LEDNW)
	if(xOffset > tolerance and yOffset < -tolerance):
		updateMotors.append(motorSE)
		updateMotors.append(LEDSE)
	if(xOffset < -tolerance) and yOffset < -tolerance:
		updateMotors.append(motorSW)
		updateMotors.append(LEDSW)
	# Turn off all previous motors
	for motor in motors:
		motor.value = 0
	time.sleep(0.25)
	# Turn on the motors to indicate direction
	for motor in updateMotors:
		motor.value = 1

# A simple helper function to grab a still and convert to base 64
def base64ify():
	camera.start_preview()
	sleep(0.25)
	camera.capture('foo.jpg', format="jpeg")
	camera.stop_preview()
	image_64 = base64.b64encode(open('foo.jpg', "rb").read())
	return str(image_64.decode('utf-8'))

# This function will tell the phone to speak whatever words are sent
def postCaption(words):
	print(words)
	r = requests.post(url+"/caption", json={"caption":words})

# This function is used for the show-attend-tell model
def describeScene():
	img64 = base64ify()
	r = requests.post(url+"/show-attend-tell", json={"b64" : 'data:image/jpeg;base64,' + img64})
	postCaption("I see " + str(r.json()['response']['caption']))

# This function is used for the OCR
def readText():
	img64 = base64ify()
	r = requests.post(url+"/ocr", json={"b64" : 'data:image/jpeg;base64,' + img64})
	print(r.json())
	if("err" in r.json()):
                # In case text is too blurry
		postCaption("Sorry, I couldn't read that")
	else:
		try:
			postCaption(r.json()['texts'][0]['description'])
		except:
			postCaption("Sorry, I couldn't read that")

# This function "trains" the facial recognition by simply storing the image
def trainFacialRecognition(name):
	camera.start_preview()
	sleep(0.25)
	camera.capture('./faces/' + name + '.jpg', format="jpeg")
	camera.stop_preview()
	# Nice verbal confirmation so user knows how Christy pronounces name
	postCaption("Nice to meet " + name)
	
# This function iterates through the saved faces to find a match
def identifyPerson():
	camera.start_preview()
	sleep(0.25)
	camera.capture('./unknown.jpg', format="jpeg")
	camera.stop_preview()
	postCaption("This will take a while")
	for img in glob.glob("./faces/*.jpg"):
		known_img = face_recognition.load_image_file(img)
		unknown_img = face_recognition.load_image_file("./unknown.jpg")
		known_encoding = face_recognition.face_encodings(known_img)[0]
		unknown_encoding = face_recognition.face_encodings(unknown_img)[0]

		if face_recognition.compare_faces([known_encoding], unknown_encoding)[0]:
			person = os.path.basename(img) # return image
			postCaption("It's " + person[:-3])
			return
	# If the face isn't there, it's probably somebody new
	postCaption("I don't know who it is")

# This function uses OpenCV to track an object
def objectTracking():
	rawCapture.truncate(0)
	img64 = base64ify()
	#Get contents of image
	r = requests.post(url+"/object-recognition", json={"b64" : 'data:image/jpeg;base64,' + img64})
	if r.json()['response']['num_detections'] == 0:
		postCaption('No objects found')
	else:
		bb_orig = r.json()['response']['boxes'][0]
		postCaption('tracking ' + r.json()['response']['classes'][0])
		tracker = cv2.TrackerMedianFlow_create()
		
		# this is the output format
		yMin = int(640 * bb_orig[0])
		xMin = int(640 * bb_orig[1])
		yMax = int(640 * bb_orig[2])
		xMax = int(640 * bb_orig[3])

		# this is what the bbox rect wants
		height = yMax - yMin 
		width = xMax - xMin

		bbox = (xMin, yMin, xMax, xMin)
		
		# Initialize tracker with first frame and bounding box
		n = 0
		ok = True
		for frame in camera.capture_continuous(rawCapture, format="bgr", use_video_port=True):
			image = frame.array
			if n == 0:
				ok = tracker.init(image, bbox)
				n += 1
			else:
				ok, bbox = tracker.update(image)
			if ok:
				# Tracking success
				p1 = (int(bbox[0]), int(bbox[1]))
				p2 = (int(bbox[0] + bbox[2]), int(bbox[1] + bbox[3]))
				cv2.rectangle(image, p1, p2, (255,0,0), 2, 1)
				if updateMotors(p1, p2, image):
					pulse()
					break
			else :
				# Tracking failure
				cv2.putText(image, "Searching...", (100,80), cv2.FONT_HERSHEY_SIMPLEX, 0.75,(0,0,255),2)
	 
			rawCapture.truncate(0)
		camera.close()

# This function handles voice input
def processVoice():
	r = requests.get(url + '/voice')

	if r.json()['command'] != None:
		pulse()
		cmd = r.json()['command'].split(' ')
		firstWord = cmd[0]
		
		#Triggers tracking
		objectTrackingWords = [
			'detect',
			'find'
		]
		#Triggers describing
		objectDescribingWords = [
			'describe',
			'show',
			'tell'
		]
		#Triggers face training
		faceTrainingWords = [
			'friend',
			'meet',
			'meat'
		]
		#Triggers face recognizing
		faceRecognizeWords = [
			'who',
			"who's",
			'whose'
		]
		#Triggers OCR
		readingWords = [
			'read'
		]

		# Based on what the command seems to be saying, execute the action
		if firstWord in objectDescribingWords:
			describeScene()
		elif firstWord in objectTrackingWords:
			objectTracking()
		elif firstWord in faceTrainingWords:
			trainFacialRecognition(cmd[1])
		elif firstWord in faceRecognizeWords:
			identifyPerson()
		elif firstWord in readingWords:
			readText()
		else:
                        # If the command was not recognized, inform user
			postCaption("Sorry, I didn't catch that")
		return True
	return False

# Simple wake up function for startup
def wakeUp():
	requests.post(url+'/phone-wait')

if __name__ == '__main__':
	while True:
                # Wake by button
		if touchBtn.value == 1:
			pulse(0.5)
			i = 0
			while touchBtn.value == 1:
				sleep(0.25)
				i += 1
			if(i < 8):
				wakeUp()
				print("wake")
			else:
				pulse()
				describeScene()
				print("des")
		else:
			print("processing")
			processVoice()
		sleep(0.25)

server.js

JavaScript
This is our server to communicate between the Raspberry Pi and application. Not my most elegant work.
// libraries
var express = require('express');
var app = express();
var bodyParser = require('body-parser');
var shelljs = require('shelljs');
var base64ToImage = require('base64-to-image');
var admin = require('firebase-admin');
var vision = require('@google-cloud/vision');
var mongoose = require('mongoose');

// mongo set up
mongoose.connect('mongodb://localhost/test');

var schema = new mongoose.Schema({
	file: { type: String },
	processed: { type: Boolean, default: false },
	response: { type: Object, default: null },
	date: { type: Date }
});

var ObjDetImage = mongoose.model('Object_Detection', schema);
var SatImage = mongoose.model('Show_Detection', schema);

// directories setup
var imagesPath = __dirname + '/images/';
var modelFile = __dirname + '/show-attend-tell/289999.npy';

// server setup
app.use(bodyParser.json({ 'limit': '5mb' }));
app.use(bodyParser.urlencoded({ extended: true }))
app.set('port', process.env.PORT || 3000)

// google firebase setup
var serviceAccount = require('<json file here>');

admin.initializeApp({
	credential: admin.credential.cert(serviceAccount),
	databaseURL: '...'
});

// Creates a google vision client
var client = new vision.ImageAnnotatorClient({
	keyFilename: __dirname + '<json file here>'
});

// variable setup
var command = null;
var phoneWait = false;
var caption = null;

// this resets the variables
app.get('/reset', function(req, res){
	command = null;
	phoneWait = false;
	caption = null;
	res.json({ success: true });
});

// get variables
app.get('/stats', function(req, res){
	res.json({
		command: command,
		phoneWait: phoneWait,
		caption: caption
	});
});

// rpi sends req to server
app.post('/phone-wait', function(req, res){
	phoneWait = true;
	res.json({ phoneWait: phoneWait });
});

// phone constantly calls this and on true, gets triggered to do voice command
app.get('/phone-wait', function(req, res){
	phoneWait = false; // no loops
	res.json({ phoneWait: phoneWait });
});

// phone gets command
app.post('/voice', function(req, res){
	command = req.body.command;
	res.json({ command: command });
});

// rpi constantly calls this and on value != null, gets triggered to send pic to obj rec
app.get('/voice', function(req, res){
	var cmd = command;
	command = null; // no loops
	res.json({ command: cmd });
});

// rpi then sends the b64 encoded image here
app.post('/object-recognition', function(req, res){
	var b64 = req.body.b64; // convert b64 to image...
	// random filename
	var filename = base64ToImage(b64, imagesPath).fileName;
	console.log(imagesPath + filename);
	// execute python
	shelljs.exec('python mongo-listener.py -src=' + imagesPath + filename + ' -m="object-detection"', function(code, stdout, stderr){
		ObjDetImage.findOne({ file: imagesPath + filename }, function(err, img){
			if(err) console.log(err);
			console.log(img);
			res.json({ response: img.response });
		});
	});
});

// or here
app.post('/show-attend-tell', function(req, res){
	var b64 = req.body.b64; // convert b64 to image...
	// random filename
	var filename = base64ToImage(b64, imagesPath).fileName;
	console.log(imagesPath + filename);
	// execute python
	shelljs.exec('python mongo-listener.py -src=' + imagesPath + filename + ' -m="show-attend-tell"', function(code, stdout, stderr){
		SatImage.findOne({ file: imagesPath + filename }, function(err, img){
			if(err) console.log(err);

			res.json({ response: img.response });
		});
	});
});

// or here!
app.post('/ocr', function(req, res){
	var b64 = req.body.b64; // convert b64 to image...
	// random filename
	var filename = base64ToImage(b64, imagesPath).fileName;
	// Performs label detection on the image file
	console.log('file uploading');
	console.log('no errors');
	// upload to google cloud vision
	client
		.textDetection(imagesPath + filename)
		.then(response => {
			var texts = response[0].textAnnotations;
			res.json({ texts: texts });
		})
		.catch(err => {
			console.error('ERROR:', err);
			res.json({ err: err });
		});
});

// tell the phone what to say
app.post('/caption', function(req, res){
	caption = req.body.caption;
	res.json({caption: caption});
});

// phone gets caption
app.get('/caption', function(req, res){
	var capt = caption;
	caption = null; // no loops
	res.json({caption: capt});
});

function getOutputJSON(str){
	return JSON.parse(str.substring(str.indexOf('{')-1, str.indexOf('}')+1)); 
}

// run the server
app.listen(app.get('port'), '0.0.0.0', function(){
	console.log('app listening on port: ' + app.get('port'));
});

server.py

Python
This is the code for the caption generator server. This is similar to the object detection server.
#!/usr/bin/python
# imports
import tensorflow as tf
import sys
import json
import pymongo

from pymongo import MongoClient
from config import Config
from model import CaptionGenerator
from dataset import prepare_test_data, custom_prepare_test_data

# basic mongo setup
client = MongoClient()
db = client.test
images_collection = db.show_detections

# flags
FLAGS = tf.app.flags.FLAGS

tf.flags.DEFINE_string('phase', 'test',
					   'The phase can be train, eval or test')

tf.flags.DEFINE_boolean('load', False,
						'Turn on to load a pretrained model from either \
						the latest checkpoint or a specified file')

tf.flags.DEFINE_string('model_file', './289999.npy',
					   'If sepcified, load a pretrained model from this file')

tf.flags.DEFINE_boolean('load_cnn', False,
						'Turn on to load a pretrained CNN model')

tf.flags.DEFINE_string('cnn_model_file', './vgg16_no_fc.npy',
					   'The file containing a pretrained CNN model')

tf.flags.DEFINE_boolean('train_cnn', False,
						'Turn on to train both CNN and RNN. \
						 Otherwise, only RNN is trained')

tf.flags.DEFINE_integer('beam_size', 3,
						'The size of beam search for caption generation')
						
# main function
def main(argv):
  # configuration from flags
	config = Config()
	config.phase = FLAGS.phase
	config.train_cnn = FLAGS.train_cnn
	config.beam_size = FLAGS.beam_size

  # setup tensorflow session
	with tf.Session() as sess:

		# load model into VRAM
		model = CaptionGenerator(config)
		model.load(sess, FLAGS.model_file)

		print "Running show-attend-tell machine learning server..."
    
    # forever generate captions for new files that are put onto MongoDB
		while True:
			for image in images_collection.find().sort('date', pymongo.DESCENDING):
				if not image["processed"]:
					config.image_file = image["file"]
					
					# testing phase
					data, vocabulary = custom_prepare_test_data(config)
					tf.get_default_graph().finalize()
					prediction = model.test(sess, data, vocabulary)
					# bc there is only one item
					images_collection.find_one_and_update({ "file": image["file"] }, { "$set": { "response": { 'caption': prediction['caption'][0], 'prob': prediction['prob'][0] }, "processed": True } })

if __name__ == '__main__':
	tf.app.run()

mongo-listener.py

Python
This listens for a change in MongoDB to serve back to the user.
# imports
import argparse
import datetime

from pymongo import MongoClient

# basic mongo setup
client = MongoClient()
db = client.test
images_collection = None

# run this if this file is not imported
if __name__ == '__main__':
  # command line arguments
	parser = argparse.ArgumentParser()
	parser.add_argument('-src', '--source', dest='image_path', type=str,
						default=None, help='Path to image')
	parser.add_argument('-m', '--model', dest='model', type=str,
						default=None, help='Which model to predict the image')

  # tensorflow models
	models = [
		'show-attend-tell',
		'object-detection'
	]
  
  # check for errors in args
	args = parser.parse_args()
	if not args.image_path:
		raise ValueError('Arguments must contain a source image!')
	if not args.model:
		raise ValueError('Arguments must contain a model!')
	if args.model not in models:
		raise ValueError('Model name must be valid!')

	if args.model == 'object-detection':
		images_collection = db.object_detections
	elif args.model == 'show-attend-tell':
		images_collection = db.show_detections

  # create image object
	image = {
		"file": args.image_path,
		"processed": False,
		"response": None,
		"date": datetime.datetime.utcnow()
	}
  
  # insert into MongoDB for ML server to recognize
	images_collection.insert(image)

  # listen for a change in MongoDB and then return response to server
	while True:
		if images_collection.find_one({ "file": args.image_path })["processed"]:
			break

Credits

Devin Mui
4 projects • 13 followers
My favorite data structure is a linked list 🐍. What's yours? Undergraduate CS @ University of Southern California 22'
Jaiveer Singh
1 project • 4 followers
Incoming EECS+Business at UC Berkeley M.E.T. '22
Jaimie Jin
0 projects • 2 followers
Jesse Liang
1 project • 2 followers
Self-taught programmer. Hackathon hacker. From San Francisco, but attending UCSD.

Comments