Coming back from our vacation, me and my wife were surprised by the water covering the floor of our apartment, and we found out it's not even clean water, it's drain everywhere. After clearing the drain and cleaning the floor, I had this question: why don't we have an alarm system for potential drain clogs?
Clogged drains not only can bring your home to a halt, but will consume additional costs from your pockets, $206 on average is cost of clearing a clogged drain according to HomeAdvisor, in addition to hidden costs of damaged carpets, wooden furniture, ... etc.
Our idea is to let home owners as well as enterprises like city/compounds maintenance departments and specialized service providers to have an efficient and intelligent system that alerts whoever in charge as early as possible to take action, which contributes in enriching smart cities with an important feature.
The IdeaAlthough detection of clogs can be done through a number of techniques, like using gas sensors or internal mechanisms, our team was focused on using sound as our input, as we know that knocking on a tube where it's opened is different sound from that happened when being closed.
According to this simple concept, if we can train a model the sound patterns occurring on tube surface during clogs as well as those patterns occurs in opened pipes, we can then apply the model to detect proactively when a clog starts to compose, and we then ring some bills.
Project in detail3 phases are implemented in this project: gathering data, learning & prediction.
Before applying this system in real life, we needed to create an enforced simulation environment, where we have the pipe, flowing water, and somehow to simulate the clog. So, we got a tube, a water hose with a water source doing this in the bath tub, and using the tub surface to close the tube which represents the clog.
In this video, we explain how we built the environment and how we collected data for the model training.
And in this next video, showing how we did the testing for the system and the model, in open mode, then in clog mode and back to open mode, however
So, lets explore our implementation step by step:
1. Gathering Data, Data Analysis Phase & Learning PhaseA. The Experiment
In this scenario we use a small water pipe connected to our hardware and sound sensor. Hardware reads the sensor value and send it back to Cloud.
This has been done for 10 mins for a blocked tube, then another 10 mins for a tube that isn't blocked.
B. Hardware
I. Arduino
To detect the water sound inside the pipe we need a sound sensor. However Raspberry Pi 3 doesn't have Analog GPIO. To handle this issue we use Arduino as Arduino has analog GPIO.
So we connect Grove Sound sensor to Grove Arduino shield and connect Shield to Arduino UNO 3. Then we connect Arduino & Raspberry using USB cable.
To get more information about Grove Sound sensor, you can check its data sheet. You can find in data sheet a sample code how to read sensor values. Sample Code is almost use will small changes.
- In below code we connect sensor to A0 in shield.
- To write on serial, we use Serial.begin() function. To communicate with Raspberry baud rate set to 115200
- Data will be sent to Raspberry if it is bigger than certain threshold to cut the noise
- Many trials has been done to choose the desired threshold & delay values. Threshold found to be 400 & Delay value to be 10 millisecond.
- Threshold has been chosen to filter normal noise & ensure that only meaningful data will be sent to the cloud.
- Delay has been chosen in away to ensure that sensor has detect any changes in flow sound inside the tube immediately.
#define SOUND_SENSOR A0
#define THRESHOLD_VALUE 400
void setup()
{
Serial.begin(115200);
pinMode(SOUND_SENSOR, INPUT);
}
void loop()
{
int sensorValue = analogRead(SOUND_SENSOR);//use A0 to read the electrical signal
//send data to raspberry
if( sensorValue > THRESHOLD_VALUE)
{
Serial.println(sensorValue);
}
delay(10);
}
II. Raspberry Pi 3
To download android things on Raspberry, you can download the latest version from Android Things Console. In this project we use version: OIR1.170720.017. follow steps in Raspberry site to install operating system on raspberry, for windows you can use these steps
After installation you can connect the Raspberry to your computer using USB. Then in your computer console use below command to get Raspberry IP:
nmap -sn 192.168.1.*
After getting the IP, connect to your Raspberry using below command:
adb connect <ip_address_android_things>
To connect your Raspberry to Wifi (add your SSID & password):
adb am startservice \
-n com.google.wifisetup/.WifiSetupService \
-a WifiSetupService.Connect \
-e ssid ***** \
-e passphrase ****
C. Google Cloud
Google offers a free tier for all users for one year with ceiling of 300$. Thanks to Google :).
Follow the below screens to create new project in Google Cloud:
I. Pub/Sub
Google Cloud Pub/Sub is a fully-managed real-time messaging service that allows you to send and receive messages between independent applications. follow below screen to create new topic.
II. IOT Core
A fully managed service to easily and securely connect, manage, and ingest data from globally dispersed devices. IOT Core still Beta, to have access on it you need to make a request with Justification to Google. We made the request, our justification was this contest. Google approved, Thanks to Google again :).
Raspberry will send sensor data to IOT Core which will forward readings to Pub/Sub topic created in previous step
III. Cloud Functions
Cloud Functions is a serverless environment to build and connect cloud services. Trigger for this function is the Pub/Sub topic that created in step 1.
This function will be triggered when new value written in Pub/Sub and write it in Cloud DataStore with Kind "SoundValue"
'use strict';
const Datastore = require('@google-cloud/datastore');
const ds = Datastore({
projectId:".............."
});
const kind = 'SoundValue';
function toDatastore (obj, nonIndexed) {
nonIndexed = nonIndexed || [];
const results = [];
Object.keys(obj).forEach((k) => {
if (obj[k] === undefined) {
return;
}
results.push({
name: k,
value: obj[k],
excludeFromIndexes: nonIndexed.indexOf(k) !== -1
});
});
return results;
}
exports.subscribe = function subscribe(event, callback) {
const pubsubMessage = event.data;
let buffer = {};
buffer = Buffer.from(pubsubMessage.data, 'base64').toString() ;
let dataArray = JSON.parse(buffer);
let data = dataArray.data[0];
let key = ds.key(kind);
let result = toDatastore(data, ['description']) ;
const item = {
key: key,
data: result
};
ds.save(
item,
(err) => {
data.id = item.key.id;
callback(err);
}
);
callback();
};
IV. Cloud DataStore
Google Cloud Datastore is a NoSQL document database built for automatic scaling, high performance, and ease of application development. While the Cloud Datastore interface has many of the same features as traditional databases, as a NoSQL database it differs from them in the way it describes relationships between data objects.
No need for any setup as once the Cloud Functions write sensor values to DataStore, data will be added to DataStore as below screen
V. BigQuery
We gather a sample 10 min from normal pipe & 10 min from blocked pipe with difference exactly 1 hour between the 2 iterations. After downloading data DataStore and make some manipulation to add classification for each row. Now we have 2 csv files one for each category. As best practice upload data CSV files first to Cloud Storage. In below screen we create a new bucket & upload the 2 CSVs files
As this bucket will be used for analysis only, no need to choose Multi-regional bucket.
Then create new Dataset & new table in BigQuery and upload the 2 CSVs file from bucket to the new table.
VI. Data Studio
Then we use Data Studio to draw some insights. Data Studio will read data from BigQuery table.
Final report show 3 graphs plus telemetries table.
From graphs we can see the difference between 2 categories in number of telemetries and sum of values per minute. Based on these insights we can design a simple model, pipe is considered blocked if
- in 3 successive minutes, count of telemetries values that are higher than noise threshold (400) is more than 350 telemetries.
and
- in 3 successive minutes, count of telemetries value that is higher than spark threshold (720) is more than 10 telemetries.
D. Software
To create new android things project, you can check those samples. Especially below samples
- Simple PIO
- Weather Station
- Tensorflow
- Cloud IOT Sensor hub
I. Serial Port
To be notified when an external USB device is attached to the Android device. This can be done adding a new intent-filter entry to MainActivity that should be started by the system when a USB device is plugged in.
<!-- Launch when USB device attached -->
<intent-filter>
<action android:name="android.hardware.usb.action.USB_DEVICE_ATTACHED" />
</intent-filter>
<meta-data
android:name="android.hardware.usb.action.USB_DEVICE_ATTACHED"
android:resource="@xml/device_filter"/>
To ensure that you will be notified only when Arduino is connected to USB and not anything else, add new file in res folder "xml/device_filter.xml". Arduino Vendor ID is 9025.
<?xml version="1.0" encoding="utf-8"?>
<resources>
<usb-device vendor-id="9025"/>
</resources>
The following code will ensure that only data received from Arduino on USB port is considered.
private static final int USB_VENDOR_ID = 0x2341; // 9025
private static final int USB_PRODUCT_ID = 0x0043; // 67
if (UsbManager.ACTION_USB_DEVICE_DETACHED.equals(action)) {
UsbDevice device = intent.getParcelableExtra(UsbManager.EXTRA_DEVICE);
if (device != null && device.getVendorId() == USB_VENDOR_ID &&
Log.i(TAG, "Device found: " + device.getDeviceName());
}
}
To open a serial connection between the Arduino and the Android device, we use felHR85’s USBSerial library. Below code to initialize the USB port, as you can see we use the same BaudRate = 115200.
connection = usbManager.openDevice(device);
serialDevice = UsbSerialDevice.createUsbSerialDevice(device, connection);
if (serialDevice != null) {
if (serialDevice.open()) {
serialDevice.setBaudRate(115200);
serialDevice.setDataBits(UsbSerialInterface.DATA_BITS_8);
serialDevice.setStopBits(UsbSerialInterface.STOP_BITS_1);
serialDevice.setParity(UsbSerialInterface.PARITY_NONE);
serialDevice.setFlowControl(UsbSerialInterface.FLOW_CONTROL_OFF);
serialDevice.read(callback);
Log.i(TAG, "Serial connection opened");
}
}
Expected data received is a number, so we discard any empty or non numeric values.
II. Connect to Cloud
In phase of Gathering sensor data, we will need to send data to Google cloud. We follow this example.
Set below information based on IDs & names created in Google Cloud
options.projectId = ".........";
options.registryId = ".......";
options.deviceId = "......";
options.cloudRegion = ".....";
To register the device and enable it to receive data, we need to extract Public Key from Raspberry. The below code will save Public key of Raspberry:
public String getCertificatePEM() throws GeneralSecurityException {
StringBuilder sb = new StringBuilder();
sb.append("-----BEGIN CERTIFICATE-----\n");
sb.append(Base64.encodeToString(certificate.getEncoded(), Base64.DEFAULT));
sb.append("-----END CERTIFICATE-----\n");
Log.i(TAG, sb.toString());
return sb.toString();
}
This Public key will be saved on file called "cloud_iot_auth_certificate.pem", to move this file from Raspberry to your computer. This will move the file to:
adb pull /sdcard/cloud_iot_auth_certificate.pem
To set Public key for created device in Cloud, use the below command:
gcloud beta iot devices create <DEVICE_ID> --project=<PROJECT_ID> --region=<CLOUD_REGION> --registry=<REGISTRY_ID> --public-key path=cloud_iot_auth_certificate.pem,type=rs256
When Raspberry reads sensor values, it sends those data to cloud.
private static final int MQTT_QOS = 1;
private static final boolean SHOULD_RETAIN = false;
private void sendMessage(String mqttTopic, byte[] mqttMessage) throws MqttException {
mqttClient.publish(mqttTopic, mqttMessage, MQTT_QOS, SHOULD_RETAIN);
}
2. Prediction PhaseWe had 2 models can be used:
A. Fisher Linear discriminant analysis through R, Python & TensorFlow
A machine learning algorithm classify the readings into two classes, developed using R applying the Fisher rules , where class 1 represents open mode, and class two represents clog mode.
As we have 20 minutes of readings, 10 for open mode, and 10 for clog mode, each minute of data was stored in different file and was represented by a vector, and labeled with matching mode, each file contains 60000 readings (60 seconds * 1000 milliseconds).
In the below code, classifier matrix w & w0 stored in wClassifiers & w0Classifiers, which then used in validation over a 6 minutes of data (3 open & 3 closed).
require("MASS")
#Reading sensor values stored into a list of matrices####
classList <- list()
#open mode
for (i in 1:10) {
fileName = paste("open", i , ".csv", sep = "")
x=read.csv(fileName)
y = x
#Lineup matrix columns to a single vector
xdim=y[,1]
for (j in 2:60000) {
xdim=append(xdim,y[,j])
}
#Add new vector as a column in class matrix
classMat = cbind(classMat,xdim)
}
#Add each class matrix to a class list of all modes (classes)
classList[[1]] = classMat
#repeat for clog mode
for (i in 1:10) {
fileName = paste("clog", i , ".csv", sep = "")
x=read.csv(fileName)
y = x
#Lineup matrix columns to a single vector
xdim=y[,1]
for (j in 2:60000) {
xdim=append(xdim,y[,j])
}
#Add new vector as a column in class matrix
classMat = cbind(classMat,xdim)
}
#Add each class matrix to a class list of all modes (classes)
classList[[2]] = classMat
#Train classifiers matrix wClassifiers####
#First class (open) mean
class_1 = classList[[1]]
class_1_SUM = class_1[,1]
for (i in 2:10) {
class_1_SUM = class_1_SUM + class_1[,i]
}
#Calculate class 1 mean
m1 = class_1_SUM / 10
#Initialize Class 2 Matrix and summation vector
class_2 = vector(mode="numeric", length=60000)
class_2_SUM = class_2
#Second class (clog) mean
class_2_MAT = classList[[2]]
for (i in 1:10) {
xdim = class_2_MAT[,i]
class_2 = cbind(class_2,xdim)
}
#Remove initialization vector
class_2 = class_2[,-1]
for (i in 1:10) {
xdim = class_2[,i]
class_2_SUM = class_2_SUM + xdim
}
#Calculate class 2 mean
m2 = class_2_SUM / 10
#First class (open) variance
class_1_var <- matrix(0,10,60000)
for (i in 1:10) {
xvar = class_1[,i] - m1
xvart = t(xvar)
class_1_var = class_1_var + (xvar %*% xvart)
}
#Second class (clog) variance
class_2_var <- matrix(0,10,60000)
for (i in 1:10) {
xvar = class_2[,i] - m2
xvart = t(xvar)
class_2_var = class_2_var + (xvar %*% xvart)
}
#Variance matrix
SW = class_1_var + class_2_var
#Invert variance matrix
SWinv = ginv(SW)
#Calculate w vector
w = 0.1 * ( SWinv %*% (m2 - m1) )
#Calculate w0
wtr = t(w)
w0 = -0.5 * ( wtr %*% (m1 + m2) )
#Check formula, expected class 1 -> -ve values, class 2 -> +ve values
wClassifiers = w
w0Classifiers = w0
#Variance matrix
SW = class_1_var + class_2_var
#Invert variance matrix
SWinv = ginv(SW)
#Calculate w vector
w = 0.1 * ( SWinv %*% (m2 - m1) )
#Calculate w0
wtr = t(w)
w0 = -0.5 * ( wtr %*% (m1 + m2) )
#Check formula, expected class 1 -> -ve values, class 2 -> +ve values
#rep(w0,7) + wtr %*% class_1
#rep(w0,175) + wtr %*% class_2
wClassifiers = cbind(wClassifiers,w)
w0Classifiers = cbind(w0Classifiers,w0)
####################### Training Complete #######################
#Check classifiers, expected target class -> -ve values, others -> +ve values
####################### Start Testing #######################
#Read Testing values ####
Readings <- numeric()
for (i in 1:6) {
fileName = paste("../Test/A1", l , i , ".csv", sep = "")
#Read image as a matrix
x=read.csv(fileName)
y=x
#Lineup matrix columns to a single vector
xdim=y[,1]
for (j in 2:60000) {
xdim=append(xdim,y[,j])
}
#Add vector as a column in class matrix
Readings = cbind(Readings,xdim)
}
#Apply classifiers
w0MAT = replicate ( 6, as.vector (w0Classifiers) )
classification = t(wClassifiers) %*% Readings + w0MAT
#Assign "open" to values > 0
classification[classification > 0] <- "Open"
#Assign "clog" to values < 0
classification[classification < 0] <- "Clog"
As the model created and tested, we can move it to TensorFlow as a "SavedModel" using implementation in Python, after exporting it to a MetaGraph.
The following code used to build a SavedModel:
expdir = '/tmp'
builder = tf.saved_model_builder.SavedModelBuilder(expdir)
with tf.Session(graph=tf.Graph()) as savemod:
builder.add_meta_graph_and_variables(savemod, [tag_constants.TRAINING], signature_def_map=sigmap,assets_collection=assetscol)
builder.save()
Then model need to be deployed on Google Cloud Platform Console as follows:
By creating the model version and pointing "Source" to training artifacts stored in Cloud Storage, the new version appears in versions list to be called and serve.
TensorFlow - Using for Predictions
In prediction phase, Raspberry read sensor data and use tensorflow generated model (as a result of training) to detect if the pipe is blocked (turn the LED ON) or not (turn the LED to off).
To initialize tensorflow model
private boolean predictionMode; //if false, Training mode. if true, prediction mode
private PredictionInterface predictionInterface;
private static List input_signal;
//Tesnorflow
predictionMode = true;
predictionInterface = new PredictionInterface(getApplicationContext());
input_signal = new ArrayList();
To initialize the LED
private static final String LED1_PIN = "BCM17";
private Handler mHandler = new Handler();
private Gpio mLed1;
PeripheralManagerService service = new PeripheralManagerService();
try {
mLed1 = service.openGpio(LED1_PIN);
mLed1.setDirection(Gpio.DIRECTION_OUT_INITIALLY_LOW);
} catch (IOException e) {
Log.e(TAG, "Error on PeripheralIO API", e);
}
below code will execute tensorflow model and turn LED ON/OFF based on model output.
private void activityPrediction(float soundValue)
{
input_signal.add(soundValue);
// Perform inference using Tensorflow
float[] results = predictionInterface.getActivityProb(toFloatArray(input_signal));
//result will be true or false. if true, turn the LED on
try {
if (round(results[0], 2) > 0) {
mLed1.setValue(true);
} else {
mLed1.setValue(false);
}
} catch (IOException e) {
Log.e(TAG, "Error on PeripheralIO API", e);
}
// Clear all the values
input_signal.clear();
}
tensorflow model file "optimized_har.pb" will be saved in assets folder
B. Time-series model, count of telemetries per minute
We refer to a reading, when it exceeds a certain value (THRESHOLD_VALUE
) which was set to 350 that filters noise and lower water flow rates in the tube, from being considered as a reading
Data Analysis
Data analysis shown that in open mode the number of readings is less than 100, but in clog mode, values are far higher (reached 900 per minute), but in rare cases were also less than 100. However, these cases are not repeated consequently, and for three consequent minutes, total number of readings always exceeded 350. Having open mode in same three minutes will sum up less than a 300, we could confidently put this rule:
Rule # 1 --> For three minutes in a raw, if total readings > 350 , then a clog is detected.
We found maximum value reached in open mode doesn't exceed a certain value (SPARK_VALUE
) which is found to be 770, so we added this rule:
Rule # 2 --> If reading value > 350 , then a clog is mostly detected.
Combining both rules, gave us an easy way to implement the detection logic, as shown. Notice that below code was deployed on Arduino which then evaluates the received telemetries based on our model and send to raspberry if pipe is clogged or open.
void loop()
{
watch++;
timer++;
int sensorValue = analogRead(SOUND_SENSOR);//use A0 to read the electrical signal
if( sensorValue > THRESHOLD_VALUE)
{
count++;
}
if( sensorValue > SPARK_VALUE)
{
count_spike++;
}
if(timer >= 6000 )
{
int minute_count = watch % 18000;
if (minute_count < 6000)
{
count1 = count;
count_spike1 = count_spike;
}
else if (minute_count >= 6000 && minute_count < 12000)
{
count2 = count;
count_spike2 = count_spike;
}
else if (minute_count >= 12000)
{
count3 = count;
count_spike3 = count_spike;
}
if( count_spike1+count_spike2+count_spike3 >= 10 && count1+count2+count3 >= 350)
Serial.println(2); //blocked
else
Serial.println(0); //not blocked
timer = 0 ;
count = 0;
count_spike = 0;
}
delay(10);
}
In Raspberry, the below code will show how we can switch from gathering data mode to prediction mode:
// 0 => Training mode. 1 => prediction - Simple model. 2 => prediction - Tensorflow
private int predictionMode;
private static int BLOCK_COUNT =60 ;
//dataStr is the received String from USB port
if(!dataStr.isEmpty())
{
float datafloat ;
try {
datafloat = Float.valueOf(dataStr);
if( datafloat < 1000)
{
if ( predictionMode == 0 )// training mode
collectSensorOnChange(SENSOR_TYPE_SOUND_DETECTION , datafloat );
else if ( predictionMode == 1 )// Prediction mode - count model
activityPredictionSimpleModel(datafloat);
else if ( predictionMode == 2 )// Prediction mode - tesnorflow
activityPrediction(datafloat);
}
} catch(NumberFormatException e) {
Log.e(TAG, "Error receiving USB data", e);
}
}
The code below will turn the LED on or off based on received input from Arduino:
private void activityPredictionSimpleModel(float count)
{
try {
if ( count > 2 ) {
mLed1.setValue(true);
} else {
mLed1.setValue(false);
}
} catch (IOException e) {
Log.e(TAG, "Error on PeripheralIO API", e);
}
}
Comments