This tutorial is about handling data streams published over MQTT. We'll be using an Arduino MKR 1000 board for generating data. While if you have already sensors sending data to a messaging broker already you can just add another subscriber to what you already have. So regardless if you're starting from scratch or not, you can benefit largely from adding a subscriber via SA Studio in your web browser rather than relying on the visualisations that someone else has designed for you. One of the easiest ways of creating an advanced analysis is using Stream Analyze technology either on your computer, on a cloud server or on an edge device depending on your needs. The SA Engine has built-in support for both plain MQTT and secured messaging TLS (transport layer security) over TCP. The last example in the article will cover just that, how to use TLS with SA Studio.
The objective of this tutorial is to learn how to subscribe to MQTT topics using Stream Analyze software SA Studio, and how a single-board computer can be used to publish via a MQTT broker, here using the Arduino MKR1000. We'll describe examples on using the SA Studio, to subscribe to a MQTT topic and carry out some preamble functions and visual graphing.
Creating the data stream
Find enclosed in this tutorial a sample code for sending MQTT data on the Arduino platform, based on an example available in the public domain on the Arduino.cc web site, where we're adding some basic waveform generating functions in the first example. For your MKR 1000 hardware to connect properly, don’t forget to add your ssid/pass for your WiFi network in the arduino_secrets.h tab. In the first example, we revisit a previous article covering how to send data over the serial port from an arduino. We’ll be generating three waves; a sine wave with values occillating around zero, a sawtooth wave starting at value 97 with an amplitude of 20 and in-between a square wave periodically switching between 40 and 80 so that we’ll be able to view them side-by-side in the same graph view.
We’re sending an array package “pkt” of eight bytes; a designated start byte of 0x55, followed by two empty bytes for future use, and four bytes carrying values representing a square wave (byte 4), a sawtooth wave (byte 5) and a sine wave (represented by two bytes; 6 and 7), ending with an eighth, designated stop byte 0xAA.
pkt[0] = 0x55; // designated start byte
pkt[1] = 0x00; // not in use
pkt[2] = 0x00; // not in use
pkt[3] = square + height * 20; // square wave 60 ± 20 alternating
pkt[4] = 97 + i; //sawtooth wave, for 0 ≤ i < 20
*((int16_t *)&pkt[5]) = (int16_t)(y);
// sine wave based on y=20*sin(x*pi/180) in radians, repr. by 2 bytes
pkt[7] = 0xAA; // designated stop byte
We’re publishing to the open broker op-en.se and have chosen the topic formed by test/sa/ followed by a psuedorandom string trial314159. You can select your own random topic under test/sa/* if you aren’t already running another MQTT broker and have defined other topics for use. See the entire code in the attached file(s).
Subscribing to MQTT feeds
In SA Studio, you can add more functionality, which we call modules. This includes connecting to technologies like MQTT, Kafka, CAN bus and Azure Event Hub as well as several modules for advanced analytics AI and machine learning functions. Just login to SA Studio and open a new OSQL editor tab to load the MQTT module;
loadsystem(startup_dir()+"../extenders/sa.mqtt","mqtt.osql");
Set up the parameters for the connection, alongside a random client ID, note the ntoa() function for converting numbers to strings, and the randomizer that creates a random number between 0 and 10^16:
set :mqtt_connect_opts = {
"qos": 1,
"connection": "tcp://op-en.se:1883",
"clientid": "client" + sha256(ntoa(rand(10e16)))
};
// and register the client:
mqtt:register_client("mqtt",:mqtt_connect_opts);
Connect to the broker on your desired topic, which runs until stopped:
select charstring(b)
from binary b
where b in subscribe("mqtt:test/sa/YOUR-TOPIC-HERE");
We then exploit this compact format, when picking up the data in SA Studio and unpacking it to visualize the three wave formats:
select unpack(charstring(b), "z24u08u08i16")
from binary b
where b in subscribe("mqtt:test/sa/trial314159");
So, what’s actually happening, in this compact, but useful OSQL query ode? We’re subscribing to the MQTT topic “test/sa/trial314159”, reading incoming data as a string in binary, and then applying the unpack() function with these formatting options (z, u and i):
z16 -> skipping 16 bits
u08 -> reads 8 bits as an unsigned integer, values 0-255
u16 -> reads 16 bits as an unsigned integer, values 0 to 65,535
i16 -> reads 16 bits as a signed integer, values -32,768 to 32,767
So we’re basically ignoring the start byte along the two unused bytes (z24 or three bytes), while the fourth and fifth bytes become unsigned integers (u08), whereas bytes six and seven is read as a signed 16-bit value (i16).
You can force SA Studio to display the result in text format, or choose “automatic” or line graph to see the results:
Reading off the Arduino'sanalog port
In our second example, instead of generating wave forms, we’re reading off the analog A0 port of the Arduino; reading off a port with nothing attached will create a near-random 10-bit data stream. If you already have sensors to attach, feel free to do so. Just replace the loop() section of the first example, with this shorter code block, replacing the generated waveforms with readouts from the analog port A0.
void loop()
{
mqttClient.poll(); // keep alive
unsigned long currentMillis = millis();
if (currentMillis - previousMillis >= interval)
{
// save the last time a message was sent
previousMillis = currentMillis;
int analogValue = analogRead(A0);
Serial.print("Sending message to topic: ");
Serial.println(topic);
Serial.println(analogValue);
mqttClient.beginMessage(topic);
mqttClient.print(analogValue);
mqttClient.endMessage();
Serial.println();
}
}
Our third example origins from an already existing MQTT feed published from my country house, where a connected single-board computer is installed to the HAN port (home area network) of my electric meter, which already publishes to a messaging topic on MQTT. It’s a format originating from the DSMR project (Dutch Smart Meter Requirements Reader) “/dsmr/reading/electricity_currently_delivered”, and I got the device from https://smartgateways.nl/ (no affiliation of mine, it was inexpensive and they sent it really quickly.
After connecting to the broker, you can issue this query statement in the OSQL editor:
select json:unstringify(charstring(b))
from binary b
where b in subscribe("mqtt:/dsmr/reading/electricity_currently_delivered");
Note the unstringify() function which prunes undesired chars like quotes and other messy non-payload characters, simplifying the graph drawing automation in SA Studio.
Before we finish up, we’ll have a look at how to run MQTT over TLS, where we’ll be using Mosquitto's public MQTT broker and corresponding CA certificate issued by mosquitto organization. In SA Studio, downloading the file is carried out by the http module, stored in the temporary folder for the duration of the session:
http:download_file(
"http://assets.streamanalyze.com/docs/mqtt/mosquitto.org.crt",
{}, temp_folder() + "mosquitto.org.crt");
We’ll also need to expand the options of our MQTT connection with a pointer to the CA file and a random client ID, which some MQTT broker needs to have:set :mqtt_secure_connect_opts = {
"qos": 1,
"connection": "ssl://test.mosquitto.org:8883",
"cafile": temp_folder() + "mosquitto.org.crt",
"clientid": "client" + sha256(ntoa(rand(10e16)))
};
When connecting, we’ll need to formalize the secure connection using “mqtt-secure” instead of the “mqtt” parameter in the first example.
mqtt:register_client("mqtt-secure",:mqtt_secure_connect_opts);
Publishing in SA Studio via MQTT is easy as pie, just run the following command in the OSQL editor:loadsystem(startup_dir()+"../extenders/sa.mqtt", "mqtt.osql");
http:download_file(
"http://assets.streamanalyze.com/docs/mqtt/mosquitto.org.crt",
{}, temp_folder() + "mosquitto.org.crt");
set :mqtt_secure_connect_opts = {
"qos": 1,
"connection": "ssl://test.mosquitto.org:8883",
"cafile": temp_folder() + "mosquitto.org.crt",
"clientid": "client" + sha256(ntoa(rand(10e16)))
};
mqtt:register_client("mqtt-secure",:mqtt_secure_connect_opts);
publish(streamof("My first secure message!"), "mqtt-secure:my/experimental");
If you want to subscribe to the published feed from the command line, you’ll need to download the CA file to your computer so you can send it along, which is the same file as we’re using when connecting via SA Studio. https://test.mosquitto.org/ssl/mosquitto.org.crt
Then run the mosquitto binary from your command line prompt (rather than in SA Studio), before publishing from SA Studio:
mosquitto_sub -h test.mosquitto.org -p 8883 --cafile ./mosquitto.org.crt -t my/experimental
In SA Studio, this can be presented as a simple line graph, for monitoring your electric meter in realtime!
Hope you've enjoyed these examples, and now have something to bring in to the mix of your own examples, and as always Happy Coding!
Comments