Welcome to Hackster!
Hackster is a community dedicated to learning hardware, from beginner to pro. Join us, it's free!
Zhenya LiuKarthik GoliBT7Lei Xia
Published

Lane-Keeping RC car

An RC car with a Raspberry Pi running onboard vision and PID control for autonomous speed control and lane assist.

IntermediateFull instructions provided5 hours108
Lane-Keeping RC car

Things used in this project

Hardware components

Webcam, Logitech® HD Pro
Webcam, Logitech® HD Pro
×1
Portable charger
×1
RC car, batteries, and charger
×1
Raspberry Pi 4 Model B
Raspberry Pi 4 Model B
×1
Optical speed encoder
×1

Software apps and online services

OpenCV
OpenCV – Open Source Computer Vision Library OpenCV

Hand tools and fabrication machines

Scissor, Electrician
Scissor, Electrician
tape

Story

Read more

Code

autodrive.py

Python
'''
ELEC 553 - Final Project
Spring 2025
Team Member: Lei Xia, Bruce Tian, Karthik, Yaqi

Reference:
https://www.instructables.com/Autonomous-Lane-Keeping-Car-Using-Raspberry-Pi-and/
https://www.hackster.io/team-elec424-dreamhouse/team-elec424-dreamhouse-c5ad62
'''

# Import Packages
import cv2
import numpy as np
import matplotlib.pyplot as plt
import math
import sys
import time
import os
import RPi.GPIO as GPIO

#Setup GPIO Interface
GPIO.setmode(GPIO.BCM)
GPIO.setup(18, GPIO.OUT)
GPIO.setup(19, GPIO.OUT)
pwm_esc = GPIO.PWM(18, 50)
pwm_serv = GPIO.PWM(19, 50)

pwm_esc.start(7.5)                          # puts DC motor on neutral
pwm_serv.start(7)                           # sets servo motor straight

# Encoder Setup
speed_encode = True
encoder_path = "/sys/module/encoder/parameters/encoder_val"
encoder_max_rotation = 304700000
encoder_min_rotation = 304600000 

# Camera Setup
frame_width = 160
frame_height = 120
cam_idx = 0

# PD variables
kp = 0.09                                   # Proportional gain
kd = kp * 0.5                               # Derivative gain

# Speed Values
zero_speed = 7.5                            # Car in netural
base_speed = 8.13                           # Initial Car moving speed
speed_change = 0.01
global_enc_vals = []

# Max number of loop
max_ticks = 4000

# Keep the loop alive
class NotSudo(Exception):
    pass

# set speed to zero and steering to neutral
def reset_car():
    pwm_esc.ChangeDutyCycle(7.5)            # Puts motor on neutral
    pwm_serv.ChangeDutyCycle(7)             # Straight Steering
    print("Car stopped & straighted")       # Debug message

# Set speed to base speed and steering to neutral
def start_car():
    print("Car Initialized.")
    pwm_esc.ChangeDutyCycle(base_speed)     # Puts motor on neutral
    pwm_serv.ChangeDutyCycle(7)             # Straight Steering

# Adjust the speed based on the speed encoder
def manage_speed():
    # Read the encoder data
    f = open(encoder_path, "r")
    enc = int(f.readline())                 # Get value frome encoder
    enc = abs(enc)
    f.close()
    ret = enc                               # Probably redundant

    global_enc_vals.append(enc)
    # If within bounds (wait for car to start / stop), then do not change speed
    ret = 0

    # Part to adjust speed base on encider return
    if enc >= encoder_max_rotation:
        ret = -(speed_change)
    elif enc <= encoder_min_rotation:
        ret = (speed_change)
    return ret

# Wait for a period of time
def wait(wait_time):
    start_time = time.perf_counter()
    end_time = start_time + wait_time
    # Loop until finished
    while (time.perf_counter() < end_time):
        pass
    return

# Detetct the blue lines
def detect_edges(frame):
    # Filter for detect the blue lane
    hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)

    lower_blue = np.array([90, 50, 50], dtype="uint8")
    upper_blue = np.array([130, 255, 255], dtype="uint8")

    mask = cv2.inRange(hsv, lower_blue, upper_blue)         # make mask
    edges = cv2.Canny(mask, 50, 100)                        # Get edges
    return edges

# Setup region of interest we want to look for
def region_of_interest(edges, roi_ratio=0.8):
    # Edges
    h, w = edges.shape
    mask = np.zeros_like(edges)

    top_y = int(h * roi_ratio)
    polygon = np.array([[
        (0,   h),
        (0,   top_y),
        (w,   top_y),
        (w,   h),
    ]], np.int32)

    cv2.fillPoly(mask, polygon, 255)
    return cv2.bitwise_and(edges, mask)

# Find line segments
def detect_line_segments(cropped_edges):
    # Set constants
    rho = 1
    theta = np.pi / 180
    min_threshold = 10
    
    # Get lines
    line_segments = cv2.HoughLinesP(cropped_edges, rho, theta, min_threshold,
                                    np.array([]), minLineLength=5, maxLineGap=150)
    return line_segments

# finding slope of the line
def average_slope_intercept(frame, line_segments):
    lane_lines = []
    if line_segments is None:
        print("No line segments detected")
        return lane_lines

    # Set boundries
    height, width, _ = frame.shape
    left_fit = []
    right_fit = []
    boundary = 1 / 3
    left_region_boundary = width * (1 - boundary)
    right_region_boundary = width * boundary

    # Go through line segments and get line of best fit
    for line_segment in line_segments:
        for x1, y1, x2, y2 in line_segment:
            if x1 == x2:
                continue

            fit = np.polyfit((x1, x2), (y1, y2), 1)
            slope = (y2 - y1) / (x2 - x1)
            intercept = y1 - (slope * x1)

            if slope < 0:
                if x1 < left_region_boundary and x2 < left_region_boundary:
                    left_fit.append((slope, intercept))
            else:
                if x1 > right_region_boundary and x2 > right_region_boundary:
                    right_fit.append((slope, intercept))
    
    # Take average line of best fit
    left_fit_average = np.average(left_fit, axis=0)
    if len(left_fit) > 0:
        lane_lines.append(make_points(frame, left_fit_average))

    right_fit_average = np.average(right_fit, axis=0)
    if len(right_fit) > 0:
        lane_lines.append(make_points(frame, right_fit_average))

    return lane_lines

# For getting visual of lines
def make_points(frame, line):
    height, width, _ = frame.shape
    slope, intercept = line

    y1 = height                     # Bottom of the frame image
    y2 = int(y1 / 2)                # Make points from middle of the frame down

    if slope == 0:
        slope = 0.1

    x1 = int((y1 - intercept) / slope)
    x2 = int((y2 - intercept) / slope)

    return [[x1, y1, x2, y2]]

# Get visual of lines
def display_lines(frame, lines, line_color=(0, 255, 0), line_width=6):
    line_image = np.zeros_like(frame)

    if lines is not None:
        for line in lines:
            for x1, y1, x2, y2 in line:
                cv2.line(line_image, (x1, y1), (x2, y2), line_color, line_width)

    line_image = cv2.addWeighted(frame, 0.8, line_image, 1, 1)

    return line_image

# Show a hedaing line
def display_heading_line(frame, steering_angle, line_color=(0, 0, 255), line_width=5):
    heading_image = np.zeros_like(frame)
    height, width, _ = frame.shape

    steering_angle_radian = steering_angle / 180.0 * math.pi

    x1 = int(width / 2)
    y1 = height
    x2 = int(x1 - height / 2 / math.tan(steering_angle_radian))
    y2 = int(height / 2)

    cv2.line(heading_image, (x1, y1), (x2, y2), line_color, line_width)
    heading_image = cv2.addWeighted(frame, 0.8, heading_image, 1, 1)

    return heading_image

# Process the lane detection situations
def get_steering_angle(frame, lane_lines):
    height, width, _ = frame.shape

    # Both lanes present
    if len(lane_lines) >= 2:
        _, _, left_x2, _ = lane_lines[0][0]
        _, _, right_x2, _ = lane_lines[1][0]
        mid = int(width / 2)
        x_offset = (left_x2 + right_x2) / 2 - mid
        y_offset = int(height * 0.3)

    # Only one lane detected
    elif len(lane_lines) == 1:
        x1, _, x2, _ = lane_lines[0][0]
        x_avg = (x1 + x2) / 2
        mid = int(width / 2)
        y_offset = int(height * 0.3)

        # Suppose the line is on the left
        if x_avg < mid:
            x_offset = ((x_avg + (x_avg + 100)) / 2 - mid)-1.4      # Right is righter
        else:
            x_offset = (((x_avg - 100) + x_avg) / 2 - mid)          # Left is lefter
    else:
        x_offset = 0                                                # Straight
        y_offset = int(height * 0.75)

    angle_to_mid_radian = math.atan(x_offset / y_offset)
    angle_to_mid_deg = int(angle_to_mid_radian * 180.0 / math.pi)
    steering_angle = angle_to_mid_deg + 88

    return steering_angle

# Plot the proportional, derivative and error
def plot_pd(p_vals, d_vals, error, show_img=False):
    fig, ax1 = plt.subplots()
    t_ax = np.arange(len(p_vals))
    ax1.plot(t_ax, p_vals, '-', label="Proportional values")
    ax1.plot(t_ax, d_vals, '-', label="Derivative values")
    ax2 = ax1.twinx()
    ax2.plot(t_ax, error, '--r', label="Error")

    ax1.set_xlabel("Frames")
    ax1.set_ylabel("PD_Value")
    ax2.set_ylim(-90, 90)
    ax2.set_ylabel("Error_Value")

    plt.title("PD and Error Value with time")
    fig.legend()
    fig.tight_layout()
    plt.savefig("PDError.png")

    if show_img:
        plt.show()
    plt.clf()

# Plot the speed, steering and the error
def plot_pwm(speed_pwms, turn_pwms, error, show_img=False):
    fig, ax1 = plt.subplots()
    t_ax = np.arange(len(speed_pwms))
    ax1.plot(t_ax, speed_pwms, '-', label="Speed")
    ax1.plot(t_ax, turn_pwms, '-', label="Steering")
    ax2 = ax1.twinx()

    ax1.plot(t_ax, error / np.max(error), '--r', label="Error")
    ax1.set_xlabel("Frames")
    ax1.set_ylabel("Speed & Steer Duty Cycle")
    ax2.set_ylabel("Error Value")

    plt.title("Speed & Steering value with time")
    fig.legend()
    plt.savefig("SpeedSteeringError.png")

    if show_img:
        plt.show()
    plt.clf()

# Setup paramater to detect red boxes
def isRedFloorVisible(image):
    hsv_img = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)                            # Convert to HSV color space

    cv2.imwrite("redfloor.jpg", hsv_img)
    percentage = 25                                                             # Parse out the color boundaries and the success boundaries

    # lower and upper range for the lower part of red
    lower_red1 = np.array([0, 40, 60], dtype="uint8")
    upper_red1 = np.array([10, 255, 255], dtype="uint8")

    # lower and upper range for the upper part of red
    lower_red2 = np.array([170, 40, 60], dtype="uint8")
    upper_red2 = np.array([180, 255, 255], dtype="uint8")

    # create two masks to capture both ranges of red
    mask1 = cv2.inRange(hsv_img, lower_red1, upper_red1)
    mask2 = cv2.inRange(hsv_img, lower_red2, upper_red2)

    mask = cv2.bitwise_or(mask1, mask2)                                         # Combining the masks
    output = cv2.bitwise_and(hsv_img, hsv_img, mask=mask)                       # Applying the mask
    cv2.imwrite("redfloormask.jpg", output)                                     # Save the output image
    percentage_detected = np.count_nonzero(mask) * 100 / np.size(mask)          # calculating what percentage of image falls between color boundaries

    # if the percentage percentage_detected is betweeen the success boundaries, we return true, otherwise false for result
    result = percentage < percentage_detected
    if result:
        print(percentage_detected)
    return result, output

def main():
    # Setup Base Variables
    lastTime = 0
    lastError = 0
    SecondStopTick = 0

    # Declare arrays for making the graphs
    p_vals = []             # Proportional
    d_vals = []             # Derivative
    err_vals_1 = []         # Error
    err_vals_2 = []         # Error
    speed_vals = []         # Speed values
    steer_vals = []         # Steering values

    # set up video feed
    video = cv2.VideoCapture(cam_idx, cv2.CAP_V4L2)
    video.set(cv2.CAP_PROP_FRAME_WIDTH, frame_width)
    video.set(cv2.CAP_PROP_FRAME_HEIGHT, frame_height)

    #Start the car
    start_car()
    curr_speed = base_speed
    counter = 0

    passedFirstStopSign = False
    
    while counter < max_ticks:
           print("Beginning of the Measurement")
           pwm_esc.ChangeDutyCycle(zero_speed)
           # Read the image feed and resize to 160x120
           ret, original_frame = video.read()
           frame = cv2.resize(original_frame, (160, 120))

           # Process the frame to determine the desired steering angle
           edges = detect_edges(frame)
           cv2.imshow("edges", edges)
           roi = region_of_interest(edges, roi_ratio=0.9)

           line_segments = detect_line_segments(roi)
           lane_lines = average_slope_intercept(frame, line_segments)
           lane_lines_image = display_lines(frame, lane_lines)

           steering_angle = get_steering_angle(frame, lane_lines)
           heading_image = display_heading_line(lane_lines_image,steering_angle)

           # calculate changes for PD
           now = time.time()
           dt = now - lastTime
           deviation = steering_angle - 90

           # PD Code
           error = deviation
           base_turn = 7.25
           proportional = kp * error
           derivative = kd * (error - lastError) / dt

           # Take values for graphs
           p_vals.append(proportional)
           d_vals.append(derivative)
           err_vals_1.append(error)
           speed_vals.append(curr_speed)

           # determine actual turn behavior
           turn_amt = base_turn + proportional + derivative
           steer_vals.append(turn_amt)
           pwm_serv.ChangeDutyCycle(turn_amt)                   # Drives the motor forward

           # Red Box detection
           if counter % 20:                                     # Looks every 20 ticks
                if not passedFirstStopSign:
                    isStopSign, floorSight = isRedFloorVisible(frame)

                    # Sees the red box
                    if isStopSign:
                        wait(3)
                        passedFirstStopSign = True
                        SecondStopTick = counter
                
                # See the second box
                elif passedFirstStopSign and counter > SecondStopTick+100:
                    isStopSign, _ = isRedFloorVisible(frame)
                    if isStopSign:
                        time.sleep(6)
                        pwm_esc.ChangeDutyCycle(zero_speed)
                        print("Reached the Final Stop, program exit.")
                        break                                   # Stop forever

           # speed encoding
           if speed_encode:
                if counter % 3 == 0:                            # Check every 3 ticks
                    # Adjust speed
                    temp_speed = manage_speed() + curr_speed
                    if temp_speed != curr_speed:
                        pwm_esc.ChangeDutyCycle(temp_speed)     # Probably redundant
                        curr_speed = temp_speed

           pwm_esc.ChangeDutyCycle(curr_speed)                  # Drives the DC motor (forward) #w/o encoder, curr_speed is base speed
           wait(0.023)                                          # Run at speed for a short amount of time

           key = cv2.waitKey(1)
           if key == 27:
                break
           counter += 1

    # Reset car and close
    reset_car()
    video.release()
    cv2.destroyAllWindows()

    plot_pd(p_vals, d_vals, err_vals_1)                         # Save the Plots
    plot_pwm(speed_vals, steer_vals, err_vals_1)                # Recording encoder

if __name__ == "__main__":
    main()

encoder.c

C/C++
#include <linux/module.h>
#include <linux/of_device.h>
#include <linux/kernel.h>
#include <linux/gpio/consumer.h>
#include <linux/platform_device.h>
#include <linux/interrupt.h>
#include <linux/timekeeping.h>
#include <linux/ktime.h>
#include <linux/moduleparam.h>
#include <linux/hrtimer.h>
#include <linux/interrupt.h>
#include <linux/of.h>

/**
* ELEC 553 - Final Project
* Spring 2025
* Team Member: Lei Xia, Bruce Tian, Karthik Goli, Zhenya Liu
*
* Reference:
* https://www.hackster.io/paul-walker/autonomous-rc-car-eebfb4
* Interrupt code:
* https://github.com/Johannes4Linux/Linux_Driver_Tutorial/blob/main/11_gpio_irq/gpio_irq.c
**/

// Init variables
unsigned int irq_number;
static struct gpio_desc *my_enc;
static volatile u64 prev_time;
static volatile u64 curr_time;

static int encoder_val;
module_param(encoder_val, int, 0644);

// Interrupt Handler
static irq_handler_t gpio_irq_handler(unsigned int irq, void *dev_id, struct pt_regs *regs) {

	// Time difference between interrupts is greater than 1 ms 
	int enc_val = gpiod_get_value(my_enc); 
	curr_time = ktime_get_ns();
	unsigned long elapsed_time = curr_time - prev_time;
	
	if (enc_val == 1 && elapsed_time > 1000000) {
		prev_time = curr_time;
		encoder_val = elapsed_time;
		printk(KERN_INFO "irq_handler - timing detected: %lu", elapsed_time);
	}
	
	return (irq_handler_t) IRQ_HANDLED; 
}

// Probe Function 
static int enc_probe(struct platform_device *pdev) {
	//struct gpio_desc *my_btn;
	struct device *dev;
	dev = &pdev->dev;
	
	printk("enc_probe - RUNNING\n");

	my_enc = gpiod_get(dev, "lights2", GPIOD_IN);
	if(IS_ERR(my_enc)) {
		printk("enc_probe - could not gpiod_get_index 0 for btn\n");
		return -1;
	}
	
	// Setup interrupt & debounce
	irq_number = gpiod_to_irq(my_enc);
	gpiod_set_debounce(my_enc, 1000000);
	
	if(request_irq(irq_number, (irq_handler_t) gpio_irq_handler, IRQF_TRIGGER_RISING, "my_gpio_irq", NULL) != 0){
		printk("Error!\n Can not request interrupt nr.: %d\n", irq_number);
		return -1;	}
	prev_time = ktime_get_ns();
	
	printk("Speed Encoder Insert Successful.\n");
	return 0;
}

// Remove encoder
static void enc_remove(struct platform_device *pdev) {
	free_irq(irq_number, NULL);
	printk("Speed Encoder Removed.\n");
	return;
}

static struct of_device_id matchy_match[] = {
    { .compatible = "custom,speed_encoder_driver", },
	{},
};

// Platform driver object
static struct platform_driver speed_encoder = {
	.probe	 = enc_probe,
	.remove	 = enc_remove,
	.driver	 = {
	       .name  = "The Rock: this name doesn't even matter",
	       .owner = THIS_MODULE,
	       .of_match_table = matchy_match,
	},
};
module_platform_driver(speed_encoder);

MODULE_DESCRIPTION("ELEC 553 Final Project");
MODULE_AUTHOR("Team 7");
MODULE_LICENSE("GPL v2");
MODULE_ALIAS("platform:speed_encoder");

encoder_overlay.dts

C/C++
/dts-v1/;
/plugin/;

/ {
    compatible = "custom,speed_encoder";
    fragment@0 {
        target-path ="/";
        __overlay__ {
            speed_encoder {
                compatible = "custom,speed_encoder";
                encoder_gpios = <&gpio 4 0>;
            };
        };
    };

};

image_recognition.py

Python
import cv2
import torch
import time
import warnings
warnings.filterwarnings("ignore")

# Load YOLOv5 from PyTorch hub
model = torch.hub.load('ultralytics/yolov5', 'yolov5s', pretrained=True)

# Open the webcam (likely the first camera connected)
cap = cv2.VideoCapture(0)

# Check if the camera opened successfully
if not cap.isOpened():
    print("Error: Could not open camera.")
    exit()

# Loop to acquire frames from webcam
prev_time = time.perf_counter()
while True:
    # Acquire frame
    ret, frame = cap.read()

    # If frame is read correctly, ret is True
    if not ret:
        print("Error: Failed to read frame.")
        break

    # Run YOLOv5 model on frame
    img_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    results = model(img_rgb, size=320)

    # Parse results as pandas DataFrame
    detections = results.pandas().xyxy[0]  # xmin, ymin, xmax, ymax, confidence, class, name

    # Draw detected objects on frame
    for _, row in detections.iterrows():
        x1, y1, x2, y2 = map(int, [row['xmin'], row['ymin'], row['xmax'], row['ymax']])
        label = f"{row['name']} {row['confidence']:.2f}"

        cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
        cv2.putText(frame, label, (x1, y1 - 10),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)

    # Display the resulting frame
    cv2.imshow('Camera Feed', frame)

    # Calculate frame rate
    curr_time = time.perf_counter()
    elapsed_time = curr_time - prev_time
    prev_time = curr_time
    if elapsed_time > 0:
        fps = 1.0 / elapsed_time
        print(f"Frame Rate: {fps:.4f} fps")
    else:
        print("Elapsed time too short to measure frame rate")


    # Exit the loop when 'q' is pressed
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

# Release the camera and close windows
cap.release()
cv2.destroyAllWindows()

Credits

Zhenya Liu
1 project • 0 followers
Contact
Karthik Goli
1 project • 0 followers
Contact
BT7
1 project • 0 followers
Contact
Lei Xia
0 projects • 0 followers
Contact

Comments

Please log in or sign up to comment.