Hackster is hosting Hackster Holidays, Ep. 4: Livestream & Giveaway Drawing. Watch previous episodes or stream live on Wednesday!Stream Hackster Holidays, Ep. 4 on Wednesday!
Kutluhan Aktar
Published © CC BY

AI-driven IoT 3D Printer Motion & Status Tracker w/ Telegram

Track lateral and vertical movements of a 3D printer via AprilTags. Then, get informed of malfunctions related to motion via Telegram.

ExpertFull instructions provided4,156
AI-driven IoT 3D Printer Motion & Status Tracker w/ Telegram

Things used in this project

Hardware components

Raspberry Pi Pico
Raspberry Pi Pico
×1
WIZnet Ethernet HAT
WIZnet Ethernet HAT
×1
DFRobot HuskyLens AI Camera
×1
Raspberry Pi 3 Model B+
Raspberry Pi 3 Model B+
Raspberry Pi 3B+ or 4
×1
Raspberry Pi 4 Model B
Raspberry Pi 4 Model B
Raspberry Pi 3B+ or 4
×1
Creality CR-6 SE 3D Printer
×1
Keyes 10mm RGB LED Module (140C05)
×1
SparkFun Buzzer
×1
LAN Ethernet Cable
×1
Xiaomi 20000 mAh 3 Pro Type-C Power Bank
×1
USB Buck-Boost Converter Board
×1
Mini Breadboard
×2
Jumper wires (generic)
Jumper wires (generic)
×1

Software apps and online services

Raspbian
Raspberry Pi Raspbian
Thonny
Adafruit CircuitPython
Fusion
Autodesk Fusion
Ultimaker Cura
Visual Studio 2017
Microsoft Visual Studio 2017

Hand tools and fabrication machines

Soldering iron (generic)
Soldering iron (generic)
Solder Wire, Lead Free
Solder Wire, Lead Free
Hot glue gun (generic)
Hot glue gun (generic)

Story

Read more

Custom parts and enclosures

3D_printer_movement_and_status_tracker_case_v1.stl

3D_printer_movement_and_status_tracker_cover_v1.stl

telegram_3D_printer_bot.zip

lib.zip

Fritzing File

Schematics

Connections

WIZnet Ethernet HAT Pinout

Code

main.py

Python
# AI-driven IoT 3D Printer Motion & Status Tracker w/ Telegram

# Windows, Linux, or Ubuntu

# By Kutluhan Aktar

# Track lateral and vertical movements of a 3D printer via AprilTags. Then, get informed of malfunctions related to motion via Telegram.

# For more information:
# https://www.theamplituhedron.com/projects/IoT_3D_Printer_Motion_Status_Tracker_w_Telegram/

import board
import busio
import digitalio
import time
import adafruit_requests as requests
from adafruit_wiznet5k.adafruit_wiznet5k import WIZNET5K
import adafruit_wiznet5k.adafruit_wiznet5k_socket as socket
from huskylib import HuskyLensLibrary
from time import sleep
from machine import Pin, PWM

class IoT_3D_printer_tracker:
    def __init__(self, SPI0_SCK, SPI0_TX, SPI0_RX, SPI0_CSn, W5x00_RSTn):
        # Define the PHP web application path.
        self.App = "http://192.168.1.20/telegram_3D_printer_bot/"
        # Setup the network configuration settings:
        MY_MAC = (0xDE, 0xAD, 0xBE, 0xEF, 0xFE, 0xED)
        IP_ADDRESS = (192, 168, 1, 24)
        SUBNET_MASK = (255, 255, 255, 0)
        GATEWAY_ADDRESS = (192, 168, 1, 1)
        DNS_SERVER = (8, 8, 8, 8)
        # Define the WIZnet Ethernet HAT pin settings:
        ethernetRst = digitalio.DigitalInOut(W5x00_RSTn)
        ethernetRst.direction = digitalio.Direction.OUTPUT
        cs = digitalio.DigitalInOut(SPI0_CSn)
        spi_bus = busio.SPI(SPI0_SCK, MOSI=SPI0_TX, MISO=SPI0_RX)
        # Reset the WIZnet Ethernet HAT.
        ethernetRst.value = False
        sleep(1)
        ethernetRst.value = True    
        # Initialize the Ethernet interface without DHCP.
        self.eth = WIZNET5K(spi_bus, cs, is_dhcp=False, mac=MY_MAC, debug=False)
        # Set the network configuration.
        self.eth.ifconfig = (IP_ADDRESS, SUBNET_MASK, GATEWAY_ADDRESS, DNS_SERVER)
        # Optional: Initialize the Ethernet interface with DHCP.
        #eth = WIZNET5K(spi_bus, cs, is_dhcp=True, mac=MY_MAC, debug=False)
        # Print the WIZnet Ethernet HAT information.
        print("Chip Version: ", self.eth.chip)
        print("\nMAC Address: ", [hex(i) for i in self.eth.mac_address])
        print()
        # Define HuskyLens settings.
        self.husky_lens = HuskyLensLibrary("I2C", Pin(4), Pin(5), 0x32)
        # Define the RGB LED settings:
        self.red = PWM(Pin(10))
        self.green = PWM(Pin(11))
        self.blue = PWM(Pin(12))
        self.red.freq(1000) 
        self.green.freq(1000)
        self.blue.freq(1000)
        # Define the buzzer:
        self.buzzer = PWM(Pin(13))
        self.buzzer.freq(450)
        self.adjust_color(0, 0, 65025)
    # Detect the 3D printer lateral and vertical motions via tags (AprilTags). Then, transfer the current printer motions to the given web application.
    def detect_and_transfer_motions(self, rest):
            detected_tag = self.husky_lens.command_request_blocks_learned()
            tag_number = len(detected_tag)
            if (tag_number == 3):
                self.adjust_color(65025, 65025, 0)
                x_axis = str(detected_tag[0][0]) + "," + str(detected_tag[0][1])
                y_axis = str(detected_tag[1][0]) + "," + str(detected_tag[1][1])
                z_axis = str(detected_tag[2][0]) + "," + str(detected_tag[2][1])
                print()
                print("X-Axis: " + x_axis)
                print("Y-Axis: " + y_axis)
                print("Z-Axis: " + z_axis)
                # Define the parameters to transfer printer movements to the web application.
                parameters = {"x_axis" : x_axis, "y_axis" : y_axis, "z_axis" : z_axis}
                # Initialize a requests object with the Ethernet interface and a socket.
                requests.set_socket(socket, self.eth)
                print()
                # Make an HTTP POST request to the web application.
                r = requests.post(self.App, data=parameters)
                print("-" * 60)
                print("Data Saved and Transferred to the Telegram Bot Successfully!")
                print("-" * 60)
                r.close()
                # Notify the user after sending the current printer motions.
                self.adjust_color(0, 65025, 0)
                self.buzzer.duty_u16(65025)
                sleep(5)
                self.buzzer.duty_u16(0)
                self.adjust_color(65025, 0, 65025)
            else:
                print(detected_tag)
            # Maintain the DHCP lease.
            self.eth.maintain_dhcp_lease()
            sleep(rest)
    # Change the RGB LED's color. 
    def adjust_color(self, red_x, green_x, blue_x):
        self.red.duty_u16(65025-red_x)
        self.green.duty_u16(65025-green_x)
        self.blue.duty_u16(65025-blue_x)
                
# Define the new 'tracker' class object.
tracker = IoT_3D_printer_tracker(board.GP18, board.GP19, board.GP16, board.GP17, board.GP15)
        
while True:
    tracker.detect_and_transfer_motions(30)

index.php

PHP
<?php

// Define the IoT_3D_printer_tracker class and its functions:
class IoT_3D_printer_tracker {
	public $token, $web_path, $conn, $table;
	
	public function __init__($token, $server, $conn, $table){
		$this->token = $token;
		$this->web_path = $server.$token;
		$this->conn = $conn;
		$this->table = $table;
	}
	
	// Telegram:
	public function send_message($chat_id, $string){
		$new_message = $this->web_path."/sendMessage?chat_id=".$chat_id."&text=".urlencode($string);
		file_get_contents($new_message);
	}
	
	public function send_photo($chat_id, $photo, $caption){
	    $new_photo = $this->web_path."/sendPhoto?chat_id=".$chat_id."&photo=".$photo."&caption=".$caption;
	    file_get_contents($new_photo);
	}
	
    // Database -> Insert Data:
	public function insert_new_data($x_axis, $y_axis, $z_axis, $date){
		$sql = "INSERT INTO `$this->table`(`x_axis`, `y_axis`, `z_axis`, `date`) VALUES ('$x_axis', '$y_axis', '$z_axis', '$date')";
		mysqli_query($this->conn, $sql);
	}

	// Database -> Retrieve Data:
	public function get_data_from_database(){
		$sql = "SELECT * FROM `$this->table` ORDER BY id DESC LIMIT 1";
		$result = mysqli_query($this->conn, $sql);
		if($row = mysqli_fetch_assoc($result)){
			return $row;
		}
	}
	
	// Database -> Create Table
	public function database_create_table(){
		// Create a new database table.
		$sql_create = "CREATE TABLE `$this->table`(
						id int AUTO_INCREMENT PRIMARY KEY NOT NULL,
						x_axis varchar(255) NOT NULL,
					    y_axis varchar(255) NOT NULL,
						z_axis varchar(255) NOT NULL,
						`date` varchar(255) NOT NULL
					   );";
		if(mysqli_query($this->conn, $sql_create)) echo("<br><br>Database Table Created Successfully!");
		// Insert the default data items as the first row into the given database table.
		$sql_insert = "INSERT INTO `$this->table`(`x_axis`, `y_axis`, `z_axis`, `date`) VALUES ('0,0', '0,0', '0,0', 'default')";
		if(mysqli_query($this->conn, $sql_insert)) echo("<br><br>Default Data Items Inserted Successfully!");
	}
}

// Define database and server settings:
$server = array(
	"name" => "localhost",
	"username" => "root",
	"password" => "bot",
	"database" => "telegram3dprinter",
	"table" => "entries"

);

$conn = mysqli_connect($server["name"], $server["username"], $server["password"], $server["database"]);

// Define the new 'printer_tracker' object:
$printer_tracker = new IoT_3D_printer_tracker();
$bot_token = "<_____________________>"; // e.g., 123456:ABC-DEF1234ghIkl-zyx57W2v1u123ew11
$printer_tracker->__init__($bot_token, "https://api.telegram.org/bot", $conn, $server["table"]); 


// Get updates on the 3D printer's lateral and vertical motions (X-axis, Y-axis, and Z-axis) from Raspberry Pi Pico so as to detect potential malfunctions.
if(isset($_POST["x_axis"]) && isset($_POST["y_axis"]) && isset($_POST["z_axis"])){
	// Obtain the latest printer motions stored in the given database table.
	$row = $printer_tracker->get_data_from_database();
	$previous_motions = [
		"x_axis" => [intval(explode(",", $row["x_axis"])[0]), intval(explode(",", $row["x_axis"])[1])],
		"y_axis" => [intval(explode(",", $row["y_axis"])[0]), intval(explode(",", $row["y_axis"])[1])],
		"z_axis" => [intval(explode(",", $row["z_axis"])[0]), intval(explode(",", $row["z_axis"])[1])]
	];
	// Obtain the current printer motions transferred by Raspberry Pi Pico.
	$motions = [
		"x_axis" => [intval(explode(",", $_POST["x_axis"])[0]), intval(explode(",", $_POST["x_axis"])[1])],
		"y_axis" => [intval(explode(",", $_POST["y_axis"])[0]), intval(explode(",", $_POST["y_axis"])[1])],
		"z_axis" => [intval(explode(",", $_POST["z_axis"])[0]), intval(explode(",", $_POST["z_axis"])[1])]
	];
	
	// Send the current printer motions to the given Telegram bot. 
	$chat_id = ""; // 1496498083
	$date = date("Y/m/d__h:i:s A");
	$printer_tracker->send_message($chat_id, " Recent Printer Movements:\n\n  $date\n\n  X-Axis: ".$_POST["x_axis"]."\n\n  Y-Axis: ".$_POST["y_axis"]."\n\n  Z-Axis: ".$_POST["z_axis"]);
	// Check for potential malfunctions related to the printer's lateral or vertical motion.
	if($previous_motions["x_axis"][0] == $motions["x_axis"][0] && $previous_motions["x_axis"][1] == $motions["x_axis"][1] && $previous_motions["y_axis"][0] == $motions["y_axis"][0] && $previous_motions["y_axis"][1] == $motions["y_axis"][1] && $previous_motions["z_axis"][0] == $motions["z_axis"][0] && $previous_motions["z_axis"][1] == $motions["z_axis"][1]){
		$printer_tracker->send_message($chat_id, " 3D Printer is not working!");
	}else{
		if($previous_motions["x_axis"][0] == $motions["x_axis"][0] && $previous_motions["x_axis"][1] == $motions["x_axis"][1]){
			$printer_tracker->send_message($chat_id, " X-Axis: Potential Malfunction Detected!");
		}
		if($previous_motions["y_axis"][0] == $motions["y_axis"][0] && $previous_motions["y_axis"][1] == $motions["y_axis"][1]){
			$printer_tracker->send_message($chat_id, " Y-Axis: Potential Malfunction Detected!");
		}
		if($previous_motions["z_axis"][0] == $motions["z_axis"][0] && $previous_motions["z_axis"][1] == $motions["z_axis"][1]){
			$printer_tracker->send_message($chat_id, " Z-Axis: Potential Malfunction Detected!");
		}
	}
	
	// Send a schematic describing 3D printer motions.
	$printer_tracker->send_photo($chat_id, "https://ars.els-cdn.com/content/image/3-s2.0-B9780128145647000031-f03-03-9780128145647.jpg", "3D Printer Motions");
	
	// Save the current printer motions to the given database table.
	$printer_tracker->insert_new_data($_POST["x_axis"], $_POST["y_axis"], $_POST["z_axis"], $date);
	echo("Data Saved and Transferred to the Telegram Bot Successfully!");
	
}else{
	echo("Waiting for new data...");
}

// If requested, create a new database table, including the default data items in the first row.
if(isset($_GET["create_table"]) && $_GET["create_table"] == "OK") $printer_tracker->database_create_table();

?>

huskylib.py

Python
#Library credits: RRoy  https://community.dfrobot.com/makelog-310469.html
 
import ubinascii
import time
from machine import UART,I2C,Pin
 
commandHeaderAndAddress = "55AA11"
algorthimsByteID = {
    "ALGORITHM_OBJECT_TRACKING": "0100",
    "ALGORITHM_FACE_RECOGNITION": "0000",
    "ALGORITHM_OBJECT_RECOGNITION": "0200",
    "ALGORITHM_LINE_TRACKING": "0300",
    "ALGORITHM_COLOR_RECOGNITION": "0400",
    "ALGORITHM_TAG_RECOGNITION": "0500",
    "ALGORITHM_OBJECT_CLASSIFICATION": "0600"
}
 
COMMAND_REQUEST_CUSTOMNAMES= 0x2f
COMMAND_REQUEST_TAKE_PHOTO_TO_SD_CARD = 0x30
COMMAND_REQUEST_SAVE_MODEL_TO_SD_CARD = 0x32
COMMAND_REQUEST_LOAD_MODEL_FROM_SD_CARD = 0x33
COMMAND_REQUEST_CUSTOM_TEXT = 0x34
COMMAND_REQUEST_CLEAR_TEXT = 0x35
COMMAND_REQUEST_LEARN_ONECE = 0x36
COMMAND_REQUEST_FORGET = 0x37
COMMAND_REQUEST_SCREENSHOT_TO_SD_CARD = 0x39
COMMAND_REQUEST_FIRMWARE_VERSION = 0x3C
 
class HuskyLensLibrary:
    def __init__(self, proto, SDA, SCL, address):
        self.proto=proto
        self.address=address
        if(self.proto=="SERIAL"):
            self.huskylensSer = UART(2,baudrate=9600,rx=33,tx=32,timeout=100)
        else :
            self.huskylensSer = I2C(0, scl=SCL, sda=SDA, freq=100000)
        self.lastCmdSent = ""
 
    def writeToHuskyLens(self, cmd):
        self.lastCmdSent = cmd
        if(self.proto=="SERIAL"):
            self.huskylensSer.write(cmd)
        else :
            self.huskylensSer.writeto(self.address, cmd)
 
    def calculateChecksum(self, hexStr):
        total = 0
        for i in range(0, len(hexStr), 2):
            total += int(hexStr[i:i+2], 16)
        hexStr = hex(total)[-2:]
        return hexStr
 
    def cmdToBytes(self, cmd):
        return ubinascii.unhexlify(cmd)
 
    def splitCommandToParts(self, str):
        headers = str[0:4]
        address = str[4:6]
        data_length = int(str[6:8], 16)
        command = str[8:10]
        if(data_length > 0):
            data = str[10:10+data_length*2]
        else:
            data = []
        checkSum = str[2*(6+data_length-1):2*(6+data_length-1)+2]
        return [headers, address, data_length, command, data, checkSum]
 
    def getBlockOrArrowCommand(self):
        if(self.proto=="SERIAL"):
                    byteString = self.huskylensSer.read(5)
                    byteString += self.huskylensSer.read(int(byteString[3]))
                    byteString += self.huskylensSer.read(1)
        else:
                    byteString  =self.huskylensSer.readfrom(self.address,5)
                    ##print("______")
                    ##print(byteString)
                    ##print(byteString[3])
                    byteString +=self.huskylensSer.readfrom(self.address,byteString[3]+1)
                    ##print("=======")
                    ##print(byteString)
        commandSplit = self.splitCommandToParts(''.join(['%02x' % b for b in byteString]))
        return commandSplit[4]
 
    def processReturnData(self):
        inProduction = True
        if(inProduction):
            try:
                if(self.proto=="SERIAL"):
                    byteString = self.huskylensSer.read(5)
                    byteString += self.huskylensSer.read(int(byteString[3]))
                    byteString += self.huskylensSer.read(1)
                else:
                    byteString  =self.huskylensSer.readfrom(self.address,5)
                    ##print("______")
                    ##print(byteString)
                    ##print(byteString[3])
                    byteString +=self.huskylensSer.readfrom(self.address,byteString[3]+1)
                    ##print("=======")
                    ##print(byteString)
                commandSplit = self.splitCommandToParts(''.join(['%02x' % b for b in byteString]))
                if(commandSplit[3] == "2e"):
                    return "Knock Recieved"
                else:
                    returnData = []
                    numberOfBlocksOrArrow = int(
                        commandSplit[4][2:4]+commandSplit[4][0:2], 16)
                    numberOfIDLearned = int(
                        commandSplit[4][6:8]+commandSplit[4][4:6], 16)
                    frameNumber = int(
                        commandSplit[4][10:12]+commandSplit[4][8:10], 16)
                    for i in range(numberOfBlocksOrArrow):
                        returnData.append(self.getBlockOrArrowCommand())
                    finalData=[]
                    tmp=[]
                    for i in returnData:
                        tmp=[]
                        for q in range(0,len(i),4):
                            tmp.append(int(i[q:q+2],16)+int(i[q+2:q+4],16))
                        finalData.append(tmp)
                        tmp=[]
                    return finalData
            except:
                 print("Read error")
                 return []
                
    def command_request_knock(self):
        cmd = self.cmdToBytes(commandHeaderAndAddress+"002c3c")
        self.writeToHuskyLens(cmd)
        return self.processReturnData()
 
    def command_request(self):
        cmd = self.cmdToBytes(commandHeaderAndAddress+"002030")
        self.writeToHuskyLens(cmd)
        return self.processReturnData()
 
    def command_request_blocks(self):
        cmd = self.cmdToBytes(commandHeaderAndAddress+"002131")
        self.writeToHuskyLens(cmd)
        return self.processReturnData()
 
    def command_request_arrows(self):
        cmd = self.cmdToBytes(commandHeaderAndAddress+"002232")
        self.writeToHuskyLens(cmd)
        return self.processReturnData()
 
    def command_request_learned(self):
        cmd = self.cmdToBytes(commandHeaderAndAddress+"002333")
        self.writeToHuskyLens(cmd)
        return self.processReturnData()
 
    def command_request_blocks_learned(self):
        cmd = self.cmdToBytes(commandHeaderAndAddress+"002434")
        self.writeToHuskyLens(cmd)
        return self.processReturnData()
 
    def command_request_arrows_learned(self):
        cmd = self.cmdToBytes(commandHeaderAndAddress+"002535")
        self.writeToHuskyLens(cmd)
        return self.processReturnData()
 
    def line_tracking_mode(self):
        cmd = self.cmdToBytes(commandHeaderAndAddress+"022d030042")
        self.writeToHuskyLens(cmd)
        return self.processReturnData()
 
    def face_recognition_mode(self):
        cmd = self.cmdToBytes(commandHeaderAndAddress+"022d00003f")
        self.writeToHuskyLens(cmd)
        return self.processReturnData()
 
    def object_tracking_mode(self):
        cmd = self.cmdToBytes(commandHeaderAndAddress+"022d010040")
        self.writeToHuskyLens(cmd)
        return self.processReturnData()
 
    def object_recognition_mode(self):
        cmd = self.cmdToBytes(commandHeaderAndAddress+"022d020041")
        self.writeToHuskyLens(cmd)
        return self.processReturnData()
 
    def color_recognition_mode(self):
        cmd = self.cmdToBytes(commandHeaderAndAddress+"022d040043")
        self.writeToHuskyLens(cmd)
        return self.processReturnData()
 
    def tag_recognition_mode(self):
        cmd = self.cmdToBytes(commandHeaderAndAddress+"022d050044")
        self.writeToHuskyLens(cmd)
        return self.processReturnData()
 
    def command_request_by_id(self, idVal):
        idVal = "{:04x}".format(idVal)
        idVal = idVal[2:]+idVal[0:2]
        cmd = commandHeaderAndAddress+"0226"+idVal
        cmd += self.calculateChecksum(cmd)
        cmd = self.cmdToBytes(cmd)
        self.writeToHuskyLens(cmd)
        return self.processReturnData()
 
    def command_request_blocks_by_id(self, idVal):
        idVal = "{:04x}".format(idVal)
        idVal = idVal[2:]+idVal[0:2]
        cmd = commandHeaderAndAddress+"0227"+idVal
        cmd += self.calculateChecksum(cmd)
        cmd = self.cmdToBytes(cmd)
        self.writeToHuskyLens(cmd)
        return self.processReturnData()
 
    def command_request_arrows_by_id(self, idVal):
        idVal = "{:04x}".format(idVal)
        idVal = idVal[2:]+idVal[0:2]
        cmd = commandHeaderAndAddress+"0228"+idVal
        cmd += self.calculateChecksum(cmd)
        cmd = self.cmdToBytes(cmd)
        self.writeToHuskyLens(cmd)
        return self.processReturnData()
 
    def command_request_algorthim(self, alg):
        if alg in algorthimsByteID:
            cmd = commandHeaderAndAddress+"022d"+algorthimsByteID[alg]
            cmd += self.calculateChecksum(cmd)
            cmd = self.cmdToBytes(cmd)
            self.writeToHuskyLens(cmd)
            return self.processReturnData()
        else:
            print("INCORRECT ALGORITHIM NAME")
            

Credits

Kutluhan Aktar

Kutluhan Aktar

82 projects • 310 followers
AI & Full-Stack Developer | @EdgeImpulse | @Particle | Maker | Independent Researcher

Comments