Hardware components | ||||||
![]() |
| × | 1 | |||
![]() |
| × | 1 | |||
| × | 1 | ||||
| × | 3 | ||||
| × | 3 | ||||
| × | 2 | ||||
| × | 2 |
As kid I’ve always wanted to have a remote-controlled machine, but I’ve never had one. Now it was the time I made my own machine that I can customize the way I want.
import utime
from machine import Pin, ADC
# Motor Pins
pinsLeft = [15, 14, 13, 12]
pinsRight = [19, 18, 17, 16]
phasesRight = [Pin(pin, Pin.OUT) for pin in pinsRight]
phasesLeft = [Pin(pin, Pin.OUT) for pin in pinsLeft]
wave_drive = [
[1, 1, 0, 0],
[0, 1, 1, 0],
[0, 0, 1, 1],
[1, 0, 0, 1]
]
def go_forward(phase):
for pinL, pinR, value in zip(phasesLeft, phasesRight, phase):
pinL.value(value)
pinR.value(value)
def go_right(phase):
for pinL, pinR, value in zip(phasesLeft, phasesRight, phase):
pinL.value(value)
pinR.value([0, 0, 0, 0])
def go_left(phase):
for pinL, pinR, value in zip(phasesLeft, phasesRight, phase):
pinL.value([0, 0, 0, 0])
pinR.value(value)
# ADC with Pull-Up Class
class ADCwithPullUp(ADC):
def __init__(self, gpio, adc_vref=3.3):
self.gpio = gpio
self.adc_vref = adc_vref
adc_pin = Pin(gpio, mode=Pin.IN, pull=Pin.PULL_UP)
super().__init__(adc_pin)
adc_pin = Pin(gpio, mode=Pin.IN, pull=Pin.PULL_UP)
def sample(self):
self.adc_value = self.read_u16()
# Convert the ADC value to voltage
self.voltage = (self.adc_value / 65535) * self.adc_vref
return self.voltage
# Setup sensors and LED
adcs = list(map(ADCwithPullUp, [28, 27, 26]))
LED_Ir = Pin(22, mode=Pin.OUT)
LED_Ir.off()
def read_sensors():
LED_Ir.on()
sensor_values = list([adc.sample() for adc in adcs])
LED_Ir.off()
return list([1 if value < 1.0 else 0 for value in sensor_values])
# Line Following Algorithm
def follow_line(speed):
sensor_values = read_sensors()
print(sensor_values)
# Basic line following logic based on sensor readings
for phase in wave_drive:
if (sensor_values == [1, 1, 0]) or (sensor_values == [1, 0, 0]):
go_right(phase)
elif (sensor_values == [0, 1, 1]) or (sensor_values == [0, 0, 1]):
go_left(phase)
elif sensor_values == [0, 0, 0]:
pass
else:
go_forward(phase)
utime.sleep(speed)
from machine import UART, Pin, ADC
import modules_shared
import time
def crc8(data):
"""
Compute CRC8 using the One-Wire CRC8 algorithm.
"""
crc = 0x00
for byte in data:
crc ^= byte
for _ in range(8):
if (crc & 0x01):
crc = (crc >> 1) ^ 0x8C # X8 + X5 + X4 + 1
else:
crc >>= 1
return crc
def format_message(message, message_counter, source_type, source_id, destination_id):
"""
Format the message according to the specified format.
"""
preamble = [0x55, 0x55, 0x55, 0x33]
# Convert the message into key-value pairs (example: "key1:value1;key2:value2")
pairs = []
for pair in message.split(';'):
if ':' in pair:
key, value = pair.split(':', 1)
pairs.append(ord(key)) # Convert key to ASCII
pairs.append(len(value))
pairs.extend(value.encode()) # Append value
# Calculate lengths
content_length = len(pairs)
length_negated = 0xFF - content_length
# Construct the message
msg = preamble + [content_length, length_negated] + list(source_type) + [source_id, destination_id, message_counter] + pairs
# Calculate CRC8 and append
crc = crc8(msg[4:]) # CRC8 calculation starts from the length field
msg.append(crc)
msg.extend(map(ord, '\r\n'))
return bytes(msg)
def decode_message(received_message):
"""
Decode the received message and extract key-value pairs.
"""
try:
key = received_message[10]
message_size = received_message[11]
message_offset = 12 + message_size
message = received_message[12:message_offset]
return {chr(key): message.decode('ascii')}
except Exception as e:
print(f"Error decoding message: {e}")
return None
class ATUART(UART):
source_type = 'R'.encode()
source_id = 0x32
destination_id = 0x44
message_counter = 0
def req(self, echo=True):
self.responses = []
ln = self.readline()
if ln:
self.responses.append(ln)
try:
self.responses[-1] = self.responses[-1].decode('ascii').strip()
except UnicodeError:
pass
if echo:
decoded_data = decode_message(ln)
key, value = list(decoded_data.items())[0]
if key == '1':
with modules_shared.variable_lock:
modules_shared.action = value
if key == '2':
if value == 'slow':
with modules_shared.variable_lock:
if modules_shared.speed > 0.003:
modules_shared.speed = modules_shared.speed - 0.001
elif value == 'fast':
with modules_shared.variable_lock:
if modules_shared.speed < 0.01:
modules_shared.speed = modules_shared.speed + 0.001
return ln
def res(self):
"""Send a message over the UART connection."""
try:
speed_message = format_message(f"2:{modules_shared.speed}", self.message_counter, self.source_type, self.source_id, self.destination_id)
self.write(speed_message)
self.message_counter = (self.message_counter + 1) % 256
action_message = format_message(f"2:{modules_shared.action}", self.message_counter, self.source_type, self.source_id, self.destination_id)
self.write(action_message)
self.message_counter = (self.message_counter + 1) % 256
except Exception as e:
print(f"Error sending message: {e}")
def shell(self): # deschide un terminal interactiv de comunicare prin conexiunea UART asociata instantei
self.req()
self.res()
time.sleep(1)
from modules_follower import follow_line
import modules_shared
def run_machine():
try:
while not modules_shared.need_to_stop:
if modules_shared.action == 'go':
follow_line(modules_shared.speed)
elif modules_shared.action == 'stop':
pass
except Exception as e:
print("Exception:", e)
finally:
for pin in phasesRight + phasesLeft:
pin.value(0)
print("Application exited cleanly")
import time
from machine import Pin, freq, PWM, Timer
from array import array
from utime import ticks_us, ticks_diff
import modules_shared
class IR_RX:
Timer_id = -1 # Software timer but enable override
# Result/error codes
# Repeat button code
REPEAT = -1
# Error codes
BADSTART = -2
BADBLOCK = -3
BADREP = -4
OVERRUN = -5
BADDATA = -6
BADADDR = -7
def __init__(self, pin, nedges, tblock, callback, *args): # Optional args for callback
self._pin = pin
self._nedges = nedges
self._tblock = tblock
self.callback = callback
self.args = args
self._errf = lambda _: None
self.verbose = False
self._times = array("i", (0 for _ in range(nedges + 1))) # +1 for overrun
pin.irq(handler=self._cb_pin, trigger=(Pin.IRQ_FALLING | Pin.IRQ_RISING))
self.edge = 0
self.tim = Timer(self.Timer_id) # Defaul is sofware timer
self.cb = self.decode
# Pin interrupt. Save time of each edge for later decode.
def _cb_pin(self, line):
t = ticks_us()
# On overrun ignore pulses until software timer times out
if self.edge <= self._nedges: # Allow 1 extra pulse to record overrun
if not self.edge: # First edge received
self.tim.init(period=self._tblock, mode=Timer.ONE_SHOT, callback=self.cb)
self._times[self.edge] = t
self.edge += 1
def do_callback(self, cmd, addr, ext, thresh=0):
self.edge = 0
if cmd >= thresh:
self.callback(cmd, addr, ext, *self.args)
else:
self._errf(cmd)
def error_function(self, func):
self._errf = func
def close(self):
self._pin.irq(handler=None)
self.tim.deinit()
class NEC_ABC(IR_RX):
def __init__(self, pin, extended, samsung, callback, *args):
# Block lasts <= 80ms (extended mode) and has 68 edges
super().__init__(pin, 68, 80, callback, *args)
self._extended = extended
self._addr = 0
self._leader = 2500 if samsung else 4000 # 4.5ms for Samsung else 9ms
def decode(self, _):
try:
if self.edge > 68:
raise RuntimeError(self.OVERRUN)
width = ticks_diff(self._times[1], self._times[0])
if width < self._leader: # 9ms leading mark for all valid data
raise RuntimeError(self.BADSTART)
width = ticks_diff(self._times[2], self._times[1])
if width > 3000: # 4.5ms space for normal data
if self.edge < 68: # Haven't received the correct number of edges
raise RuntimeError(self.BADBLOCK)
# Time spaces only (marks are always 562.5µs)
# Space is 1.6875ms (1) or 562.5µs (0)
# Skip last bit which is always 1
val = 0
for edge in range(3, 68 - 2, 2):
val >>= 1
if ticks_diff(self._times[edge + 1], self._times[edge]) > 1120:
val |= 0x80000000
elif width > 1700: # 2.5ms space for a repeat code. Should have exactly 4 edges.
raise RuntimeError(self.REPEAT if self.edge == 4 else self.BADREP) # Treat REPEAT as error.
else:
raise RuntimeError(self.BADSTART)
addr = val & 0xff # 8 bit addr
cmd = (val >> 16) & 0xff
if cmd != (val >> 24) ^ 0xff:
raise RuntimeError(self.BADDATA)
if addr != ((val >> 8) ^ 0xff) & 0xff: # 8 bit addr doesn't match check
if not self._extended:
raise RuntimeError(self.BADADDR)
addr |= val & 0xff00 # pass assumed 16 bit address to callback
self._addr = addr
except RuntimeError as e:
cmd = e.args[0]
addr = self._addr if cmd == self.REPEAT else 0 # REPEAT uses last address
# Set up for new data burst and run user callback
self.do_callback(cmd, addr, 0, self.REPEAT)
class NEC_8(NEC_ABC):
def __init__(self, pin, callback, *args):
super().__init__(pin, False, False, callback, *args)
def callback(data, addr, ctrl):
global action, speed
if data < 0:
pass
else:
if data == 22:
with modules_shared.variable_lock:
modules_shared.action = 'go'
elif data == 13:
with modules_shared.variable_lock:
modules_shared.action = 'stop'
elif data == 82 and modules_shared.speed > 0.003:
with modules_shared.variable_lock:
modules_shared.speed = modules_shared.speed - 0.001
elif data == 24 and modules_shared.speed < 0.5:
with modules_shared.variable_lock:
modules_shared.speed = modules_shared.speed + 0.001
print(data)
print(modules_shared.action)
print(modules_shared.speed)
"""
from machine import Pin
import time
# Define the GPIO pin where the CHQ1838 OUT pin is connected
ir_pin = Pin(16, Pin.IN)
# Flag to indicate if "UUU3" sequence is detected
uuu3_detected = False
message_size = 0
# Buffer to store the next 11 bits after "UUU3" sequence
buffer = []
def ir_callback(pin):
global uuu3_detected, buffer, message_size
byteValue = pin.value()
buffer.append(byteValue)
if len(buffer) == 1 and buffer[0] == ['U']:
pass
elif len(buffer) == 2 and buffer[1] == ['U']:
pass
elif len(buffer) == 3 and buffer[2] == ['U']:
pass
elif len(buffer) == 4 and buffer[3] == ['3']:
uuu3_detected = True
elif len(buffer) <= 4:
buffer = []
elif uuu3_detected and len(buffer) == 12:
message_size = buffer[11]
elif uuu3_detected and len(buffer) == 13 + message_size:
print(f"Received {14+message_size} bits: ", buffer)
uuu3_detected = False
buffer = []
time.sleep(0.1)
ir_pin.irq(trigger=Pin.IRQ_FALLING | Pin.IRQ_RISING, handler=ir_callback)
print("IR Receiver Ready. Waiting for signals...")
# Main loop
while True:
time.sleep(1) # Sleep to reduce CPU usage
"""
from modules_bluetooth import ATUART
from modules_ir_receiver import NEC_8, callback
from machine import Pin
import modules_shared
def run_receivers():
pin_ir = Pin(5, Pin.IN)
ir = NEC_8(pin_ir, callback)
ser = ATUART(0, 9600, timeout=100, timeout_char=1) # tx=6, rx=7 GND=8
print('Automatic message sending over UART started! (use Ctrl+C to terminate!)')
try:
while not modules_shared.need_to_stop:
ser.shell()
except KeyboardInterrupt:
print('Shell terminated!')
import _thread
#Shared variables and a lock
action = 'go'
speed = 0.003
need_to_stop = False
variable_lock = _thread.allocate_lock()
from modules_receivers import run_receivers
from modules_drivers import run_machine
import modules_shared
import _thread
try:
receivers_thread = _thread.start_new_thread(run_machine, ())
run_receivers()
except Exception as e:
print(f"Error: {e}")
finally:
with modules_shared.variable_lock:
modules_shared.need_to_stop = True
print("Successful exit!")
Thanks to Peter Hitch.
Comments
Please log in or sign up to comment.