Kutluhan Aktar
Published © CC BY

IoT | Telegram Fingerprint Door Lock and Surveillance Camera

Via Telegram, get apprised of every attempt to lock or unlock the door w/ surveillance footage by 5MP night vision camera or USB webcam.

ExpertFull instructions provided4 hours5,876

Things used in this project

Hardware components

Raspberry Pi 3 Model B+
Raspberry Pi 3 Model B+
Raspberry Pi 3B+ or 4
×1
Raspberry Pi 4 Model B
Raspberry Pi 4 Model B
Raspberry Pi 3B+ or 4
×1
Fingerprint Sensor
DFRobot Fingerprint Sensor
×1
USB-to-Serial Converter (CP2102)
×1
DFRobot 5MP Night Vision Camera for Raspberry Pi
×1
USB Webcam
×1
DFRobot Electromagnetic Lock
×1
SparkFun 2-Channel Relay Module
×1
12V Battery
×1

Software apps and online services

Raspbian
Raspberry Pi Raspbian
Visual Studio 2017
Microsoft Visual Studio 2017
Telegram Bot API

Hand tools and fabrication machines

Soldering iron (generic)
Soldering iron (generic)
Solder Wire, Lead Free
Solder Wire, Lead Free
Hot glue gun (generic)
Hot glue gun (generic)

Story

Read more

Custom parts and enclosures

Telegram_Surveillance_System.zip

Schematics

Schematic-1

Schematic-2

Code

surveillance_system.py

Python
# IoT | Telegram Fingerprint Door Lock and Surveillance Camera w/ Night Vision
#
# Raspberry Pi 3 Model B+ or 4
#
# By Kutluhan Aktar
#
# Via Telegram, get apprised of every attempt to lock or unlock the door w/ surveillance footage
# by night vision camera or USB Webcam.
#
# For more information:
# https://www.theamplituhedron.com/projects/IoT-Telegram-Fingerprint-Door-Lock-and-Surveillance-Camera-w-Night-Vision/

from picamera import PiCamera
import json
from time import sleep
import datetime
from subprocess import call 
import requests
import RPi.GPIO as GPIO
import adafruit_fingerprint
import serial

# Set up BCM GPIO numbering
GPIO.setmode(GPIO.BCM)
GPIO.setwarnings(False)
# Set up Relay pins:
lock = 4
GPIO.setup(lock, GPIO.OUT)
GPIO.output(lock, GPIO.HIGH)

# Create the Surveillance System class with the required settings:
class Surveillance_System:
    def __init__(self, server, file_location):
        # Define the fingerprint sensor settings (USB/serial converter).
        uart = serial.Serial("/dev/ttyUSB0", baudrate=57600, timeout=1)
        self.finger = adafruit_fingerprint.Adafruit_Fingerprint(uart)
        # Define the server (Telegram Bot) and the file location (captured).
        self.server = server
        self.file_location = file_location
        # Define the Night Vision Camera Settings.
        self.night_cam = PiCamera()
        self.night_cam.resolution = (640, 480)
        self.night_cam.framerate = 15
        # Define the default camera type setting (USB Webcam).
        self.camera_type = "USB"
        self.surveillance_request = "default"
    # Get a fingerprint image, template it, and see if it matches.
    def detect_fingerprint(self):
        if self.finger.get_image() != adafruit_fingerprint.OK:
            sleep(1)
            return "Waiting"
        print("Templating...")
        if self.finger.image_2_tz(1) != adafruit_fingerprint.OK:
            return "Not Reading"
        print("Searching...")
        if self.finger.finger_search() != adafruit_fingerprint.OK:
            return "Not Found"
        print("Detected #", self.finger.finger_id, "with confidence", self.finger.confidence)
        return "Found"
    # Get updates from the Telegram Bot via the PHP web application (telegram_surveillance_bot).
    def get_updates_from_web_app(self):
        data = requests.get(self.server+"?data=ok");
        # If incoming data:
        if(data.text == "Waiting new commands..."):
            pass
        else:
            self.commands = json.loads(data.text)
            self.camera_type = self.commands["camera"]
            self.surveillance_request = self.commands["surveillance"]
    # According to the selected camera type (Night Vision or USB Webcam), capture the recent attempt to lock or unlock the door.
    def capture_last_attempt(self, _date, file_name, camera_type):
        file_name = self.file_location + file_name
        # Night Vision Camera:
        if(camera_type == "night"):
            # Add date as timestamp on the generated files.
            self.night_cam.annotate_text = _date
            # Capture an image as the thumbnail.
            sleep(2)
            self.night_cam.capture(file_name+".jpg")
            print("\r\nRasp_Pi => Image Captured!\r\n")
            # Record a 5 seconds video.
            self.night_cam.start_recording(file_name+".h264")
            sleep(10)
            self.night_cam.stop_recording()
            print("Rasp_Pi => Video Recorded! \r\n")
            # Convert the H264 format to the MP4 format.
            command = "MP4Box -add " + file_name + ".h264" + " " + file_name + ".mp4"
            call([command], shell=True)
            print("\r\nRasp_Pi => Video Converted! \r\n")
        # USB Webcam:
        elif (camera_type == "USB"):
            # Capture an image with Fswebcam module.
            width = "640"
            height = "480"
            command_capture = "fswebcam -D 2 -S 20 -r " + width + "x" + height + " " + file_name + ".jpg"
            call([command_capture], shell=True)
            print("\r\nRasp_Pi => Image Captured!\r\n")
            # Record a 5 seconds video with FFmpeg.
            command_record = "ffmpeg -f video4linux2 -r 20 -t 5 -s " + width + "x" + height + " -i /dev/video0 " + file_name + ".avi"
            call([command_record], shell=True)
            print("Rasp_Pi => Video Recorded! \r\n")
            # Convert the AVI format to the MP4 format.
            command = "MP4Box -add " + file_name + ".avi" + " " + file_name + ".mp4"
            call([command], shell=True)
            print("\r\nRasp_Pi => Video Converted! \r\n")
            sleep(3)
    # Send the recently captured files to the server (web app).
    def send_last_attempt(self, _file_name):
        file_name = self.file_location + _file_name
        # Files:
        files = {'rasp_video': open(file_name+".mp4", 'rb'), 'rasp_capture': open(file_name+".jpg", 'rb')}
        # Last Entry:
        data = {'access': _file_name}
        # Make an HTTP Post Request to the server to send the files.
        request = requests.post(self.server, files=files, data=data)
        # Print the response from the server.
        print("Rasp_Pi => Files Transferred!\r\n")
        print(request.text+"\r\n")

# Define a new class object named 'surveillance':
surveillance = Surveillance_System("https://www.theamplituhedron.com/telegram_surveillance_bot/", "/home/pi/Telegram_Surveillance_System_w_Fingerprint/captured/") # Change with your settings.

while True:
    # Get updates from the Telegram Bot via the PHP web app.
    surveillance.get_updates_from_web_app()
    sleep(5)
    # If surveillance footage requested without triggering the fingerprint sensor:
    if(surveillance.surveillance_request == "footage"):
        surveillance.surveillance_request = "default"
        print("Bot => Footage Requested!\r\n")
        date = datetime.datetime.now().strftime("%m-%d-%y_%H-%M-%S")
        surveillance.capture_last_attempt(date, "requested_"+date, surveillance.camera_type)
        surveillance.send_last_attempt("requested_"+date)
    # Detect whether the fingerprint is found or not.
    fingerprint_sta = surveillance.detect_fingerprint()
    if(fingerprint_sta == "Waiting"):
        print("Waiting for image...")
    elif(fingerprint_sta == "Found"):
        # Lock or unlock:
        if(GPIO.input(lock)):
            GPIO.output(lock, GPIO.LOW)
        else:
            GPIO.output(lock, GPIO.HIGH)
        print("Fingerprint => Detected!")
        date = datetime.datetime.now().strftime("%m-%d-%y_%H-%M-%S")
        surveillance.capture_last_attempt(date, "access_"+date, surveillance.camera_type)
        surveillance.send_last_attempt("access_"+date)
    elif(fingerprint_sta == "Not Found"):
        print("Fingerprint => Not Found!")
        date = datetime.datetime.now().strftime("%m-%d-%y_%H-%M-%S")
        surveillance.capture_last_attempt(date, "failed_"+date, surveillance.camera_type)
        surveillance.send_last_attempt("failed_"+date)

fingerprint_sensor_settings.py

Python
# SPDX-FileCopyrightText: 2021 ladyada for Adafruit Industries
# SPDX-License-Identifier: MIT

import time
import adafruit_fingerprint
import serial

# If using with Linux/Raspberry Pi and hardware UART:
#uart = serial.Serial("/dev/ttyS0", baudrate=57600, timeout=1)

# If using with a computer such as Linux/RaspberryPi, Mac, Windows with USB/serial converter:
uart = serial.Serial("/dev/ttyUSB0", baudrate=57600, timeout=1)

finger = adafruit_fingerprint.Adafruit_Fingerprint(uart)

##################################################


def get_fingerprint():
    """Get a finger print image, template it, and see if it matches!"""
    print("Waiting for image...")
    while finger.get_image() != adafruit_fingerprint.OK:
        pass
    print("Templating...")
    if finger.image_2_tz(1) != adafruit_fingerprint.OK:
        return False
    print("Searching...")
    if finger.finger_search() != adafruit_fingerprint.OK:
        return False
    return True


# pylint: disable=too-many-branches
def get_fingerprint_detail():
    """Get a finger print image, template it, and see if it matches!
    This time, print out each error instead of just returning on failure"""
    print("Getting image...", end="", flush=True)
    i = finger.get_image()
    if i == adafruit_fingerprint.OK:
        print("Image taken")
    else:
        if i == adafruit_fingerprint.NOFINGER:
            print("No finger detected")
        elif i == adafruit_fingerprint.IMAGEFAIL:
            print("Imaging error")
        else:
            print("Other error")
        return False

    print("Templating...", end="", flush=True)
    i = finger.image_2_tz(1)
    if i == adafruit_fingerprint.OK:
        print("Templated")
    else:
        if i == adafruit_fingerprint.IMAGEMESS:
            print("Image too messy")
        elif i == adafruit_fingerprint.FEATUREFAIL:
            print("Could not identify features")
        elif i == adafruit_fingerprint.INVALIDIMAGE:
            print("Image invalid")
        else:
            print("Other error")
        return False

    print("Searching...", end="", flush=True)
    i = finger.finger_fast_search()
    # pylint: disable=no-else-return
    # This block needs to be refactored when it can be tested.
    if i == adafruit_fingerprint.OK:
        print("Found fingerprint!")
        return True
    else:
        if i == adafruit_fingerprint.NOTFOUND:
            print("No match found")
        else:
            print("Other error")
        return False


# pylint: disable=too-many-statements
def enroll_finger(location):
    """Take a 2 finger images and template it, then store in 'location'"""
    for fingerimg in range(1, 3):
        if fingerimg == 1:
            print("Place finger on sensor...", end="", flush=True)
        else:
            print("Place same finger again...", end="", flush=True)

        while True:
            i = finger.get_image()
            if i == adafruit_fingerprint.OK:
                print("Image taken")
                break
            if i == adafruit_fingerprint.NOFINGER:
                print(".", end="", flush=True)
            elif i == adafruit_fingerprint.IMAGEFAIL:
                print("Imaging error")
                return False
            else:
                print("Other error")
                return False

        print("Templating...", end="", flush=True)
        i = finger.image_2_tz(fingerimg)
        if i == adafruit_fingerprint.OK:
            print("Templated")
        else:
            if i == adafruit_fingerprint.IMAGEMESS:
                print("Image too messy")
            elif i == adafruit_fingerprint.FEATUREFAIL:
                print("Could not identify features")
            elif i == adafruit_fingerprint.INVALIDIMAGE:
                print("Image invalid")
            else:
                print("Other error")
            return False

        if fingerimg == 1:
            print("Remove finger")
            time.sleep(1)
            while i != adafruit_fingerprint.NOFINGER:
                i = finger.get_image()

    print("Creating model...", end="", flush=True)
    i = finger.create_model()
    if i == adafruit_fingerprint.OK:
        print("Created")
    else:
        if i == adafruit_fingerprint.ENROLLMISMATCH:
            print("Prints did not match")
        else:
            print("Other error")
        return False

    print("Storing model #%d..." % location, end="", flush=True)
    i = finger.store_model(location)
    if i == adafruit_fingerprint.OK:
        print("Stored")
    else:
        if i == adafruit_fingerprint.BADLOCATION:
            print("Bad storage location")
        elif i == adafruit_fingerprint.FLASHERR:
            print("Flash storage error")
        else:
            print("Other error")
        return False

    return True


##################################################


def get_num():
    """Use input() to get a valid number from 1 to 127. Retry till success!"""
    i = 0
    while (i > 127) or (i < 1):
        try:
            i = int(input("Enter ID # from 1-127: "))
        except ValueError:
            pass
    return i


while True:
    print("----------------")
    if finger.read_templates() != adafruit_fingerprint.OK:
        raise RuntimeError("Failed to read templates")
    print("Fingerprint templates:", finger.templates)
    print("e) enroll print")
    print("f) find print")
    print("d) delete print")
    print("----------------")
    c = input("> ")

    if c == "e":
        enroll_finger(get_num())
    if c == "f":
        if get_fingerprint():
            print("Detected #", finger.finger_id, "with confidence", finger.confidence)
        else:
            print("Finger not found")
    if c == "d":
        if finger.delete_model(get_num()) == adafruit_fingerprint.OK:
            print("Deleted!")
        else:
            print("Failed to delete")

index.php (web app)

PHP
<?php

// Define the telegram_surveillance class and its functions:
class telegram_surveillance {
	public $token, $web_path, $conn, $table;
	
	public function __init__($token, $server, $conn, $table){
		$this->token = $token;
		$this->web_path = $server.$token;
		$this->conn = $conn;
		$this->table = $table;
	}
	// Telegram:
	public function send_message($id, $string){
		$new_message = $this->web_path."/sendMessage?chat_id=".$id."&text=".urlencode($string);
		file_get_contents($new_message);
	}
	
	public function send_photo($id, $photo, $caption){
	    $new_photo = $this->web_path."/sendPhoto?chat_id=".$id."&photo=".$photo."&caption=".$caption;
	    file_get_contents($new_photo);
	}

	public function send_video($id, $video, $caption){
	    $new_video = $this->web_path."/sendVideo?chat_id=".$id."&video=".$video."&caption=".$caption;
	    file_get_contents($new_video);
	}
	
	// Database:
	public function update_id($chat_id){
		$sql = "UPDATE `$this->table` SET `id`='$chat_id' LIMIT 1";
		mysqli_query($this->conn, $sql);
	}

	public function update_access($access){
		$sql = "UPDATE `$this->table` SET `access`='$access' LIMIT 1";
		mysqli_query($this->conn, $sql);
	}

	public function update_camera($camera, $status){
		$sql = "UPDATE `$this->table` SET `camera`='$camera', `status`='$status' LIMIT 1";
		mysqli_query($this->conn, $sql);
	}

	public function update_surveillance($surveillance, $status){
		$sql = "UPDATE `$this->table` SET `surveillance`='$surveillance', `status`='$status' LIMIT 1";
		mysqli_query($this->conn, $sql);
	}
	
	// Fetch:
	public function get_last_access(){
		$sql = "SELECT * FROM `$this->table` LIMIT 1";
		$result = mysqli_query($this->conn, $sql);
		if($row = mysqli_fetch_assoc($result)){
			return $row["access"];
		}
	}
	
	public function get_entry_log(){
		$entries = "";
		foreach(glob("*captured/*.jpg*") as $entry){
			$entries .= explode(".", explode("/", $entry)[1])[0]."\n";
		}
		return $entries;
	}
	
	public function get_chat_id(){
		$sql = "SELECT * FROM `$this->table` LIMIT 1";
		$result = mysqli_query($this->conn, $sql);
		if($row = mysqli_fetch_assoc($result)){
			return $row["id"];
		}
	}
	
	// Print:
	public function print_and_manage_data(){
		$sql = "SELECT * FROM `$this->table` LIMIT 1";
		$result = mysqli_query($this->conn, $sql);
		if($row = mysqli_fetch_assoc($result)){
			if($row["status"] == "default"){
				echo "Waiting new commands...";
			}else if($row["status"] == "changed"){
				$data = array(
					"camera" => $row["camera"],
					"surveillance" => $row["surveillance"]
				);
				// Set variables to default.
				$this->update_surveillance("default", "default");
				echo json_encode($data);
			}
		}
	}
}

// Define database and server settings:
$server = array(
	"name" => "localhost",
	"username" => "<_username_>",
	"password" => "<_password_>",
	"database" => "telegramsurveillance",
	"table" => "entries"

);

$conn = mysqli_connect($server["name"], $server["username"], $server["password"], $server["database"]);

// Define the new 'surveillance' object:
$surveillance = new telegram_surveillance();
$surveillance->__init__("<_bot_token_>", "https://api.telegram.org/bot", $conn, $server["table"]); // e.g., 123456:ABC-DEF1234ghIkl-zyx57W2v1u123ew11

// Get updates from the Telegram Bot API.
$updates = json_decode(file_get_contents('php://input'), TRUE); 
if($updates['update_id']){
	$chat_id =   $updates['message']['chat']['id'];
	$message = $updates['message']['text'];
    
	if($updates["message"]["photo"]){
		$surveillance->send_message($chat_id, "Thank you for sending me a photo but I cannot process it yet ");
	}else if($updates["message"]["video"]){
		$surveillance->send_message($chat_id, "Thank you for sending me a video but I cannot process it yet  ");
	}else if($updates["message"]["document"]){
		$surveillance->send_message($chat_id, "Thank you for sending me a file but I cannot process it yet  ");
	}else{
		// Commands:
		switch($message){
		  case '/start':
		  $surveillance->update_id($chat_id); // Register the chat ID to send messages without an update by the bot. 
		  $surveillance->send_message($chat_id, "Chat ID has been successfully registered to the database. Now, you can receive surveillance footage directly from Raspberry Pi if the fingerprint sensor is triggered. Or, you can request surveillance footage without triggering the fingerprint sensor.\n\nEnter /help to view all available commands.");
		  break;	
		  case '/enable_night_vision':
		  $surveillance->update_camera("night", "changed");
		  $surveillance->send_message($chat_id, " Night Vision Camera => Activated!");
		  break;	
		  case '/disable_night_vision':
		  $surveillance->update_camera("USB", "changed");
		  $surveillance->send_message($chat_id, " USB Webcam => Activated!");
		  break;
		  case '/status_check':
		  $surveillance->update_surveillance("footage", "changed");
		  $surveillance->send_message($chat_id, " Footage => Requested!");
		  break;		  
		  case '/last_access':
		  $access = $surveillance->get_last_access();
		  $surveillance->send_message($chat_id, " Last Access: \n\n$access");
		  break;
		  case '/entry_log':
		  $entries = $surveillance->get_entry_log();
		  $surveillance->send_message($chat_id, " Entry Log: \n\n$entries");
	      break;
		  case '/help':
		  $surveillance->send_message($chat_id, "/enable_night_vision - activate the 5MP Night Vision Camera\n/disable_night_vision - activate the USB Webcam (default)\n/status_check - run the surveillance system without triggered by the fingerprint sensor\n/last_access - display the preceding entry to the fingerprint sensor\n/entry_log - list all attempts to open the fingerprint smart lock");
		  break;	  
	    }
	}
}

// If requested, print information to update Raspberry Pi.
if(isset($_GET["data"])){
	$surveillance->print_and_manage_data();
}

// Save the captured photo and video transferred by Raspberry Pi. And, send them via Telegram Bot API.
if(!empty($_FILES["rasp_video"]["name"]) && !empty($_FILES["rasp_capture"]["name"])){
	// Update the last access (entry).
	$access = (isset($_POST['access'])) ? $_POST['access'] : "Not Detected!";
	$surveillance->update_access($access);
	// Get properties of the uploaded files.
	$video_properties = array(
	    "name" => $_FILES["rasp_video"]["name"],
	    "tmp_name" => $_FILES["rasp_video"]["tmp_name"],
		"size" => $_FILES["rasp_video"]["size"],
		"extension" => pathinfo($_FILES["rasp_video"]["name"], PATHINFO_EXTENSION)
	);
	$capture_properties = array(
	    "name" => $_FILES["rasp_capture"]["name"],
	    "tmp_name" => $_FILES["rasp_capture"]["tmp_name"],
		"size" => $_FILES["rasp_capture"]["size"],
		"extension" => pathinfo($_FILES["rasp_capture"]["name"], PATHINFO_EXTENSION)
	);
	// Check whether the uploaded file extensions are in allowed formats.
	$allowed_formats = array('jpg', 'png', 'mp4');
	if(!in_array($video_properties["extension"], $allowed_formats) || !in_array($capture_properties["extension"], $allowed_formats)){
		echo 'SERVER RESPONSE => File Format Not Allowed!\r\n';
	}else{
		// Check whether the uploaded file sizes exceed the data limit - 5MB.
		if($video_properties["size"] > 5000000 || $capture_properties["size"] > 5000000){
			echo 'SERVER RESPONSE => File size cannot exceed 5MB!\r\n';
		}else{
			$URL = "<_save_files_to_>"; // e.g., https://www.theamplituhedron.com/telegram_surveillance_bot/captured/
			$capture_path = $URL.$capture_properties["name"];
			$video_path = $URL.$video_properties["name"];
			
			// Upload files:
		    move_uploaded_file($video_properties["tmp_name"], "./captured/".$video_properties["name"]);
		    move_uploaded_file($capture_properties["tmp_name"], "./captured/".$capture_properties["name"]);
		    echo "SERVER RESPONSE => Files Uploaded Successfully!\r\n";
			
			// Send the recently uploaded files to the Telegram Bot with the registered chat ID:
			$chat_id = $surveillance->get_chat_id();
			$surveillance->send_photo($chat_id, $capture_path, $capture_properties["name"]);
			$surveillance->send_video($chat_id, $video_path, $video_properties["name"]);
		}
	}
}

?>

Credits

Kutluhan Aktar

Kutluhan Aktar

81 projects • 307 followers
AI & Full-Stack Developer | @EdgeImpulse | @Particle | Maker | Independent Researcher

Comments