Valentin Gîscă
Published

Line follower machine

Have you ever wanted to have your own toy machine that you can control remotely? Well, I wanted, and I made one.

IntermediateFull instructions provided2 hours37
Line follower machine

Things used in this project

Hardware components

Raspberry Pi Pico W
Raspberry Pi Pico W
×1
HC-05 Bluetooth Module
HC-05 Bluetooth Module
×1
CHQ1838
×1
IR receiver (generic)
×3
IR transmitter (generic)
×3
28BYJ-48
×2
ULN2003A
×2

Story

Read more

Schematics

CHQ1838 schematic

bluetooth schematic

Stepper motor schematic

Line follower driver schematic

Code

modules_follower.py

MicroPython
The line follower code
import utime
from machine import Pin, ADC
# Motor Pins
pinsLeft = [15, 14, 13, 12]
pinsRight = [19, 18, 17, 16]

phasesRight = [Pin(pin, Pin.OUT) for pin in pinsRight]
phasesLeft = [Pin(pin, Pin.OUT) for pin in pinsLeft]

wave_drive = [
    [1, 1, 0, 0],
    [0, 1, 1, 0],
    [0, 0, 1, 1],
    [1, 0, 0, 1]
]

def go_forward(phase):
    for pinL, pinR, value in zip(phasesLeft, phasesRight, phase):
        pinL.value(value)
        pinR.value(value)
        
def go_right(phase):
    for pinL, pinR, value in zip(phasesLeft, phasesRight, phase):
        pinL.value(value)
        pinR.value([0, 0, 0, 0])
        
def go_left(phase):
    for pinL, pinR, value in zip(phasesLeft, phasesRight, phase):
        pinL.value([0, 0, 0, 0])
        pinR.value(value)

# ADC with Pull-Up Class
class ADCwithPullUp(ADC):
    def __init__(self, gpio, adc_vref=3.3):
        self.gpio = gpio
        self.adc_vref = adc_vref
        adc_pin = Pin(gpio, mode=Pin.IN, pull=Pin.PULL_UP)
        super().__init__(adc_pin)
        adc_pin = Pin(gpio, mode=Pin.IN, pull=Pin.PULL_UP)
        
    def sample(self):
        self.adc_value = self.read_u16()
        # Convert the ADC value to voltage
        self.voltage = (self.adc_value / 65535) * self.adc_vref
        return self.voltage

# Setup sensors and LED
adcs = list(map(ADCwithPullUp, [28, 27, 26]))
LED_Ir = Pin(22, mode=Pin.OUT)
LED_Ir.off()

def read_sensors():
    LED_Ir.on()
    sensor_values = list([adc.sample() for adc in adcs])
    LED_Ir.off()
    return list([1 if value < 1.0 else 0 for value in sensor_values])

# Line Following Algorithm

def follow_line(speed):
    sensor_values = read_sensors()
    print(sensor_values)
    # Basic line following logic based on sensor readings  
    for phase in wave_drive:                
        if (sensor_values == [1, 1, 0]) or (sensor_values == [1, 0, 0]):
            go_right(phase)
        elif (sensor_values == [0, 1, 1]) or (sensor_values == [0, 0, 1]):
            go_left(phase)
        elif sensor_values == [0, 0, 0]:
            pass
        else:
            go_forward(phase)
        utime.sleep(speed) 
    

modules_bluetooth.py

MicroPython
The Bluetooth modules required for the project.
from machine import UART, Pin, ADC
import modules_shared

import time

def crc8(data):
    """
    Compute CRC8 using the One-Wire CRC8 algorithm.
    """
    crc = 0x00
    for byte in data:
        crc ^= byte
        for _ in range(8):
            if (crc & 0x01):
                crc = (crc >> 1) ^ 0x8C  # X8 + X5 + X4 + 1
            else:
                crc >>= 1
    return crc

def format_message(message, message_counter, source_type, source_id, destination_id):
    """
    Format the message according to the specified format.
    """
    preamble = [0x55, 0x55, 0x55, 0x33]

    # Convert the message into key-value pairs (example: "key1:value1;key2:value2")
    pairs = []
    for pair in message.split(';'):
        if ':' in pair:
            key, value = pair.split(':', 1)
            pairs.append(ord(key))  # Convert key to ASCII
            pairs.append(len(value))
            pairs.extend(value.encode())  # Append value

    # Calculate lengths
    content_length = len(pairs)
    length_negated = 0xFF - content_length

    # Construct the message
    msg = preamble + [content_length, length_negated] + list(source_type) + [source_id, destination_id, message_counter] + pairs

    # Calculate CRC8 and append
    crc = crc8(msg[4:])  # CRC8 calculation starts from the length field
    msg.append(crc)
    msg.extend(map(ord, '\r\n'))
    return bytes(msg)

def decode_message(received_message):
    """
    Decode the received message and extract key-value pairs.
    """
    try:
        key = received_message[10]
        message_size = received_message[11]
        message_offset = 12 + message_size
        message = received_message[12:message_offset]
        return {chr(key): message.decode('ascii')}
    except Exception as e:
        print(f"Error decoding message: {e}")
        return None

class ATUART(UART):
    source_type = 'R'.encode()
    source_id = 0x32
    destination_id = 0x44
    message_counter = 0

    def req(self, echo=True):
        self.responses = []
        ln = self.readline()
        if ln:
            self.responses.append(ln)
            try:
                self.responses[-1] = self.responses[-1].decode('ascii').strip()
            except UnicodeError:
                pass
            if echo:
                decoded_data = decode_message(ln)       
                key, value = list(decoded_data.items())[0]
                if key == '1':
                    with modules_shared.variable_lock:
                        modules_shared.action = value
                if key == '2':
                    if value == 'slow':
                        with modules_shared.variable_lock:
                            if modules_shared.speed > 0.003:
                                modules_shared.speed = modules_shared.speed - 0.001
                    elif value == 'fast':
                        with modules_shared.variable_lock:
                            if modules_shared.speed < 0.01:
                                modules_shared.speed = modules_shared.speed + 0.001
        return ln
    
    def res(self):
        """Send a message over the UART connection."""
        try:
            speed_message = format_message(f"2:{modules_shared.speed}", self.message_counter, self.source_type, self.source_id, self.destination_id)
            self.write(speed_message)
            self.message_counter = (self.message_counter + 1) % 256
            action_message = format_message(f"2:{modules_shared.action}", self.message_counter, self.source_type, self.source_id, self.destination_id)
            self.write(action_message)
            self.message_counter = (self.message_counter + 1) % 256

        except Exception as e:
            print(f"Error sending message: {e}")
    
    def shell(self): # deschide un terminal interactiv de comunicare prin conexiunea UART asociata instantei
        self.req()
        self.res()
        time.sleep(1)

modules_drivers.py

MicroPython
Code that runs the line follower code
from modules_follower import follow_line
import modules_shared
def run_machine():
    try:
        while not modules_shared.need_to_stop:
            if modules_shared.action == 'go':
                follow_line(modules_shared.speed)
            elif modules_shared.action == 'stop':
                pass
    except Exception as e:
        print("Exception:", e)
    finally:
        for pin in phasesRight + phasesLeft:
            pin.value(0)
        print("Application exited cleanly")

modules_ir_receiver.py

MicroPython
IR Receiver code
import time
from machine import Pin, freq, PWM, Timer
from array import array
from utime import ticks_us, ticks_diff
import modules_shared

class IR_RX:
    Timer_id = -1  # Software timer but enable override
    # Result/error codes
    # Repeat button code
    REPEAT = -1
    # Error codes
    BADSTART = -2
    BADBLOCK = -3
    BADREP = -4
    OVERRUN = -5
    BADDATA = -6
    BADADDR = -7

    def __init__(self, pin, nedges, tblock, callback, *args):  # Optional args for callback
        self._pin = pin
        self._nedges = nedges
        self._tblock = tblock
        self.callback = callback
        self.args = args
        self._errf = lambda _: None
        self.verbose = False

        self._times = array("i", (0 for _ in range(nedges + 1)))  # +1 for overrun
        pin.irq(handler=self._cb_pin, trigger=(Pin.IRQ_FALLING | Pin.IRQ_RISING))
        self.edge = 0
        self.tim = Timer(self.Timer_id)  # Defaul is sofware timer
        self.cb = self.decode

    # Pin interrupt. Save time of each edge for later decode.
    def _cb_pin(self, line):
        t = ticks_us()
        # On overrun ignore pulses until software timer times out
        if self.edge <= self._nedges:  # Allow 1 extra pulse to record overrun
            if not self.edge:  # First edge received
                self.tim.init(period=self._tblock, mode=Timer.ONE_SHOT, callback=self.cb)
            self._times[self.edge] = t
            self.edge += 1

    def do_callback(self, cmd, addr, ext, thresh=0):
        self.edge = 0
        if cmd >= thresh:
            self.callback(cmd, addr, ext, *self.args)
        else:
            self._errf(cmd)

    def error_function(self, func):
        self._errf = func

    def close(self):
        self._pin.irq(handler=None)
        self.tim.deinit()

class NEC_ABC(IR_RX):
    def __init__(self, pin, extended, samsung, callback, *args):
        # Block lasts <= 80ms (extended mode) and has 68 edges
        super().__init__(pin, 68, 80, callback, *args)
        self._extended = extended
        self._addr = 0
        self._leader = 2500 if samsung else 4000  # 4.5ms for Samsung else 9ms

    def decode(self, _):
        try:
            if self.edge > 68:
                raise RuntimeError(self.OVERRUN)
            width = ticks_diff(self._times[1], self._times[0])
            if width < self._leader:  # 9ms leading mark for all valid data
                raise RuntimeError(self.BADSTART)
            width = ticks_diff(self._times[2], self._times[1])
            if width > 3000:  # 4.5ms space for normal data
                if self.edge < 68:  # Haven't received the correct number of edges
                    raise RuntimeError(self.BADBLOCK)
                # Time spaces only (marks are always 562.5µs)
                # Space is 1.6875ms (1) or 562.5µs (0)
                # Skip last bit which is always 1
                val = 0
                for edge in range(3, 68 - 2, 2):
                    val >>= 1
                    if ticks_diff(self._times[edge + 1], self._times[edge]) > 1120:
                        val |= 0x80000000
            elif width > 1700: # 2.5ms space for a repeat code. Should have exactly 4 edges.
                raise RuntimeError(self.REPEAT if self.edge == 4 else self.BADREP)  # Treat REPEAT as error.
            else:
                raise RuntimeError(self.BADSTART)
            addr = val & 0xff  # 8 bit addr
            cmd = (val >> 16) & 0xff
            if cmd != (val >> 24) ^ 0xff:
                raise RuntimeError(self.BADDATA)
            if addr != ((val >> 8) ^ 0xff) & 0xff:  # 8 bit addr doesn't match check
                if not self._extended:
                    raise RuntimeError(self.BADADDR)
                addr |= val & 0xff00  # pass assumed 16 bit address to callback
            self._addr = addr
        except RuntimeError as e:
            cmd = e.args[0]
            addr = self._addr if cmd == self.REPEAT else 0  # REPEAT uses last address
        # Set up for new data burst and run user callback
        self.do_callback(cmd, addr, 0, self.REPEAT)

class NEC_8(NEC_ABC):
    def __init__(self, pin, callback, *args):
        super().__init__(pin, False, False, callback, *args)
        
def callback(data, addr, ctrl):
    global action, speed
    if  data < 0:
        pass
    else:
        if data == 22:
            with modules_shared.variable_lock:
                modules_shared.action = 'go'
        elif data == 13:
            with modules_shared.variable_lock:
                modules_shared.action = 'stop'
        elif data == 82 and modules_shared.speed > 0.003:
            with modules_shared.variable_lock:
                modules_shared.speed = modules_shared.speed - 0.001
        elif data == 24 and modules_shared.speed < 0.5:
            with modules_shared.variable_lock:
                modules_shared.speed = modules_shared.speed + 0.001
        print(data)
        print(modules_shared.action)
        print(modules_shared.speed)
"""
from machine import Pin
import time

# Define the GPIO pin where the CHQ1838 OUT pin is connected
ir_pin = Pin(16, Pin.IN)

# Flag to indicate if "UUU3" sequence is detected
uuu3_detected = False
message_size = 0
# Buffer to store the next 11 bits after "UUU3" sequence
buffer = []

def ir_callback(pin):
    global uuu3_detected, buffer, message_size

    byteValue = pin.value()
    buffer.append(byteValue)
    if len(buffer) == 1 and buffer[0] == ['U']:
        pass
    elif len(buffer) == 2 and buffer[1] == ['U']:
        pass
    elif len(buffer) == 3 and buffer[2] == ['U']:
        pass
    elif len(buffer) == 4 and buffer[3] == ['3']:
        uuu3_detected = True
    elif len(buffer) <= 4:
        buffer = []
    elif uuu3_detected and len(buffer) == 12:
        message_size = buffer[11]
    elif uuu3_detected and len(buffer) == 13 + message_size:
        print(f"Received {14+message_size} bits: ", buffer)
        uuu3_detected = False
        buffer = []

time.sleep(0.1)
ir_pin.irq(trigger=Pin.IRQ_FALLING | Pin.IRQ_RISING, handler=ir_callback)

print("IR Receiver Ready. Waiting for signals...")

# Main loop
while True:
    time.sleep(1)  # Sleep to reduce CPU usage
"""

modules_receivers.py

MicroPython
The code that runs the receivers.
from modules_bluetooth import ATUART
from modules_ir_receiver import NEC_8, callback
from machine import Pin
import modules_shared

def run_receivers():
    pin_ir = Pin(5, Pin.IN)
    ir = NEC_8(pin_ir, callback)

    ser = ATUART(0, 9600, timeout=100, timeout_char=1)  # tx=6, rx=7 GND=8
    print('Automatic message sending over UART started! (use Ctrl+C to terminate!)')
    try:
        while not modules_shared.need_to_stop:
            ser.shell()
    except KeyboardInterrupt:
        print('Shell terminated!')

modules_shared.py

MicroPython
Global variables
import _thread

#Shared variables and a lock
action = 'go'
speed = 0.003
need_to_stop = False
variable_lock = _thread.allocate_lock()

machine_combined.py

MicroPython
The main code
from modules_receivers import run_receivers
from modules_drivers import run_machine
import modules_shared
import _thread

try:
    receivers_thread = _thread.start_new_thread(run_machine, ())
    run_receivers()
except Exception as e:
        print(f"Error: {e}")
finally:
    with modules_shared.variable_lock:
        modules_shared.need_to_stop = True
    print("Successful exit!")

Credits

Valentin Gîscă
1 project • 0 followers
Contact
Thanks to Peter Hitch.

Comments

Please log in or sign up to comment.