Waking up lazily Friday morning of the last weekend of 2017 and getting my plans together for New year's party with friends, I was checking out headlines to start my day. News of the fire accident in Bronx, NY dimmed my spirits. A 3-year old playing with stove caused fire in the entire building and claimed 12 lives. Most of the people who were killed were on a higher floor (see picture below).
Firefighters often start helping from the floor that caused fire (See below: excerpt from the book: "Special Report: Operational Considerations for High rise Firefighting"), but this incident proved that it may not be the floor with maximum causalities. That got me thinking: What if firefighters could go to the floor that needed the most help first and as a result, save more people! I was determined to do something that can empower firefighters do their job optimally, aka save more people.
So with that, lets get started!
RASPBERRY PI:The Raspberry Pi part of this project is easy to set up. Simply install the required components and run the python file.
I used Ubuntu Mate as I tested on an Ubuntu computer originally and did not want to edit and modify that much code.
First, go to the Walabot Site and download the Raspberry Pi SDK and install using gDebi
Next, Install the required python packages
pip install enum
pip install requests
cd /usr/share/walabot/python
pip install WalabotAPI-1.0.21.zip
Next, plug in the Walabot via USB
Next, paste this code into "firefury.py":
from __future__ import print_function
from sys import platform
from os import system
from imp import load_source
from enum import Enum
from os.path import join
import time
import requests
import json
if platform == 'win32':
modulePath = join('C:/', 'Program Files', 'Walabot', 'WalabotSDK',
'python', 'WalabotAPI.py')
elif platform.startswith('linux'):
modulePath = join('/usr', 'share', 'walabot', 'python', 'WalabotAPI.py')
import WalabotAPI as walabotAPI
walabotAPI.Init()
wlbt = load_source('WalabotAPI', modulePath)
wlbt.Init()
def PrintTrackerTargets(targets):
system('cls' if platform == 'win32' else 'clear')
if targets:
for i, target in enumerate(targets):
print(('y: {}'.format(target.yPosCm)))
else:
print('No Target Detected')
class Placement(Enum):
Empty = 0
Back = 1
Front = 2
class State(Enum):
Idle = 0
Bi = 1
Fi = 2
Bo = 3
Fo = 4
def _get_placement(targets):
if len(targets) is 0:
return Placement.Empty
if targets[0].yPosCm > 0:
return Placement.Front
if targets[0].yPosCm <= 0:
return Placement.Back
class PeopleCounter:
def __init__(self):
self.placement = Placement.Empty
self.state = State.Idle
self.count = 0
self.state_machine = {
State.Idle:
{Placement.Empty: State.Idle,
Placement.Back: State.Bi,
Placement.Front: State.Fo},
State.Bi:
{Placement.Empty: State.Idle,
Placement.Back: State.Bi,
Placement.Front: State.Fi},
State.Fi:
{Placement.Empty: State.Idle,
Placement.Back: State.Bi,
Placement.Front: State.Fi},
State.Fo:
{Placement.Empty: State.Idle,
Placement.Back: State.Bo,
Placement.Front: State.Fo},
State.Bo:
{Placement.Empty: State.Idle,
Placement.Back: State.Bo,
Placement.Front: State.Fo},
}
def update_state_get_count(self, targets):
self.placement = _get_placement(targets)
prev_state = self.state
self.state = self.state_machine[self.state][self.placement]
if prev_state == State.Bo and self.state == State.Idle:
self._decrement()
elif prev_state == State.Fi and self.state == State.Idle:
self._increment()
return self.count
def _increment(self):
self.count += 1
return State.Idle
def _decrement(self):
self.count = max(self.count - 1, 0)
return State.Idle
def PeopleCounterApp():
people_counter = PeopleCounter()
rArenaMin, rArenaMax, rArenaRes = 5, 120, 5
phiArenaMin, phiArenaMax, phiArenaRes = -60, 60, 3
thetaArenaMin, thetaArenaMax, thetaArenaRes = -20, 20, 10
walabotAPI.SetSettingsFolder()
walabotAPI.ConnectAny()
walabotAPI.SetProfile(wlbt.PROF_SENSOR)
walabotAPI.SetArenaR(rArenaMin, rArenaMax, rArenaRes)
walabotAPI.SetArenaPhi(phiArenaMin, phiArenaMax, phiArenaRes)
walabotAPI.SetArenaTheta(thetaArenaMin, thetaArenaMax, thetaArenaRes)
walabotAPI.SetDynamicImageFilter(walabotAPI.FILTER_TYPE_MTI)
walabotAPI.Start()
try:
num_of_people = 0
while True:
#Checks how many people there are
walabotAPI.Trigger()
targets = walabotAPI.GetSensorTargets()
print (targets)
targets1 = len(walabotAPI.GetSensorTargets())
Address = "75 Mountain Street, Brockton, CA"
floor = 3
Room = "12A"
payload={'Address':Address, 'floor' : floor, 'Room' : Room, 'Peeps': targets1}
r=requests.post("
targets = sorted(targets, key=lambda x: x.zPosCm, reverse=True)
prev_num_of_people = num_of_people
num_of_people = people_counter.update_state_get_count(targets)
if prev_num_of_people != num_of_people:
print('# {} #\n'.format(num_of_people))
time.sleep(1)
except KeyboardInterrupt:
pass
finally:
walabotAPI.Stop()
walabotAPI.Disconnect()
print('Terminated successfully!')
if __name__ == '__main__':
PeopleCounterApp()
With that, you are all setup on the Raspberry Pi
Windows Server Deployment:Since we want this program to only be administrable by Fire Department Admins, we are making all our servers and skills in their control therefore, in real deployment, you should not publish your skills.
First, go to the MySQL Server site and download MySQL Server for Windows. Then, install it.
Next, Install the required python packages
pip install flask
pip install flask_cors
pip install pymysql
pip install flask_sqlalchemy
pip install flask_ask
pip install pymysql
Next, create a new file called app1.py and paste the following code in:
from flask import Flask
from flask import request
from flask_cors import CORS
import json
import requests
import pymysql
app = Flask(__name__)
CORS(app)
app.config["DEBUG"] = True
db = pymysql.connect(host='localhost',port=3306,user='root',passwd='Srivibhu10',db='users')
@app.route('/post', methods=['POST'])
def Us():
Given_Address = request.json['Address']
Given_floor = request.json['floor']
Given_Room = request.json['Room']
Given_Peeps = request.json['Peeps']
cursor = db.cursor()
select_Address = ("select count(*) from users.user where Address=%s ")
select_Addrss_Floor = ("select count(*) from users.user where Address=%s and floor=%s")
select_Addrss_Floor_Room = ("select count(*) from users.user where Address=%s and floor=%s and Room=%s")
if len(Given_Address) > 0 and len(Given_floor) > 0 and len(Given_Room) > 0 and Given_Peeps != "0":
#check if all values are present in database
check_allfields=(Given_Address,Given_floor,Given_Room)
cursor.execute(select_Addrss_Floor_Room,check_allfields)
rows = cursor.fetchone()
if rows[0] > 0:
Update_noofpeople = ("UPDATE users.user set Peeps= %s where Address=%s and Floor=%s and Room=%s")
updated_data=(Given_Peeps,Given_Address,Given_floor,Given_Room)
cursor.execute(Update_noofpeople,updated_data)
db.commit()
cursor.close()
return ("Hi..Updated")
else:
# Add new record
Insert_newrecord=("Insert into users.user(Address,floor,Room,Peeps) VALUES (%s,%s,%s,%s)")
Insert_allfields=(Given_Address,Given_floor,Given_Room,Given_Peeps)
cursor = db.cursor()
cursor.execute(Insert_newrecord,Insert_allfields)
db.commit()
cursor.close()
return ("Hi....Inserted")
else:
return ("Hi in ELSE BLOCK ....")
def GetPeopleCounts(Address):
#return (Address)
Input_Address = Address
# check if this record is already present
select_noofrows=("select floor,sum(peeps) from users.user where UPPER(Address)=UPPER(%s) group by floor order by sum(peeps) desc")
cursor = db.cursor()
cursor.execute(select_noofrows,Input_Address)
#print(cursor.description)
if cursor.rowcount > 0:
#data = cursor.fetchall()
return_msg = "Number of people in each floor at this address "
for row in cursor:
return_msg = return_msg + " Floor "+row[0]+" People "+str(int(row[1]))
return (return_msg)
'''
for row in cursor:
#print(row)
return(row)
'''
else:
return("No Records Found")
db.commit()
cursor.close()
if __name__ == '__main__':
app.run()
You can use ngrok to create a static url for localhost:5000 like I did. That way even if your computer gets a new IP address, you can still post to the same URL.
To finish our project off, lets create a skill leading to our flask_ask skill
Only the launch_intent matters to us. When the app launches, it immediately asks for the address, returns the values, and then quits out. This is a measure of saving time.
Lets create our Flask_Ask program now:Simply create a file called flaskaskpy.py and paste the following code in:
from flask import Flask
from flask_ask import Ask, request, session, question, statement
from flask_cors import CORS
import json
from app1 import GetPeopleCounts
app = Flask(__name__)
ask = Ask(app, '/')
app.config["DEBUG"] = True
@ask.launch
def launch():
speech_text = "What Address to check"
return question(speech_text)
@ask.intent('MainIntent')
def MainIntent(Address):
#return statement("{}".format(Address))
#print (GetPeopleCounts("{}".format(Address)))
return statement(GetPeopleCounts("{}".format(Address)))
if __name__ == '__main__':
app.run(debug=True, port = 5001)
If you are running this program off localhost, you should use Ngrok and forward port 5001
Skill Creation:
Simply create a skill called fire fury and fill out the fields as follows:
Here is the link to the finished skill: https://www.amazon.com/dp/B078RCJFYC/ref=sr_1_1?s=digital-skills&ie=UTF8&qid=1515158289&sr=1-1&keywords=fire+fury
After that, we can finally test out our device!
Example Conversation:
- Human: "Alexa, open fire fury"
- Alexa: "Which address do you want to check?"
- Human: "75 Mountain Street, Brockton, CA"
- Alexa: "Floor 3 People 8, Floor 6 People 3, Floor 8 People 2, Floor 7 People 1 "
Here is a video of it working with a known address:
Here is a video of it working with a unknown address:
Thats all, you are now finished! Your building now has a lower fire mortality rate with just a few hours of work!
My skill is published in the store and located at:
Comments