Anomaly detection is the process of identifying unexpected items or events in data sets. It’s about detecting the deviation from expected pattern of a dataset. It’s like having “spidey senses” for your apps that can detect when there’s danger or something is not right. Attend this session and learn about using anomaly detection in ML.NET, Azure Stream Analytics and Cognitive Services API, become a superhero and save the day.
Here's the presentation slides
Presentation SpideySense-Anomaly
- Data Entry Errors: 100, 000 vs 1, 000, 000 - fat fingered
- Measurement Error: common
- Experimental Error: start late in sprint
- Intentional Outlier: underreporting alcohol consumption
- Data Processing Error: extraction errors
- Sampling Error: reporting height for all athletes and included few basketball players
- Natural Outlier: When it's not artificial
Applications of Anomaly Detection
Time Series Data is what we typically get in an IoT device. Sensor is doing fine, then in the middle of nowhere, there's a spike like this
Sample source code is located in my github page, click here
Anomaly Detection using Cognitive Services Anomaly Detector APITry it out:
Click on the link to run on Binder
It should start a container running the program
Plug in subscription key and endpoint by going to Anomaly Detector Page and sign up for the API
# To start sending requests to the Anomaly Detector API, paste your subscription key you received after creating Anomaly Detector resource.
subscription_key = ''
# Use the endpoint your received from overview section of the Anomaly Detector resource you created
# the endpoint is like https://westus2.api.cognitive.microsoft.com/, different by regions, you need to concat anomalydetector/v1.0/timeseries/entire/detect
#endpoint = 'https://westus2.api.cognitive.microsoft.com/anomalydetector/v1.0/timeseries/entire/detect'
endpoint = ''
test data looks like this
{
"granularity": "minutely",
"period" : 15,
"sensitivity" : 80,
"series": [
{
"value": 22.37410220507436,
"timestamp": "2020-07-11T01:43:00.000Z"
},
{
"value": 28.375182989711604,
"timestamp": "2020-07-11T01:44:00.000Z"
},
{
"value": 21.960680058249352,
"timestamp": "2020-07-11T01:45:00.000Z"
},
{
"value": 25.65734886985868,
"timestamp": "2020-07-11T01:46:00.000Z"
}
]
}
Here's how it calls the API from python
def detect(endpoint, subscription_key, request_data):
headers = {'Content-Type': 'application/json', 'Ocp-Apim-Subscription-Key': subscription_key}
response = requests.post(endpoint, data=json.dumps(request_data), headers=headers)
if response.status_code == 200:
return json.loads(response.content.decode("utf-8"))
else:
print(response.status_code)
raise Exception(response.text)
def build_figure(sample_data, sensitivity):
sample_data['sensitivity'] = sensitivity
result = detect(endpoint, subscription_key, sample_data)
sample_data = json.load(open('./testdata.json'))
sample_data['granularity'] = 'minutely'
sample_data['period'] = 15
# 95 sensitivity
build_figure(sample_data,95)
Run all cells
Anomaly Detection in ML.NETOpen in VS Code with Remote Containers
Use project
mlnet\TempHumidityAnomalyDetection.csproj
Open the project in dev container
Data\testdata.csv looks like this
"messageId","deviceId","temperature","humidity","enqueuedTimeUtc"
"1","Raspberry Pi Web Client","21.533918279089207","62.049337654075785","2020-06-27T11:49:52.583Z"
"2","Raspberry Pi Web Client","27.787553476779234","79.58891052389029","2020-06-27T11:49:53.588Z"
"3","Raspberry Pi Web Client","29.098405839319042","72.16314028869465","2020-06-27T11:49:54.585Z"
"4","Raspberry Pi Web Client","29.131030128595658","74.1662945972788","2020-06-27T11:49:55.584Z"
"5","Raspberry Pi Web Client","23.50802809147398","70.58544235687907","2020-06-27T11:49:56.583Z"
Run program.cs
Here's how it detects spikes
static void DetectSpike(MLContext mlContext, IDataView tempHumidityData)
{
Console.WriteLine("Detect temporary changes in pattern");
// STEP 2: Set the training algorithm
var iidSpikeEstimator = mlContext.Transforms.DetectIidSpike(
outputColumnName: nameof(TempHumidityPrediction.Prediction),
inputColumnName: nameof(TempHumidityData.Temperature),
confidence: 95, pvalueHistoryLength: 25 );
// STEP 3: Create the transform
// Create the spike detection transform
Console.WriteLine("=============== Training the model ===============");
ITransformer iidSpikeTransform =
iidSpikeEstimator.Fit(CreateEmptyDataView(mlContext));
Console.WriteLine("=============== End of training process ===============");
//Apply data transformation to create predictions.
IDataView transformedData = iidSpikeTransform.Transform(tempHumidityData);
var predictions = mlContext.Data.CreateEnumerable<TempHumidityPrediction>(
transformedData, reuseRowObject: false);
Console.WriteLine("Alert\tScore\tP-Value");
foreach (var p in predictions)
{
var results = $"{p.Prediction[0]}\t{p.Prediction[1]:f2}\t{p.Prediction[2]:F2}";
if (p.Prediction[0] == 1)
{
results += " <-- Spike detected";
}
Console.WriteLine(results);
}
Console.WriteLine("");
}
Data coming in from IoT Hub can be filtered thru Stream Analytics and deliver alerts or update dashboard data.
To test it out:
Open in VS Code with Azure Stream Analytics Tools Installed
Use this workspace
spideySenseASAProj\spideySenseASA.code-workspace
Follow instructions on this tutorial
Open browser to Raspberry PI Azure IoT Online Simulator
- copy and paste rpi-node every5sec.js to code window
- replace connectionString
Define Transformation Query using spideySenseASAProj.asaql instead of this query
Define live input and live output
Here's what testdata.json looks like
[
{
"messageId": 1,
"deviceId": "Raspberry Pi Web Client",
"temperature": 21.533918279089207,
"humidity": 62.049337654075785,
"enqueuedTimeUtc": "2020-06-27T11:49:52.583Z"
},
{
"messageId": 2,
"deviceId": "Raspberry Pi Web Client",
"temperature": 27.787553476779234,
"humidity": 79.58891052389029,
"enqueuedTimeUtc": "2020-06-27T11:49:53.588Z"
},
{
"messageId": 3,
"deviceId": "Raspberry Pi Web Client",
"temperature": 29.098405839319042,
"humidity": 72.16314028869465,
"enqueuedTimeUtc": "2020-06-27T11:49:54.585Z"
}
]
Here's what asaql looks like
WITH AnomalyDetectionStep AS
(
SELECT
EVENTENQUEUEDUTCTIME AS time,
CAST(temperature AS float) AS temp,
AnomalyDetection_SpikeAndDip(CAST(temperature AS float),
80, 120, 'spikesanddips')
OVER(LIMIT DURATION(second, 120)) AS SpikeAndDipScores
FROM IoTHubInput
)
SELECT
time,
temp,
CAST(GetRecordPropertyValue(SpikeAndDipScores, 'Score') AS float) AS
SpikeAndDipScore,
CAST(GetRecordPropertyValue(SpikeAndDipScores, 'IsAnomaly') AS bigint) AS
IsSpikeAndDipAnomaly
INTO BlobOutput
FROM AnomalyDetectionStep
/*AnomalyDetection_SpikeAndDip(
<scalar_expression>,
<confidence>,
<historySize>,
<mode>)
OVER ([PARTITION BY <partition key>]
LIMIT DURATION(<unit>, <length>)
[WHEN boolean_expression])*/
As developers,
If you find this useful, please click the "thumbs up" button and follow me. It's always great to hear from others how this project was able to help them understand the technology.
Comments
Please log in or sign up to comment.