Hackster is hosting Hackster Holidays, Ep. 6: Livestream & Giveaway Drawing. Watch previous episodes or stream live on Monday!Stream Hackster Holidays, Ep. 6 on Monday!
Oliver Crask
Published © GPL3+

Particulater: Air Quality Monitoring for Everyone

Easily measure air quality, avoid polluted areas, and improve your health and that of your community.

IntermediateWork in progress5 hours13,072
Particulater: Air Quality Monitoring for Everyone

Things used in this project

Hardware components

Raspberry Pi Zero Wireless
Raspberry Pi Zero Wireless
×1
Hologram Nova
Hologram Nova
×1
SparkFun Atmospheric Sensor Breakout - BME280
SparkFun Atmospheric Sensor Breakout - BME280
×1
Plantower PMS5003
×1

Software apps and online services

Hologram Data Router
Hologram Data Router
Losant Platform
Losant Platform

Hand tools and fabrication machines

Soldering iron (generic)
Soldering iron (generic)

Story

Read more

Schematics

Fritzing Diagram

Code

Particulater.py

Python
Main program for this project. Use in same folder as Adafruit_BME280.py. Requires sudo for Hologram connection.
# Initialise Hologram
from Hologram.HologramCloud import HologramCloud
hologram = HologramCloud(dict(), network='cellular')

# Import libraries for PMS sensor
import serial
import sys
import time

debug=0
# GPIO Serial may be ttyAMA0 or ttyS0

from Adafruit_BME280 import *

# Set up BME280
sensor = BME280(t_mode=BME280_OSAMPLE_8, p_mode=BME280_OSAMPLE_8, h_mode=BME280_OSAMPLE_8)


class g3sensor(): # Class with all functions relating to decoding of serial messages
    def __init__(self):
        if debug: print "init"
	self.endian = sys.byteorder
    
    def conn_serial_port(self, device):
        if debug: print device
        self.serial = serial.Serial(device, baudrate=9600)
        if debug: print "conn ok"

    def check_keyword(self):
        if debug: print "check_keyword"
        while True:
            token = self.serial.read()
    	    token_hex=token.encode('hex')
    	    if debug: print token_hex
    	    if token_hex == '42':
    	        if debug: print "get 42"
    	        token2 = self.serial.read()
    	        token2_hex=token2.encode('hex')
    	        if debug: print token2_hex
    	        if token2_hex == '4d':
    	            if debug: print "get 4d"
                    return True
		elif token2_hex == '00': # fixme
		    if debug: print "get 00"
		    token3 = self.serial.read()
		    token3_hex=token3.encode('hex')
		    if token3_hex == '4d':
			if debug: print "get 4d"
			return True
		    
    def vertify_data(self, data):
	if debug: print data
        n = 2
	sum = int('42',16)+int('4d',16)
        for i in range(0, len(data)-4, n):
            #print data[i:i+n]
	    sum=sum+int(data[i:i+n],16)
	versum = int(data[40]+data[41]+data[42]+data[43],16)
	if debug: print sum
        if debug: print versum
	if sum == versum:
	    print "data correct"
	
    def read_data(self):
        data = self.serial.read(22)
        data_hex=data.encode('hex')
        if debug: self.vertify_data(data_hex)
        pm1_cf=int(data_hex[4]+data_hex[5]+data_hex[6]+data_hex[7],16)
        pm25_cf=int(data_hex[8]+data_hex[9]+data_hex[10]+data_hex[11],16)
        pm10_cf=int(data_hex[12]+data_hex[13]+data_hex[14]+data_hex[15],16)
        pm1=int(data_hex[16]+data_hex[17]+data_hex[18]+data_hex[19],16)
        pm25=int(data_hex[20]+data_hex[21]+data_hex[22]+data_hex[23],16)
        pm10=int(data_hex[24]+data_hex[25]+data_hex[26]+data_hex[27],16)
        if debug: print "pm1_cf: "+str(pm1_cf)
        if debug: print "pm25_cf: "+str(pm25_cf)
        if debug: print "pm10_cf: "+str(pm10_cf)
        if debug: print "pm1: "+str(pm1)
        if debug: print "pm25: "+str(pm25)
        if debug: print "pm10: "+str(pm10)
        data = [pm1, pm25, pm10]
    	self.serial.close()
        return data

    def read(self, argv):
        tty=argv[0:]
        self.conn_serial_port(tty)
        if self.check_keyword() == True:
            self.data = self.read_data()
            if debug: print self.data
            return self.data

def upload(pmdata): # Function to upload data to the Hologram Cloud


	# Grab readings for BME280
    temp = sensor.read_temperature()
    pressure = sensor.read_pressure()
    humidity = sensor.read_humidity()
 
    # Connect to cellular network
    result = hologram.network.connect()
    if result == False:
        print ' Failed to connect to cell network'

	# Format data into a JSON style message
    message = "{{\"PM1\": {},\"PM25\": {},\"PM10\": {},\"Temperature\": {},\"Humidity\": {},\"Pressure\": {}}}".format(pmdata[0], pmdata[1], pmdata[2], temp, humidity, pressure)
    # Send message with the topic PMS
    response_code = hologram.sendMessage(str(message), topics = ["PMS"])
    print hologram.getResultString(response_code) # Prints 'Message sent successfully'.

    # Disconnect from cellular network
    hologram.network.disconnect()

if __name__ == '__main__':
    air=g3sensor()
    while True:
        pmdata=0
        try:
            pmdata=air.read("/dev/ttyS0") # You may need to change ttyS0 to ttyAMA0
        except: 
            next
        if pmdata != 0:
            print pmdata
            upload(pmdata)
            time.sleep(300) # Set this to change frequency of data collection
            

PMS Test Program

Test your PMS sensor using this program by Thomas Tsai. You only need g3.py.

Adafruit_Python_BME280

Test your BME280 sensor with this driver. Adafruit_BME280.py required for particulater.py

Credits

Oliver Crask

Oliver Crask

1 project • 14 followers
Thanks to Thomas Tsai and Adafruit.

Comments