Hackster is hosting Hackster Holidays, Ep. 6: Livestream & Giveaway Drawing. Watch previous episodes or stream live on Monday!Stream Hackster Holidays, Ep. 6 on Monday!
Stefan Ciobanu
Published

Urban Air Quality & Street Cleanliness Monitor

Detecting airborne particles, enables you to assess the cleanliness of streets and public spaces.

Intermediate4 hours305

Things used in this project

Hardware components

AVR IoT Mini Cellular Board
Microchip AVR IoT Mini Cellular Board
×1
PMS5003 Sensing Module
×1
BME280 5V Digital Sensor
×1
Electric box
×1

Software apps and online services

VS Code
Microsoft VS Code
PlatformIO IDE
PlatformIO IDE
ThingSpeak API
ThingSpeak API
Hivemq

Hand tools and fabrication machines

Solder Wire, Lead Free
Solder Wire, Lead Free
Wire Stripper & Cutter, 18-10 AWG / 0.75-4mm² Capacity Wires
Wire Stripper & Cutter, 18-10 AWG / 0.75-4mm² Capacity Wires
Female to female jumper wire
Soldering iron (generic)
Soldering iron (generic)

Story

Read more

Schematics

Diagram

Code

credentials.h

C Header File
#ifndef credentials_h
#define credentials_h

// Credentials for establishing mqtt connection
#define MQTT_BROKER     "xxxxxxxxxx.hivemq.cloud"
#define HIVEMQ_USERNAME "xxxxxxxxxx"
#define HIVEMQ_PASSWORD "xxxxxxxxxx"

// Credentials for establishing connection to thingspeak
#define THINGSPEAK_KEY "xxxxxxxxxxxx"

#endif

particle.h

C Header File
#ifndef particle_h
#define particle_h

#include <Arduino.h>

// Structure used for handling of single type particle sensor data
typedef struct {
    uint16_t value = 0; // the last value read
    uint16_t max_value = 0; // the maximul value read
    uint16_t min_value = UINT16_MAX; // the minimum value read
    uint16_t filtered = 0; // the filtered value
} pm_value_t;

// Structure used for handling all particle sensor data
typedef struct {
   pm_value_t pm1;
   pm_value_t pm2;
   pm_value_t pm10;
} pm_data_t;

// Function prototypes
void particle_setup(void);
void particle_loop_task(uint16_t delta_time);
void particle_slow_task(uint16_t delta_time);
void particle_reset_minmax(void);
pm_data_t particle_get_data(void);

#endif

thingspeak.h

C Header File
#ifndef thingspeak_h
#define thingspeak_h

#include <Arduino.h>

// Function prototypes
boolean ts_config_client(void);
void ts_add_data(uint8_t id, float value);
void ts_send_data(void);

#endif

time.h

C Header File
#ifndef time_h
#define time_h

#include <Arduino.h>
#include <TimeLib.h>

// Function prototypes
void time_setup(void);
void time_cyclic_s(uint16_t diff_s);
String time_get_time_string(void);
time_t time_get_time_unix(void);
String time_time_to_string(time_t unixTime);

#endif

platformio.ini

INI
; PlatformIO Project Configuration File
;
;   Build options: build flags, source filter
;   Upload options: custom upload port, speed and extra flags
;   Library options: dependencies, extra library storages
;   Advanced options: extra scripting
;
; Please visit documentation for the other options and examples
; https://docs.platformio.org/page/projectconf.html

[env:AVR128DB48]
platform = atmelmegaavr
board = AVR128DB48
framework = arduino
ib_deps = https://github.com/microchip-pic-avr-solutions/avr-iot-cellular-arduino-library
build_flags = -DTWI_USING_WIRE1
lib_deps = 
    microchip-pic-avr-solutions/AVR-IoT-Cellular@^1.3.9
    Wire
    adafruit/Adafruit BME280 Library@^2.2.4
    adafruit/Adafruit Unified Sensor@^1.1.14
    adafruit/Adafruit BusIO@^1.15.0
    megunolink/MegunoLink@^1.42
    bblanchon/ArduinoJson@^7.0.2
    paulstoffregen/Time@^1.6.1
    mathworks/ThingSpeak@^2.0.0
    SPI

main.cpp

C/C++
#include <lte.h>
#include <led_ctrl.h>
#include <log.h>
#include <Adafruit_BME280.h>
#include <mcp9808.h>
#include <ArduinoJson.h>
#include <Arduino.h>
#include "Filter.h"

#include "main.h"
#include "mqtt.h"
#include "particle.h"
#include "time.h"
#include "thingspeak.h"

#define TAKS_1s 1000
#define TAKS_5s 5000

// The size of the buffer where data is stored when the sending of the data
// is not possible due to LTE connection or mqtt connection
#define PENDING_DATA_BUFFER_SIZE 12

// Structure used for handling the data from bme280 sensor
typedef struct 
{
  float temp = 0;
  float pressure = 0;
  float humidity = 0;
} bme280_data_t;

// Structure that will contain data from all sensors
typedef struct {
  pm_data_t pm;
  bme280_data_t bme;
  time_t time;
  int16_t board_temp;
} sensors_data_t;

// Circular buffer structure
typedef struct {
  sensors_data_t data[PENDING_DATA_BUFFER_SIZE];
  int readIndex;
  int writeIndex;
  int count;
} CircularBuffer;

// Timers used for handling tasks
uint32_t task_loop_time = 0u;
uint32_t task_1s_time = 0u;
uint32_t task_5s_time = 0u;

// Timeout between LTE connection attempts
uint32_t p_lte_conection_time = 150; // seconds 

// Timer used in LTE connection attempts
uint32_t lte_conection_timer = 0;

// Interval between sensor data publishing
uint16_t p_publish_intervals = 300; // seconds

// Timer used in data publishing
uint16_t publish_intervals_timer = 0;

// Object used to handle the bme280 sensor
Adafruit_BME280 bme280;

// Variable used to store the bme280 sensor data
bme280_data_t bme280_data;

// Filters used for fltering bme280 sensor data
ExponentialFilter<float> bme280_filter_temp(20, 20);
ExponentialFilter<float> bme280_filter_pres(20, 1000);
ExponentialFilter<float> bme280_filter_humi(20, 50);

// Variable used to store all sensor data
sensors_data_t sensors_data;

// Buffer used to store data until is send
CircularBuffer pending_data_buffer;

// Filter for board temperature sesnor
ExponentialFilter<float> board_temp_filter(20, 0);

// Function prototypes
void initCircularBuffer(CircularBuffer* buffer);
void writeCircularBuffer(CircularBuffer* buffer, sensors_data_t value);
void readCircularBuffer(CircularBuffer* buffer, sensors_data_t** ptrToValue);
void readCircularBufferValid(CircularBuffer* buffer);
void send_data_to_mqtt(void);
void send_data_to_thingspeak(void);
void board_cyclic_s(uint16_t delta_time);


/**************************************************************************/
/*!
@brief  Cyclic function in which bme280 sensor data is read and filtered.

The temperature, humidity and atmospheric pressure are read from the bme280
sensor. If the values are valid will be filtered and stored into the 
bme280_data variable, if are not value an invalid value will be stored.
The invalid value will be define as the maximum value of the data type 
used for storing the value.

*/
/**************************************************************************/
void bme280_cyclic_s(void)
{

    float bme280_temp = bme280.readTemperature();
    if (bme280_temp != NAN) {
        bme280_filter_temp.Filter(bme280_temp);
        bme280_data.temp = bme280_filter_temp.Current();
    } else {
        bme280_data.temp = INT32_MAX;
    }
    
    float bme280_humi = bme280.readHumidity();
    if (bme280_humi != NAN) {
        bme280_filter_humi.Filter(bme280_humi);
        bme280_data.pressure = bme280_filter_pres.Current();
    } else {
        bme280_data.pressure = INT32_MAX;
    }
    
    float bme280_pres = bme280.readPressure();
    if (bme280_pres != NAN) {
        bme280_filter_pres.Filter(bme280_pres / 100);
        bme280_data.humidity = bme280_filter_humi.Current();
    } else {
        bme280_data.humidity = INT32_MAX;
    }
    
    Log.info("bme280 - temp: " + String(bme280_data.temp));
    Log.info("bme280 - humidity: " + String(bme280_data.humidity));
    Log.info("bme280 - pressure: " + String(bme280_data.pressure));
    
}

/**************************************************************************/
/*!
@brief  Function cyclically called to check the LTE connection.

In this function we check if the LTE connection is establish. If is not,
we will try to connect. If the establishing of the conection failed we will
retry after some parametrisable time.

@param[in] delta_time  Time between two function calls.
*/
/**************************************************************************/
void check_lte_connection(uint16_t delta_time)
{
    if (lte_conection_timer >= delta_time)
    {
        lte_conection_timer -= delta_time;
    } else
    {
        lte_conection_timer = 0;
    }

    // Check if LTE connection is established
    if (Lte.isConnected() == false)
    {
        if (lte_conection_timer == 0)
        {
            Log.info(">> start LTE connection\r\n");
            // Establish LTE connection
            if (!Lte.begin(p_lte_conection_time * 1000)) 
            {
                Log.error(">> LTE connect fail\r\n");
                Lte.end();
                lte_conection_timer = p_lte_conection_time * 2;
            }
            else 
            {
                Log.info(">> LTE connect\r\n");
            }
        }
    }
}

/**************************************************************************/
/*!
@brief  This function initializes all sensors, sets up the time and performe
        necessary setup for the MQTT connection.
*/
/**************************************************************************/
void setup() {

    Log.begin(115200);
    
    LedCtrl.begin();

    // Initialize the circular buffer
    initCircularBuffer(&pending_data_buffer);

    // Setup the mqtt conection
    mqtt_setup();
    // Setup the time
    time_setup();
    // Setup the PMS5003 sensor
    particle_setup();

    // Initialize MCP9808 library
    Mcp9808.begin();

    // Initialize BME280 sensor
    uint8_t sensor_status = bme280.begin(0x76,&Wire1);   // use TWI1 interface
    if (sensor_status) {
        // ToDo
    } else {
        // ToDo
    }

    // Init the timer here in order to not trigger
    // the update in the first cyclic function call
    publish_intervals_timer = p_publish_intervals;

}

/**************************************************************************/
/*!
@brief  Within this function, all other cyclic functions are invoked.
*/
/**************************************************************************/
void loop() {

    uint32_t delta_time;
    uint32_t current_time;
    
    current_time = millis();
    delta_time = task_loop_time - current_time;
    if (delta_time > UINT16_MAX)
    {
        delta_time = UINT16_MAX;
    }

    particle_loop_task(delta_time);

    // 1s task
    current_time = millis();
    delta_time = current_time - task_1s_time;
    if (delta_time >= TAKS_1s)
    {
        if (delta_time > UINT16_MAX)
        {
            delta_time = UINT16_MAX;
        }

        task_1s_time = current_time;
        // No function is called for the moment in this task
    }

    // 5s task
    current_time = millis();
    delta_time = current_time - task_5s_time;
    if (delta_time >= TAKS_5s)
    {   
        // Convert to seconds
        delta_time = delta_time / 1000; 
        if (delta_time > UINT16_MAX)
        {
            delta_time = UINT16_MAX;
        }

        task_5s_time = current_time;
        
        mqtt_cyclic_s(delta_time);
        time_cyclic_s(delta_time);
        particle_slow_task(delta_time);
        board_cyclic_s(delta_time);
        bme280_cyclic_s();

        check_lte_connection(delta_time);

        if (publish_intervals_timer >= delta_time)
        {
            publish_intervals_timer -= delta_time;
        } else
        {
            publish_intervals_timer = 0;
        }

        if (publish_intervals_timer == 0)
        {
            Log.info("Add data to buffer");
            sensors_data.pm = particle_get_data();
            sensors_data.bme = bme280_data;
            sensors_data.time = time_get_time_unix();
            sensors_data.board_temp = board_temp_filter.Current();

            // Send data to thingspeak
            if (Lte.isConnected() == true) {
                send_data_to_thingspeak();
            }

            // Write the values into the circular buffer.
            // The values will be send to mqtt server from send_data_to_mqtt function.
            writeCircularBuffer(&pending_data_buffer, sensors_data);
            
            // Reste the min/max values
            particle_reset_minmax();
            publish_intervals_timer = p_publish_intervals;
        }
        if (Lte.isConnected() == true) {
            // Try to send the data to the mqtt server.
            send_data_to_mqtt();
        }

    }

}

/**************************************************************************/
/*!
@brief  This function sends the sensor data to thingspeak.
*/
/**************************************************************************/
void send_data_to_thingspeak(void) {

    if (ts_config_client() == true) {

        // Add data to every channel
        ts_add_data(1, sensors_data.pm.pm1.filtered);
        ts_add_data(2, sensors_data.pm.pm2.filtered);
        ts_add_data(3, sensors_data.pm.pm10.filtered);
        ts_add_data(4, sensors_data.bme.temp);
        ts_add_data(5, sensors_data.bme.humidity);
        ts_add_data(6, sensors_data.bme.pressure);
        // Trigger sending of data
        ts_send_data();
    }
}

/**************************************************************************/
/*!
@brief  This function checks if any data that needs to be send is available
        and try to send them.
*/
/**************************************************************************/
void send_data_to_mqtt(void)
{
    Log.info(">>> send_data_to_mqtt");
    // Create a pointer to the structure
    sensors_data_t* ptrToMyValue = NULL;

    // Publish data from buffer until is empty and if publishing is succesflu
    do {
        bool mqtt_publish_status = false;
        // Read data from the circular buffer using a pointer
        readCircularBuffer(&pending_data_buffer, &ptrToMyValue);

        // Check if ptrToMyValue is NULL (buffer empty) or not
        if (ptrToMyValue == NULL) {
            Log.info(">>> Buffer is empty");
        } else {
            DynamicJsonDocument out_message(1024);

            // Access data through ptrToMyValue
            out_message["time"]["formated"] = time_time_to_string(ptrToMyValue->time);
            out_message["time"]["unix"] = ptrToMyValue->time;
            out_message["pm"]["pm1"] = ptrToMyValue->pm.pm1.filtered;
            out_message["pm"]["pm2.5"] = ptrToMyValue->pm.pm2.filtered;
            out_message["pm"]["pm10"] = ptrToMyValue->pm.pm10.filtered;
            out_message["bme"]["temp"] = ptrToMyValue->bme.temp;
            out_message["bme"]["humidity"] = ptrToMyValue->bme.humidity;
            out_message["bme"]["pressure"] = ptrToMyValue->bme.pressure;

            char mqtt_message[200];
            serializeJson(out_message, mqtt_message);

            // Try to send sensor data to mqtt server
            mqtt_publish_status = mqtt_publish_message(mqtt_message);

            if (mqtt_publish_status > 0u)
            {   
                // If the data was send mark the last data send as 'read'
                readCircularBufferValid(&pending_data_buffer);
            } else
            {   
                // If the publishing failed do to different reasons abord the sending
                // and try to send data next time when send_data_to_mqtt is called
                ptrToMyValue = NULL;
            }
        }

    } while (ptrToMyValue != NULL);

}

/**************************************************************************/
/*!
@brief  This function is used to initialize the circular buffer.
*/
/**************************************************************************/
void initCircularBuffer(CircularBuffer* buffer) {
  buffer->readIndex = 0;
  buffer->writeIndex = 0;
  buffer->count = 0;
}

/**************************************************************************/
/*!
@brief  This function is used to write into the circular buffer.

@param[in] buffer  Represents a pointer to a CircularBuffer object. 

@param[in] value  Represents the sensor data that will be written into 
                  the buffer.

*/
/**************************************************************************/
void writeCircularBuffer(CircularBuffer* buffer, sensors_data_t value) {
  buffer->data[buffer->writeIndex] = value;
  buffer->writeIndex = (buffer->writeIndex + 1) % PENDING_DATA_BUFFER_SIZE;

  if (buffer->count < PENDING_DATA_BUFFER_SIZE) {
    buffer->count++;
  } else {
    buffer->readIndex = (buffer->readIndex + 1) % PENDING_DATA_BUFFER_SIZE;  // Overwrite oldest data
  }
}

/**************************************************************************/
/*!
@brief  This function is used to read data from the circular buffer.

This function is designed to read data from the circular buffer,
but it doesn't mark the data as read within the buffer itself.
The function focuses solely on reading data without altering 
the buffer's state regarding the read status of the data.
There's a separate function (readCircularBufferValid) for marking the data as read.

@param[in] buffer  Represents a pointer to a CircularBuffer object. 

@param[out] ptrToValue  Represents a pointer in which the read data will be copied.

*/
/**************************************************************************/
void readCircularBuffer(CircularBuffer* buffer, sensors_data_t** ptrToValue) {
    if (buffer->count > 0) {
        *ptrToValue = &buffer->data[buffer->readIndex];
        // buffer->readIndex = (buffer->readIndex + 1) % PENDING_DATA_BUFFER_SIZE;
        // buffer->count--;
    } else {
        *ptrToValue = NULL;  // If the buffer is empty, set the pointer to NULL
    }
}


/**************************************************************************/
/*!
@brief This function is used to mark the last data read by 
        the readCircularBuffer function as 'read' within the buffer itself.

@param[in] buffer  Represents a pointer to a CircularBuffer object.
*/
/**************************************************************************/
void readCircularBufferValid(CircularBuffer* buffer) {
    buffer->readIndex = (buffer->readIndex + 1) % PENDING_DATA_BUFFER_SIZE;
    buffer->count--;
}

/**************************************************************************/
/*!
@brief  Cyclic function in which board temperature sensor data is read and filtered.

@param[in] delta_time  Time between two function calls.
*/
/**************************************************************************/
void board_cyclic_s(uint16_t delta_time)
{
    board_temp_filter.Filter(Mcp9808.readTempC());
    Log.info("board temperature: " + String(board_temp_filter.Current()));
}

mqtt.cpp

C/C++
#include <log.h>
#include <lte.h>
#include <mqtt_client.h>
#include <ArduinoJson.h>
#include "sequans_controller.h"

#include "mqtt.h"
#include "particle.h"
#include "credentials.h"


// Credentials for establishing mqtt connection
#define MQTT_THING_NAME    "avr_iot"
// #define MQTT_BROKER        "xxx.hivemq.cloud"
#define MQTT_PORT          8883
#define MQTT_USE_TLS       true
#define MQTT_KEEPALIVE     60
#define MQTT_USE_ECC       false
// #define HIVEMQ_USERNAME "xxx"
// #define HIVEMQ_PASSWORD "xxx"
#define MQTT_SUB_TOPIC "to_node_avr"
#define MQTT_PUB_TOPIC "from_node_avr"

// Size of the buffer used to store the received messages
#define MQTT_CALLBACK_BUFFER 2048

// Parameters
uint16_t p_mqtt_reconect_attempt_interval = 60; // s
uint8_t p_mqtt_reconect_attempts = 10;
uint8_t p_mqtt_publis_attempts = 10;

// Variables
uint16_t mqtt_reconect_attempt_timer; // ms
uint8_t mqtt_reconect_count;
uint8_t mqtt_publis_failed_count;
uint16_t mqtt_message_length;

// Function prototypes
void mqtt_check_connection(uint16_t task_time);
void mqtt_check_for_incomming_messages(void);
void mqtt_tls_config(void);
void mqtt_callback(const char* topic, const uint16_t message_length, const int32_t message_id);


/**************************************************************************/
/*!
@brief  This function performe necessary setup for the MQTT connection.
*/
/**************************************************************************/
void mqtt_setup() {
    Log.info(F(">>> mqtt_setup"));

    mqtt_tls_config();

    MqttClient.onReceive(mqtt_callback);
    mqtt_reconect_attempt_timer = 0;
    mqtt_reconect_count = 0;
    mqtt_publis_failed_count = 0;
    mqtt_message_length = 0;
}


/**************************************************************************/
/*!
@brief  Cyclic function in which the connection/reconnection to mqtt server
        is established.

@param[in] delta_time  Time between two function calls.
*/
/**************************************************************************/
void mqtt_cyclic_s(uint16_t delta_time) {
#if DEBUG_ENABLED > 0
    Log.info(">>> mqtt_cyclic_s");
#endif
    mqtt_check_connection(delta_time);
    mqtt_check_for_incomming_messages();
}

/**************************************************************************/
/*!
@brief  This function is for setting up security profile for MQTT TLS.
*/
/**************************************************************************/
void mqtt_tls_config(void)
{
 
    SequansController.begin();
  
    Log.info(">> Setting up security profile for MQTT TLS\r\n");
    const char* cmd1 = "AT+SQNSPCFG=2,2,\"\",0,1"; 

    SequansController.writeBytes((uint8_t*)cmd1, strlen(cmd1), true);
    if (!SequansController.waitForURC("SQNSPCFG", NULL, 0, 4000)) {
        Log.infof(">> Error: Failed to set security profile\r\n");
        return;
    }  
    Log.info(">> TLS config complete\r\n");
    SequansController.end();

}

/**************************************************************************/
/*!
@brief  Cyclic function is used as call back for receiving MQTT messages.

In this function we will save the length of the message. 
The lenght will also act as a flag to know that we have to process 
a received MQTT message.
The message itself is processed in mqtt_check_for_incomming_messages function.

@param[in] topic  Topic of the received message
@param[in] message_length Lenght of the received message
@param[in] message_id The ID of the received message
*/
/**************************************************************************/
void mqtt_callback(const char* topic, const uint16_t message_length, const int32_t message_id)
{
    Log.info(">> mqtt_callback\r\n");
    Log.info(topic);
    Log.info(">> message_length: " + String(message_length));
    Log.info(">> message_id: " + String(message_id));

    mqtt_message_length = message_length;
}

/**************************************************************************/
/*!
@brief  This function is used to send MQTT messages.

@param[in] mqtt_message  The message that shall be send.

@return Returns true if the message was sent, otherwise returns false.
*/
/**************************************************************************/
bool mqtt_publish_message(const char* mqtt_message)
{   
    if (Lte.isConnected() == true)
    {
        uint8_t pub_status = MqttClient.publish(MQTT_PUB_TOPIC, mqtt_message);
        if(pub_status)
        {
            Log.info(F(">>> mqtt_publish_data - message send successful"));
            return true;
        }
        else
        {
            Log.info(F(">>> mqtt_publish_data - message send failed"));
            mqtt_publis_failed_count++;

            if (mqtt_publis_failed_count >= p_mqtt_publis_attempts)
            {   
                mqtt_publis_failed_count = 0;
                // End connection in order to reste it
                MqttClient.end();
            }

            return false;
        }
    }
    else
    {
        return false;
    }
}

/**************************************************************************/
/*!
@brief  Function cyclically called to check the MQTT connection.

The function will check if the MQTT connection is established. If not it 
will try to connect to the mqtt server. If the establishing will fail it
will wait for a parametrizable time for a new attempt.
After a certain number of retries in this function we will terminate the 
Lte connection because even if the status of the Lte connection it says 
that Lte is connected sometimes is not correct.

@param[in] delta_time  Time between two function calls.
*/
/**************************************************************************/
void mqtt_check_connection(uint16_t task_time) {

    if (mqtt_reconect_attempt_timer >= task_time) 
    {
        mqtt_reconect_attempt_timer -= task_time;
    } else 
    {
        mqtt_reconect_attempt_timer = 0u;
    }

    if ((mqtt_reconect_attempt_timer == 0) 
        && (Lte.isConnected() == true) 
        && (MqttClient.isConnected() == false))
    {   
        // Connect to the mqtt broker
        MqttClient.begin(MQTT_THING_NAME,
                        MQTT_BROKER,
                        MQTT_PORT,
                        MQTT_USE_TLS,
                        MQTT_KEEPALIVE,
                        MQTT_USE_ECC,
                        HIVEMQ_USERNAME,
                        HIVEMQ_PASSWORD);
        
        if (MqttClient.isConnected() == false)
        {
            mqtt_reconect_attempt_timer = p_mqtt_reconect_attempt_interval;
            mqtt_reconect_count++;
            Log.info(F(">>> mqtt_check_connection - failed connect"));
        } else {
            mqtt_reconect_count = 0;

            Log.info(F(">>> mqtt_check_connection - connected"));
            MqttClient.subscribe(MQTT_SUB_TOPIC, AT_LEAST_ONCE);
            
            DynamicJsonDocument out_message(1024);
            out_message["status"] = "connected";
            char mqtt_message[128];
            serializeJson(out_message, mqtt_message);
            uint8_t pub_status = MqttClient.publish(MQTT_PUB_TOPIC, mqtt_message);
        }
    }

    if (mqtt_reconect_count >= p_mqtt_reconect_attempts)
    {   
        mqtt_reconect_count = 0;
        // Close Lte connection in order to reset it because sometimes
        // we are unable to connect to mqtt server do some problems in the Lte connection.
        Lte.end();
    }

}

/**************************************************************************/
/*!
@brief  Function cyclically called to check for incoming MQTT messages.

*/
/**************************************************************************/
void mqtt_check_for_incomming_messages(void)
{
    if (mqtt_message_length > 0)
    {
        String message = MqttClient.readMessage(MQTT_SUB_TOPIC,
                                                mqtt_message_length);

        // Check if message was really received
        if (message != "") 
        {
            Log.infof(F("New mqtt message received: %s\r\n"), message.c_str());

            // Deserialize JSON
            DynamicJsonDocument doc(1024); // Adjust the size as per your JSON message size
            DeserializationError error = deserializeJson(doc, message);

            if (error == DeserializationError::Ok) 
            {
                if (doc.containsKey("info")) 
                {
                    Log.info(">> Info Requested\r\n");
                }
            } else 
            {
                // The error can be logged
            }
        }

        // Reste the message lenght
        mqtt_message_length = 0;
    }
}

particle.cpp

C/C++
#include <log.h>
#include "Filter.h"

#include "mqtt.h"
#include "time.h"
#include "particle.h"

// Size of the buffer used to store serial data 
// received form PMS5003 particle sensor
#define BUFFER_SIZE 31     // 0x42 + 31 bytes equal to 32 bytes

// Timer used for delaying the comparison of min/max values
uint16_t start_timer;

// Buffer used to store serial data received form PMS5003 particle sensor
unsigned char particle_buffer[BUFFER_SIZE];

// Variable used to store processed data from PMS5003 particle sensor
pm_data_t pm_data;

// Filters used for filtering the data received from PMS5003 particle sensor
ExponentialFilter<long> filter1(20, 0);
ExponentialFilter<long> filter2(20, 0);
ExponentialFilter<long> filter10(20, 0);

// Function prototypes
char checkValue(unsigned char *thebuf, char buffer_size);
uint16_t extract_pm10(unsigned char *thebuf);
uint16_t extract_pm2(unsigned char *thebuf);
uint16_t extract_pm1(unsigned char *thebuf);


/**************************************************************************/
/*!
@brief  This function performe necessary setup for the PMS5003 
        particle sensor connection.
*/
/**************************************************************************/
void particle_setup()
{
    Serial2.swap(1);
    Serial2.begin(9600);
    start_timer = 30;
}

/**************************************************************************/
/*!
@brief  Cyclic function in which the data from the PMS5003 particle
        sensor is read.

@param[in] delta_time  Time between two function calls.
*/
/**************************************************************************/
void particle_loop_task(uint16_t delta_time)
{   
    // Start to read when detect 0x42
    if (Serial2.find(0x42))
    {
        // Read the data from the sensor
        Serial2.readBytes(particle_buffer, BUFFER_SIZE);

        // Check if the read data is correct, if is correct process it
        if ((particle_buffer[0] == 0x4d) && (checkValue(particle_buffer, BUFFER_SIZE)))
        {   
            // Extract the values
            pm_data.pm1.value = extract_pm1(particle_buffer);
            pm_data.pm2.value = extract_pm2(particle_buffer);
            pm_data.pm10.value = extract_pm10(particle_buffer);

            // Filter the values
            filter1.Filter(pm_data.pm1.value);
            filter2.Filter(pm_data.pm2.value);
            filter10.Filter(pm_data.pm10.value);

            // Add the filtered values to the main particle structure
            pm_data.pm1.filtered = filter1.Current();
            pm_data.pm2.filtered = filter2.Current();
            pm_data.pm10.filtered = filter10.Current();

            // Compare after start time expired to be sure that 
            // the values from the sensor are stable
            if (start_timer == 0)
            {
                if (pm_data.pm1.value > pm_data.pm1.max_value)
                {
                    pm_data.pm1.max_value = pm_data.pm1.value;
                }
                if (pm_data.pm2.value > pm_data.pm2.max_value)
                {
                    pm_data.pm2.max_value = pm_data.pm2.value;
                }
                if (pm_data.pm10.value > pm_data.pm10.max_value)
                {
                    pm_data.pm10.max_value = pm_data.pm10.value;
                }
                if (pm_data.pm1.value < pm_data.pm1.min_value)
                {
                    pm_data.pm1.min_value = pm_data.pm1.value;
                }
                if (pm_data.pm2.value < pm_data.pm2.min_value)
                {
                    pm_data.pm2.min_value = pm_data.pm2.value;
                }
                if (pm_data.pm10.value < pm_data.pm10.min_value)
                {
                    pm_data.pm10.min_value = pm_data.pm10.value;
                }
            }
        }
    }
}

/**************************************************************************/
/*!
@brief  Cyclic function in which the the start time is decremented.

@param[in] delta_time  Time between two function calls.
*/
/**************************************************************************/
void particle_slow_task(uint16_t delta_time)
{   

    if (start_timer >= delta_time)
    {
        start_timer -= delta_time;
    } else
    {
        start_timer = 0;
    }

    Log.info("\n\nPM1.0: \nvalue: " + String(pm_data.pm1.value) + " ug/m3 \n" 
    + "max: " + String(pm_data.pm1.max_value) + " ug/m3 \n"
    + "min: " + String(pm_data.pm1.min_value) + " ug/m3 \n"
    + "filtered: " + String(pm_data.pm1.filtered) + " ug/m3 \n"
    + "\nPM2.5: \nvalue: " + String(pm_data.pm2.value) + " ug/m3 \n" 
    + "max: " + String(pm_data.pm2.max_value) + " ug/m3 \n"
    + "min: " + String(pm_data.pm2.min_value) + " ug/m3 \n"
    + "filtered: " + String(pm_data.pm2.filtered) + " ug/m3 \n"
    + "\nPM10: \nvalue: " + String(pm_data.pm10.value) + " ug/m3 \n" 
    + "max: " + String(pm_data.pm10.max_value) + " ug/m3 \n"
    + "min: " + String(pm_data.pm10.min_value) + " ug/m3 \n"
    + "filtered: " + String(pm_data.pm10.filtered) + " ug/m3 \n");

}

/**************************************************************************/
/*!
@brief  This function is used to extract PM1.0 data.

@param[in] thebuf Pointer to the data received from PMS5003 particle sensor.

@return Returns the PM1.0 data.
*/
/**************************************************************************/
uint16_t extract_pm1(unsigned char *thebuf)
{
    uint16_t pm1_value;
    pm1_value = ((thebuf[3]<<8) + thebuf[4]);
    return pm1_value;
}

/**************************************************************************/
/*!
@brief  This function is used to extract PM2.5 data.

@param[in] thebuf Pointer to the data received from PMS5003 particle sensor.

@return Returns the PM2.5 data.
*/
/**************************************************************************/
uint16_t extract_pm2(unsigned char *thebuf)
{
    uint16_t pm2_value;
    pm2_value = ((thebuf[5]<<8) + thebuf[6]);
    return pm2_value;
}

/**************************************************************************/
/*!
@brief  This function is used to extract PM10 data.

@param[in] thebuf Pointer to the data received from PMS5003 particle sensor.

@return Returns the PM10 data.
*/
/**************************************************************************/
uint16_t extract_pm10(unsigned char *thebuf)
{
    uint16_t pm10_value;
    pm10_value = ((thebuf[7] << 8) + thebuf[8]);
    return pm10_value;
}

/**************************************************************************/
/*!
@brief  This function verify the integrity of data received from the 
        PMS5003 sensor.

@param[in] thebuf Pointer to the buffer where data from the sensor is stored.
@param[in] buffer_size Size of the buffer.

@return Returns if the received data is ok and can be used.
*/
/**************************************************************************/
char checkValue(unsigned char *thebuf, char buffer_size)
{  
    char receiveflag = 0;
    int receiveSum = 0;
    
    for(int i=0; i<(buffer_size - 2); i++){
        receiveSum = receiveSum+thebuf[i];
    }
    receiveSum = receiveSum + 0x42;
    
    // Check the serial data
    if(receiveSum == ((thebuf[buffer_size - 2] << 8) + thebuf[buffer_size - 1])) 
    {
        receiveSum = 0;
        receiveflag = 1;
    }
    return receiveflag;
}

/**************************************************************************/
/*!
@brief  This function is used to reset the min and max values form the 
        main structure
*/
/**************************************************************************/
void particle_reset_minmax(void)
{
    pm_data.pm1.max_value = 0u;
    pm_data.pm2.max_value = 0u;
    pm_data.pm10.max_value = 0u;
    pm_data.pm1.min_value = UINT16_MAX;
    pm_data.pm2.min_value = UINT16_MAX;
    pm_data.pm10.min_value = UINT16_MAX;
}

/**************************************************************************/
/*!
@brief  This function is used for providing the PMS5003 sensor 
        processed data to other units.

@return Returns the main structure with PMS5003 sensor processed data.
*/
/**************************************************************************/
pm_data_t particle_get_data(void)
{
    return pm_data;
}

thingspeak.cpp

C/C++
#include <http_client.h>
#include <lte.h>
#include <log.h>

#include "thingspeak.h"
#include "credentials.h"


#define THINGSPEAK_URL "api.thingspeak.com"
// #define THINGSPEAK_KEY "xxx"
#define MAX_FIELDS 8

struct ts_filed_data_t
{
    float value = 0;
    bool valid = false;
};

ts_filed_data_t ts_fields_data[MAX_FIELDS];

/**************************************************************************/
/*!
@brief  This function is used configure the Http Client in order to be able 
        to establish a connection to thingspeak.

@return Returns true if the configuration was successful.
*/
/**************************************************************************/
boolean ts_config_client(void)
{
    if (!HttpClient.configure(THINGSPEAK_URL, 80, false)) 
    {
        Log.errorf(F("Failed to configure HTTP for the domain %s\r\n"),
            THINGSPEAK_URL);
        return false;
    } else {
        return true;
    }
}

/**************************************************************************/
/*!
@brief  This function is used to add data to the 'ts_fields_data' buffer.

Because the requests to thingspeak are limited first we will add all
the values that will be send to thingspeak to a buffer and after we 
will send the data to thingspeak using 'ts_send_data' function.

@param[in] id  Field id from thingspeak.
@param[in] value  Value that will be send for the selected field ID .

*/
/**************************************************************************/
void ts_add_data(uint8_t id, float value) {
    ts_fields_data[id - 1].value = value;
    ts_fields_data[id - 1].valid = true;
}

/**************************************************************************/
/*!
@brief  This function sent all data from the buffer to thingspeak.
*/
/**************************************************************************/
void ts_send_data(void)
{
    HttpResponse response;
    String url = "/update?api_key=" + String(THINGSPEAK_KEY);

    int i;

    for (i = 0; i < MAX_FIELDS; i++) 
    {
        if (ts_fields_data[i].valid == true)
        {
            url = url + "&field" + String(i + 1) + "=" + String(ts_fields_data[i].value);
            ts_fields_data[i].valid = false;
        }
    }

    // Convert String to const char*
    const char* url_c = url.c_str();

    Log.info("Url: " + url);

    response = HttpClient.get(url_c);
    if (response.status_code != HttpClient.STATUS_OK) {
        Log.errorf(
            F("Error when performing a GET request on %s. Got HTTP status"
            "code = %d. Exiting...\r\n"),
            THINGSPEAK_URL,
            response.status_code);
    } else
    {
        Log.info("Data send to Thingspeak");
    }
}

time.cpp

C/C++
#include <http_client.h>
#include <lte.h>
#include <log.h>

#include "time.h"


#define TIMEZONE_URL "worldtimeapi.org"
#define TIMEZONE_URI "/api/timezone/Europe/Bucharest.txt"

// This parameter determin the interval at witch the time will be updated,
// the value is in seconds. Value 0 deactivate the update
uint16_t p_cyclic_update = 3600; // seconds

// This variable stores the difference between time retrived from worldtimeapi
// and internal time (retrived with millis())
unsigned long server_time_delta;
// Flag that triggers the update of time
bool request_update;
// Flag that tell if the time was updated
bool is_time_updated;
// Timer used for updating the time at a parameterisable interval
uint16_t cyclic_update_timer;

// Function prototypes
void time_update_dif_time();
uint32_t time_getTimeFromResponse(String* resp);

/**************************************************************************/
/*!
@brief  This function sets the init values for variables.
*/
/**************************************************************************/
void time_setup(void) {
    server_time_delta = 0u;
    cyclic_update_timer = 0u;
    request_update = false;
    is_time_updated = false;
}

/**************************************************************************/
/*!
@brief  Cyclic function in which the update of time is triggered

@param[in] delta_time  Time between two function calls.
*/
/**************************************************************************/
void time_cyclic_s(uint16_t diff_s) {
    if(p_cyclic_update > 0) {
        if (cyclic_update_timer == 0u) {
            request_update = true;
            cyclic_update_timer = p_cyclic_update;
        } else {
            if (cyclic_update_timer >= diff_s) {
                cyclic_update_timer -= diff_s;
            } else {
                cyclic_update_timer = 0u;
            }
        }
    }

    if(request_update) {
        time_update_dif_time();
    }

}

/**************************************************************************/
/*!
@brief  This function will process the http response and extract the time.

@param[in] resp Response string received from http request.

@return Returns the time form http response.
*/
/**************************************************************************/
uint32_t time_getTimeFromResponse(String* resp) {
    uint16_t unix_time_index    = resp->indexOf(String("unixtime: "));
    uint16_t utx_datetime_index = resp->indexOf(String("utc_datetime"));
    uint16_t offset_index       = resp->indexOf(String("raw_offset: "));

    // Extract Unix time as a string
    String unixTimeString = resp->substring(unix_time_index + 10, utx_datetime_index - 1);

    uint32_t unixTime = unixTimeString.toInt();

    // Extract the offset as a string
    String offsetString = resp->substring(offset_index + 12, offset_index + 17);

    // Convert the offset string to an integer (assuming it's in minutes)
    uint32_t offset = offsetString.toInt();

    // Convert Unix time string to uint32_t
    return unixTime + offset;
}

/**************************************************************************/
/*!
@brief  This function will update the time (server_time_delta).

In this function a http request will be performed in order to get the time. 
The http response will be processed and if no error will occur the time 
will be saved by saving the delta between the internal time and the recived
time.

*/
/**************************************************************************/
void time_update_dif_time() {

    if(Lte.isConnected() == true) {
    
        unsigned long server_seconds;
        unsigned long internal_seconds;

        if (!HttpClient.configure(TIMEZONE_URL, 80, false)) {
            Log.errorf(F("Failed to configure HTTP for the domain %s\r\n"),
                    TIMEZONE_URL);
            return;
        }

        Log.info("--- Configured to HTTP ---");

        HttpResponse response;
        response = HttpClient.get(TIMEZONE_URI);
        if (response.status_code != HttpClient.STATUS_OK) {
            Log.errorf(
                F("Error when performing a GET request on %s%s. Got HTTP status"
                "code = %d. Exiting...\r\n"),
                TIMEZONE_URL,
                TIMEZONE_URI,
                response.status_code);
        } else
        {
            Log.infof(F("Successfully performed GET request. HTTP status code = %d, Size = %d\r\n"),
            response.status_code,
            response.data_size);

            String body = HttpClient.readBody(512);
            Log.info("HTTP received data: " + body);

            if (body == "") 
            {
                Log.errorf(
                    F("The returned body from the GET request is empty. Something "
                    "went wrong. Exiting...\r\n"));
            } else 
            {
                server_seconds = time_getTimeFromResponse(&body);
                Log.infof(F("Got the time (unixtime) %lu\r\n"), server_seconds);

                // Check if the received time is valid
                if (server_seconds > 0u)
                {
                    internal_seconds = millis() / 1000;
                    if (server_seconds > internal_seconds) 
                    {
                        server_time_delta = server_seconds - internal_seconds;
                    } else {
                        server_time_delta = 0;
                    }

                    request_update = false;
                    is_time_updated = true;
                }
            }
        }
    }
}

/**************************************************************************/
/*!
@brief  This function is used for getting the time in seconds.

@return Returns time in seconds.
*/
/**************************************************************************/
time_t time_get_time_unix(void) {
    time_t time = (millis() / 1000) + server_time_delta;

    return time;
}

/**************************************************************************/
/*!
@brief  This function is used for getting the current time in string format.

@return Returns a string that contain the current time.
*/
/**************************************************************************/
String time_get_time_string(void)
{
    time_t unixTime = time_get_time_unix();
    return time_time_to_string(unixTime);
}

/**************************************************************************/
/*!
@brief  This function is used to convert the time from numeric to string.

@return Returns a string that contain the time.
*/
/**************************************************************************/
String time_time_to_string(time_t unixTime)
{
    // Buffer to store the formatted time string
    char buffer[25];

    sprintf(buffer, "%04d-%02d-%02d %02d:%02d:%02d",
          year(unixTime), month(unixTime), day(unixTime),
          hour(unixTime), minute(unixTime), second(unixTime));

    return String(buffer);
}

main.h

C Header File
#ifndef main_h
#define main_h

#endif

mqtt.h

C Header File
#ifndef mqtt_h
#define mqtt_h

#include <Arduino.h>
// Function prototypes
void mqtt_cyclic_s(uint16_t delta_time);
void mqtt_setup();
bool mqtt_publish_message(const char* mqtt_message);

#endif

Credits

Stefan Ciobanu

Stefan Ciobanu

1 project • 1 follower

Comments