Guillermo Perez Guillen
Published © MIT

Smart Alert Using Blues and Machine Learning

Innovative device made with Blues, gas sensors and Edge Impulse. Useful for visual impaired people to detect smoke, and alcoholic drinks

IntermediateFull instructions provided8 hours76

Things used in this project

Hardware components

Blues Swan
Blues Swan
×1
Carbon Monoxide Sensor (MQ7)
DFRobot Carbon Monoxide Sensor (MQ7)
×1
MQ-135 Gas Sensor
×1
USB-TTL UART Serial Adapter (CP2104)
M5Stack USB-TTL UART Serial Adapter (CP2104)
×1
Buzzer
Buzzer
×1
LED (generic)
LED (generic)
×1
Rechargeable Battery, 4.8 V
Rechargeable Battery, 4.8 V
×1

Software apps and online services

Arduino IDE
Arduino IDE
Edge Impulse Studio
Edge Impulse Studio
Python
Python

Hand tools and fabrication machines

Box, General Purpose
Box, General Purpose

Story

Read more

Schematics

Schematic for data collection

Schematic for inference

Code

data_collection.ino

Arduino
Code used in data collection section
// Author: Guillermo Perez Guillen

#include <Wire.h>

// Constants
#define SAMPLING_FREQ_HZ    100       // Sampling frequency (Hz)
#define SAMPLING_PERIOD_MS  1000 / SAMPLING_FREQ_HZ   // Sampling period (ms)
#define NUM_SAMPLES         100       // 100 samples at 100 Hz is 1 sec window

int analogPin1 = PA3; //A0 MQ-135   
int analogPin2 = PC3; //A2 MQ-7   
int val1 = 0;           
int val2 = 0;           

void setup() {
  // Enable button pin
  pinMode(USER_BTN, INPUT_PULLUP); //BTN pin
  pinMode(LED_BUILTIN, OUTPUT); //LED pin

  // Start serial
  Serial1.setRx(PA10);                       //Redefine required Serial1 RX pin
  Serial1.setTx(PA9);                       //Redefine required Serial1 TX pin    
  Serial1.begin(115200);
}

void loop() {
  uint8_t len = 0;
  uint8_t addr = 0;
  uint8_t i;

  unsigned long timestamp;
  unsigned long start_timestamp;

  // Wait for button press
  while (digitalRead(USER_BTN) == 1);

  // Turn on LED to show we're recording
  digitalWrite(LED_BUILTIN, LOW);

  // Print header
  Serial1.println("timestamp,MQ-135,MQ-7");
 
  // Record samples in buffer
  start_timestamp = millis();
  for (int i = 0; i < NUM_SAMPLES; i++) {

    // Take timestamp so we can hit our target frequency
    timestamp = millis();

    // Read and print values
    Serial1.print(timestamp - start_timestamp);
    Serial1.print(",");
    val1 = analogRead(analogPin1);     // read the input pin1
    Serial1.print(val1);             // debug value MQ-135
    Serial1.print(",");
    val2 = analogRead(analogPin2);     // read the input pin2
    Serial1.println(val2);             // debug value MQ-7
        
    // Wait just long enough for our sampling period
    while (millis() < timestamp + SAMPLING_PERIOD_MS);
  }

  // Print empty line to transmit termination of recording
  Serial1.println();

  // Turn off LED to show we're done
  digitalWrite(LED_BUILTIN, HIGH);

  // Make sure the button has been released for a few milliseconds
  while (digitalRead(USER_BTN) == 0);
  delay(100);
}

serial-data-collect-csv.py

Python
Code used in data collection section
import argparse
import os
import uuid

# Third-party libraries
import serial
import serial.tools.list_ports

# Settings
DEFAULT_BAUD = 115200       # Must match transmitting program baud rate
DEFAULT_LABEL = "_unknown"  # Label prepended to all CSV files

# Create a file with unique filename and write CSV data to it
def write_csv(data, dir, label):

    # Keep trying if the file exists
    exists = True
    while exists:

        # Generate unique ID for file (last 12 characters from uuid4 method)
        uid = str(uuid.uuid4())[-12:]
        filename = label + "." + uid + ".csv"
        
        # Create and write to file if it does not exist
        out_path = os.path.join(dir, filename)
        if not os.path.exists(out_path):
            exists = False
            try:
                with open(out_path, 'w') as file:
                    file.write(data)
                print("Data written to:", out_path)
            except IOError as e:
                print("ERROR", e)
                return    

# Command line arguments
parser = argparse.ArgumentParser(description="Serial Data Collection CSV")
parser.add_argument('-p',
                    '--port',
                    dest='port',
                    type=str,
                    required=True,
                    help="Serial port to connect to")
parser.add_argument('-b',
                    '--baud',
                    dest='baud',
                    type=int,
                    default=DEFAULT_BAUD,
                    help="Baud rate (default = " + str(DEFAULT_BAUD) + ")")
parser.add_argument('-d',
                    '--directory',
                    dest='directory',
                    type=str,
                    default=".",
                    help="Output directory for files (default = .)")
parser.add_argument('-l',
                    '--label',
                    dest='label',
                    type=str,
                    default=DEFAULT_LABEL,
                    help="Label for files (default = " + DEFAULT_LABEL + ")")
                    
# Print out available serial ports
print()
print("Available serial ports:")
available_ports = serial.tools.list_ports.comports()
for port, desc, hwid in sorted(available_ports):
    print("  {} : {} [{}]".format(port, desc, hwid))
    
# Parse arguments
args = parser.parse_args()
port = args.port
baud = args.baud
out_dir = args.directory
label = args.label

# Configure serial port
ser = serial.Serial()
ser.port = port
ser.baudrate = baud

# Attempt to connect to the serial port
try:
    ser.open()
except Exception as e:
    print("ERROR:", e)
    exit()
print()
print("Connected to {} at a baud rate of {}".format(port, baud))
print("Press 'ctrl+c' to exit")

# Serial receive buffer
rx_buf = b''

# Make output directory
try:
    os.makedirs(out_dir)
except FileExistsError:
    pass

# Loop forever (unless ctrl+c is captured)
try:
    while True:
        
        # Read bytes from serial port
        if ser.in_waiting > 0:
            while(ser.in_waiting):
                
                # Read bytes
                rx_buf += ser.read()
                
                # Look for an empty line
                if rx_buf[-4:] == b'\r\n\r\n':

                    # Strip extra newlines (convert \r\n to \n)
                    buf_str = rx_buf.decode('utf-8').strip()
                    buf_str = buf_str.replace('\r', '')

                    # Write contents to file
                    write_csv(buf_str, out_dir, label)
                    rx_buf = b''

# Look for keyboard interrupt (ctrl+c)
except KeyboardInterrupt:
    pass

# Close serial port
print("Closing serial port")
ser.close()

alert_inference.ino

Arduino
Code used in inference section
// Author: Guillermo Perez Guillen 

#include <smart-alert_inferencing.h>
#include <Wire.h>

static uint8_t recv_cmd[8] = {};

// Settings
#define THRESHOLD           0.8       // Threshold for performing action 0.8

// Constants
#define SAMPLING_FREQ_HZ    100       // Sampling frequency (Hz)
#define SAMPLING_PERIOD_MS  1000 / SAMPLING_FREQ_HZ   // Sampling period (ms) 1000
#define NUM_CHANNELS        EI_CLASSIFIER_RAW_SAMPLES_PER_FRAME // 2 channels
#define NUM_READINGS        EI_CLASSIFIER_RAW_SAMPLE_COUNT      // 100 readings
#define NUM_CLASSES         EI_CLASSIFIER_LABEL_COUNT           // 3 classes

#define led1 D10 //PA4
#define led2 D11 //PA7

// Means and standard deviations from our dataset curation
static const float means[] = {0.4686, -0.4075, 8.3669, 0.0717, 4.7533, -9.9816};
static const float std_devs[] = {2.9989, 7.0776, 6.8269, 60.9333, 101.3666, 109.0392};

int analogPin1 = PA3; //A0    
int analogPin2 = PC3; //A2    
int val1 = 0;           
int val2 = 0;           

float test0 = 0.0;
float test1 = 0.0;
float test2 = 0.0;

float myArray[3];

void setup() {  
  // Enable button pin
  pinMode(USER_BTN, INPUT_PULLUP); //BTN pin
  pinMode(LED_BUILTIN, OUTPUT); //LED pin

  pinMode(led1, OUTPUT); 
  pinMode(led2, OUTPUT);
    
  // Start serial
  Serial1.setRx(PA10);                       //Redefine required Serial1 RX pin
  Serial1.setTx(PA9);                       //Redefine required Serial1 TX pin    
  Serial1.begin(115200);
}

void loop() {  
  uint8_t len = 0;
  uint8_t addr = 0;
  uint8_t i;
  //uint32_t val = 0;
  
  unsigned long timestamp;
  ei_impulse_result_t result;
  int err;
  float input_buf[EI_CLASSIFIER_DSP_INPUT_FRAME_SIZE];
  signal_t signal;

  // Wait for button press
  while (digitalRead(USER_BTN) == 1);

  // Turn on LED to show we're recording
  digitalWrite(LED_BUILTIN, LOW);

  // Record samples in buffer
  for (int i = 0; i < NUM_READINGS; i++) {

    // Take timestamp so we can hit our target frequency
    timestamp = millis();

    // Get raw readings from the gas sensor  
    val1 = analogRead(analogPin1);
    int mq135 = val1;
    val2 = analogRead(analogPin2);
    int mq7 = val2;

    // Perform standardization on each reading
    // Use the values from means[] and std_devs[]
    mq135 = (mq135 - means[0]) / std_devs[0];
    mq7 = (mq7 - means[1]) / std_devs[1];

    // Fill input_buf with the standardized readings. Recall tha the order
    // is [mq135, mq7 ...]
    input_buf[(NUM_CHANNELS * i) + 0] = mq135;
    input_buf[(NUM_CHANNELS * i) + 1] = mq7;

    // Wait just long enough for our sampling period
    while (millis() < timestamp + SAMPLING_PERIOD_MS);
  }

  // Turn off LED to show we're done recording
  digitalWrite(LED_BUILTIN, HIGH);

  // Turn the raw buffer into a signal for inference
  err = numpy::signal_from_buffer(input_buf, 
                                  EI_CLASSIFIER_DSP_INPUT_FRAME_SIZE, 
                                  &signal);
  if (err != 0) {
    Serial1.print("ERROR: Failed to create signal from buffer: ");
    Serial1.println(err);
    return;
  }

  // Run the impulse
  err = run_classifier(&signal, &result, false);
  if (err != 0) {
    Serial1.print("ERROR: Failed to run classifier: ");
    Serial1.println(err);
    return;
  }

  // Print the results
  Serial1.println("Predictions");
  for (int i = 0; i < EI_CLASSIFIER_LABEL_COUNT; i++) {
    Serial1.print("  ");
    Serial1.print(result.classification[i].label);
    Serial1.print(": ");
    Serial1.println(result.classification[i].value);
    myArray[i] = (result.classification[i].value);
  }
    
  if (myArray[0] > 0.65) {         //ALCOHOL
    digitalWrite(led1, HIGH);
    digitalWrite(led2, HIGH);
    delay(500);
    digitalWrite(led1, LOW);
    digitalWrite(led2, LOW);
    delay(500);
    digitalWrite(led1, HIGH);
    digitalWrite(led2, HIGH);
    delay(500);
    digitalWrite(led1, LOW);
    digitalWrite(led2, LOW);    
  }
  if (myArray[1] > 0.65) {         //GAS
    digitalWrite(led1, HIGH);
    digitalWrite(led2, HIGH);
    delay(500);
    digitalWrite(led1, LOW);
    digitalWrite(led2, LOW);
  }
  if (myArray[2] > 0.65) {         //SMOKE
    digitalWrite(led1, HIGH);
    digitalWrite(led2, HIGH);
    delay(500);
    digitalWrite(led1, LOW);
    digitalWrite(led2, LOW);
    delay(500);
    digitalWrite(led1, HIGH);
    digitalWrite(led2, HIGH);
    delay(500);
    digitalWrite(led1, LOW);
    digitalWrite(led2, LOW);
  delay(500);
    digitalWrite(led1, HIGH);
    digitalWrite(led2, HIGH);
    delay(500);
    digitalWrite(led1, LOW);
    digitalWrite(led2, LOW);
  }
  else {
    digitalWrite(led1, LOW);
    digitalWrite(led2, LOW);
  }
  
  // Make sure the button has been released for a few milliseconds
  while (digitalRead(USER_BTN) == 0);
  delay(100);
}

Credits

Guillermo Perez Guillen

Guillermo Perez Guillen

57 projects • 63 followers
Electronics and Communications Engineer (ECE) & Renewable Energy: 12 prizes in Hackster / Hackaday Prize Finalist 2021-22-23

Comments