Welcome to Hackster!
Hackster is a community dedicated to learning hardware, from beginner to pro. Join us, it's free!
Arjun KannanNoah GilesAlex TorresPrahalad ChariThanh Nguyen
Published

Autonomous RC Car - ELEC 424 Final Project

The team worked on programming an autonomous RC/toy car that follows a lane and stops at a red box stop.

IntermediateFull instructions provided88
Autonomous RC Car - ELEC 424 Final Project

Things used in this project

Hardware components

Webcam, Logitech® HD Pro
Webcam, Logitech® HD Pro
×1
RPi 4 Model B
×1
Optical speed encoder
×1
Portable charger
×1

Software apps and online services

OpenCV
OpenCV

Hand tools and fabrication machines

Drill / Driver, Cordless
Drill / Driver, Cordless
File
Soldering iron (generic)
Soldering iron (generic)

Story

Read more

Code

final_code.py

Python
Final project code, contains all start/stop and lane-keeping functionality.
import cv2
import numpy as np
import math
import sys
import time
import os
import subprocess
import board
import adafruit_mcp4728
import matplotlib.pyplot as plt

'''
The following code is based on: 

User raja_961, “Autonomous Lane-Keeping Car Using Raspberry
Pi and OpenCV”. Instructables. URL:
https://www.instructables.com/Autonomous-Lane-Keeping-Car-U
sing-Raspberry-Pi-and/ 

As well as the following hackster project:

https://www.hackster.io/beagle-bone-baja-blast/beagle-bone-baja-blast-eae48e

Which in turn based its code on this hackster project:

https://www.hackster.io/really-bad-idea/autonomous-path-following-car-6c4992
'''

# Need this
passedFirstStopSign = False

# wait function for stop logic
def wait(wait_time):
    start_time = time.perf_counter()
    end_time = start_time + wait_time
    print('waiting... ' + str(end_time) + 'seconds')
    while (time.perf_counter() < end_time):
        pass
    return


# initializes drive control system
MCP4728A4_DEFAULT_ADDRESS = 0x64  # weird address that only works on our board
i2c = board.I2C()  # uses board.SCL and board.SDA
mcp4728 = adafruit_mcp4728.MCP4728(i2c, adafruit_mcp4728.MCP4728A4_DEFAULT_ADDRESS)

'''
These are our default speed and steering variables. Initialized to their midpoint
'''
# speed is on a 0-2 scale where 1-2 is positive and 0-1 is negative.
# steering is on 0 - 2, where 0-1 is left, and 1-2 is right
speed = 1.009
base_turn = 1.175

mcp4728.channel_a.value = int(65535 / 2)  # neutral throttle
mcp4728.channel_b.value = int(65535 / 2)  # steering


# we might putting camera stuff in this was well
def initialize_car():
    mcp4728.channel_a.value = int(65535 / 2)  # Voltage = VDD
    mcp4728.channel_b.value = int(base_turn * (65535 / 2))  # car by default doesn't steer straight, constant is applied
    # to make it as straight as possible


def move(speed):
    mcp4728.channel_a.value = int(speed * (65535 / 2))  # Voltage = VDD
    # speed = speed - not sure why we have this line, commented out but will uncomment worst case


# stop function sets the throttle 0
def stop():
    mcp4728.channel_a.value = int(65535 / 2)


# steep < 1 = left, steep > 1 -- right
def turn(steepness):
    mcp4728.channel_b.value = int(base_turn * steepness * (65535 / 2))

# re-centers car (car's turn was initially off-center)
def re_center():
    mcp4728.channel_b.value = int(base_turn * 65535 / 2)


# based on: https://www.instructables.com/Autonomous-Lane-Keeping-Car-Using-Raspberry-Pi-and/
# and https://www.hackster.io/really-bad-idea/autonomous-path-following-car-6c4992

# adapted from hackster, changed to adhere to red HSV color segments
def isRedFloorVisible(image):
    """
    Detects whether or not the majority of a color on the screen is a particular color
    :param image:
    :param boundaries: [[color boundaries], [success boundaries]]
    :return: boolean if image satisfies provided boundaries, and an image used for debugging
    """
    # Convert to HSV color space
    hsv_img = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)

    cv2.imwrite("redfloor.jpg", hsv_img)

    # parse out the color boundaries and the success boundaries
    # percentage was adjusted
    percentage = 25

    # lower and upper range for the lower part of red
    lower_red1 = np.array([0, 40, 60], dtype="uint8")
    upper_red1 = np.array([10, 255, 255], dtype="uint8")

    # lower and upper range for the upper part of red
    lower_red2 = np.array([170, 40, 60], dtype="uint8")
    upper_red2 = np.array([180, 255, 255], dtype="uint8")

    # create two masks to capture both ranges of red
    mask1 = cv2.inRange(hsv_img, lower_red1, upper_red1)
    mask2 = cv2.inRange(hsv_img, lower_red2, upper_red2)

    # combining the masks
    mask = cv2.bitwise_or(mask1, mask2)

    # applying the mask
    output = cv2.bitwise_and(hsv_img, hsv_img, mask=mask)

    # save the output image
    cv2.imwrite("redfloormask.jpg", output)

    # calculating what percentage of image falls between color boundaries
    percentage_detected = np.count_nonzero(mask) * 100 / np.size(mask)
    # if the percentage percentage_detected is betweeen the success boundaries, we return true,
    # otherwise false for result
    result = percentage < percentage_detected
    if result:
        print(percentage_detected)
    return result, output

# finds edges on track and writes to images, adapted from raja_961
def detect_edges(frame):
    # filter for blue lane lines
    hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
    cv2.imwrite("hsv.jpg", hsv)
    lower_blue = np.array([90, 50, 50], dtype="uint8")
    upper_blue = np.array([130, 255, 255], dtype="uint8")
    mask = cv2.inRange(hsv, lower_blue, upper_blue)
    cv2.imwrite("mask.jpg", mask)

    # detect edges
    blur = cv2.GaussianBlur(mask, (5, 5), 0)
    edges = cv2.Canny(blur, 50, 100)
    cv2.imwrite("edges.jpg", edges)

    return edges

# determines regions of interest for lane-keeping, adapted from raja_961
def region_of_interest(edges):
    height, width = edges.shape
    mask = np.zeros_like(edges)

    # only focus lower half of the screen
    polygon = np.array([[
        (0, height),
        (0, 3 * height / 4),
        (width, 3 * height / 4),
        (width, height),
    ]], np.int32)

    cv2.fillPoly(mask, polygon, 255)

    cropped_edges = cv2.bitwise_and(edges, mask)
    cv2.imwrite("roi.jpg", cropped_edges)

    return cropped_edges

# adapted from raja_961
def detect_line_segments(cropped_edges):
    rho = 1
    theta = np.pi / 180
    min_threshold = 20

    line_segments = cv2.HoughLinesP(cropped_edges, rho, theta, min_threshold,
                                    np.array([]), minLineLength=10, maxLineGap=150)

    return line_segments

# takes the frame under processing and lane segments detected using Hough transform
# and returns the average slope and intercept of two lane lines, adapted from raja_961
def average_slope_intercept(frame, line_segments):
    lane_lines = []
    print("line_segments", line_segments)

    if line_segments is None:
        print("no line segments detected")
        return lane_lines

    height, width, _ = frame.shape
    left_fit = []
    right_fit = []

    boundary = 0
    left_region_boundary = width * (1 - boundary)
    right_region_boundary = width * boundary

    for line_segment in line_segments:
        for x1, y1, x2, y2 in line_segment:
            if x1 == x2:
                print("skipping vertical lines (slope = infinity")
                continue

            fit = np.polyfit((x1, x2), (y1, y2), 1)
            slope = (y2 - y1) / (x2 - x1)
            intercept = y1 - (slope * x1)

            if slope < 0:
                if x1 < left_region_boundary and x2 < left_region_boundary:
                    left_fit.append((slope, intercept))
            else:
                if x1 > right_region_boundary and x2 > right_region_boundary:
                    right_fit.append((slope, intercept))

    left_fit_average = np.average(left_fit, axis=0)
    if len(left_fit) > 0:
        lane_lines.append(make_points(frame, left_fit_average))

    right_fit_average = np.average(right_fit, axis=0)
    if len(right_fit) > 0:
        lane_lines.append(make_points(frame, right_fit_average))

    return lane_lines

# helper function for average_slope_intercept() function which returns
# the bounded coordinates of the lane lines (from the bottom to the middle of the frame)
# adapted from raja_961
def make_points(frame, line):
    height, width, _ = frame.shape

    slope, intercept = line

    y1 = height  # bottom of the frame
    y2 = int(y1 / 2)  # make points from middle of the frame down

    if slope == 0:
        slope = 0.1

    x1 = int((y1 - intercept) / slope)
    x2 = int((y2 - intercept) / slope)

    return [[x1, y1, x2, y2]]

# adapted from raja_961
def display_lines(frame, lines, line_color=(0, 255, 0), line_width=6):
    line_image = np.zeros_like(frame)

    if lines is not None:
        for line in lines:
            for x1, y1, x2, y2 in line:
                cv2.line(line_image, (x1, y1), (x2, y2), line_color, line_width)

    line_image = cv2.addWeighted(frame, 0.8, line_image, 1, 1)

    if line_image is not None:
        cv2.imwrite("line_image.jpg", line_image)
    else:
        print("no line image")

    return line_image

# adapted from raja_961
def display_heading_line(frame, steering_angle, line_color=(0, 0, 255), line_width=5):
    heading_image = np.zeros_like(frame)
    height, width, _ = frame.shape

    steering_angle_radian = steering_angle / 180.0 * math.pi

    x1 = int(width / 2)
    y1 = height
    x2 = int(x1 - height / 2 / math.tan(steering_angle_radian))
    y2 = int(height / 2)

    cv2.line(heading_image, (x1, y1), (x2, y2), line_color, line_width)
    heading_image = cv2.addWeighted(frame, 0.8, heading_image, 1, 1)

    return heading_image

# adapted from raja_961
def get_steering_angle(frame, lane_lines):
    height, width, _ = frame.shape

    if len(lane_lines) == 2:
        _, _, left_x2, _ = lane_lines[0][0]
        _, _, right_x2, _ = lane_lines[1][0]
        mid = int(width / 2)
        x_offset = (left_x2 + right_x2) / 2 - mid
        y_offset = int(height / 2)

    elif len(lane_lines) == 1:
        x1, _, x2, _ = lane_lines[0][0]
        x_offset = x2 - x1
        y_offset = int(height / 2)

    elif len(lane_lines) == 0:
        x_offset = 0
        y_offset = int(height / 2)

    angle_to_mid_radian = math.atan(x_offset / y_offset)
    angle_to_mid_deg = int(angle_to_mid_radian * 180.0 / math.pi)
    steering_angle = angle_to_mid_deg + 90
    print("steer angle", steering_angle)

    return steering_angle

# function reads from kernel log to adjust speed based on speed encoder
# time readings
def get_kernel_last_line():
    try:
        result = subprocess.run(['tail', '-n', '1', '/var/log/kern.log'], check=True, text=True, capture_output=True)
        return result.stdout.strip()  # Remove leading/trailing whitespaces and newlines
    except subprocess.CalledProcessError as e:
        print(f"Error executing 'tail': {e}")
        return None


# Main Code

# set up the car throttle and steering PWMs
initialize_car()

# set up video
video = cv2.VideoCapture(0)
cv2.waitKey(3)
video.set(cv2.CAP_PROP_FRAME_WIDTH, 320)
video.set(cv2.CAP_PROP_FRAME_HEIGHT, 240)

# wait for video to load
wait(1)

# PD variables
kp = 0.1 / 2.5
kd = 0.3 * kp
lastTime = 0
lastError = 0

# counter for number of ticks
counter = 0

# start the engines
move(speed)

# Max number of loops
max_ticks = 2000

# Booleans for handling stop light
passed_floor_stop = False
atStopLight = False
at_end = False

# check every 3 frames
stopSignCheck = 3
sightDebug = True
isStopSignBool = False

# initialized to take values for plots
prop_arr = []
deriv_arr = []
error_arr = []
steer_v_arr = []
speed_v_arr = []
frames = []

while counter < max_ticks:
    ret, original_frame = video.read()
    frame = cv2.resize(original_frame, (176, 144))

    speed_msg = get_kernel_last_line()
    # string will be of similar form to "Dec  5 19:55:26 raspberrypi kernel: [ 1562.508644] Time elapsed: 14444"
    # adjusting speed based on speed encoder time delta
    try:
        encoder_time_delta = int(speed_msg.split("Time elapsed: ")[1])
    except Exception as e:
        encoder_time_delta = 2000000

    if encoder_time_delta > 2000000:
        speed += 0.001
    else:
        speed -= 0.001

    # bounding speed
    if speed > 1.011:
        speed = 1.009

    if speed < 1.007:
        speed = 1.009

    # adjusting speed based on encoder
    move(speed)

    print("Speed is", speed)

    # check for floor stop every couple ticks
    if ((counter + 1) % stopSignCheck) == 0:
        # check for the first stop sign
        if not passedFirstStopSign:
            isStopSignBool, floorSight = isRedFloorVisible(frame)
            if sightDebug:
                cv2.imwrite("floorSight.jpg", floorSight)
            if isStopSignBool:
                print("detected a stop sign, stopping")
                stop()
                wait(3)
                move(speed)
                passedFirstStopSign = True
                # this is used to not check for the second stop sign until many frames later
                secondStopSignTick = counter + 200
                # add a delay to calling go faster
                go_faster_tick = counter + go_faster_tick_delay
                print("first stop finished!")
        # check for the second stop sign
        elif passedFirstStopSign and counter > secondStopSignTick:
            isStop2SignBool, _ = isRedFloorVisible(frame)
            print("is a floor stop: ", isStopSignBool)
            if isStop2SignBool:
                # last stop sign detected, exits while loop
                print("detected second stop sign, stopping")
                stop()
                break

    # process the frame to determine the desired steering angle
    edges = detect_edges(frame)
    roi = region_of_interest(edges)
    line_segments = detect_line_segments(roi)

    lane_lines = average_slope_intercept(frame, line_segments)
    lane_lines_image = display_lines(frame, lane_lines)
    steering_angle = get_steering_angle(frame, lane_lines)
    heading_image = display_heading_line(lane_lines_image, steering_angle)
    cv2.imwrite("heading line.jpg", heading_image)

    # calculate changes for PD
    now = time.time()
    dt = now - lastTime
    if sightDebug:
        cv2.imwrite("Cropped_sight.jpg", roi)
    deviation = steering_angle - 90

    # PD Code
    error = -deviation
    proportional = kp * error
    derivative = kd * (error - lastError) / dt

    # determine actual turn to take
    turn_amt = proportional + derivative
    turn_amt = 1 / (1 + np.exp(turn_amt)) * 2 / base_turn # bounding turn values

    # taking values for plots
    prop_arr.append(proportional)
    deriv_arr.append(derivative)
    error_arr.append(error)
    steer_v_arr.append(turn_amt * (3.3 / 2))
    speed_v_arr.append(speed * (3.3 / 2))
    frames.append(counter)

    # turning based on PD calculations
    turn(turn_amt)

    # update PD values for next loop
    lastError = error
    lastTime = time.time()

    key = cv2.waitKey(1)
    if key == 27:
        break

    counter += 1

# creating plots
plt.figure()
plt.plot(frames, prop_arr, '--g', label="P values", linewidth=1)
plt.plot(frames, deriv_arr, '--b', label="D values", linewidth=1)
plt.plot(frames, error_arr, '--r', label="Error values", linewidth=1)

plt.title("PD & error values over time")
plt.xlabel("Frames")

plt.legend()
plt.savefig('plotPDE.png')
plt.close()

plt.figure()
plt.title("P values over time")
plt.plot(frames, prop_arr, 'g', linewidth=0.8)
plt.xlabel("Frames")
plt.ylabel("P values")
plt.savefig('plotP.png')
plt.close()

plt.figure()
plt.title("D values over time")
plt.plot(frames, deriv_arr, 'b', linewidth=0.8)
plt.xlabel("Frames")
plt.ylabel("D values")
plt.savefig('plotD.png')
plt.close()

plt.figure()
plt.title("Error values over time")
plt.plot(frames, error_arr, 'r', linewidth=0.8)
plt.xlabel("Frames")
plt.ylabel("Error values")
plt.savefig('plotE.png')
plt.close()

plt.figure()
plt.title("Speed voltage over time")
plt.plot(frames, speed_v_arr, 'y', linewidth=0.8)
plt.xlabel("Frames")
plt.ylabel("Speed Voltage (V)")
plt.savefig('plotSpeed.png')
plt.close()

plt.figure()
plt.title("Steering voltage over time")
plt.plot(frames, steer_v_arr, 'tab:orange', linewidth=0.8)
plt.xlabel("Frames")
plt.ylabel("Steer Voltage (V)")
plt.savefig('plotSteer.png')
plt.close()

plt.figure()
plt.plot(frames, speed_v_arr, 'y', label="Speed voltage values", linewidth=0.8)
plt.plot(frames, steer_v_arr, 'tab:orange', label="Steer voltage values", linewidth=0.8)
plt.plot(frames, error_arr, 'r', label="Error values", linewidth=0.8)

plt.title("Speed, steer, & error values over time")
plt.xlabel("Frames")

plt.legend()
plt.savefig('plotSSE.png')
plt.close()

video.release()
cv2.destroyAllWindows()

speed_encoder_driver.c

C/C++
Encoder driver for kernel-level delta time readings necessary for speed adjustment.
#include <linux/module.h>
#include <linux/of_device.h>
#include <linux/kernel.h>
#include <linux/gpio/consumer.h>
#include <linux/platform_device.h>
#include <linux/interrupt.h>
#include <linux/init.h>
#include <linux/gpio.h>
#include <linux/ktime.h>

ktime_t start, end, elapsed_time;

//int to handle irq and int to store led value.
int irq_num;
int led_val;
//Declare gpio desc for button and led.
struct gpio_desc *button_ptr;
struct gpio_desc *led_ptr;
unsigned long long ns_elapsed_time;

/**
 * @brief Interrupt service routine is called, when interrupt is triggered
 */
static irq_handler_t gpiod_irq_handler(unsigned int irq, void *dev_id, struct
pt_regs *regs) {
 //Print that the interrupt occured.
printk("gpiod_irq: Interrupt was triggered and ISR was called!\n");

  end = ktime_get();
  elapsed_time = ktime_sub(end, start);
  start = end;

  ns_elapsed_time =  ktime_to_ns(elapsed_time);
  
  printk("Time elapsed: %llu \n", ns_elapsed_time);
  
 //get the led value and set the led value to the opposite of the current value.
 led_val = gpiod_get_value(led_ptr);
 gpiod_set_value(led_ptr, !led_val);
return (irq_handler_t) IRQ_HANDLED;
}
static int led_probe(struct platform_device *pdev)
{

 struct device *dev = &pdev->dev;
 //initialize led gpio
 led_ptr = devm_gpiod_get(dev, "led", GPIOD_OUT_LOW);

 //set the led direction as an output.
 //initially pass 0 as an output to led_ptr.
 if (gpiod_direction_output(led_ptr, 0)) {
 printk("Error setting the led output direction.\n");
 }
 //initialize button gpio
 button_ptr = devm_gpiod_get(dev, "button", GPIOD_IN);
 //set the button to an input.
 if (gpiod_direction_input(button_ptr)) {
 printk("Error setting button ptr direction\n");

  //free the button pointer if the gpiod direction fails.
 gpio_free(desc_to_gpio(button_ptr));
 //gpio_free is used because gpiod_free is not found during linking.
 return -1;
 }
 //set the irq num corresponding to the gpiod device.
 //I think this works basically like a file descriptor.
 irq_num = gpiod_to_irq(button_ptr);

 if(request_irq(irq_num, (irq_handler_t) gpiod_irq_handler, IRQF_TRIGGER_FALLING,
"fakedev", NULL) != 0){
printk("Error!\nCan not request interrupt nr.: %d\n", irq_num);
return -1;
}
printk("Done setting up button and irq! Module is inserted!\n");
return 0;
}
// remove function
static int led_remove(struct platform_device *pdev)
{
 //Free the irq and print that the module is removed.
 free_irq(irq_num, NULL);
 printk("Module is removed\n");
return 0;
}
static struct of_device_id matchy_match[] = {
{.compatible = "fake",},
{/* leave alone - keep this here (end node) */},
};
// platform driver object
static struct platform_driver adam_driver = {
.probe = led_probe,
.remove = led_remove,
.driver = {
.name = "The Rock: this name doesn't even matter",
.owner = THIS_MODULE,
.of_match_table = matchy_match,
},
};
//module platform initializes and exits the module as necessary.
module_platform_driver(adam_driver);
MODULE_DESCRIPTION("424\'s finest");
MODULE_AUTHOR("GOAT");
MODULE_LICENSE("GPL v2");
MODULE_ALIAS("platform:adam_driver");

Credits

Arjun Kannan
2 projects • 4 followers
Contact
Noah Giles
1 project • 1 follower
Contact
Alex Torres
1 project • 1 follower
Contact
Prahalad Chari
1 project • 1 follower
Contact
Thanh Nguyen
1 project • 1 follower
Contact

Comments

Please log in or sign up to comment.