This IoT project uses Azure IoT Hub and Raspberry Pi2 device to process sensor data for real-time alerts. Azure IoT Hub is used to collect temperature sensor data from raspberry pi, and to notify any device or mobile app when temperature crosses a set limit with the help of Azure Stream Analytic, Event Hub and Cloud services. This concept can be applied to any type of data.
Prerequisites:
Hardware
Raspberry Pi2 Model B
BMP280 Barometric Pressure & Altitude Sensor
Breadboard
10K Ohm Resistor
LED (generic)
Female to Male Jumper Wires
Male to Male Jumper Wires
Software
Windows 10 PC
Visual Studio 2015 Community Edition or Enterprise Edition Update2
Azure SDK 2.9
Windows 10 IoT Core
Device Explorer
Azure Subscription
IoT Hub
Event Hub
Azure Storage Account
Stream Analytic
Cloud Service
Fritzing Image of the Project
High-Level Architectural Diagram
If you have already set up your PC and Raspberry Pi2, you can skip step 1 and step 2.
Step 1: Set up Your PC- First you need a Windows 10 PC running the public release of Windows 10 (version 10.0.10586) or better.
- Install Visual Studio Community 2015 or Visual Studio Professional 2015 or Visual Studio Enterprise 2015 with Update 1.
- Install Windows IoT Core Project Templates from here.
- Enable developer mode on your Windows 10 device by following https://msdn.microsoft.com/library/windows/apps/xaml/dn706236.aspx
- 5v Micro USB power supply with at least 1.0A current. If you plan on using several power-hungry USB peripherals, use a higher current power supply instead (>2.0A).
- 8GB Micro SD card - class 10 or better.
- HDMI cable and monitor (Optional)
- Ethernet Cable
- Micro SD card reader - due to an issue with most internal micro SD card readers, we suggest an external USB micro SD card reader.
- Install the Windows 10 IoT Core tools:Download a Windows 10 IoT Core image from http://ms-iot.github.io/content/en-US/Downloads.htm Save the ISO to a local folder.Double click on the ISO (Iot Core RPi.iso). It will automatically mount itself as a virtual drive so you can access the contents.
- Install Windows_10_IoT_Core_RPi2.msi. When installation is complete, flash.ffu will be located at C:\Program Files (x86)\Microsoft IoT\FFU\RaspberryPi2.
- Eject the Virtual CD when installation is complete - this can be done by navigating to the top folder of File Explorer, right clicking on the virtual drive, and selecting "Eject" like shown in below figure 2.
- Insert a Micro SD Card into your SD card reader.
- Use IoTCoreImageHelper.exe to flash the SD card. Search for "WindowsIoT" from start menu and select the shortcut "WindowsIoTImageHelper".
- After launch the IoTCoreImageHelper.exe and select your SD Card and the flash.ffu found in the directory.
- Once the process has completed, you are ready to run Windows 10 IoT Core on your Raspberry Pi 2.NOTE: IoTCoreImageHelper.exe is the recommended tool to flash the SD card. However, instructions are available for using http://ms-iot.github.io/content/en-US/win10/samples/DISM.htm directly.
- Safely remove your USB SD card reader by clicking on "Safely Remove Hardware" in your task tray, or by finding the USB device in File Explorer, right clicking, and choosing "Eject". Failing to do this can cause corruption of the image.
- Hook up your board
- Insert the micro SD card you prepared into your Raspberry Pi 2.
- Connect a network cable from your local network to the Ethernet port on the board. Make sure your development PC is on the same network.NOTE: If you don't have a local wired network, see http://ms-iot.github.io/content/en-US/win10/ConnectToDevice.htm for additional connection options.
- Connect an HDMI (High-Definition Multimedia Interface) monitor to the HDMI port on the board (Optional).
- Connect the power supply to the micro USB port on the board.
Create and configure IoT Hub in Azure
- Login to your new azure portal (https://portal.azure.com), click on browse> select IoT Hub and click on Add icon.
- Give name for the IoT Hub, select pricing, enter resource name and region and finally click on create.
- Copy the host name which will be used in connection string at the time of registering device identity.
- Now click on All Settings-> Select Shared Access Policies -> select iothubowner. Copy primary access key which will be used in later steps.
Device Identity Registry
The main purpose of device identity registry is to allow access to the device-facing endpoints. For each device, it creates resources in the service, such as a queue in-flight cloud to device messages.
You can do this in many different ways. Here i will explain the process of creating device registry using console application as well as device explorer. You can use either console application or device explorer to create device identity registry.
Using Console Application:
Create a console application and add references of Microsoft.Azure.Devices to your project, you can find it from NuGet package manager.
To create Device Identity registry you need connection string of your Azure IoT Hub.
static string connectionString = "HostName=********.azure-devices.net;SharedAccessKeyName=iothubowner;SharedAccessKey=**********";
In above code replace the host name and Shared access key with your IoT Hub values which you saved in earlier step.
Complete Code of Program.cs file
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Azure.Devices;
using Microsoft.Azure.Devices.Common.Exceptions;
namespace CreateDeviceIdentityConsoleApp
{
class Program
{
static RegistryManager registryManager;
static string connectionString = "HostName=IoTHub******.azure-devices.net;SharedAccessKeyName=iothubowner;SharedAccessKey=**************************";
static void Main(string[] args)
{
registryManager = RegistryManager.CreateFromConnectionString(connectionString);
AddDeviceAsync().Wait();
Console.ReadLine();
}
private async static Task AddDeviceAsync()
{
string deviceId = "astranidevice";
Device device;
try
{
device = await registryManager.AddDeviceAsync(new Device(deviceId));
}
catch (DeviceAlreadyExistsException)
{
device = await registryManager.GetDeviceAsync(deviceId);
}
Console.WriteLine("Generated device key: {0}", device.Authentication.SymmetricKey.PrimaryKey);
}
}
}
In above code I am creating a device with name "astranidevice", the class RegistryManager class will creates the device identity registry in the IoT Hub with the help of connection string. If the device already exists, then just get the device information.
After creating/getting device information save the device.Authentication.SymmetricKey.PrimaryKey of the device which will be used in later steps.
Using Device Explorer
To simulate the Device to cloud and cloud to device messages we have a tool called device explorer. Available from here.
- Open Device Explorer, enter your IoT Hub connection string and click on Update.
- Now go to Management tab and click on Create button to create a device
- Finally save the device Id and Primary Key values which will be used in later steps.
You can use either console application or device explorer to create device identity registry.
Step 4: Create an UWP app for Raspberry Pi 2 Which will Send Sensor Data to IoT Hub and Receive Alerts from IoT HubAfter setting up the Development PC, Raspberry Pi and Azure IoT Hub, Open Visual Studio and create a new project by selecting Universal –> Blank App template as shown in screenshot below.
Now add reference of Windows 10 IoT Extensions to the recently created project.
Select an appropriate version of IoT Extensions, here I am using 10.0.10586.0 version and my Raspberry Pi 2 has the same version of Windows 10 IoT core installed.
Also add Microsoft.Azure.Devices.Client reference from NuGet Manager.
Reading Temperature values
In this project we reading temperature, pressure and latitude of a room.
For temperature we used BPM280 V1 sensor which comes with Ada fruit starter kit.
The following is the helper class which has all functionality to Initialize and read values from BPM280 Sensor.
BPM280.cs
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Windows.Devices.Enumeration;
using Windows.Devices.Gpio;
using Windows.Devices.I2c;
namespace SimpleTemperatureAlert
{
public class BMP280_CalibrationData
{
//BMP280 Registers
public UInt16 dig_T1 { get; set; }
public Int16 dig_T2 { get; set; }
public Int16 dig_T3 { get; set; }
public UInt16 dig_P1 { get; set; }
public Int16 dig_P2 { get; set; }
public Int16 dig_P3 { get; set; }
public Int16 dig_P4 { get; set; }
public Int16 dig_P5 { get; set; }
public Int16 dig_P6 { get; set; }
public Int16 dig_P7 { get; set; }
public Int16 dig_P8 { get; set; }
public Int16 dig_P9 { get; set; }
}
public class BMP280
{
//The BMP280 register addresses according the the datasheet: http://www.adafruit.com/datasheets/BST-BMP280-DS001-11.pdf
const byte BMP280_Address = 0x77;
const byte BMP280_Signature = 0x58;
enum eRegisters : byte
{
BMP280_REGISTER_DIG_T1 = 0x88,
BMP280_REGISTER_DIG_T2 = 0x8A,
BMP280_REGISTER_DIG_T3 = 0x8C,
BMP280_REGISTER_DIG_P1 = 0x8E,
BMP280_REGISTER_DIG_P2 = 0x90,
BMP280_REGISTER_DIG_P3 = 0x92,
BMP280_REGISTER_DIG_P4 = 0x94,
BMP280_REGISTER_DIG_P5 = 0x96,
BMP280_REGISTER_DIG_P6 = 0x98,
BMP280_REGISTER_DIG_P7 = 0x9A,
BMP280_REGISTER_DIG_P8 = 0x9C,
BMP280_REGISTER_DIG_P9 = 0x9E,
BMP280_REGISTER_CHIPID = 0xD0,
BMP280_REGISTER_VERSION = 0xD1,
BMP280_REGISTER_SOFTRESET = 0xE0,
BMP280_REGISTER_CAL26 = 0xE1, // R calibration stored in 0xE1-0xF0
BMP280_REGISTER_CONTROLHUMID = 0xF2,
BMP280_REGISTER_CONTROL = 0xF4,
BMP280_REGISTER_CONFIG = 0xF5,
BMP280_REGISTER_PRESSUREDATA_MSB = 0xF7,
BMP280_REGISTER_PRESSUREDATA_LSB = 0xF8,
BMP280_REGISTER_PRESSUREDATA_XLSB = 0xF9, // bits
BMP280_REGISTER_TEMPDATA_MSB = 0xFA,
BMP280_REGISTER_TEMPDATA_LSB = 0xFB,
BMP280_REGISTER_TEMPDATA_XLSB = 0xFC, // bits
BMP280_REGISTER_HUMIDDATA_MSB = 0xFD,
BMP280_REGISTER_HUMIDDATA_LSB = 0xFE,
};
//String for the friendly name of the I2C bus
const string I2CControllerName = "I2C1";
//Create an I2C device
private I2cDevice bmp280 = null;
//Create new calibration data for the sensor
BMP280_CalibrationData CalibrationData;
//Variable to check if device is initialized
bool init = false;
//Method to initialize the BMP280 sensor
public async Task Initialize()
{
Debug.WriteLine("BMP280::Initialize");
try
{
//Instantiate the I2CConnectionSettings using the device address of the BMP280
I2cConnectionSettings settings = new I2cConnectionSettings(BMP280_Address);
//Set the I2C bus speed of connection to fast mode
settings.BusSpeed = I2cBusSpeed.FastMode;
//Use the I2CBus device selector to create an advanced query syntax string
string aqs = I2cDevice.GetDeviceSelector(I2CControllerName);
//Use the Windows.Devices.Enumeration.DeviceInformation class to create a collection using the advanced query syntax string
DeviceInformationCollection dis = await DeviceInformation.FindAllAsync(aqs);
//Instantiate the the BMP280 I2C device using the device id of the I2CBus and the I2CConnectionSettings
bmp280 = await I2cDevice.FromIdAsync(dis[0].Id, settings);
//Check if device was found
if (bmp280 == null)
{
Debug.WriteLine("Device not found");
}
}
catch (Exception e)
{
Debug.WriteLine("Exception: " + e.Message + "\n" + e.StackTrace);
throw;
}
}
private async Task Begin()
{
Debug.WriteLine("BMP280::Begin");
byte[] WriteBuffer = new byte[] { (byte)eRegisters.BMP280_REGISTER_CHIPID };
byte[] ReadBuffer = new byte[] { 0xFF };
//Read the device signature
bmp280.WriteRead(WriteBuffer, ReadBuffer);
Debug.WriteLine("BMP280 Signature: " + ReadBuffer[0].ToString());
//Verify the device signature
if (ReadBuffer[0] != BMP280_Signature)
{
Debug.WriteLine("BMP280::Begin Signature Mismatch.");
return;
}
//Set the initalize variable to true
init = true;
//Read the coefficients table
CalibrationData = await ReadCoefficeints();
//Write control register
await WriteControlRegister();
//Write humidity control register
await WriteControlRegisterHumidity();
}
//Method to write 0x03 to the humidity control register
private async Task WriteControlRegisterHumidity()
{
byte[] WriteBuffer = new byte[] { (byte)eRegisters.BMP280_REGISTER_CONTROLHUMID, 0x03 };
bmp280.Write(WriteBuffer);
await Task.Delay(1);
return;
}
//Method to write 0x3F to the control register
private async Task WriteControlRegister()
{
byte[] WriteBuffer = new byte[] { (byte)eRegisters.BMP280_REGISTER_CONTROL, 0x3F };
bmp280.Write(WriteBuffer);
await Task.Delay(1);
return; }
//Method to read a 16-bit value from a register and return it in little endian format
private UInt16 ReadUInt16_LittleEndian(byte register)
{
UInt16 value = 0;
byte[] writeBuffer = new byte[] { 0x00 };
byte[] readBuffer = new byte[] { 0x00, 0x00 };
writeBuffer[0] = register;
bmp280.WriteRead(writeBuffer, readBuffer);
int h = readBuffer[1] << 8;
int l = readBuffer[0];
value = (UInt16)(h + l);
return value;
}
//Method to read an 8-bit value from a register
private byte ReadByte(byte register)
{
byte value = 0;
byte[] writeBuffer = new byte[] { 0x00 };
byte[] readBuffer = new byte[] { 0x00 };
writeBuffer[0] = register;
bmp280.WriteRead(writeBuffer, readBuffer);
value = readBuffer[0];
return value;
}
//Method to read the caliberation data from the registers
private async Task ReadCoefficeints()
{
// 16 bit calibration data is stored as Little Endian, the helper method will do the byte swap.
CalibrationData = new BMP280_CalibrationData();
// Read temperature calibration data
CalibrationData.dig_T1 = ReadUInt16_LittleEndian((byte)eRegisters.BMP280_REGISTER_DIG_T1);
CalibrationData.dig_T2 = (Int16)ReadUInt16_LittleEndian((byte)eRegisters.BMP280_REGISTER_DIG_T2);
CalibrationData.dig_T3 = (Int16)ReadUInt16_LittleEndian((byte)eRegisters.BMP280_REGISTER_DIG_T3);
// Read presure calibration data
CalibrationData.dig_P1 = ReadUInt16_LittleEndian((byte)eRegisters.BMP280_REGISTER_DIG_P1);
CalibrationData.dig_P2 = (Int16)ReadUInt16_LittleEndian((byte)eRegisters.BMP280_REGISTER_DIG_P2);
CalibrationData.dig_P3 = (Int16)ReadUInt16_LittleEndian((byte)eRegisters.BMP280_REGISTER_DIG_P3);
CalibrationData.dig_P4 = (Int16)ReadUInt16_LittleEndian((byte)eRegisters.BMP280_REGISTER_DIG_P4);
CalibrationData.dig_P5 = (Int16)ReadUInt16_LittleEndian((byte)eRegisters.BMP280_REGISTER_DIG_P5);
CalibrationData.dig_P6 = (Int16)ReadUInt16_LittleEndian((byte)eRegisters.BMP280_REGISTER_DIG_P6);
CalibrationData.dig_P7 = (Int16)ReadUInt16_LittleEndian((byte)eRegisters.BMP280_REGISTER_DIG_P7);
CalibrationData.dig_P8 = (Int16)ReadUInt16_LittleEndian((byte)eRegisters.BMP280_REGISTER_DIG_P8);
CalibrationData.dig_P9 = (Int16)ReadUInt16_LittleEndian((byte)eRegisters.BMP280_REGISTER_DIG_P9);
await Task.Delay(1);
return CalibrationData;
}
//t_fine carries fine temperature as global value
Int32 t_fine = Int32.MinValue;
//Method to return the temperature in DegC. Resolution is 0.01 DegC. Output value of “5123” equals 51.23 DegC.
private double BMP280_compensate_T_double(Int32 adc_T)
{
double var1, var2, T;
//The temperature is calculated using the compensation formula in the BMP280 datasheet
var1 = ((adc_T / 16384.0) - (CalibrationData.dig_T1 / 1024.0)) * CalibrationData.dig_T2;
var2 = ((adc_T / 131072.0) - (CalibrationData.dig_T1 / 8192.0)) * CalibrationData.dig_T3;
t_fine = (Int32)(var1 + var2);
T = (var1 + var2) / 5120.0;
return T;
}
//Method to returns the pressure in Pa, in Q24.8 format (24 integer bits and 8 fractional bits).
//Output value of “24674867” represents 24674867/256 = 96386.2 Pa = 963.862 hPa
private Int64 BMP280_compensate_P_Int64(Int32 adc_P)
{
Int64 var1, var2, p;
//The pressure is calculated using the compensation formula in the BMP280 datasheet
var1 = t_fine - 128000;
var2 = var1 * var1 * (Int64)CalibrationData.dig_P6;
var2 = var2 + ((var1 * (Int64)CalibrationData.dig_P5) << 17);
var2 = var2 + ((Int64)CalibrationData.dig_P4 << 35);
var1 = ((var1 * var1 * (Int64)CalibrationData.dig_P3) >> 8) + ((var1 * (Int64)CalibrationData.dig_P2) << 12);
var1 = (((((Int64)1 << 47) + var1)) * (Int64)CalibrationData.dig_P1) >> 33;
if (var1 == 0)
{
Debug.WriteLine("BMP280_compensate_P_Int64 Jump out to avoid / 0");
return 0; //Avoid exception caused by division by zero
}
//Perform calibration operations as per datasheet: http://www.adafruit.com/datasheets/BST-BMP280-DS001-11.pdf
p = 1048576 - adc_P;
p = (((p << 31) - var2) * 3125) / var1;
var1 = ((Int64)CalibrationData.dig_P9 * (p >> 13) * (p >> 13)) >> 25;
var2 = ((Int64)CalibrationData.dig_P8 * p) >> 19;
p = ((p + var1 + var2) >> 8) + ((Int64)CalibrationData.dig_P7 << 4);
return p;
}
public async Task ReadTemperature()
{
//Make sure the I2C device is initialized
if (!init) await Begin();
//Read the MSB, LSB and bits 7:4 (XLSB) of the temperature from the BMP280 registers
byte tmsb = ReadByte((byte)eRegisters.BMP280_REGISTER_TEMPDATA_MSB);
byte tlsb = ReadByte((byte)eRegisters.BMP280_REGISTER_TEMPDATA_LSB);
byte txlsb = ReadByte((byte)eRegisters.BMP280_REGISTER_TEMPDATA_XLSB); // bits 7:4
//Combine the values into a 32-bit integer
Int32 t = (tmsb << 12) + (tlsb << 4) + (txlsb >> 4);
//Convert the raw value to the temperature in degC
double temp = BMP280_compensate_T_double(t);
//Return the temperature as a float value
return (float)temp;
}
public async Task ReadPreasure()
{
//Make sure the I2C device is initialized
if (!init) await Begin();
//Read the temperature first to load the t_fine value for compensation
if (t_fine == Int32.MinValue)
{
await ReadTemperature();
}
//Read the MSB, LSB and bits 7:4 (XLSB) of the pressure from the BMP280 registers
byte tmsb = ReadByte((byte)eRegisters.BMP280_REGISTER_PRESSUREDATA_MSB);
byte tlsb = ReadByte((byte)eRegisters.BMP280_REGISTER_PRESSUREDATA_LSB);
byte txlsb = ReadByte((byte)eRegisters.BMP280_REGISTER_PRESSUREDATA_XLSB); // bits 7:4
//Combine the values into a 32-bit integer
Int32 t = (tmsb << 12) + (tlsb << 4) + (txlsb >> 4);
//Convert the raw value to the pressure in Pa
Int64 pres = BMP280_compensate_P_Int64(t);
//Return the temperature as a float value
return ((float)pres) / 256;
}
//Method to take the sea level pressure in Hectopascals(hPa) as a parameter and calculate the altitude using current pressure.
public async Task ReadAltitude(float seaLevel)
{
//Make sure the I2C device is initialized
if (!init) await Begin();
//Read the pressure first
float pressure = await ReadPreasure();
//Convert the pressure to Hectopascals(hPa)
pressure /= 100;
//Calculate and return the altitude using the international barometric formula
return 44330.0f * (1.0f - (float)Math.Pow((pressure / seaLevel), 0.1903f));
}
}
public class BMP280SensorData
{
public float Temperature { get; set; }
public float Pressure { get; set; }
public float Altitude { get; set; }
}
}
Below is the code for reading temperature sensor values from Helper class.
private async Task ReadBMP280SensorData()
{
var sensorData = new BMP280SensorData();
try
{
//Create a constant for pressure at sea level.
//This is based on your local sea level pressure (Unit: Hectopascal)
const float seaLevelPressure = 1013.25f;
sensorData.Temperature =(int)Math.Ceiling(await BMP280.ReadTemperature());
sensorData.Pressure = await BMP280.ReadPreasure();
sensorData.Altitude = await BMP280.ReadAltitude(seaLevelPressure);
}
catch (Exception ex)
{
Debug.WriteLine(ex.Message);
}
return sensorData;
}
Sending sensor data to IoT Hub
The application reads data from sensors and send it to IoT Hub periodically. Following is the timer event code to read sensor data and send to IoT Hub.
private async void Timer_Tick(object sender, object e)
{
//Reading Temperature/Pressure/Altitude Information from BMP280 Sensor
var BMP280SensorData = await ReadBMP280SensorData();
Debug.WriteLine("BMP280 Sensor data\nTemperature:{0}, \nPressure:{1}, \nAltitude:{2}",
BMP280SensorData.Temperature, BMP280SensorData.Pressure, BMP280SensorData.Altitude);
if (BMP280SensorData != null)
{
//Sending Message to IoT Hub
SendDeviceToCloudMessagesAsync(BMP280SensorData);
}
}
private async void SendDeviceToCloudMessagesAsync(BMP280SensorData bMP280SensorData)
{
SimpleTemperatureAlertData simpleTemperatureAlertData = new SimpleTemperatureAlertData()
{
DeviceId = deviceName,
Time = DateTime.UtcNow.ToString("o"),
RoomTemp =(int)Math.Ceiling(bMP280SensorData.Temperature),
RoomPressure = bMP280SensorData.Pressure.ToString(),
RoomAlt = bMP280SensorData.Altitude.ToString(),
};
var jsonString = JsonConvert.SerializeObject(simpleTemperatureAlertData);
var jsonStringInBytes = new Microsoft.Azure.Devices.Client.Message(Encoding.ASCII.GetBytes(jsonString));
await deviceClient.SendEventAsync(jsonStringInBytes);
Debug.WriteLine("{0} > Sending message: {1}", DateTime.UtcNow, jsonString);
}
Device side code for receiving alerts from IoT Hub
Timer method will send the temperature data for every time interval. After initiating timer method, you have to call the following method to receive alerts from IoT Hub.
public static async Task ReceiveCloudToDeviceMessageAsync()
{
while (true)
{
var receivedMessage = await deviceClient.ReceiveAsync();
if (receivedMessage != null)
{
var messageData = Encoding.ASCII.GetString(receivedMessage.GetBytes());
await deviceClient.CompleteAsync(receivedMessage);
return messageData;
}
await Task.Delay(TimeSpan.FromSeconds(1));
}
}
null
Deploy UWP app in to Raspberry Pi
Set Project build type to Debug, device architecture to ARM and Device as Remote Machine.
In Remote Machine Configuration window enter the unique name of your device or IP address.
The unique name is the name which you will create while setting up the device.
If you are having trouble to identify the IP address of your Raspberry Pi, use Windows IoT core watcher or Windwos 10 IoT Core Dashboard app. Which will automatically detects the Raspberry Pi in your network (make sure that your Raspberry PI and your Dev PC connected to the same network, otherwise you were not able to see the PI) and will display you the name and IP address of your app.
Or
The visual studio by default automatically detects the devices which are connected to your network like in below, the device name is ’minwinpc’.
Select your device.
Now select/click on Remote Machine play button on top of the visual studio, it starts deploying the app directly into your Raspberry Pi 2.
Step 5: Set Up Azure Stream Analytics to filter IoT Hub data and send it to Event HubTo create a Stream Analytics Job in azure, log in to your azure classic portal. Select Stream Analytics from left side list and click on New plus icon on bottom of the page. Select Data Services->Stream Analytics->Quick Create
Enter Job Name, Select Region and related region storage account. If these details already exist, then it automatically selects it, otherwise we have to create a new storage account by specifying name for the storage account. Finally click on Create Stream Analytics Job which is located at the bottom of New dialog screen.
Select the newly created stream analytics, click on Inputs tab and select Add Input option available on the page.
On Add an Input Popup window select Data Stream as the input and click on next.
Select IoT Hub as the data stream input and click next.
Enter Input stream alias name which will be used in later steps, select subscription, choose an IoT Hub which we created in Step 3 SetUp Azure IoT Hub and select iothubowner as shared access policy name and click on next.
Select Event serialization format as JSON and Encoding as UTF8 then click on create.
Here temperaturealertsjob is my stream analytics job name and bmpsensordata is the alias name for my IoT Hub Input.
Now add an output for the stream analytics job so that we can process the data which is coming from input stream and send it to the list of supported outputs.
Here we are using Event Hub as an output to the stream analytics job.
Enter output alias name, event name and name space under which you want to create Event Hub. Leave the remaining fields with default values, and click on next.
In the next page, just click on tick mark on bottom of the popup and leave all input fields with their default values. Now this creates a Service bus and a Event Hub in it.
Now go to Query tab and write a query to filter the data coming from IoT Hub input stream and pass it onto Event Hub.
Following is the query I used to filter the temperature greater than 50 degrees. This is where you can apply your business rules/logic for the IoT data.
SELECT
Time,
DeviceId,
RoomTemp,
RoomPressure,
RoomAlt
INTO
eventhub
FROM
bmpsensordata
WHERE RoomTemp>50
Now click on start icon from the bottom of the screen. If stream analytics fails to run, then go to Dashboard of your stream analytics job, select Operational Logs which is located right side of the page under Management Service.
Select the log and click on Details icon which is located under bottom of the page.
This newly created Stream Analytics job will send temperature data to Event Hub when the temperature is greater than 50 degrees. Now we will create a cloud service that reads this data from Event Hub and sends it back to IoT Hub. And IoT hub will send it to the device back(Raspberry Pi). To achieve this, we need to configure Cloud Service.
Configure a Cloud Service
Select Cloud Service from left hamburger menu in Azure Classic portal
Select New option located at the bottom of the page and enter the name for the service.
Select the affinity group where your all resources will be saved and select create cloud service.
Now create a new project in Visual Studio of type Cloud Service, for this you need Visual Studio 2015 Update 2 and Azure SDK 2.9.
Now select Worker Role from left side list and add it to the right side list. Then click on OK.
After successful creation of the solution, notice that there are two projects inside the solution, one is for the worker role library and another is for cloud service which will be published to azure.
Update the WorkerRole.cs code to read data from Event Hub and send the data back to IoT Hub.
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.WindowsAzure;
using Microsoft.Azure.Devices;
using Microsoft.ServiceBus.Messaging;
using System.Configuration;
using Microsoft.WindowsAzure.ServiceRuntime;
namespace events_forwarding
{
public class WorkerRole : RoleEntryPoint
{
private readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
private readonly ManualResetEvent runCompleteEvent = new ManualResetEvent(false);
private static string connectionString;
private static string eventHubName;
public static ServiceClient iotHubServiceClient { get; private set; }
public static EventHubClient eventHubClient { get; private set; }
public override void Run()
{
Trace.TraceInformation("EventsForwarding Run()...\n");
try
{
this.RunAsync(this.cancellationTokenSource.Token).Wait();
}
finally
{
this.runCompleteEvent.Set();
}
}
public override bool OnStart()
{
// Set the maximum number of concurrent connections
ServicePointManager.DefaultConnectionLimit = 12;
// For information on handling configuration changes
// see the MSDN topic at http://go.microsoft.com/fwlink/?LinkId=166357.
bool result = base.OnStart();
Trace.TraceInformation("EventsForwarding OnStart()...\n");
connectionString = ConfigurationManager.AppSettings["Microsoft.ServiceBus.ConnectionString"];
eventHubName = ConfigurationManager.AppSettings["Microsoft.ServiceBus.EventHubName"];
string storageAccountName = ConfigurationManager.AppSettings["AzureStorage.AccountName"];
string storageAccountKey = ConfigurationManager.AppSettings["AzureStorage.Key"];
string storageAccountString = string.Format("DefaultEndpointsProtocol=https;AccountName={0};AccountKey={1}",
storageAccountName, storageAccountKey);
string iotHubConnectionString = ConfigurationManager.AppSettings["AzureIoTHub.ConnectionString"];
iotHubServiceClient = ServiceClient.CreateFromConnectionString(iotHubConnectionString);
eventHubClient = EventHubClient.CreateFromConnectionString(connectionString, eventHubName);
var defaultConsumerGroup = eventHubClient.GetDefaultConsumerGroup();
string eventProcessorHostName = "SensorEventProcessor";
EventProcessorHost eventProcessorHost = new EventProcessorHost(eventProcessorHostName, eventHubName, defaultConsumerGroup.GroupName, connectionString, storageAccountString);
var epo = new EventProcessorOptions
{
MaxBatchSize = 100,
PrefetchCount = 10,
ReceiveTimeOut = TimeSpan.FromSeconds(20),
InitialOffsetProvider = (name) => DateTime.Now.AddDays(-7)
};
epo.ExceptionReceived += OnExceptionReceived;
eventProcessorHost.RegisterEventProcessorAsync(epo).Wait();
Trace.TraceInformation("Receiving events...\n");
return result;
}
public override void OnStop()
{
Trace.TraceInformation("EventsForwarding is OnStop()...");
this.cancellationTokenSource.Cancel();
this.runCompleteEvent.WaitOne();
base.OnStop();
Trace.TraceInformation("EventsForwarding has stopped");
}
private async Task RunAsync(CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
//Trace.TraceInformation("EventsToCommmandsService running...\n");
await Task.Delay(1000);
}
}
public static void OnExceptionReceived(object sender, ExceptionReceivedEventArgs args)
{
Trace.TraceInformation("Event Hub exception received: {0}", args.Exception.Message);
}
}
}
We will use App settings to store the Event Hub, storage account and IoT Hub keys and will use those in the Worker Role class with the help of ConfigurationManager class.
app.config
<!-- Service Bus specific app setings for messaging connections -->
<add key="Microsoft.ServiceBus.ConnectionString" value="<Enter Your Connection String>" />
<add key="Microsoft.ServiceBus.EventHubName" value="eventhubforconnectingdots" />
<add key="AzureStorage.AccountName" value="temperaturealertstorage" />
<add key="AzureStorage.Key" value="<Enter Your Key>" />
<add key="AzureIoTHub.ConnectionString" value="<Enter Your Connection String>" />
</appSettings>
Add two new classes in to WorkerRole project SensorEventProcessor.cs class and SimpleTemperatureAlertData.cs model class.
SensorEventProcessor.cs
using Microsoft.ServiceBus.Messaging;
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace events_forwarding
{
class SensorEventProcessor : IEventProcessor
{
Stopwatch checkpointStopWatch;
PartitionContext partitionContext;
public async Task CloseAsync(PartitionContext context, CloseReason reason)
{
Trace.TraceInformation(string.Format("EventProcessor Shuting Down. Partition '{0}', Reason: '{1}'.", this.partitionContext.Lease.PartitionId, reason.ToString()));
if (reason == CloseReason.Shutdown)
{
await context.CheckpointAsync();
}
}
public Task OpenAsync(PartitionContext context)
{
Trace.TraceInformation(string.Format("Initializing EventProcessor: Partition: '{0}', Offset: '{1}'", context.Lease.PartitionId, context.Lease.Offset));
this.partitionContext = context;
this.checkpointStopWatch = new Stopwatch();
this.checkpointStopWatch.Start();
return Task.FromResult(null);
}
public async Task ProcessEventsAsync(PartitionContext context, IEnumerable messages)
{
Trace.TraceInformation("\n");
Trace.TraceInformation("........ProcessEventsAsync........");
//string commandParameterNew = "{\"Name\":\"AlarmThreshold\",\"Parameters\":{\"SensorId\":\"" + "Hello World" + "\"}}";
//await WorkerRole.iotHubServiceClient.SendAsync("astranidevice", new Microsoft.Azure.Devices.Message(Encoding.UTF8.GetBytes(commandParameterNew)));
foreach (EventData eventData in messages)
{
try
{
string jsonString = Encoding.UTF8.GetString(eventData.GetBytes());
Trace.TraceInformation(string.Format("Message received at '{0}'. Partition: '{1}'",
eventData.EnqueuedTimeUtc.ToLocalTime(), this.partitionContext.Lease.PartitionId));
Trace.TraceInformation(string.Format("-->Raw Data: '{0}'", jsonString));
var newSensorEvent = this.DeserializeEventData("[" + jsonString.Replace("}", "},").Remove(jsonString.Length - 1, 1) + "]");
// SimpleTemperatureAlertData newSensorEvent = Jso(jsonString);
//Trace.TraceInformation(string.Format("-->Serialized Data: '{0}', '{1}', '{2}', '{3}', '{4}'",
// newSensorEvent.Time, newSensorEvent.RoomTemp, newSensorEvent.RoomPressure, newSensorEvent.RoomAlt, newSensorEvent.DeviceId));
foreach (var eventhubdata in newSensorEvent)
{
// Issuing alarm to device.
string commandParameterNew = "{\"Name\":\"Room Temperature\",\"Parameters\":{\"SensorId\":\"" + eventhubdata.RoomTemp + "\"}}";
Trace.TraceInformation("Issuing Eventhub data to device: '{0}', from sensor: '{1}'", eventhubdata.DeviceId, eventhubdata.RoomTemp);
Trace.TraceInformation("New Command Parameter: '{0}'", commandParameterNew);
await WorkerRole.iotHubServiceClient.SendAsync(eventhubdata.DeviceId, new Microsoft.Azure.Devices.Message(Encoding.UTF8.GetBytes(commandParameterNew)));
}
}
catch (Exception ex)
{
Trace.TraceInformation("Error in ProssEventsAsync -- {0}\n", ex.Message);
}
}
await context.CheckpointAsync();
}
private SimpleTemperatureAlertData[] DeserializeEventData(string eventDataString)
{
return JsonConvert.DeserializeObject(eventDataString);
}
}
}
SimpleTemperatureAlertData.cs
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Runtime.Serialization;
namespace events_forwarding
{
[DataContract]
class SimpleTemperatureAlertData
{
[DataMember]
public string Time { get; set; }
[DataMember]
public string DeviceId { get; set; }
[DataMember]
public int RoomTemp { get; set; }
[DataMember]
public string RoomPressure { get; set; }
[DataMember]
public string RoomAlt { get; set; }
}
}
Publish Cloud Service
Now build the code and publish it to azure. Right click on Cloud Service project and select Publish option. On publish window, login using your Azure account and select appropriate subscription, then click on Next button.
Make sure to select the cloud service that was created in previous step. keep all other settings to pre populated values (Don't change them).
Now click on Advanced settings. In that, cross check to make sure that the storage account which is selected, is the same that was mentioned in app.config. Now click on Next Button.
Now click on Publish Button.
To Identify whether the cloud service is running fine or having any issue, select your cloud service in Azure and go to dashboard. There look for Operational Logs option. Here you can see all logs related to your service.
If you want to debug the code locally, select the Cloud Service project as start up project in Visual Studio, and press F5. Now CloudService will run and starts the WorkerRole Run method. If your Event Hub is receiving data from Stream Analytics then you can debug the SensorEventProcessor class methods and can easily identify the issues in the code (if any).
In Step 4 under Device side code for receiving alerts, update the method to glow a LED when you receive alert from IoT Hub.
public async Task ReceiveCloudToDeviceMessageAsync()
{
var deviceClient = DeviceClient.CreateFromConnectionString(connectionString, TransportType.Amqp);
while (true)
{
var receivedMessage = await deviceClient.ReceiveAsync();
if (receivedMessage != null)
{
led.Write(GpioPinValue.Low);
var messageData = Encoding.ASCII.GetString(receivedMessage.GetBytes());
await deviceClient.CompleteAsync(receivedMessage);
return messageData;
}
else
{
led.Write(GpioPinValue.High);
}
await Task.Delay(TimeSpan.FromSeconds(1));
}
}
Comments