We all deal with traffic every day. It wastes our time, pollutes the environment, expends our natural resources, creates unnecessary stress in our lives and in every other way just plain sucks. Having a plethora of real-time data now available with crowd sourced traffic information, the high availability of RFID vehicle identification systems, machine learning, stream analytics and the power of the cloud, why haven't we solved such a prevalent problem?
When I saw that Hackster.io was hosting a contest for an enterprise class IoT concept, I couldn't resist. I have built this out as a proof of concept model to promote the topic and hopefully get some creative minds tickled.
Essentially, I have modeled a basic 4-way intersection. Three of the lamps are simulated with LED's on the breadboard (South, East and West) and one full lamp is modeled with 50W halogen flood lights (North). The whole project is mounted on a scrap piece of pine board I had laying around.
I settled on a Raspberry Pi3 for the computer, Windows 10 IoT Core and a headless UWP app for the on-board software and, of course, Azure for the cloud computing, communications and web hosting. Go Microsoft or go home. lol.
So before y'all get bored, let's check out a video. Probably a good time to introduce some ubiquitous language as well. As the board fires up, it exercises each element in a Power On Self Test. In the video you will see it perform this test. It will test each of the 12 bulbs, then it puts the bulbs together into 4 lamps. You will see it test those as it assembles them. Then the lamps are assembled into lamp sets. In a real-world scenario there would likely be multiple lamps facing each direction. I have called that a " lamp set". In this model we only have one lamp per lamp set. Next, the lamp sets are assigned to routes. Think of a normal 2-lane road running north and south as a route. The lamp or lamps facing north would be one lamp set and the lamp or lamps facing south would be the other. As the routes are assembled from the lamp sets, you will see it transition the route from go through caution to stop. Finally the routes are assembled in to an intersection. In the last few seconds of the video you will see the intersection transition from east/west route having the right of way to holding while the north/south route transitions from holding to right of way.
Okay, cool. So that's the 10,000 foot hardware overview. We'll dive deeper later down the page. First, let's take a look at the software.
The Pi streams out large volumes of events from the intersection through an Azure IoT hub. It streams telemetry in the form of state change events for each component and usage updates to track the usage of the relays and bulbs in order to predict component failure. Here is some sample content...
You can also send commands from the cloud to the intersection. The commands I built in for this model include starting and stopping the intersection, shutting down the on-board app completely, requesting status updates for the various components, and perhaps most importantly, update the routing preference in real time. By adjusting the preference metric for the routes, the controller can work in concert with the rest of the intersections in town to alleviate congested areas. If a sporting even, for instance, lets out downtown and this intersection is south of downtown, it could be set to prefer the north/south route over the east/west route for the next couple of hours while the mob of south bound traffic comes through. But let's not get ahead of ourselves.
Events... So these events can also drive your remote monitoring tools. I have created my own micro view to remote monitor this particular intersection, but later on we will connect it to Azure Remote Monitoring to illustrate what a more macro, city-wide view would look like. For the sake of this simulation, I can also send commands to *fake* a broken bulb or current sensor. Here is a screenshot of my remote monitoring *micro view* page.
Clicking on the state labels sends a status request to the Pi and allows you to send the break/fix commands for the bulbs. You can see the North Lamp Set status report in the lower left.
When you click a bulb status label you get the opportunity to break stuff... In this screenshot I have busted the yellow bulb in the south lamp and the current sensor in the north lamp.
Here's what it looks like all running together. I had to mute the bulbs a bit for the camera. Remember that the break out, real bulbs, represent the north facing lamp and on the breadboard, a little harder to see, are the other three lamps oriented in alignment with the compass. West mid left, East mid right and South at the bottom.
Project DetailsOkay, hold on to you butt's... There's A LOT to cover. I'm going to try to break it down best I can.
Assembling the ThingFirst let's look at the 3 simulated lamps. This is actually very straight forward. It's just 9 basic LED's and a 220-Ohm resistor on the 3.3v bus. For reference, here are the GPIO's I used:
West Lamp:
- Red - GPIO 4
- Yellow - GPIO 5
- Green - GPIO 6
East Lamp:
- Red - GPIO 23
- Yellow - GPIO 24
- Green - GPIO 25
South Lamp
- Red - GPIO 16
- Yellow - GPIO 20
- Green - GPIO 21
Here's your fritz!
Easy, right?
Now the break out North Lamp. This is a 5v circuit. It operates 3 - 5v relays running 110v through 3 ACS712 hall effect current sensors to 3 bulbs. The analog sensors are fed through an MCP3208 ADC so we can read the values. I am using the sensors to identify if a bulb is out. Obviously if it is instructed to turn on and does not draw any current, it (or the relay) has failed. Here is the routing I chose for this lamp:
North Lamp:
- Red - GPIO 13 to Relay 2 & ADC channel 1
- Yellow - GPIO 19 to Relay 3 & ADC channel 2
- Green - GPIO 26 to Relay 4 & ADC channel 3
I also wired channels 7 and 8 to ground and Vre to try to get the readings more accurate by calibrating the top and bottom in real time when taking the readings. I am only using the Pi for the power, so it is really noisy. It was still tough to register the less than half an amp load the bulbs were drawing reliably. I hooked up a clothes iron to the circuit, though and that created enough draw to validate the wiring, concept and code.
And the fritz:
So that covers the hardware design. On to the software!
Code code code!Hopefully if you have followed along thus far, you have gone ahead and pulled down the solution from github HERE. I think this will be much easier to follow if you have spent a minute or 2 looking at the source tree. If you want to just connect it to your Azure account and see it work, go straight to the TrafficManager.Devices.Configuration.InMemoryConfigService and put in the information for your Azure IoT hub and update any GPIO routing that differs from the setup above.
Let's dive in.
It was my intent to model this guy as it might look as a real enterprise class solution. As such, I started out by modeling the problem domain in...
TrafficManager.DomainYou will see that this project is a UWP class library. It contains the base building blocks for our solution.
Let's look at how our hardware solution is logically defined by the domain. All the devices are going to inherit our IDomainDevice interface which simply contains a StateChange event and Guid Id:
public interface IDomainDevice
{
event StateChangedEvent StateChanged;
Guid Id { get; }
}
So let's define a bulb. ValueTypes\IBulb.cs
public interface IBulb : IDomainDevice
{
event BulbCycledEvent BulbCycled;
BulbTypeEnum BulbType { get; }
ICurrentSensor MyCurrentSensor { get; }
Task<BulbStateEnum> GetState();
Task<bool> TransitionToState(BulbStateEnum state);
void MarkInOp(bool broken);
}
Notice that the bulb owns an ICurrentSensor:
public interface ICurrentSensor : IDomainDevice
{
Task<CurrentSensorStateEnum> GetState();
void MarkInOp(bool broken);
}
Package up some bulbs to make an ILamp:
public interface ILamp : IDomainDevice
{
ICollection<IBulb> Bulbs { get; }
Task<LampStateEnum> GetState();
Task<DeviceSummary> GetSummary();
Task<bool> TransitionToState(LampStateEnum requestedState);
}
Package up some Lamps into an ILampSet:
public interface ILampSet : IDomainDevice
{
bool HasRightOfWay { get; }
int Facing { get; }
ICollection<ILamp> Lamps { get; }
Task<RightOfWayStateEnum> GetState();
Task<bool> SwitchRightOfWay();
Task<DeviceSummary> GetSummary();
}
Put a few LampSets in to an ITrafficRoute. Notice the PreferenceMetric here. This is the mechanism to tune the route preference. In my implementation I simply add this number in seconds to the base number of seconds to stay green. The higher this number the longer this route stays in the RightOfWay state:
public interface ITrafficRoute : IDomainDevice
{
int PreferenceMetric { get; }
bool HasRightOfWay { get; }
ICollection<ILampSet> LampSets { get; }
Task<RightOfWayStateEnum> GetState();
Task<DeviceSummary> GetSummary();
Task UpdatePreferenceMetric(int newMetric);
Task<bool> TransitionRightOfWay();
}
And finally create our IIntersection from a number of TrafficRoutes:
public interface IIntersection : IDomainDevice, IDisposable
{
event RightOfWayChangedEvent RightOfWayChanged;
event InternalAnomalyEvent InternalAnomalyOccurred;
int TransitionBaseValue { get; set; }
ICollection<ITrafficRoute> TrafficRoutes { get; }
void Start();
void Stop();
Task<Guid?> GetRightOfWayRoute();
Task<bool> TransitionRightOfWay();
Task<bool> UpdateRoutePreference(Guid routeId, int newMetric);
Task<ICollection<DeviceSummary>> GetSummaries();
}
There is a lot more in there, but those are the highlights.
The event handlers folder has all our events defined in there.
namespace TrafficManager.Domain.EventHandlers
{
public delegate void BulbCycledEvent(object sender, BulbCycledEventArgs args);
public delegate void StateChangedEvent(object sender, StateChangedEventArgs args);
public delegate void InternalAnomalyEvent(object sender, InternalAnomalyEventArgs args);
public delegate void RightOfWayChangedEvent(object sender, RightOfWayChangedEventArgs args);
public delegate void CommandReceivedEvent(object sender, CommandReceivedEventArgs args);
}
The Models folder has models that will be going over the wire. There are essentially 2 sets of models in there. At first I had the different events each having their own model. This is less than ideal for the Azure Iot Suite, though, so I refactored to an "AllInOne" model.
public abstract class IotHubModelBase
{
public int EventStream { get; set; }
public DateTime Timestamp { get; set; }
}
public class AllInOneModel : IotHubModelBase
{
public Guid? DeviceId { get; set; }
public string DeviceType { get; set; }
public string DeviceName { get; set; }
public Guid? IntersectionId { get; set; }
public string Function { get; set; }
public string Description { get; set; }
public Guid? ParentDeviceId { get; set; }
public bool IsError { get; set; }
public string Message { get; set; }
public string OldState { get; set; }
public string CurrentState { get; set; }
public decimal UsageFactorOne { get; set; }
public decimal UsageFactorTwo { get; set; }
public override string ToString()
{
int cs;
int.TryParse(CurrentState, out cs);
if (cs > 0)
{
if (Enum.IsDefined(typeof(BulbStateEnum), cs))
CurrentState = ((BulbStateEnum)cs).ToString();
if (Enum.IsDefined(typeof(CurrentSensorStateEnum), cs))
CurrentState = ((CurrentSensorStateEnum)cs).ToString();
if (Enum.IsDefined(typeof(IntersectionStateEnum), cs))
CurrentState = ((IntersectionStateEnum)cs).ToString();
if (Enum.IsDefined(typeof(LampStateEnum), cs))
CurrentState = ((LampStateEnum)cs).ToString();
if (Enum.IsDefined(typeof(RightOfWayStateEnum), cs))
CurrentState = ((RightOfWayStateEnum)cs).ToString();
}
var stream = (EventStreamEnum)EventStream;
var errorNote = IsError ? " ***ERROR*** " : "";
var msg =
$"{Timestamp.ToString("yyyy-MM-dd HH:mm:ss")}Z - {errorNote}{stream}: {DeviceType}({DeviceId ?? IntersectionId})";
if (!string.IsNullOrWhiteSpace(Function)) msg = string.Concat(msg, $" - Function: {Function}");
if (!string.IsNullOrWhiteSpace(Description)) msg = string.Concat(msg, $" - Description: {Description}");
if (ParentDeviceId.HasValue) msg = string.Concat(msg, $" - ParentDeviceId: {ParentDeviceId}");
if (!string.IsNullOrWhiteSpace(Message)) msg = string.Concat(msg, $" - Message: {Message}");
if (!string.IsNullOrWhiteSpace(OldState)) msg = string.Concat(msg, $" - OldState: {OldState}");
if (!string.IsNullOrWhiteSpace(CurrentState)) msg = string.Concat(msg, $" - CurrentState: {CurrentState}");
if (UsageFactorOne != 0) msg = string.Concat(msg, $" - UsageFactorOne: {UsageFactorOne}");
if (UsageFactorTwo != 0) msg = string.Concat(msg, $" - UsageFactorTwo: {UsageFactorTwo}");
return msg;
}
}
There is also the SystemCommandModel in there that was built to communicate seamlessly with the Azure IoT Remote Monitoring:
public class SystemCommandModel
{
public string Name { get; set; }
public Guid MessageId { get; set; }
public DateTime CreatedTime { get; set; }
public ICollection<KeyValuePair<string, object>> Parameters { get; set; }
}
Finally, there are some models to identify our device to Azure IoT Suite
public class AzureDeviceInfo
{
public AzureDeviceInfo()
{
}
public AzureDeviceInfo(string deviceName)
{
ObjectType = "DeviceInfo";
Version = "1.0";
IsSimulatedDevice = false;
DeviceProperties = new AzureDeviceProperties
{
DeviceID = deviceName,
HubStateEnabled = true,
DeviceState = "normal",
FirmwareVersion = "0.1",
InstalledRam = "8gb",
Manufacturer = "RyanMack",
ModelNumber = "RiPi4Way",
Platform = "Windows IoT Core",
Processor = "ARM",
SerialNumber = "12345",
Latitude = 30.324608,
Longitude = -81.398250
};
Commands = new List<AzureCommandInfo>
{
new AzureCommandInfo { Name = "BringOnline" },
new AzureCommandInfo { Name = "TakeOffline" },
new AzureCommandInfo { Name = "Shutdown" }
};
}
public string ObjectType { get; set; }
public string Version { get; set; }
public bool IsSimulatedDevice { get; set; }
public ICollection<KeyValuePair<string, object>> DeviceProperties { get; set; }
public ICollection<AzureCommandInfo> Commands { get; set; }
}
public class AzureDeviceProperties
{
public string DeviceID { get; set; }
public bool HubStateEnabled { get; set; }
public string DeviceState { get; set; }
public string Manufacturer { get; set; }
public string ModelNumber { get; set; }
public string SerialNumber { get; set; }
public string FirmwareVersion { get; set; }
public string Platform { get; set; }
public string Processor { get; set; }
public string InstalledRam { get; set; }
public double Latitude { get; set; }
public double Longitude { get; set; }
}
public class AzureCommandInfo
{
public string Name { get; set; }
public ICollection<AzureCommandParams> Parameters { get; set; }
}
public class AzureCommandParams
{
public string Name { get; set; }
public string Type { get; set; }
}
Next, the references folder contains the args for our events as well as all of our enumerations. You may notice that in the root of the GitHub repo, there is an Excel sheet with all of our enums plotted. With this kind of organization we can identify exactly what enum it is by just the integer value.
Currently the only service I have needed to define is the IEventService. This is what will ultimately be implemented with the IoT Hub bits. Abstracting it in this manner allows us to create multiple implementations for various tasks...
public interface IEventService : IDisposable
{
event CommandReceivedEvent CommandReceived;
void SendOnline();
void UpdateDirectory(Guid deviceId, string deviceType, string deviceName, Guid? parentId);
void SendStateChangeEvent(Guid deviceId, string deviceType, string oldState, string newState, DateTime timestamp);
void SendAnomaly(Guid intersectionId, string function, string desc, Guid? offender, DateTime timestamp);
void SendSummaryUpdates(ICollection<DeviceSummary> summaries);
void SendUsageUpdate(Guid deviceId, decimal factorOne, decimal factorTwo);
void SendRightOfWayChanged(Guid intersectionId, Guid oldRoWRouteId, Guid newRoWRouteId, DateTime timeStamp);
void SendLogMessage(Guid intersectionId, bool containsError, string desc, DateTime timeStamp);
}
I think that about covers the Domain library. Let's move on to some implementations. I have created what I needed for this thing in...
TrafficManager.DevicesMy first thought when thinking about the devices is that I have 2 different implementations of the bulb and the current sensors. For the North Lamp I need real bulbs with real current sensors. For the other three lights I need LED bulbs with fake current sensors that act however we tell them to. Because of this, I packaged the logic that was going to be common to both implementations in to base classes. CurrentSensorBase
public class CurrentSensorBase : ICurrentSensor
{
public event StateChangedEvent StateChanged;
protected decimal Tolerance;
protected decimal CurrentValue;
protected DateTime LastUpdate;
protected TimeSpan InOpTolerance;
protected bool ImBroken;
public CurrentSensorBase(Guid id)
{
Id = id;
}
public Guid Id { get; }
public Task<CurrentSensorStateEnum> GetState()
{
return Task.Run(() =>
{
if (ImBroken || LastUpdate + InOpTolerance <= DateTime.UtcNow)
return CurrentSensorStateEnum.InOperable;
return CurrentValue <= Tolerance
? CurrentSensorStateEnum.Idle
: CurrentSensorStateEnum.Flowing;
});
}
public void MarkInOp(bool broken)
{
ImBroken = broken;
}
protected virtual void OnStateChanged(CurrentSensorStateEnum oldState, CurrentSensorStateEnum newState)
{
StateChanged?.Invoke(this, new StateChangedEventArgs
{
SourceId = Id,
OldState = (int)oldState,
NewState = (int)newState,
SourceTimestamp = DateTime.UtcNow
});
}
}
and BulbBase
public class BulbBase : IBulb
{
public event StateChangedEvent StateChanged;
public event BulbCycledEvent BulbCycled;
protected bool ImBroken;
protected BulbStateEnum LastStateRequest;
private readonly Stopwatch _usageTimer;
public BulbBase(Guid id, BulbTypeEnum bulbType, ICurrentSensor currentSensor)
{
Id = id;
BulbType = bulbType;
MyCurrentSensor = currentSensor;
_usageTimer = new Stopwatch();
}
public Guid Id { get; }
public BulbTypeEnum BulbType { get; }
public ICurrentSensor MyCurrentSensor { get; }
public async Task<BulbStateEnum> GetState()
{
if (ImBroken) return BulbStateEnum.InOperable;
var sensorState = await MyCurrentSensor.GetState();
switch (sensorState)
{
case CurrentSensorStateEnum.InOperable:
return LastStateRequest == BulbStateEnum.Off ? BulbStateEnum.AssumedOff : BulbStateEnum.AssumedOn;
case CurrentSensorStateEnum.Idle:
return BulbStateEnum.Off;
case CurrentSensorStateEnum.Flowing:
return BulbStateEnum.On;
default:
throw new ArgumentOutOfRangeException();
}
}
public virtual Task<bool> TransitionToState(BulbStateEnum newState)
{
throw new NotImplementedException();
}
public void MarkInOp(bool broken)
{
var oldState = GetState().Result;
ImBroken = broken;
var newState = GetState().Result;
if (newState != oldState)
OnStateChanged(oldState, newState);
}
protected virtual void OnStateChanged(BulbStateEnum oldState, BulbStateEnum newState)
{
//start the usage stopwatch to report the stats for predictive maintenance
//Starting when we are going from off to on
if (oldState != BulbStateEnum.On && oldState != BulbStateEnum.AssumedOn &&
(newState == BulbStateEnum.On || newState == BulbStateEnum.AssumedOn))
_usageTimer.Start();
//if we are going from on to off it is time to report the usage cycle
if (oldState == BulbStateEnum.On || oldState == BulbStateEnum.AssumedOn
&& _usageTimer.IsRunning)
{
_usageTimer.Stop();
BulbCycled?.Invoke(this, new BulbCycledEventArgs
{
BulbId = Id,
SecondsOn = _usageTimer.Elapsed.TotalSeconds
});
_usageTimer.Reset();
}
StateChanged?.Invoke(this, new StateChangedEventArgs
{
SourceId = Id,
OldState = (int)oldState,
NewState = (int)newState,
SourceTimestamp = DateTime.UtcNow
});
}
}
This left me very little work to finish out the actual implementations. First a fake current sensor for our LED's and for testing:
public class MockSensor : CurrentSensorBase
{
public MockSensor() : base(Guid.NewGuid())
{
Tolerance = .01m;
InOpTolerance = TimeSpan.FromHours(24);
LastUpdate = DateTime.UtcNow;
CurrentValue = 0;
}
public MockSensor(Guid id) : base(id)
{
Tolerance = .01m;
InOpTolerance = TimeSpan.FromHours(24);
LastUpdate = DateTime.UtcNow;
CurrentValue = 0;
}
public void SetValue(decimal value)
{
var oldState = GetState().Result;
LastUpdate = DateTime.UtcNow;
CurrentValue = value;
var newState = GetState().Result;
if (oldState != newState)
OnStateChanged(oldState, newState);
}
}
Let's go ahead and put that in a fake bulb for testing as well:
public class MockBulb : BulbBase
{
public MockBulb(BulbTypeEnum bulbType) : base(Guid.NewGuid(), bulbType, new MockSensor())
{
}
public override async Task<bool> TransitionToState(BulbStateEnum newState)
{
var oldState = await GetState();
LastStateRequest = newState;
((MockSensor)MyCurrentSensor).SetValue(newState == BulbStateEnum.On ? 1 : 0);
var actualState = await GetState();
if (oldState != newState)
OnStateChanged(oldState, actualState);
return true;
}
}
Now our real current sensor (ACS712)... Oh wait, we will need our MCP3208 for this, too. Let's look at that:
public class Mcp3208SpiDevice
{
private readonly decimal _vre;
private readonly McpChannelByteEnum _commonChannel;
private readonly McpChannelByteEnum _vreChannel;
private readonly SpiDevice _myDevice;
public Mcp3208SpiDevice(int spiChannel, decimal vre = 4.83m, McpChannelByteEnum commonChannel = McpChannelByteEnum.ChannelSeven, McpChannelByteEnum vreChannel = McpChannelByteEnum.ChannelEight)
{
_vre = vre;
_commonChannel = commonChannel;
_vreChannel = vreChannel;
var spiSettings = new SpiConnectionSettings(0)
{
ClockFrequency = 2000000,
Mode = SpiMode.Mode3
};
var spiQuery = SpiDevice.GetDeviceSelector($"SPI{spiChannel}");
var deviceInfo = DeviceInformation.FindAllAsync(spiQuery).AsTask().Result;
if (deviceInfo != null && deviceInfo.Count > 0)
{
_myDevice = SpiDevice.FromIdAsync(deviceInfo[0].Id, spiSettings).AsTask().Result;
}
}
public decimal GetVoltage(McpChannelByteEnum channel)
{
//To get the voltage I'm going to get the percentage and calulate based on expected vre
var pct = GetPercentage(channel, 10);
//This is not as accurate as the percentage becasue I'm using the Pi to power Vre and it's
//not well regulated
return _vre * (pct/100);
}
public decimal GetPercentage(McpChannelByteEnum channel, int decimals = 4)
{
//Since I don't hae a well regulated power supply I'm going to take a reading from
//my Vre channel and use that as the upper bound and try removing the ground rail noise
decimal noise = Read((byte)_commonChannel);
decimal vreTicks = Read((byte)_vreChannel) - noise;
var channelTicks = Read((byte)channel) - noise;
var ret = (channelTicks / vreTicks) * 100;
return Math.Round(ret, decimals);
}
private int Read(byte fromChannel)
{
var transmitBuffer = new byte[] { 1, fromChannel, 0x00 };
var receiveBuffer = new byte[3];
_myDevice.TransferFullDuplex(transmitBuffer, receiveBuffer);
//first byte returned is 0 (00000000),
//second byte returned we are only interested in the last 4 bits 00001111 (mask of &15)
//then shift result 8 bits to make room for the data from the 3rd byte (makes 12 bits total)
//third byte, need all bits, simply add it to the above result
return ((receiveBuffer[1] & 15) << 8) + receiveBuffer[2];
}
}
NOW our ACS712:
public class Acs712CurrentSensor5A : CurrentSensorBase
{
private readonly Mcp3208SpiDevice _mySpi;
private readonly McpChannelByteEnum _myChannel;
public Acs712CurrentSensor5A(Guid id, Mcp3208SpiDevice mySpi, McpChannelByteEnum channel) : base(id)
{
_mySpi = mySpi;
_myChannel = channel;
//the bulb should be checking the current every time it turns on.
//If we haven't recorded a new current reading in 10 minutes we aren't working
InOpTolerance = TimeSpan.FromMinutes(10);
//My 50 watt bulb should draw just under half an amp. I'll say it's on if we are drawing over .2
Tolerance = 0.2m;
TakeReading();
}
public void TakeReading(bool ignoreExceptions = true)
{
try
{
var pct = _mySpi.GetPercentage(_myChannel);
//50% should be no current
pct = pct - 50;
CurrentValue = Math.Abs(5 * (pct/100)); //this sensor does +/- 5Amps
LastUpdate = DateTime.UtcNow;
}
catch
{
if (!ignoreExceptions)
throw;
//We are probably going to ignore exceptions here. The current sensor is not a critical
//component and it will naturally flip to inop if we don't get a successful reading soon.
}
}
}
To the bulbs! First the LED bulb with the fake sensor:
public class LedWithoutSensor : BulbBase
{
private readonly GpioPin _gpioPin;
public LedWithoutSensor(Guid id, BulbTypeEnum bulbType, Guid fakeSensorId, GpioPin gpioPin) : base(id, bulbType, new MockSensor(fakeSensorId))
{
_gpioPin = gpioPin;
_gpioPin.SetDriveMode(GpioPinDriveMode.Output);
//POST reporting
_gpioPin.Write(GpioPinValue.Low);
Task.Delay(50).Wait();
_gpioPin.Write(GpioPinValue.High);
Task.Delay(50).Wait();
_gpioPin.Write(GpioPinValue.Low);
Task.Delay(50).Wait();
_gpioPin.Write(GpioPinValue.High);
Task.Delay(50).Wait();
}
public override async Task<bool> TransitionToState(BulbStateEnum newState)
{
var oldState = await GetState();
LastStateRequest = newState;
_gpioPin.Write(newState == BulbStateEnum.On ? GpioPinValue.Low : GpioPinValue.High);
((MockSensor)MyCurrentSensor).SetValue(newState == BulbStateEnum.On ? 1 : 0);
var actualState = await GetState();
if (oldState != newState)
OnStateChanged(oldState, actualState);
return true;
}
}
Now the real bulb:
public class RealBulbWithSensor : BulbBase
{
private readonly GpioPin _gpioPin;
public RealBulbWithSensor(Guid id, BulbTypeEnum bulbType, ICurrentSensor currentSensor, GpioPin gpioPin) : base(id, bulbType, currentSensor)
{
_gpioPin = gpioPin;
_gpioPin.SetDriveMode(GpioPinDriveMode.Output);
//POST reporting
_gpioPin.Write(GpioPinValue.Low);
Task.Delay(50).Wait();
_gpioPin.Write(GpioPinValue.High);
Task.Delay(50).Wait();
_gpioPin.Write(GpioPinValue.Low);
Task.Delay(50).Wait();
_gpioPin.Write(GpioPinValue.High);
Task.Delay(50).Wait();
}
public override async Task<bool> TransitionToState(BulbStateEnum newState)
{
var oldState = await GetState();
LastStateRequest = newState;
_gpioPin.Write((newState == BulbStateEnum.On) ? GpioPinValue.Low : GpioPinValue.High);
//HACK: Having trouble getting reliable readings from my ACS712 without a steady power supply using 50w bulbs
var areWeMocking = MyCurrentSensor as MockSensor;
areWeMocking?.SetValue(newState == BulbStateEnum.On ? 1 : 0);
var actualState = await GetState();
if (oldState != newState)
OnStateChanged(oldState, actualState);
return true;
}
}
So that's it for hardware, let's look at the logical devices we are building with those parts.
Again, the bulbs go in to a lamp. For our purposes, let's make a simple 3 light lamp. You could easily implement many different lamps that had arrows or flashing yellow/red lights, whatever. I simply built an implementation for our basic 3-bulb lamp. Crap, I see I called the bulbs lights... That's 50 cents in the ubiquitous language jar...
public class ThreeLightLamp : ILamp
{
public event StateChangedEvent StateChanged;
public Guid Id { get; }
public ICollection<IBulb> Bulbs { get; }
private bool _iAmTransitioning;
public ThreeLightLamp(Guid id, ICollection<IBulb> bulbs)
{
Id = id;
Bulbs = bulbs;
}
public Task<LampStateEnum> GetState()
{
return Task.Run(() =>
{
if (_iAmTransitioning) return LampStateEnum.Transitioning;
var t = new[]
{
GetRedLight().GetState(),
GetYellowLight().GetState(),
GetGreenLight().GetState()
};
Task.WaitAll(t);
var stats = new Dictionary<string, BulbStateEnum>
{
{"r" , t[0].Result},
{"y" , t[1].Result},
{"g" , t[2].Result}
};
//If bulbs are broke the lamp is broke
if (stats.Any(s => s.Value == BulbStateEnum.InOperable))
return LampStateEnum.InOperable;
//If a bulb is in transition the lamp is
if (stats.Any(s => s.Value == BulbStateEnum.Transitioning))
return LampStateEnum.Transitioning;
//Critical invalid state if multiple bulbs are on
if (stats.Count(s => s.Value == BulbStateEnum.On || s.Value == BulbStateEnum.AssumedOn) != 1)
return LampStateEnum.CriticalMalfunction;
//Not the job of the lamp to report the broken current sensor on a bulb. Treat assumed values as real
if (stats["r"] == BulbStateEnum.AssumedOn || stats["r"] == BulbStateEnum.On)
return LampStateEnum.Stop;
if (stats["y"] == BulbStateEnum.AssumedOn || stats["y"] == BulbStateEnum.On)
return LampStateEnum.Caution;
if (stats["g"] == BulbStateEnum.AssumedOn || stats["g"] == BulbStateEnum.On)
return LampStateEnum.Go;
//Unhandled state... Critical issue
return LampStateEnum.CriticalMalfunction;
});
}
public Task<DeviceSummary> GetSummary()
{
return Task.Run(() =>
{
var tState = GetState();
var bulbSummaries = new List<DeviceSummary>();
var hasMalfunction = false;
Parallel.ForEach(Bulbs, (bulb, state) =>
{
var badBulb = false;
var theState = bulb.GetState().Result;
switch (theState)
{
case BulbStateEnum.AssumedOff:
case BulbStateEnum.AssumedOn:
case BulbStateEnum.InOperable:
hasMalfunction = true;
badBulb = true;
break;
}
bulbSummaries.Add(new DeviceSummary
{
DeviceId = bulb.Id,
CurrentState = (int)theState,
HasMalfunction = badBulb
});
});
var ret = new DeviceSummary
{
DeviceId = Id,
HasMalfunction = hasMalfunction,
CurrentState = (int)tState.Result,
ChildSummaries = bulbSummaries
};
return ret;
});
}
public async Task<bool> TransitionToState(LampStateEnum requestedState)
{
var ret = true;
//Is this a valid request?
if (requestedState != LampStateEnum.Go && requestedState != LampStateEnum.Caution && requestedState != LampStateEnum.Stop)
throw new Exception("You can only request transitions to Go, Caution and Stop");
var oldState = await GetState();
//If it was transitioning, lets wait a little bit
var i = 0;
while (i++ < 10 && oldState == LampStateEnum.Transitioning)
{
await Task.Delay(TimeSpan.FromMilliseconds(100));
oldState = await GetState();
}
//Are we in a valid state to start a transition?
if (oldState == LampStateEnum.CriticalMalfunction || oldState == LampStateEnum.Transitioning)
ret = false;
//We have to go through caution before we go from green to red
if (oldState == LampStateEnum.Go && requestedState != LampStateEnum.Caution)
ret = false;
_iAmTransitioning = true;
//Lat's restrict to proper transitions. InOperable is likely the bad bulb bubbling up. Try requested transition.
if (ret && (oldState == LampStateEnum.Go || oldState == LampStateEnum.InOperable) && requestedState == LampStateEnum.Caution)
ret = await DoTransitionToCaution();
else if (ret && (oldState == LampStateEnum.Caution || oldState == LampStateEnum.InOperable) && requestedState == LampStateEnum.Stop)
ret = await DoTransitionToStop();
else if (ret && (oldState == LampStateEnum.Stop || oldState == LampStateEnum.InOperable) && requestedState == LampStateEnum.Go)
ret = await DoTransitionToGo();
else
ret = false;
//Let's clean up. finish the transition and raise state change if necessary
_iAmTransitioning = false;
var newState = await GetState();
if (oldState != newState) OnStateChanged(oldState, newState);
return ret;
}
protected virtual void OnStateChanged(LampStateEnum oldState, LampStateEnum newState)
{
StateChanged?.Invoke(this, new StateChangedEventArgs
{
SourceId = Id, OldState = (int) oldState, NewState = (int) newState, SourceTimestamp = DateTime.UtcNow
});
}
private async Task<bool> DoTransitionToGo()
{
var redLight = GetRedLight();
if (await redLight.GetState() == BulbStateEnum.InOperable)
return await GetGreenLight().TransitionToState(BulbStateEnum.On);
if (!await redLight.TransitionToState(BulbStateEnum.Off))
return false;
return await GetGreenLight().TransitionToState(BulbStateEnum.On);
}
private async Task<bool> DoTransitionToStop()
{
var yellowLight = GetYellowLight();
if (await yellowLight.GetState() == BulbStateEnum.InOperable)
return await GetRedLight().TransitionToState(BulbStateEnum.On);
if (!await yellowLight.TransitionToState(BulbStateEnum.Off))
return false;
return await GetRedLight().TransitionToState(BulbStateEnum.On);
}
private async Task<bool> DoTransitionToCaution()
{
var greenLight = GetGreenLight();
if (await greenLight.GetState() == BulbStateEnum.InOperable)
return await GetYellowLight().TransitionToState(BulbStateEnum.On);
if (!await greenLight.TransitionToState(BulbStateEnum.Off))
return false;
return await GetYellowLight().TransitionToState(BulbStateEnum.On);
}
private IBulb GetRedLight()
{
var ret = Bulbs.FirstOrDefault(b => b.BulbType == BulbTypeEnum.Red);
if (ret == null)
throw new Exception("A lamp must at least have a red, yellow and green bulb");
return ret;
}
private IBulb GetGreenLight()
{
var ret = Bulbs.FirstOrDefault(b => b.BulbType == BulbTypeEnum.Green);
if (ret == null)
throw new Exception("A lamp must at least have a red, yellow and green bulb");
return ret;
}
private IBulb GetYellowLight()
{
var ret = Bulbs.FirstOrDefault(b => b.BulbType == BulbTypeEnum.Yellow);
if (ret == null)
throw new Exception("A lamp must at least have a red, yellow and green bulb");
return ret;
}
}
And the lamp goes in to a LampSet:
//Basic lamp set handles only 3-Light lamps
public class BasicLampSet : ILampSet
{
public BasicLampSet(Guid id, ICollection<ILamp> lamps, int facesInDegrees, bool initRoW)
{
Id = id;
Lamps = lamps;
HasRightOfWay = initRoW;
Facing = facesInDegrees;
}
public event StateChangedEvent StateChanged;
public Guid Id { get; }
public bool HasRightOfWay { get; private set; }
public int Facing { get; }
public ICollection<ILamp> Lamps { get; }
private bool _transitioning;
public Task<RightOfWayStateEnum> GetState()
{
return _transitioning
? Task.FromResult(RightOfWayStateEnum.Transitioning)
: Task.FromResult(!HasRightOfWay
? RightOfWayStateEnum.Holding
: RightOfWayStateEnum.RightOfWay);
}
public async Task<bool> SwitchRightOfWay()
{
var oldState = GetState().Result;
var failed = false;
_transitioning = true;
if (HasRightOfWay)
{
//switching from green to red
//do caution first
Parallel.ForEach(Lamps, lamp =>
{
if (!lamp.TransitionToState(LampStateEnum.Caution).Result) failed = true;
});
//HACK: This should be configurable
await Task.Delay(TimeSpan.FromSeconds(3));
//Then to Stop
Parallel.ForEach(Lamps, lamp =>
{
if (!lamp.TransitionToState(LampStateEnum.Stop).Result) failed = true;
});
//hold the right of way flag until we finish the transition
if (!failed)
HasRightOfWay = false;
}
else
{
//Grab the flag before we even start the transition Error on the side of caution: assume lights are green
HasRightOfWay = true;
Parallel.ForEach(Lamps, lamp =>
{
if (!lamp.TransitionToState(LampStateEnum.Go).Result) failed = true;
});
}
_transitioning = false;
//Check for a state change event
var newState = GetState().Result;
if (oldState != newState)
OnStateChanged(oldState, newState);
return !failed;
}
protected virtual void OnStateChanged(RightOfWayStateEnum oldState, RightOfWayStateEnum newState)
{
StateChanged?.Invoke(this, new StateChangedEventArgs
{
SourceId = Id,
OldState = (int)oldState,
NewState = (int)newState,
SourceTimestamp = DateTime.UtcNow
});
}
public Task<DeviceSummary> GetSummary()
{
return Task.Run(() =>
{
var lampSummaries = new List<DeviceSummary>();
var hasMalfunction = false;
var tState = GetState();
Parallel.ForEach(Lamps, (lamp) =>
{
var theSummary = lamp.GetSummary().Result;
if (theSummary.HasMalfunction) hasMalfunction = true;
lampSummaries.Add(theSummary);
});
var ret = new DeviceSummary
{
DeviceId = Id,
CurrentState = (int)tState.Result,
HasMalfunction = hasMalfunction,
ChildSummaries = lampSummaries
};
return ret;
});
}
}
The LampSets go in to a TrafficRoute:
//Basic traffic route is built to handle 2 sets of basic lamp sets
public class BasicTrafficRoute : ITrafficRoute
{
public BasicTrafficRoute(Guid id, int initPreference, bool initRoW, ICollection<ILampSet> lampSets )
{
Id = id;
PreferenceMetric = initPreference;
HasRightOfWay = initRoW;
LampSets = lampSets;
}
public event StateChangedEvent StateChanged;
public Guid Id { get; }
public int PreferenceMetric { get; private set; }
public bool HasRightOfWay { get; private set; }
public ICollection<ILampSet> LampSets { get; }
private bool _transitioning;
public Task<RightOfWayStateEnum> GetState()
{
return _transitioning
? Task.FromResult(RightOfWayStateEnum.Transitioning)
: Task.FromResult(!HasRightOfWay
? RightOfWayStateEnum.Holding
: RightOfWayStateEnum.RightOfWay);
}
public Task UpdatePreferenceMetric(int newMetric)
{
return Task.Run(() =>
{
if (newMetric < 0 || newMetric > 100)
throw new ArgumentOutOfRangeException(nameof(newMetric), "Metric must be between 0 and 100");
PreferenceMetric = newMetric;
});
}
public Task<bool> TransitionRightOfWay()
{
return Task.Run(() =>
{
if (_transitioning) return false;
var oldState = GetState().Result;
var failed = false;
var hadRoW = HasRightOfWay;
_transitioning = true;
if (!HasRightOfWay) HasRightOfWay = true;
Parallel.ForEach(LampSets, lampSet =>
{
if (!lampSet.SwitchRightOfWay().Result) failed = true;
});
if (hadRoW && !failed)
HasRightOfWay = false;
_transitioning = false;
//Check for a state change event
var newState = GetState().Result;
if (oldState != newState)
OnStateChanged(oldState, newState);
return !failed;
});
}
protected virtual void OnStateChanged(RightOfWayStateEnum oldState, RightOfWayStateEnum newState)
{
StateChanged?.Invoke(this, new StateChangedEventArgs
{
SourceId = Id,
OldState = (int)oldState,
NewState = (int)newState,
SourceTimestamp = DateTime.UtcNow
});
}
public Task<DeviceSummary> GetSummary()
{
return Task.Run(() =>
{
var lampSummaries = new List<DeviceSummary>();
var hasMalfunction = false;
var tState = GetState();
Parallel.ForEach(LampSets, (lampSet) =>
{
var theSummary = lampSet.GetSummary().Result;
if (theSummary.HasMalfunction) hasMalfunction = true;
lampSummaries.Add(theSummary);
});
var ret = new DeviceSummary
{
DeviceId = Id,
CurrentState = (int)tState.Result,
HasMalfunction = hasMalfunction,
ChildSummaries = lampSummaries
};
return ret;
});
}
}
And traffic routes make up a what??? An intersection, of course
public class BasicFourWayIntersection : IIntersection {
public event RightOfWayChangedEvent RightOfWayChanged;
public event StateChangedEvent StateChanged;
public event InternalAnomalyEvent InternalAnomalyOccurred;
public BasicFourWayIntersection(Guid id, int transBase, ICollection<ITrafficRoute> routes)
{
Id = id;
TransitionBaseValue = transBase;
TrafficRoutes = routes;
_running = false;
_stopRequested = false;
}
public Guid Id { get; }
public int TransitionBaseValue { get; set; }
public ICollection<ITrafficRoute> TrafficRoutes { get; }
private bool _running;
private bool _stopRequested;
private Task _sequencer;
public void Start()
{
_stopRequested = false;
_sequencer = Task.Run(() =>
{
_running = true;
OnStateChanged(IntersectionStateEnum.Offline, IntersectionStateEnum.Online);
while (!_stopRequested)
{
var success = TransitionRightOfWay().Result;
if (!success)
{
var sumList = GetSummaries().Result;
OnInternalAnomalyOccurred("Sequencer", "Issue arose transitioning intersection", null,
sumList);
}
var row = GetRightOfWayRouteObject().Result;
Task.Delay((TransitionBaseValue + row.PreferenceMetric)*1000).Wait();
}
OnStateChanged(IntersectionStateEnum.Online, IntersectionStateEnum.Offline);
_running = false;
});
}
public void Stop()
{
_stopRequested = true;
}
public async Task<Guid?> GetRightOfWayRoute()
{
var row = await GetRightOfWayRouteObject();
return row?.Id;
}
public async Task<bool> TransitionRightOfWay()
{
var row = await DoTransition();
return row != null;
}
public async Task<bool> UpdateRoutePreference(Guid routeId, int newMetric)
{
var route = TrafficRoutes.FirstOrDefault(r => r.Id == routeId);
if (route == null)
{
var sumList = await GetSummaries();
OnInternalAnomalyOccurred("UpdateRoutePreference", $"Route not found: {routeId}", null,
sumList);
return false;
}
await route.UpdatePreferenceMetric(newMetric);
return true;
}
public Task<ICollection<DeviceSummary>> GetSummaries()
{
return Task.Run(() =>
{
var retList = new List<DeviceSummary>();
Parallel.ForEach(TrafficRoutes, route => retList.Add(route.GetSummary().Result));
return (ICollection<DeviceSummary>) retList;
});
}
private async Task<ITrafficRoute> DoTransition()
{
var currRoW = await GetRightOfWayRouteObject();
var nextRoW = TrafficRoutes.FirstOrDefault(r => r.HasRightOfWay == false);
var success = await currRoW.TransitionRightOfWay();
if (success)
success = await nextRoW.TransitionRightOfWay();
if (!success) return null;
OnRightOfWayChanged(currRoW.Id, nextRoW.Id);
return nextRoW;
}
private async Task<ITrafficRoute> GetRightOfWayRouteObject()
{
var row = TrafficRoutes.Where(r => r.HasRightOfWay).ToList();
if (row.Count != 1)
{
var sumList = await GetSummaries();
OnInternalAnomalyOccurred("GetRightOfWayRoute", $"Expecting 1 route but found {row.Count}", null,
sumList);
}
return row.FirstOrDefault();
}
protected virtual void OnStateChanged(IntersectionStateEnum oldState, IntersectionStateEnum newState)
{
StateChanged?.Invoke(this, new StateChangedEventArgs
{
SourceId = Id,
OldState = (int)oldState,
NewState = (int)newState,
SourceTimestamp = DateTime.UtcNow
});
}
protected virtual void OnRightOfWayChanged(Guid oldRoW, Guid newRoW)
{
RightOfWayChanged?.Invoke(this, new RightOfWayChangedEventArgs
{
IntersectionId = Id,
OldRouteId = oldRoW,
NewRouteId = newRoW,
Timestamp = DateTime.UtcNow
});
}
protected virtual void OnInternalAnomalyOccurred(string funcName, string desc, DeviceSummary suspectDeviceSummary, ICollection<DeviceSummary> otherDeviceSummaries)
{
InternalAnomalyOccurred?.Invoke(this, new InternalAnomalyEventArgs
{
IntersectionId = Id,
PerformingFunction = funcName,
Description = desc,
SuspectDeviceSummary = suspectDeviceSummary,
OtherDeviceSummaries = otherDeviceSummaries,
Timestamp = DateTime.UtcNow
});
}
public void Dispose()
{
Dispose(true);
}
protected void Dispose(bool disposing)
{
Stop();
while (_running)
Task.Delay(100).Wait();
_sequencer = null;
}
}
Now for the part we have all been waiting for...
IoT Hub Event Service ImplementationsI left my first implementation that used purpose built event models. I am not using that anymore, but it is there for your viewing pleasure (IoTHubService.cs). In order to play nice with Azure Stream Processing and the other tools in their IoT Suite, I refactored it in to the AllInOneHubService. Let's take a gander...
public class AllInOneHubService : IEventService
{
public event CommandReceivedEvent CommandReceived;
private static AllInOneHubService _instance;
private readonly Task _listener;
private readonly DeviceClient _myClient;
private readonly string _iotDeviceId;
private readonly List<IAsyncAction> _sendTasks = new List<IAsyncAction>();
private readonly CancellationTokenSource _tSource = new CancellationTokenSource();
private readonly ICollection<Message> _messageBuffer = new List<Message>();
private Task _batchTimer;
private bool _timerRunning;
private static readonly object[] Locker = {};
//This should be injected
private readonly IConfigService _cfgSvc = new InMemoryConfigService();
private AllInOneHubService()
{
var cfg = _cfgSvc.ReadConfig().Result;
var auth = AuthenticationMethodFactory.CreateAuthenticationWithRegistrySymmetricKey(cfg.AzureIoTDeviceId,
cfg.AzureIoTDeviceKey);
_iotDeviceId = cfg.AzureIoTDeviceId;
_myClient = DeviceClient.Create(cfg.AzureIoTHubUri, auth, TransportType.Http1);
_listener = Listener(_tSource);
_batchTimer = Task.Run(() =>
{
while (true)
{
Task.Delay(TimeSpan.FromSeconds(10)).Wait();
_sendTasks.RemoveAll(t => t?.AsTask() == null || t.AsTask().IsCompleted);
}
});
}
public static AllInOneHubService Instance()
{
return _instance ?? (_instance = new AllInOneHubService());
}
public void SendOnline()
{
var serialMsg = JsonConvert.SerializeObject(new AzureDeviceInfo(_iotDeviceId));
var message = new Message(Encoding.UTF8.GetBytes(serialMsg));
_sendTasks.Add(_myClient.SendEventAsync(message));
}
public void UpdateDirectory(Guid deviceId, string deviceType, string deviceName, Guid? parentId)
{
SendMessage(EventStreamEnum.Directory, new AllInOneModel
{
Timestamp = DateTime.UtcNow,
DeviceId = deviceId,
DeviceType = deviceType,
DeviceName = deviceName,
ParentDeviceId = parentId
});
}
public void SendStateChangeEvent(Guid deviceId, string deviceType, string oldState, string newState, DateTime timestamp)
{
SendMessage(EventStreamEnum.StateChange, new AllInOneModel
{
Timestamp = timestamp,
DeviceId = deviceId,
OldState = oldState,
CurrentState = newState,
DeviceType = deviceType
});
}
public void SendAnomaly(Guid intersectionId, string function, string desc, Guid? offender, DateTime timestamp)
{
SendMessage(EventStreamEnum.Anomaly, new AllInOneModel
{
Timestamp = timestamp,
IntersectionId = intersectionId,
DeviceId = offender,
Description = desc,
Function = function
});
}
public void SendSummaryUpdates(ICollection<DeviceSummary> summaries)
{
var flatList = summaries.ToList();
var childList = summaries.SelectMany(s => s.ChildSummaries).ToList();
while (childList.Any())
{
flatList.AddRange(childList);
childList = childList.SelectMany(s => s.ChildSummaries).ToList();
}
ICollection<AllInOneModel> msgList = flatList.Select(s => new AllInOneModel
{
DeviceId = s.DeviceId,
CurrentState = s.CurrentState.ToString(), //TODO: We need a service to get lables from enum ints.
Timestamp = s.Timestamp,
IsError = s.HasMalfunction
}).ToList();
SendMessageBatch(EventStreamEnum.Summary, msgList);
}
public void SendUsageUpdate(Guid deviceId, decimal factorOne, decimal factorTwo)
{
SendMessage(EventStreamEnum.Usage, new AllInOneModel
{
DeviceId = deviceId,
Timestamp = DateTime.UtcNow,
UsageFactorOne = factorOne,
UsageFactorTwo = factorTwo
});
}
public void SendRightOfWayChanged(Guid intersectionId, Guid oldRoWRouteId, Guid newRoWRouteId, DateTime timeStamp)
{
SendMessage(EventStreamEnum.RoWChange, new AllInOneModel
{
Timestamp = timeStamp,
IntersectionId = intersectionId,
OldState = oldRoWRouteId.ToString(),
CurrentState = newRoWRouteId.ToString()
});
}
public void SendLogMessage(Guid intersectionId, bool containsError, string desc, DateTime timeStamp)
{
SendMessage(EventStreamEnum.Log, new AllInOneModel
{
Timestamp = timeStamp,
IntersectionId = intersectionId,
IsError = containsError,
Message = desc
});
}
public void Dispose()
{
Dispose(true);
}
private void SendMessage(EventStreamEnum streamId, AllInOneModel msg)
{
msg.EventStream = (int)streamId;
var serialMsg = JsonConvert.SerializeObject(new AllInOneModelDto(msg, _iotDeviceId));
var message = new Message(Encoding.UTF8.GetBytes(serialMsg));
//_sendTasks.RemoveAll(t => t.AsTask().IsCompleted);
_sendTasks.Add(_myClient.SendEventAsync(message));
/* lock(Locker)
_messageBuffer.Add(message);
StartTimer();
_sendTasks.RemoveAll(t => t.AsTask().IsCompleted);*/
}
private void SendMessageBatch(EventStreamEnum streamId, IEnumerable<AllInOneModel> msgs)
{
var messages = new List<Message>();
foreach (var msg in msgs)
{
msg.EventStream = (int)streamId;
var serialMsg = JsonConvert.SerializeObject(new AllInOneModelDto(msg, _iotDeviceId));
var message = new Message(Encoding.UTF8.GetBytes(serialMsg));
messages.Add(message);
}
_sendTasks.Add(_myClient.SendEventBatchAsync(messages));
//_sendTasks.RemoveAll(t => t.AsTask().IsCompleted);
}
private void StartTimer()
{
//TODO: Need better error checking here!
if (!_timerRunning)
_batchTimer = Task.Run(() =>
{
_timerRunning = true;
Task.Delay(5000).Wait();
List<Message> thisBatch;
lock (Locker)
{
thisBatch = _messageBuffer.ToList();
_messageBuffer.Clear();
}
_sendTasks.Add(_myClient.SendEventBatchAsync(thisBatch));
_timerRunning = false;
// ReSharper disable once InconsistentlySynchronizedField
if (_messageBuffer.Any())
StartTimer();
});
}
private async Task Listener(CancellationTokenSource token)
{
while (!token.IsCancellationRequested)
{
var receivedMessage = await _myClient.ReceiveAsync();
if (receivedMessage == null) continue;
var msg = Encoding.UTF8.GetString(receivedMessage.GetBytes());
var cmd = JsonConvert.DeserializeObject<SystemCommandModel>(msg);
if (cmd != null) OnCommandReceived(cmd);
await _myClient.CompleteAsync(receivedMessage);
}
}
protected virtual void OnCommandReceived(SystemCommandModel theCmd)
{
SystemCommandEnum theEnum;
Enum.TryParse(theCmd.Name, true, out theEnum);
if (theEnum == SystemCommandEnum.None) return;
var paramList = theCmd.Parameters ?? new List<KeyValuePair<string, object>>();
CommandReceived?.Invoke(this, new CommandReceivedEventArgs
{
Command = theEnum,
Parameters = paramList
});
}
private void Dispose(bool disposing)
{
if (!disposing) return;
_tSource.Cancel();
_listener.Wait();
_batchTimer.Wait();
Task.WaitAll(_sendTasks.Select(t => t.AsTask()).ToArray());
_instance = null;
}
}
If you look closely, you will notice that this service processes all the outgoing events async. This was vital with the amount of events that little pi guy is streaming out. There is also a batch mode that I was using when running in debug. Even with them going async it was killing it. Take a look at the SendMessage method. The batch mode is currently just commented out there.
Lastly in the Devices library there is some configuration stuff. This is not vital to the problem domain itself, so that's why it is all self-contained here in the devices library. As I mentioned earlier, the configuration all lives in that InMemoryConfigService. You could easily re-implement the IConfigService to take configuration from the web or get it from a database or whatever. For my purposes I just did this. Important that you know it's here, though, because all the GPIO routing and credentials for Azure live in it.
public class InMemoryConfigService : IConfigService
{
private ConfigurationSettings _myConfig;
public InMemoryConfigService()
{
//Default Configuration
_myConfig = new ConfigurationSettings
{
AzureIoTHubUri = "PieceOfPiHub.azure-devices.net",
AzureIoTDeviceId = "PieceOfPiDevice",
AzureIoTDeviceKey = "{your key here}",
IntersectionId = new Guid("f6f9747d-f68f-4e1c-a4b9-d9e85c41ba97"),
EastLampId = new Guid("5555111f-d639-4b35-8de9-14b58b3e420c"),
EastLampSetId = new Guid("5581584e-3dae-4aa6-b690-f9d43bbf2b08"),
EastRedBulbId = new Guid("3e48ed9c-813d-4cef-93d2-216f4d32fd1d"),
EastRedBulbPinId = 23,
EastRedBulbSensorId = new Guid("735752fa-2928-405b-b50f-995231d3b792"),
EastYellowBulbId = new Guid("0bd1fbe0-ded5-410d-8cb5-887111c01630"),
EastYellowBulbPinId = 24,
EastYellowBulbSensorId = new Guid("029ae0ae-2171-4362-91a3-a814765ee8a4"),
EastGreenBulbId = new Guid("c5ef56ca-a594-41bc-8c24-2655944a752b"),
EastGreenBulbPinId = 25,
EastGreenBulbSensorId = new Guid("0035ee67-d89f-4a9e-be41-91bdac838a78"),
EastWestRouteId = new Guid("bfbe2082-4be5-427b-9f2c-fb5e4692ceec"),
NorthSouthRouteId = new Guid("47365541-2882-4814-b5f1-94361c09d58c"),
NorthLampId = new Guid("d22c0c30-35a3-439d-b505-2b395b87104e"),
NorthLampSetId = new Guid("0fc50ef3-ad24-4d63-b152-b712903e8607"),
NorthRedBulbId = new Guid("bdd4c02b-11d3-4ed5-8d34-6ef2e85c804b"),
NorthRedBulbPinId = 13,
NorthRedBulbSensorId = new Guid("26ebd133-7fa6-46b0-8a7f-5734622cc70c"),
NorthYellowBulbId = new Guid("01478f65-93c7-46d9-be64-a55f383296f9"),
NorthYellowBulbPinId = 19,
NorthYellowBulbSensorId = new Guid("b2f1e17d-52e7-480d-862a-639e98066f29"),
NorthGreenBulbId = new Guid("8b6bd075-7576-43a2-adc8-4ec36f914297"),
NorthGreenBulbPinId = 26,
NorthGreenBulbSensorId = new Guid("145b1009-099c-4456-93d8-34f4eb670ebb"),
SouthLampId = new Guid("a2742869-6852-4e03-8c1a-32673ea1c13f"),
SouthLampSetId = new Guid("edf14820-c265-44af-a8b3-b00be4ea4106"),
SouthRedBulbId = new Guid("16a97bd3-0661-47d4-a314-e7ab62162fca"),
SouthRedBulbPinId = 16,
SouthRedBulbSensorId = new Guid("3b47fc0c-9d01-4cdb-9162-8cf4508f3909"),
SouthYellowBulbId = new Guid("a8fbf9ca-aab6-4f84-95f4-f8c1134cacb5"),
SouthYellowBulbPinId = 20,
SouthYellowBulbSensorId = new Guid("352243bb-837d-41f0-be4f-0e210d4ce4a5"),
SouthGreenBulbId = new Guid("249f351b-b43d-4864-bad7-d9f19cd7c161"),
SouthGreenBulbPinId = 21,
SouthGreenBulbSensorId = new Guid("6191baaf-bb76-4664-b293-1040b223151c"),
WestLampId = new Guid("3c8c9db3-cdc6-4a76-8b0a-06ba80a03932"),
WestLampSetId = new Guid("be894b4e-454b-4a2e-b729-8d2c422e9dff"),
WestRedBulbId = new Guid("dcce1d40-45c3-4edf-9023-73b5d9261886"),
WestRedBulbPinId = 4,
WestRedBulbSensorId = new Guid("13dad334-0db5-4f55-94a0-4e34c20859f1"),
WestYellowBulbId = new Guid("1b743ee7-d375-44a7-924f-9b7b0e070a95"),
WestYellowBulbPinId = 5,
WestYellowBulbSensorId = new Guid("33db4064-522f-4068-bd4e-e4d929e48016"),
WestGreenBulbId = new Guid("03d9e044-6aee-46df-994b-0fe3fe5fefc5"),
WestGreenBulbPinId = 6,
WestGreenBulbSensorId = new Guid("00f26339-b3de-4630-bca2-4b9f24cc132c")
};
}
public Task<ConfigurationSettings> ReadConfig()
{
return Task.FromResult(_myConfig);
}
public Task WriteConfig(ConfigurationSettings updatedConfig)
{
return Task.FromResult(_myConfig = updatedConfig);
}
}
Still with me??? That's great! It's all about to come together to make the on-board software work!
Take a look at
TrafficManager.FlowProAppIt is a UWP headless app that deploys to your ARM device. In my case, the Pi. It is actually very skinny. It really just instantiates all those devices we have been looking at and let's them do their thing. You will notice that it is simply a single IBackgroundTask that looks like this:
public sealed class StartupTask : IBackgroundTask
{
private readonly IConfigService _cfgSvc = new InMemoryConfigService();
//Bulbs for north lamp (real bulbs)
private readonly ICollection<IBulb> _nBulbs = new List<IBulb>();
//Bulbs for south lamp
private readonly ICollection<IBulb> _sBulbs = new List<IBulb>();
//Bulbs for east lamp
private readonly ICollection<IBulb> _eBulbs = new List<IBulb>();
//Bulbs for west lamp
private readonly ICollection<IBulb> _wBulbs = new List<IBulb>();
private ILamp _nLamp;
private ILamp _sLamp;
private ILamp _eLamp;
private ILamp _wLamp;
private ILampSet _nSet;
private ILampSet _sSet;
private ILampSet _eSet;
private ILampSet _wSet;
private ITrafficRoute _nsRoute;
private ITrafficRoute _ewRoute;
private IIntersection _theIntersection;
private IEventService _eventor;
private Mcp3208SpiDevice _mcp3208;
private CancellationTokenSource _tSource;
public async void Run(IBackgroundTaskInstance taskInstance)
{
_tSource = new CancellationTokenSource();
var deferral = taskInstance.GetDeferral();
_eventor = AllInOneHubService.Instance();
var cfg = await _cfgSvc.ReadConfig();
_eventor.SendOnline();
InitGpio(cfg).Wait(_tSource.Token);
BindEvents();
_eventor.CommandReceived += EventorOnCommandReceived;
_theIntersection.Start();
while (!_tSource.Token.IsCancellationRequested)
await Task.Delay(TimeSpan.FromHours(1), _tSource.Token);
_eventor.Dispose();
_theIntersection.Stop();
_theIntersection.Dispose();
_eventor.Dispose();
deferral.Complete();
}
private void EventorOnCommandReceived(object sender, CommandReceivedEventArgs args)
{
Guid? targetId = null;
if (args.Parameters.Any(p => p.Key == "targetId"))
targetId = new Guid(args.Parameters.First(p => p.Key == "targetId").Value.ToString());
_eventor.SendLogMessage(_theIntersection.Id, false,
$"Received {args.Command} command for {targetId}", DateTime.UtcNow);
var workBulb = _nBulbs.FirstOrDefault(b => b.Id == targetId)
?? _sBulbs.FirstOrDefault(b => b.Id == targetId)
?? _eBulbs.FirstOrDefault(b => b.Id == targetId)
?? _wBulbs.FirstOrDefault(b => b.Id == targetId);
ITrafficRoute workRoute;
switch (args.Command)
{
case SystemCommandEnum.None:
break;
case SystemCommandEnum.BringOnline:
_theIntersection.Start();
break;
case SystemCommandEnum.RequestStatus:
var lamp = _nSet.Lamps.FirstOrDefault(l => l.Id == targetId)
?? _sSet.Lamps.FirstOrDefault(l => l.Id == targetId)
?? _eSet.Lamps.FirstOrDefault(l => l.Id == targetId)
?? _wSet.Lamps.FirstOrDefault(l => l.Id == targetId);
if (lamp != null)
{
lamp.GetSummary().ContinueWith(r =>
_eventor.SendSummaryUpdates(new List<DeviceSummary> { r.Result})
).Wait();
return;
}
var workSet = _nsRoute.LampSets.FirstOrDefault(s => s.Id == targetId)
?? _ewRoute.LampSets.FirstOrDefault(s => s.Id == targetId);
if (workSet != null)
{
workSet.GetSummary().ContinueWith(r =>
_eventor.SendSummaryUpdates(new List<DeviceSummary> { r.Result })
).Wait();
return;
}
workRoute = _theIntersection.TrafficRoutes.FirstOrDefault(r => r.Id == targetId);
if (workRoute != null)
{
workRoute.GetSummary().ContinueWith(r =>
_eventor.SendSummaryUpdates(new List<DeviceSummary> { r.Result })
).Wait();
return;
}
if (_theIntersection.Id == targetId)
_theIntersection.GetSummaries().ContinueWith(r =>
_eventor.SendSummaryUpdates(r.Result)
).Wait();
break;
case SystemCommandEnum.UpdateRoutePreference:
workRoute = _theIntersection.TrafficRoutes.FirstOrDefault(r => r.Id == targetId);
if (workRoute == null) return;
int? newPreference;
if (args.Parameters.Any(p => p.Key == "preference"))
newPreference = (int) args.Parameters.First(p => p.Key == "preference").Value;
else
return;
workRoute.UpdatePreferenceMetric(newPreference.Value);
break;
case SystemCommandEnum.ReplaceBulb:
if (workBulb == null) return;
workBulb.MarkInOp(false);
break;
case SystemCommandEnum.ReplaceSensor:
if (workBulb == null) return;
workBulb.MyCurrentSensor.MarkInOp(false);
break;
case SystemCommandEnum.SimulateBulbFailure:
if (workBulb == null) return;
workBulb.MarkInOp(true);
break;
case SystemCommandEnum.SimulateSensorFailure:
if (workBulb == null) return;
workBulb.MyCurrentSensor.MarkInOp(true);
break;
case SystemCommandEnum.TakeOffline:
_theIntersection.Stop();
break;
case SystemCommandEnum.Shutdown:
_tSource.Cancel(false);
break;
default:
throw new ArgumentOutOfRangeException();
}
}
private async Task InitGpio(ConfigurationSettings cfg)
{
try
{
_mcp3208 = new Mcp3208SpiDevice(0);
var gpio = GpioController.GetDefault();
// Show an error if there is no GPIO controller
if (gpio == null)
{
_eventor.SendLogMessage(cfg.IntersectionId, true, "There is no GPIO controller on this device.", DateTime.UtcNow);
return;
}
//Construct the bulbs
await BuildBulbs(gpio, cfg);
//Construct the Lamps
await BuildLamps(cfg);
//Put them in sets (only one lamp sets for this case...)
_nSet = new BasicLampSet(cfg.NorthLampSetId, new List<ILamp> { _nLamp }, 0, false);
_sSet = new BasicLampSet(cfg.SouthLampSetId, new List<ILamp> { _sLamp }, 180, false);
_eSet = new BasicLampSet(cfg.EastLampSetId, new List<ILamp> { _eLamp }, 90, true);
_wSet = new BasicLampSet(cfg.WestLampSetId, new List<ILamp> { _wLamp }, 270, true);
//Establish the lamp sets on traffic routes
_nsRoute = new BasicTrafficRoute(cfg.NorthSouthRouteId, 5, false, new List<ILampSet> { _nSet, _sSet });
_nsRoute.TransitionRightOfWay().Wait();
await Task.Delay(1000);
_nsRoute.TransitionRightOfWay().Wait();
await Task.Delay(1000);
_ewRoute = new BasicTrafficRoute(cfg.EastWestRouteId, 0, true, new List<ILampSet> { _eSet, _wSet });
_ewRoute.TransitionRightOfWay().Wait();
await Task.Delay(1000);
_ewRoute.TransitionRightOfWay().Wait();
await Task.Delay(1000);
//Put the intersection together
_theIntersection = new BasicFourWayIntersection(cfg.IntersectionId, 3, new List<ITrafficRoute>
{
_nsRoute, _ewRoute
});
_eventor.SendLogMessage(cfg.IntersectionId, false, "POST completed.", DateTime.UtcNow);
}
catch (Exception ex)
{
_eventor.SendLogMessage(cfg.IntersectionId, true, ex.ToString(), DateTime.UtcNow);
throw;
}
}
private async Task BuildBulbs(GpioController gpio, ConfigurationSettings cfg)
{
await Task.Run(() =>
{
//My *north* lamp is the real bulb breakout... use different sensors by changing the commented line
_nBulbs.Add(new RealBulbWithSensor(cfg.NorthRedBulbId, BulbTypeEnum.Red, new MockSensor(cfg.NorthRedBulbSensorId),
//new Acs712CurrentSensor5A(cfg.NorthRedBulbSensorId, _mcp3208, McpChannelByteEnum.ChannelOne),
gpio.OpenPin(cfg.NorthRedBulbPinId)));
_nBulbs.Add(new RealBulbWithSensor(cfg.NorthYellowBulbId, BulbTypeEnum.Yellow, new MockSensor(cfg.NorthYellowBulbSensorId),
//new Acs712CurrentSensor5A(cfg.NorthYellowBulbSensorId, _mcp3208, McpChannelByteEnum.ChannelOne),
gpio.OpenPin(cfg.NorthYellowBulbPinId)));
_nBulbs.Add(new RealBulbWithSensor(cfg.NorthGreenBulbId, BulbTypeEnum.Green, new MockSensor(cfg.NorthGreenBulbSensorId),
//new Acs712CurrentSensor5A(cfg.NorthGreenBulbSensorId, _mcp3208, McpChannelByteEnum.ChannelOne),
gpio.OpenPin(cfg.NorthGreenBulbPinId)));
//The rest of the lamps are simulated on the breadboard with LEDs
_sBulbs.Add(new LedWithoutSensor(cfg.SouthRedBulbId, BulbTypeEnum.Red, cfg.SouthRedBulbSensorId,
gpio.OpenPin(cfg.SouthRedBulbPinId)));
_sBulbs.Add(new LedWithoutSensor(cfg.SouthYellowBulbId, BulbTypeEnum.Yellow, cfg.SouthYellowBulbSensorId,
gpio.OpenPin(cfg.SouthYellowBulbPinId)));
_sBulbs.Add(new LedWithoutSensor(cfg.SouthGreenBulbId, BulbTypeEnum.Green, cfg.SouthGreenBulbSensorId,
gpio.OpenPin(cfg.SouthGreenBulbPinId)));
_eBulbs.Add(new LedWithoutSensor(cfg.EastRedBulbId, BulbTypeEnum.Red, cfg.EastRedBulbSensorId,
gpio.OpenPin(cfg.EastRedBulbPinId)));
_eBulbs.Add(new LedWithoutSensor(cfg.EastYellowBulbId, BulbTypeEnum.Yellow, cfg.EastYellowBulbSensorId,
gpio.OpenPin(cfg.EastYellowBulbPinId)));
_eBulbs.Add(new LedWithoutSensor(cfg.EastGreenBulbId, BulbTypeEnum.Green, cfg.EastGreenBulbSensorId,
gpio.OpenPin(cfg.EastGreenBulbPinId)));
_wBulbs.Add(new LedWithoutSensor(cfg.WestRedBulbId, BulbTypeEnum.Red, cfg.WestRedBulbSensorId,
gpio.OpenPin(cfg.WestRedBulbPinId)));
_wBulbs.Add(new LedWithoutSensor(cfg.WestYellowBulbId, BulbTypeEnum.Yellow, cfg.WestYellowBulbSensorId,
gpio.OpenPin(cfg.WestYellowBulbPinId)));
_wBulbs.Add(new LedWithoutSensor(cfg.WestGreenBulbId, BulbTypeEnum.Green, cfg.WestGreenBulbSensorId,
gpio.OpenPin(cfg.WestGreenBulbPinId)));
var t = new[]
{
_nBulbs.First(b => b.BulbType == BulbTypeEnum.Red).TransitionToState(BulbStateEnum.On), _sBulbs.First(b => b.BulbType == BulbTypeEnum.Red).TransitionToState(BulbStateEnum.On), _wBulbs.First(b => b.BulbType == BulbTypeEnum.Green).TransitionToState(BulbStateEnum.On), _eBulbs.First(b => b.BulbType == BulbTypeEnum.Green).TransitionToState(BulbStateEnum.On)
};
Task.WaitAll(t);
});
}
private async Task BuildLamps(ConfigurationSettings cfg)
{
//My *north* lamp is the real bulb breakout
_nLamp = new ThreeLightLamp(cfg.NorthLampId, _nBulbs);
_nLamp.TransitionToState(LampStateEnum.Go).Wait();
await Task.Delay(200);
_nLamp.TransitionToState(LampStateEnum.Caution).Wait();
await Task.Delay(200);
_nLamp.TransitionToState(LampStateEnum.Stop).Wait();
await Task.Delay(200);
//The rest of the lamps are simulated on the breadboard with LEDs
_sLamp = new ThreeLightLamp(cfg.SouthLampId, _sBulbs);
_sLamp.TransitionToState(LampStateEnum.Go).Wait();
await Task.Delay(200);
_sLamp.TransitionToState(LampStateEnum.Caution).Wait();
await Task.Delay(200);
_sLamp.TransitionToState(LampStateEnum.Stop).Wait();
await Task.Delay(200);
_eLamp = new ThreeLightLamp(cfg.EastLampId, _eBulbs);
_eLamp.TransitionToState(LampStateEnum.Caution).Wait();
await Task.Delay(200);
_eLamp.TransitionToState(LampStateEnum.Stop).Wait();
await Task.Delay(200);
_eLamp.TransitionToState(LampStateEnum.Go).Wait();
await Task.Delay(200);
_wLamp = new ThreeLightLamp(cfg.WestLampId, _wBulbs);
_wLamp.TransitionToState(LampStateEnum.Caution).Wait();
await Task.Delay(200);
_wLamp.TransitionToState(LampStateEnum.Stop).Wait();
await Task.Delay(200);
_wLamp.TransitionToState(LampStateEnum.Go).Wait();
}
//This is where events bubble up from the underlying hardware on their way through the _eventor
//interface where we will be using our AzureIoT implementation to push them on to the serviec bus
private void BindEvents()
{
var routes = _theIntersection.TrafficRoutes.ToList();
var sets = routes.SelectMany(r => r.LampSets).ToList();
var lamps = sets.SelectMany(ls => ls.Lamps).ToList();
var bulbs = lamps.SelectMany(l => l.Bulbs).ToList();
var sensors = bulbs.Select(b => b.MyCurrentSensor).ToList();
sensors.ForEach(s => s.StateChanged += (sender, args) =>
{
//Only report inop
if (args.NewState != 9999 && args.OldState != 9999) return;
var oldS = (CurrentSensorStateEnum)args.OldState;
var newS = (CurrentSensorStateEnum)args.NewState;
_eventor.SendStateChangeEvent(args.SourceId, "Sensor", oldS.ToString(), newS.ToString(), args.SourceTimestamp);
});
bulbs.ForEach(s => s.StateChanged += (sender, args) =>
{
var oldS = (BulbStateEnum)args.OldState;
var newS = (BulbStateEnum)args.NewState;
_eventor.SendStateChangeEvent(args.SourceId, "Bulb", oldS.ToString(), newS.ToString(), args.SourceTimestamp);
});
//For bulb usage factor one is the time it was on and factor 2 is the cycle count.
//Considering that turning a bulb on is hard on the materials it shoudld factor in to MTBF
bulbs.ForEach(s => s.BulbCycled += (sender, args) =>
_eventor.SendUsageUpdate(args.BulbId, Convert.ToDecimal(args.SecondsOn), 1));
lamps.ForEach(s => s.StateChanged += (sender, args) =>
{
var oldS = (LampStateEnum)args.OldState;
var newS = (LampStateEnum)args.NewState;
_eventor.SendStateChangeEvent(args.SourceId, "Lamp", oldS.ToString(), newS.ToString(), args.SourceTimestamp);
});
sets.ForEach(s => s.StateChanged += (sender, args) =>
{
var oldS = (RightOfWayStateEnum)args.OldState;
var newS = (RightOfWayStateEnum)args.NewState;
_eventor.SendStateChangeEvent(args.SourceId, "LampSet", oldS.ToString(), newS.ToString(), args.SourceTimestamp);
});
routes.ForEach(s => s.StateChanged += (sender, args) =>
{
var oldS = (RightOfWayStateEnum)args.OldState;
var newS = (RightOfWayStateEnum)args.NewState;
_eventor.SendStateChangeEvent(args.SourceId, "Route", oldS.ToString(), newS.ToString(), args.SourceTimestamp);
});
_theIntersection.StateChanged += (sender, args) =>
{
var oldS = (IntersectionStateEnum)args.OldState;
var newS = (IntersectionStateEnum)args.NewState;
_eventor.SendStateChangeEvent(args.SourceId, "Intersection", oldS.ToString(), newS.ToString(), args.SourceTimestamp);
};
_theIntersection.InternalAnomalyOccurred += (sender, args) =>
_eventor.SendAnomaly(args.IntersectionId, args.PerformingFunction, args.Description, args.SuspectDeviceSummary?.DeviceId, args.Timestamp);
_theIntersection.RightOfWayChanged += (sender, args) =>
_eventor.SendRightOfWayChanged(args.IntersectionId, args.OldRouteId, args.NewRouteId, args.Timestamp);
}
}
I'm going to call that a wrap for the software side of things. Of course the solution on GitHub includes the monitoring web app, too. It's a relatively basic MVC web app using SignalR to relay the IoT hub messages to the web clients. Here is the meat of that logic in
TrafficManager.Dashboard.Transporter.cs public class Transporter : ITransporter
{
private readonly IRepoDeviceMetadata _deviceRepo;
private readonly CancellationTokenSource _tokenSrc = new CancellationTokenSource();
private readonly ServiceClient _serviceClt;
private readonly List<Task> _sbTasks = new List<Task>();
public Transporter(IRepoDeviceMetadata deviceRepo)
{
_deviceRepo = deviceRepo;
//HACK: Get this in to config!
const string connectionString = "HostName=FloPro.azure-devices.net;SharedAccessKeyName=service;SharedAccessKey=ESqz5/K6toejWVXAYb5dpffFg/Fwb4zHlY40o30O1mw=";//"HostName=PieceOfPiHub.azure-devices.net;SharedAccessKeyName=service;SharedAccessKey=jIUi1GLea8dDnwSu1j5N5fM/aJN7E4ubKxoRxUgUbGo=";
const string iotHubToClientEndpoint = "messages/events";
_serviceClt = ServiceClient.CreateFromConnectionString(connectionString);
var eventHubClient = EventHubClient.CreateFromConnectionString(connectionString, iotHubToClientEndpoint);
foreach (var partition in eventHubClient.GetRuntimeInformation().PartitionIds)
{
//While debugging I found it helpful to backup the receiver a little to keep from having to constantly run the board
//This allowed me to fire up the board every 15 minutes or as needed while developing the web
var receiver = eventHubClient
.GetDefaultConsumerGroup()
.CreateReceiver(partition, DateTime.Now.AddMinutes(-15));
_sbTasks.Add(Listen(receiver));
}
}
public void SendCommand(string deviceId, SystemCommandModel cmd)
{
var msg = JsonConvert.SerializeObject(cmd);
_sbTasks.Add(_serviceClt.SendAsync(deviceId, new Message(Encoding.UTF8.GetBytes(msg))));
}
private async Task Listen(EventHubReceiver receiver)
{
while (!_tokenSrc.IsCancellationRequested)
{
_sbTasks.RemoveAll(x => x.IsCompleted);
//Every 30 seconds, let's check for a cancellation. As far as I could tell, there is not a listen method that
//has native cancellation support. There is for the normal Azure service bus, but guess it hasn't made it to
//the IoT hub libraries.
var eventData = await receiver.ReceiveAsync(TimeSpan.FromSeconds(30));
if (eventData == null) continue;
var ctx = GlobalHost.ConnectionManager.GetHubContext<BusRHub>();
var data = Encoding.UTF8.GetString(eventData.GetBytes());
var theEvent = JsonConvert.DeserializeObject<AllInOneModelDto>(data).ToFullModel() as AllInOneModel;
//Send the event
if (theEvent == null)
{
ctx.Clients.All.eventReceived(data);
continue;
}
var stream = (EventStreamEnum)theEvent.EventStream;
ctx.Clients.All.eventReceived(theEvent.ToString(_deviceRepo));
//If this is a summary event, trigger that method
if (stream == EventStreamEnum.Summary)
{
ctx.Clients.All.summaryUpdate(theEvent.ToString(_deviceRepo));
continue;
}
//If it's a state change
if (stream != EventStreamEnum.StateChange) continue;
//Let's get some more friendly device information
var dev = _deviceRepo.GetByDeviceId(theEvent.DeviceId ?? theEvent.IntersectionId ?? Guid.Empty);
//and trigger the stateChange method for our clients
ctx.Clients.All.stateChange(dev.DeviceId, theEvent.CurrentState);
//Finally the bulbChange method when appropriate to update the graphical UI
if (dev?.DeviceType == "Bulb")
ctx.Clients.All.bulbChange(dev.DeviceId, theEvent.CurrentState == "On" || theEvent.CurrentState == "AssumedOn");
}
}
public void Dispose()
{
Dispose(true);
}
protected void Dispose(bool disposing)
{
if (!disposing) return;
_tokenSrc.Cancel(false);
Task.WaitAll(_sbTasks.ToArray());
}
}
I'm not going to do a deep dive in to the rest of the website. After all, we are going to hook this bad boy up to Azure IoT Suite and let Microsoft do the heavy lifting there! woot! One thing I *will* mention, though... You can't reference your Windows Universal Libraries in your MVC web app. I ended up having to copy my pertinent models in to the web app itself. Ironically, not a very *universal* library...
Let's remote monitor! Starting at https://www.azureiotsuite.com/ go ahead and provision a new solution. I called mine Traffic Manager.
BOOM! Dangit. Provisioning remote monitoring failed...
Before we go for a troubleshoot, let's try one more time with a different name maybe. FloPro, seemed to like that. Second time was a charm:
Looking at my Azure portal here is what it created:
When you go to the site it will create your website account for you with your MS login:
And just like that, we have a dashboard!
Okay, let's add our device as a Custom Device:
Grab the info that it generates and apply it to the TrafficManager.Devices.Configuration.InMemoryConfigServcie.cs in the GitHub solution:
Notice that our device now shows up in the list as "Pending". It is pending because it has not received the DeviceInfo message from the PI. Rebuild and deploy to the Pi. It is expecting to see something like this:
Conveniently that is exactly how our TrafficManager.Domain.Models.AzureDeviceInfo model serializes to JSON.
After a successful rebuild and deploy, fire the app up and let it send it's Startup message containing the DeviceInfo. This will turn the device to running and register the commands:
For my purposes I want to disable the default rules:
And remove them:
Cool, let's go get some telemetry data configured. We'll keep this pretty simple. Let's say if a bulb is on for more than 30 seconds we want to raise an alarm. Also if anything goes InOp, we should alarm on that, too. So that's the goal. Remember that we will be using the Dto object model here because we are pulling it right off the wire. The bulb usage comes across in UsageFactorOne which is u1 in the Dto. The state will come over in CurrentState which is cs. We will alarm on a state of 9999 (InOp). For reference, this is the Dto object model:
namespace TrafficManager.Domain.Models
{
public class AllInOneModelDto : IotHubModelBaseDto
{
public AllInOneModelDto() { }
public AllInOneModelDto(AllInOneModel fullModel, string iotDeviceId)
{
DeviceId = iotDeviceId;
sn = fullModel.EventStream;
ts = fullModel.Timestamp;
id = fullModel.DeviceId;
dt = fullModel.DeviceType;
iid = fullModel.IntersectionId;
fn = fullModel.Function;
dsc = fullModel.Description;
cs = fullModel.CurrentState;
pid = fullModel.ParentDeviceId;
err = fullModel.IsError;
msg = fullModel.Message;
os = fullModel.OldState;
u1 = fullModel.UsageFactorOne;
u2 = fullModel.UsageFactorTwo;
}
public string DeviceId { get; set; }
public Guid? id { get; set; }
public string dt { get; set; }
public Guid? iid { get; set; }
public string fn { get; set; }
public string dsc { get; set; }
public Guid? pid { get; set; }
public bool err { get; set; }
public string msg { get; set; }
public string os { get; set; }
public string cs { get; set; }
public decimal u1 { get; set; }
public decimal u2 { get; set; }
public override IotHubModelBase ToFullModel()
{
return new AllInOneModel
{
EventStream = sn,
Timestamp = ts,
DeviceId = id,
DeviceType = dt,
IntersectionId = iid,
Function = fn,
Description = dsc,
ParentDeviceId = pid,
IsError = err,
Message = msg,
OldState = os,
CurrentState = cs,
UsageFactorOne = u1,
UsageFactorTwo = u2
};
}
}
}
So in our Azure portal we need to find that telemetry stream processing job and modify some SQL. So Stop the job, first of all:
And update the query:
Restart the stream processing and kick off the Pi to push in some events.
Notice also that the device is showing up at a proper intersection here in town... Cool, telemetry data as well. Notice that the max, min and avg graphs are populated too. Unfortunately the titles appear to be hard coded in the demo site.
I was also hoping that the alarm system was smart enough to pick up on my new telemetry fields. Alas, it is not. The Temp and Humidity fields must be part of the demo site as well.
At this point, rather than going back through the azure portal to manually create alarms and what-not, I feel I have exercised the demo well enough. What we can do, however, is review the underlying infrastructure to learn how we might extend the demo or hack and slash it in to a highly customized solution.
Exploring Azure Remote Monitoring - Under the hoodIoT HubThe hub is root of the data stream. It facilitates the communication between the devices and Azure. You will notice that our sample devices as well as our real device appear in there. This is a good place to start troubleshooting. If you do not see that "Messages" number in the Usage tile increasing your device is NOT talking to Azure. Nothing beyond this will function correctly until you get the events flowing in here.
NOTE: The key values are here, too, under the properties of the device
Next on the hit list are the databases. First lets look at
DocumentDb: DevMgmtDB - DevMgmtCollectionThis is where the DeviceInfo is going to end up. If we look in mine we can find the document that was the result of my online message with some additional info
The telemetry data makes its way to some blobs created under the storage account. There are 6 of them and we will see them again when we start looking at the stream processing:
- actionmappings - used on the dashboard. It maps the alarms to actions
- devicerules - used on dash stores the rules that are associated with devices
- devicetelemetry - Stores the results of the stream processing jobs
- flopro-ehout - EventHub storage - update dashboard when devices register
- flopro-ehruleout - EventHub storage Used to update the dashboard when alarms trigger
- rulesoutput - This is historical storage of alarms actually triggering. Stored as csv
Okay, so how to those things get populated?
Stream Jobs - DeviceInfoThe input is our IoT Hub:
And the output is our event hub (flopro-ehout):
This stream job simply pushes IotHub events that are "DeviceInfo" over to the event hub for the dashboard to update. Here is the SQL:
One input, the IotHub, two outputs. They both output to the device telemetry blob container. This job creates all your data points. This is really the meat of the event processing from the devices. This is the one that we edited above to get the metrics from our custom data points.
As the name suggests, this is the alarm processing job. This job combines two inputs and writes out to 2 outputs. The first input is built by input from the dashboard. It lives in the devicerules blob and contains the configured alarms for each device.
This information combined with the events rolling in on the IoTHub will allow the job to understand when an alert should be triggered. When an alarm is raised it will write it to both outputs, the first is the event hub to update the dashboard and the second is the rulesoutput blob (csv file pictured earlier) for persisted storage.
The SQL for this job looks like this:
WITH AlarmsData AS
(
SELECT
Stream.IoTHub.ConnectionDeviceId AS DeviceId,
'Temperature' as ReadingType,
Stream.Temperature as Reading,
Ref.Temperature as Threshold,
Ref.TemperatureRuleOutput as RuleOutput,
Stream.EventEnqueuedUtcTime AS [Time]
FROM IoTTelemetryStream Stream
JOIN DeviceRulesBlob Ref ON Stream.IoTHub.ConnectionDeviceId = Ref.DeviceID
WHERE
Ref.Temperature IS NOT null AND Stream.Temperature > Ref.Temperature
UNION ALL
SELECT
Stream.IoTHub.ConnectionDeviceId AS DeviceId,
'Humidity' as ReadingType,
Stream.Humidity as Reading,
Ref.Humidity as Threshold,
Ref.HumidityRuleOutput as RuleOutput,
Stream.EventEnqueuedUtcTime AS [Time]
FROM IoTTelemetryStream Stream
JOIN DeviceRulesBlob Ref ON Stream.IoTHub.ConnectionDeviceId = Ref.DeviceID
WHERE
Ref.Humidity IS NOT null AND Stream.Humidity > Ref.Humidity
)
SELECT *
INTO DeviceRulesMonitoring
FROM AlarmsData
SELECT *
INTO DeviceRulesHub
FROM AlarmsData
Besides that you have
- FloPro-Map - The Bing map subscription for the dashboard
- FloPro-plan for the web app hosting plan
- FloPro-jobsplan for the web jobs hosting plan
- FloPro web app
- FloPro-jobhost - host for the webjobs (simulated devices)
- FloPro service bus namespace
And that does it. Remote Monitoring inside and out...
Forward ThinkingI believe that there is no limit to where this could go. Integrating cloud connected intersections with real-time traffic info and machine learning could revolutionize how efficiently traffic systems managed traffic in a smart city. Beyond just recognizing where masses of traffic where migrating to and from, it could be integrated with RFID vehicle identification systems to preference emergency vehicles or even public transportation that was running behind schedule. City managers could coordinate detour routes around work areas or emergency scenes with the push of a button. Beyond even that, as vehicles get smarter and gps routing more efficient, route planning software for delivery vehicles or even just the daily driver could be empowered to understand the traffic signal phases along a given route in order to plan to maximize the chances of seeing green lights.
EpilogueAt the beginning of this project I set out to create a device that went beyond relaying measurements to the cloud. It was my intention to create a higher level device that, itself, operated and understood some child devices and just as well could become part of a larger mesh that looked at *the intersection* as its most finite part. So rather than relaying the values read form the current sensors to the cloud, I wanted my device to understand what that mean in relation to the state of its internals and report to the cloud as an intersection. The next step would be the intersections wrapping up to *the city* and then the city and its surrounding cities to *the State* and so on. I think that it is important to look at each task or problem and understand where the *understanding* needs to take place in order maintain scalabiliity. If a lamp understands when a bulb is out and doesn't need to push that logic up stream, and the Intersection is smart enough to take, maybe a compass heading, and preference the appropriate route for an approaching mass of traffic, and the city is smart enough to take geo-fence coordinates and avoid that area, and the State... And so on... This is an architecture that can scale on and on indefinitely. Again, worth repeating, the fundamental key to all that, in my mind, is assigning the responsibility and including the appropriate logic at the appropriate levels. This includes "containing" the actual measurements that go in to making the decision from polluting the model past where they will be used to make the decision.
Comments