Sumit Kumar
Published © GPL3+

Telegram Anomaly Alert of Continuous Temperature Change

Device which will alert in temperature change or any anomaly that happened. It also predict the future data on basis of data history.

IntermediateFull instructions provided3 hours330
Telegram Anomaly Alert of Continuous Temperature Change

Things used in this project

Hardware components

Bolt WiFi Module
Bolt IoT Bolt WiFi Module
×1
Temperature Sensor
Temperature Sensor
×1
Male/Female Jumper Wires
Male/Female Jumper Wires
×1

Software apps and online services

Bolt Cloud
Bolt IoT Bolt Cloud
Python

Story

Read more

Code

Full Code

Python
import json, time, statistics, math , requests
from boltiot import Bolt

API_KEY='XXXXXXXX'
DEVICE_ID='XXXXXXXX'
telegram_chat_id = "@XXXXXXX"  
telegram_bot_id = "botXXXXXXX"

max_temp = 15
min_temp = 4
frame_size = 10
mul_factor = 5

url = "https://api.telegram.org/" + telegram_bot_id + "/sendMessage"

# telegram function used to send telegram alert according to conditions....

def telegram(msg):
    data = {
        "chat_id": telegram_chat_id,
        "text": msg
    }
    try:
        response = requests.request("POST", url, params=data)
        print("Message Sent Successfully...")
        print(url)

    except Exception as e:
        print("An error occurred in sending the alert message via Telegram")
        print(e)

# calculating the Z-score value and defining boundries for for anomaly detection...

def compute_bounds(history_data,frame_size,factor):
    if(len(history_data) < frame_size):
        return None

    if(len(history_data) > frame_size):
        del history_data[0:len(history_data)-frame_size] 

    Mean_ = statistics.mean(history_data)
    Variance=0

    for data in history_data:
        Variance += math.pow((data-Mean_),2)
    Zn = factor * math.sqrt(Variance / frame_size)

    High_bound = history_data[frame_size-1]+Zn
    Low_bound = history_data[frame_size-1]-Zn
    return (High_bound,Low_bound)


mybolt = Bolt(API_KEY, DEVICE_ID)
history_data=[]

while True:
    response = mybolt.analogRead('A0')
    data = json.loads(response)
    if data['success'] != 1:
        print("There was an error while retriving the data.")
        print("This is the error:"+data['value'])
        time.sleep(3)
        continue
    
    sensor_value=0
    try:
        sensor_value = int(data['value'])
        sensor_value = (sensor_value*100)/1024
        print("The temperature is: " + str(sensor_value) + ' celsius')

    except e:
        print("There was an error while parsing the response: ",e)
        continue

    bound = compute_bounds(history_data,frame_size, mul_factor)
    if not bound:
        required_data_count = frame_size - len(history_data)
        print("Not enough data to compute Z-score. Need ",required_data_count," more data points")
        history_data.append(sensor_value)
        time.sleep(3)
        continue

#  checking conditions for sending alerts 

    try:
        if(sensor_value > max_temp):
            text = 'Your temperature is too HIGH, please respond quickly'
            print('Temp crossed threshold....')
            telegram(text)
        if(sensor_value < min_temp):
            text = 'Your temperature is too LOW, please respond quickly'
            print('Temp crossed threshold.....')
            telegram(text)  
        
        if(int(sensor_value) not in range(int(bound[1]) , int(bound[0])+1)):
            print('Someone has opened the fridge door...')
            text = 'Someone Just opened refrigerator..'
            telegram(text)

        history_data.append(sensor_value)
    except Exception as e:
        print ("Error",e)

    time.sleep(5)

Credits

Sumit Kumar
1 project • 1 follower
Contact

Comments

Please log in or sign up to comment.