Hackster is hosting Hackster Holidays, Ep. 7: Livestream & Giveaway Drawing. Watch previous episodes or stream live on Friday!Stream Hackster Holidays, Ep. 7 on Friday!
Jacky JiangMaaz ZuberiAdiv AhsanBurak Kilic
Published

AutonomiX Innovators: Autonomous RC Car

Our project AutonomiX Innovators presents a smart car that expertly self-navigates for a safer, seamless driving experience.

IntermediateFull instructions provided221
AutonomiX Innovators: Autonomous RC Car

Things used in this project

Hardware components

Webcam, Logitech® HD Pro
Webcam, Logitech® HD Pro
×1
Portable Battery
×1
Optical Speed Encoder
×1
Portable Charger
×1
Raspberry Pi 4 Model B
Raspberry Pi 4 Model B
×1

Software apps and online services

OpenCV
OpenCV

Story

Read more

Code

main.py

Python
import cv2
import numpy as np
import math
import sys
import time
import RPi.GPIO as GPIO
import board
import busio
import adafruit_mcp4728
import matplotlib.pyplot as plt
import csv


i2c = busio.I2C(board.SCL, board.SDA)
mcp4728 = adafruit_mcp4728.MCP4728(i2c, 0x64)


#channel b is steering, channel a is power
#int(65535/4) should be straight for steering

# GPIO.setwarnings(False)

#throttle
# throttlePin = 25 # Physical pin 22
# in3 = 23 # physical Pin 16
# in4 = 24 # physical Pin 18
#additional throttle value??

#Steering
# steeringPin = 22 # Physical Pin 15
# in1 = 17 # Physical Pin 11
# in2 = 27 # Physical Pin 13


# GPIO.setmode(GPIO.BCM)
# GPIO.setup(in1,GPIO.OUT)
# GPIO.setup(in2,GPIO.OUT)
# GPIO.setup(in3,GPIO.OUT)
# GPIO.setup(in4,GPIO.OUT)

# GPIO.setup(throttlePin,GPIO.OUT)
# GPIO.setup(steeringPin,GPIO.OUT)
go_faster_tick_delay = 80
go_faster_tick = 0
current_speed = int(65535/2)
# Steering
# in1 = 1 and in2 = 0 -> Left
# GPIO.output(in1,GPIO.LOW)
# GPIO.output(in2,GPIO.LOW)
# steering = GPIO.PWM(steeringPin,1000)
# steering.stop()

# Throttle
# in3 = 1 and in4 = 0 -> Forward
# GPIO.output(in3,GPIO.HIGH)
# GPIO.output(in4,GPIO.LOW)
# throttle = GPIO.PWM(throttlePin,1000)
# throttle.stop()


#the value to speed up/ slow down


def getRedFloorBoundaries():

    """
    Gets the hsv boundaries and success boundaries indicating if the floor is red
    :return: [[lower color and success boundaries for red floor], [upper color and success boundaries for red floor]]
    """
    return getBoundaries("redboundaries.txt")

def getBoundaries(filename):
    """
    Reads the boundaries from the file filename
    Format:
        [0] lower: [H, S, V, lower percentage for classification of success]
        [1] upper: [H, S, V, upper percentage for classification of success]
    :param filename: file containing boundary information as above
    :return: [[lower color and success boundaries], [upper color and success boundaries]]
    """
    default_lower_percent = 50
    default_upper_percent = 100
    with open(filename, "r") as f:
        boundaries = f.readlines()
        lower_data = [val for val in boundaries[0].split(",")]
        upper_data = [val for val in boundaries[1].split(",")]

        if len(lower_data) >= 4:
            lower_percent = float(lower_data[3])
        else:
            lower_percent = default_lower_percent

        if len(upper_data) >= 4:
            upper_percent = float(upper_data[3])
        else:
            upper_percent = default_upper_percent

        lower = [int(x) for x in lower_data[:3]]
        upper = [int(x) for x in upper_data[:3]]
        boundaries = [lower, upper]
        percentages = [lower_percent, upper_percent]
    return boundaries, percentages

def isMostlyColor(image, boundaries):
    """
    Detects whether or not the majority of a color on the screen is a particular color
    :param image:
    :param boundaries: [[color boundaries], [success boundaries]]
    :return: boolean if image satisfies provided boundaries, and an image used for debugging
    """
    #Convert to HSV color space
    hsv_img = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)

    #parse out the color boundaries and the success boundaries
    color_boundaries = boundaries[0]
    percentage = boundaries[1]

    lower = np.array(color_boundaries[0])
    upper = np.array(color_boundaries[1])
    mask = cv2.inRange(hsv_img, lower, upper)
    output = cv2.bitwise_and(hsv_img, hsv_img, mask=mask)

    #Calculate what percentage of image falls between color boundaries
    percentage_detected = np.count_nonzero(mask) * 100 / np.size(mask)
    # print("percentage_detected " + str(percentage_detected) + " lower " + str(lower) + " upper " + str(upper))
    # If the percentage percentage_detected is betweeen the success boundaries, we return true, otherwise false for result
    result = percentage[0] < percentage_detected <= percentage[1]
    if result:
        # print(percentage_detected)
        pass
    return result, output

def stop():
    """_summary_

    Returns:
        _type_: _description_
    """
    mcp4728.channel_a.value = 0 # start going forward
    mcp4728.channel_b.value = 0 #go straight
    return 
    
def go():
    mcp4728.channel_a.value = int(65535/1.6) #start going forward
    mcp4728.channel_b.value = int(65535/2.7) #go straight
    return 
    
def speed_up():
    #update the channel a value by a small amount
    # Increase speed by updating channel A and B values
    current_value = mcp4728.channel_a.value
    if current_value < 65535:
        new_value = min(65535, current_value + 1000)  # Increase speed by 1000 units
        mcp4728.channel_a.value = int(new_value)
        print(f"Speed increased. New value: {new_value}")
    else:
        print("Maximum speed reached.")
    
def slow_down():
    # Decrease speed by updating channel A and B values
    current_value = mcp4728.channel_a.value
    if current_value > 0:
        new_value = max(0, current_value - 1000)  # Decrease speed by 1000 units
        mcp4728.channel_a.value = int(new_value)
        print(f"Speed decreased. New value: {new_value}")
    else:
        print("Car stopped.")
    

#adapt this to the correct color scheme
# def detect_orange(frame, threshold=1000):
#     '''
#     Detect orange color within a given threshold.
#     '''
#     hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
#     lower_orange = np.array([5, 100, 100], dtype="uint8")  # Define lower orange range
#     upper_orange = np.array([15, 255, 255], dtype="uint8")  # Define upper orange range
#     mask = cv2.inRange(hsv, lower_orange, upper_orange)
#     # cv2.imshow("Orange_color", mask)
#     return mask.sum() > threshold
def isRedFloorVisible(frame):
    """
    Detects whether or not the floor is red
    :param frame: Image
    :return: [(True is the camera sees a red on the floor, false otherwise), video output]
    """
    print("Checking for floor stop")
    boundaries = getRedFloorBoundaries()
    return isMostlyColor(frame, boundaries)

def detect_edges(frame):
    # filter for blue lane lines
    hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
    #cv2.imshow("HSV",hsv)
    lower_blue = np.array([90, 120, 0], dtype = "uint8")
    upper_blue = np.array([150, 255, 255], dtype="uint8")
    mask = cv2.inRange(hsv,lower_blue,upper_blue)
    #cv2.imshow("mask",mask)
    
    # detect edges
    edges = cv2.Canny(mask, 50, 100)
    #cv2.imshow("edges",edges)
    
    return edges

def region_of_interest(edges):
    height, width = edges.shape
    mask = np.zeros_like(edges)

    # only focus lower half of the screen
    polygon = np.array([[
        (0, height),
        (0,  height/2),
        (width , height/2),
        (width , height),
    ]], np.int32)
    
    cv2.fillPoly(mask, polygon, 255)
    
    cropped_edges = cv2.bitwise_and(edges, mask)
    cv2.imshow("roi",cropped_edges)
    
    return cropped_edges

def detect_line_segments(cropped_edges):
    rho = 1  
    theta = np.pi / 180  
    min_threshold = 10  
    
    line_segments = cv2.HoughLinesP(cropped_edges, rho, theta, min_threshold, 
                                    np.array([]), minLineLength=5, maxLineGap=150)

    return line_segments

def average_slope_intercept(frame, line_segments):
    lane_lines = []
    
    if line_segments is None:
        print("no line segments detected")
        return lane_lines

    height, width,_ = frame.shape
    left_fit = []
    right_fit = []

    boundary = 1/3
    left_region_boundary = width * (1 - boundary)
    right_region_boundary = width * boundary
    
    for line_segment in line_segments:
        for x1, y1, x2, y2 in line_segment:
            if x1 == x2:
                print("skipping vertical lines (slope = infinity")
                continue
            
            fit = np.polyfit((x1, x2), (y1, y2), 1)
            slope = (y2 - y1) / (x2 - x1)
            intercept = y1 - (slope * x1)
            
            if slope < 0:
                if x1 < left_region_boundary and x2 < left_region_boundary:
                    left_fit.append((slope, intercept))
            else:
                if x1 > right_region_boundary and x2 > right_region_boundary:
                    right_fit.append((slope, intercept))

    left_fit_average = np.average(left_fit, axis=0)
    if len(left_fit) > 0:
        lane_lines.append(make_points(frame, left_fit_average))

    right_fit_average = np.average(right_fit, axis=0)
    if len(right_fit) > 0:
        lane_lines.append(make_points(frame, right_fit_average))

    return lane_lines

def make_points(frame, line):
    height, width, _ = frame.shape
    
    slope, intercept = line
    
    y1 = height  # bottom of the frame
    y2 = int(y1 / 2)  # make points from middle of the frame down
    
    if slope == 0:
        slope = 0.1
        
    x1 = int((y1 - intercept) / slope)
    x2 = int((y2 - intercept) / slope)
    
    return [[x1, y1, x2, y2]]

def display_lines(frame, lines, line_color=(0, 255, 0), line_width=6):
    line_image = np.zeros_like(frame)
    
    if lines is not None:
        for line in lines:
            for x1, y1, x2, y2 in line:
                cv2.line(line_image, (x1, y1), (x2, y2), line_color, line_width)
                
    line_image = cv2.addWeighted(frame, 0.8, line_image, 1, 1)
    
    return line_image


def display_heading_line(frame, steering_angle, line_color=(0, 0, 255), line_width=5 ):
    heading_image = np.zeros_like(frame)
    height, width, _ = frame.shape
    
    steering_angle_radian = steering_angle / 180.0 * math.pi
    
    x1 = int(width / 2)
    y1 = height
    x2 = int(x1 - height / 2 / math.tan(steering_angle_radian))
    y2 = int(height / 2)
    
    cv2.line(heading_image, (x1, y1), (x2, y2), line_color, line_width)
    heading_image = cv2.addWeighted(frame, 0.8, heading_image, 1, 1)
    
    return heading_image

def get_steering_angle(frame, lane_lines):
    
    height,width,_ = frame.shape
    
    if len(lane_lines) == 2:
        _, _, left_x2, _ = lane_lines[0][0]
        _, _, right_x2, _ = lane_lines[1][0]
        mid = int(width / 2)
        x_offset = (left_x2 + right_x2) / 2 - mid
        y_offset = int(height / 2)
        
    elif len(lane_lines) == 1:
        x1, _, x2, _ = lane_lines[0][0]
        x_offset = x2 - x1
        y_offset = int(height / 2)
        
    elif len(lane_lines) == 0:
        x_offset = 0
        y_offset = int(height / 2)
        
    angle_to_mid_radian = math.atan(x_offset / y_offset)
    angle_to_mid_deg = int(angle_to_mid_radian * 180.0 / math.pi)  
    steering_angle = angle_to_mid_deg + 90
    
    return steering_angle
def plot_pd(p_vals, d_vals, error, show_img=False):
    fig, ax1 = plt.subplots()
    t_ax = np.arange(len(p_vals))
    ax1.plot(t_ax, p_vals, '-', label="P values")
    ax1.plot(t_ax, d_vals, '-', label="D values")
    ax2 = ax1.twinx()
    ax2.plot(t_ax, error, '--r', label="Error")

    ax1.set_xlabel("Frames")
    ax1.set_ylabel("PD Value")
    ax2.set_ylim(-90, 90)
    ax2.set_ylabel("Error Value")

    plt.title("PD Values over time")
    fig.legend()
    fig.tight_layout()
    plt.savefig("pd_plot.png")

    if show_img:
        plt.show()
    plt.clf()


def plot_pwm(speed_pwms, turn_pwms, error, show_img=False):
    fig, ax1 = plt.subplots()
    t_ax = np.arange(len(speed_pwms))
    ax1.plot(t_ax, speed_pwms, '-', label="Speed PWM")
    ax1.plot(t_ax, turn_pwms, '-', label="Steering PWM")
    ax2 = ax1.twinx()
    ax2.plot(t_ax, error, '--r', label="Error")

    ax1.set_xlabel("Frames")
    ax1.set_ylabel("PWM Values")
    ax2.set_ylabel("Error Value")

    plt.title("PWM Values over time")
    fig.legend()
    plt.savefig("pwm_plot.png")

    if show_img:
        plt.show()
    plt.clf()
video = cv2.VideoCapture(0)
video.set(cv2.CAP_PROP_FRAME_WIDTH,320)
video.set(cv2.CAP_PROP_FRAME_HEIGHT,240)

#wait for vid to load
time.sleep(1)

##fourcc = cv2.VideoWriter_fourcc(*'XVID')
##out = cv2.VideoWriter('Original15.avi',fourcc,10,(320,240))
##out2 = cv2.VideoWriter('Direction15.avi',fourcc,10,(320,240))

# speed = 8
# lastTime = 0
# lastError = 0

# kp = 0.4
# kd = kp * 0.65
# PD variables
#kp = 0.095
kp = 0.2
kd = kp * 0.1
lastTime = 0
lastError = 0

# counter for number of ticks
counter = 0
MAX_ITERATIONS = 2000  # Define the maximum number of loop iterations

# start the engines
go()

# arrays for making the final graphs
p_vals = []
d_vals = []
err_vals = []
speed_pwm = []
steer_pwm = []


stopSignCheck = 1
sightDebug = False
isStopSignBool = False
passedFirstStopSign = False
atStopLight = False
atStopLight = False

secondStopLightTick = 0



counter  = 0

while counter < 3000:
    print(counter)
    print("SPEED", mcp4728.channel_a.value)
    print("STEER", mcp4728.channel_b.value)
    speed_value = mcp4728.channel_a.value
    steer_value = mcp4728.channel_b.value

    ret,original_frame = video.read()
    # frame = cv2.flip(frame,-1)
    frame = cv2.resize(original_frame, (160, 120))
    #  The resolution of the camera needed to be chosen so that 
    # the blue lines of the lane could be seen and the slope of each 
    # line could be determined by the OpenCV modules in our main Python code.
    # Thus, we chose the maximum resolution supported by the camera that was also 
    # small enough to not hinder the performance of the program by making the processing 
    # very time and computation intensive; this resolution ended up being 160x120.
    
    # cv2.imshow("original",frame)
    
    if sightDebug:
        cv2.imshow("Resized Frame", frame)
    # reading the encoder data and changing the speed   
    time_diff = 7
    
    with open("/sys/module/gpiod_driver/parameters/elapsed_ms", "r") as filetoread:
        time_diff = int(filetoread.read())
    
    print("Time diff", time_diff)  
    
    # Encoder time me when I check the encoder
    if time_diff >= 120:
        speed_up()
    elif time_diff <= 110 and time_diff > 7:
       slow_down()
    
    # check for stop sign/traffic light every couple ticks
    if ((counter + 1) % stopSignCheck) == 0:
        # check for the first stop sign
        if not passedFirstStopSign:
            isStopSignBool, floorSight = isRedFloorVisible(frame)
            if sightDebug:
                cv2.imshow("floorSight", floorSight)
            if isStopSignBool:
                print("detected first stop sign, stopping")
                stop()
                time.sleep(2)
                passedFirstStopSign = True
                # this is used to not check for the second stop sign until many frames later
                secondStopSignTick = counter + 200
                # now check for stop sign less frequently
                stopSignCheck = 3
                # add a delay to calling go faster
                go_faster_tick = counter + go_faster_tick_delay
                print("first stop finished!")
                mcp4728.channel_a.value = int(65535/1.6)
                #speed_up()
                #speed_up()
        # check for the second stop sign
        elif passedFirstStopSign and counter > secondStopSignTick:
            isStop2SignBool, _ = isRedFloorVisible(frame)
            if isStop2SignBool:
                # last stop sign detected, exits while loop
                print("detected second stop sign, stopping")
                stop()
                break
    # makes car go faster, helps it have enough speed to get to the end of the course
   # if isStopSignBool and counter == go_faster_tick:
    #    print("Going FASTER")
     #   speed_up()
     #   current_speed += .01 

        
    edges = detect_edges(frame)
    roi = region_of_interest(edges)
    line_segments = detect_line_segments(roi)
    lane_lines = average_slope_intercept(frame,line_segments)
    lane_lines_image = display_lines(frame,lane_lines)
    steering_angle = get_steering_angle(frame, lane_lines)
    #heading_image = display_heading_line(lane_lines_image,steering_angle)
    #cv2.imshow("heading line",heading_image)

    now = time.time()
    dt = now - lastTime
    if sightDebug:
        cv2.imshow("Cropped sight", roi)
    deviation = steering_angle - 90
    # error = abs(deviation)
    
    print("steering angle", steering_angle)
    #constants for steering range
    left_range = int(65535/4)
    right_range = int(65535/1.2)
    left = 8.8
    right = 6.2
    
    # PD code
    error = -deviation
    base_turn = 7.75
    proportional = kp * error
    derivative = kd * (error - lastError) / dt
    
     # take values for graphs
    p_vals.append(proportional)
    d_vals.append(derivative)
    err_vals.append(error)
    
     # determine actual turn to do
    turn_amt = base_turn + proportional + derivative
    print("TURN AMNT", turn_amt)
    
     # caps turns to make PWM values
     # Map turn_amt to the steering range
    # caps turns to make PWM values
    if 7.2 < turn_amt < 7.8:
        turn_amt = 7.75 
        mcp4728.channel_b.value = int(65535/2.7)
        
    elif turn_amt > left:
        turn_amt = left
        print("TURN LEFT")
        mcp4728.channel_b.value = int(65535/15)
    elif turn_amt < right:
        turn_amt = right
        print("TURN RIGHT")
        mcp4728.channel_b.value = int(65535/1.5)

    else:
        turn_amt = turn_amt
    
        
    


    
    # take values for graphs
    steer_pwm.append(turn_amt)
    speed_pwm.append(current_speed)

    # update PD values for next loop
    lastError = error
    lastTime = time.time()
    # derivative = kd * (error - lastError) / dt
    # proportional = kp * error
    # PD = int(speed + derivative + proportional)
    # spd = abs(PD)

    # if spd > 25:
    #     spd = 25
        
    # throttle.start(spd)

    # lastError = error
    # lastTime = time.time()
        
##    out.write(frame)
##    out2.write(heading_image)

    key = cv2.waitKey(1)
    if key == 27:
        plot_pd(p_vals, d_vals, err_vals, True)
        break
    # update PD values for next loop
    lastError = error
    lastTime = time.time()



    counter += 1
    
video.release()
##out.release()
##out2.release()
cv2.destroyAllWindows()
stop()

plot_pd(p_vals, d_vals, err_vals, True)
#plot_pwm(speed_pwm, steer_pwm, err_vals, True)
# GPIO.output(in1,GPIO.LOW)
# GPIO.output(in2,GPIO.LOW)
# GPIO.output(in3,GPIO.LOW)
# GPIO.output(in4,GPIO.LOW)
# throttle.stop()
# steering.stop()

gpiod_driver.c

C/C++
#include <linux/module.h>
#include <linux/of_device.h>
#include <linux/kernel.h>
#include <linux/gpio/consumer.h>
#include <linux/platform_device.h>
#include <linux/interrupt.h>
#include <linux/module.h>
#include <linux/init.h>
#include <linux/fs.h>
#include <linux/device.h>
#include <linux/uaccess.h>
#include <asm/div64.h>

#define DEVICE_NAME "carencoder"
#define CLASS_NAME "carclass"

unsigned int irq_number;
struct gpio_desc *led_gpio;
struct gpio_desc *encoder_gpio;

static int elapsed_ms = 0;
ktime_t start_time, old_time, elapsed;

// Adds elapsed_ms to a parameters file under sys/modules/parameters
module_param(elapsed_ms, int, S_IRUGO);

static irq_handler_t encoder_irq_handler(unsigned int irq, void *dev_id, struct pt_regs *regs) {
    // Calculates the elapsed time by getting the current time and subtracting the
    // old time from it
    ktime_t new_time = ktime_get();
    elapsed = ktime_sub(new_time, old_time);
    old_time = new_time;
    elapsed_ms = ktime_divns(ktime_to_ns(elapsed),100000);

    printk("Elapsed: %i\n", elapsed);

    return (irq_handler_t) IRQ_HANDLED;
}


static int led_probe(struct platform_device *pdev) {
    
    printk("Setting up encoder IRQ.\n");
    old_time = ktime_get();

    encoder_gpio = devm_gpiod_get(&pdev->dev, "userbutton", GPIOD_IN);
    gpiod_set_debounce(encoder_gpio, 1000000);
    irq_number = gpiod_to_irq(encoder_gpio);
    //Checks if the irq request was successful
    if (request_irq(irq_number, (irq_handler_t)encoder_irq_handler, IRQF_TRIGGER_FALLING, "button_irq", NULL) !=0) {
        printk(KERN_INFO "Failed to request\n");
        return -1;
    }

    printk("Successfully requested IRQ.\n");

    return 0;
}


static int led_remove(struct platform_device *pdev) {
    struct device *temp_dev;
    // Frees the IRQ woah neat
    temp_dev = &pdev->dev;
    free_irq(irq_number, &temp_dev->id);
    printk("IRQ Freed!\n");
    return 0;
}


static struct of_device_id encoder_gpio_match[] = {
    {
    .compatible = "encoder-gpio",
    },
    {/* leave alone - keep this here (end node) */},
};

// Encoder GPIO driver
static struct platform_driver encoder_gpio_driver = {
    .probe = led_probe,
    .remove = led_remove,
    .driver = {
        .name = "this doesnt even matter",
        .owner = THIS_MODULE,
        .of_match_table = encoder_gpio_match,
    },
};

// Module stuff and things
module_platform_driver(encoder_gpio_driver);
MODULE_DESCRIPTION("Use of the accursed gpiod library to do some blinking LED stuff");
MODULE_AUTHOR("Maaz Zuberi");
MODULE_LICENSE("GPL v2");
MODULE_ALIAS("platform:encoder_gpio_driver");

dt_overlay.dts

Plain text
/dts-v1/;
/plugin/;

/ {
    fragment@0 {
        target-path  = "/";
        __overlay__ {
            fake_device { //new name
                compatible = "myd_overlay";
                led-gpios = <&gpio 5 0>;  // GPIO 5
                button-gpios = <&gpio 6 0>;  // GPIO 6 
            };
        };
    };
};

Credits

Jacky Jiang
1 project • 0 followers
Maaz Zuberi
1 project • 0 followers
Adiv Ahsan
0 projects • 0 followers
Burak Kilic
0 projects • 0 followers

Comments