# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
# ELEC 553: Spring 2025 #
# Final Project: Self-Driving Car #
# Team 2: Charlie Liu, Haibo Li, Lindsey Russ, Kelly Chan #
# #
# Code adapted from: #
# -Users: Anyssa Castorina, Dingding Ye, Jacob L, Ryan Shores, Caroline Spindel, #
# Diet Cyber Truck. #
# Hackster.io. URL: #
# https://www.hackster.io/fish-fear-us/diet-cyber-truck-483a7d #
# #
# -User raja_961, #
# Autonomous Lane-Keeping Car Using Raspberry Pi and OpenCV, #
# Instructables URL: #
# https://www.instructables.com/Autonomous-Lane-Keeping-Car-Using-Raspberry-Pi-and/ #
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
import cv2
import numpy as np
import matplotlib
import matplotlib.pyplot as plt # plotting graphs & data visualization
import math
import time
import RPi.GPIO as GPIO
import os
# encoder driver
module_name = "gpiod_driver"
sysfs_path = f"/sys/module/{module_name}/parameters/diff"
time_diff_ms = 0 # Default value if reading fail
min_time_diff = 10 # Example: Fastest rotation corresponds to 10ms between pulses
max_time_diff = 200 # Example: Slowest rotation corresponds to 200ms between pulses
min_duty_cycle = 7.84 # Example: Slowest speed PWM duty cycle
max_duty_cycle = 8.5 # Example: Fastest speed PWM duty cycle
target_time_diff = 12 # Target time difference for the PID controller
def pi_go():
global time_diff_ms
global target_duty_cycle # PWM duty cycle value computed to reach target speed
global min_time_diff
global max_time_diff
global min_duty_cycle
global max_duty_cycle
global target_time_diff # speed target
# use pi control to get the target time
# calculate the duty range
duty_range = max_duty_cycle - min_duty_cycle
# calculate speed range
time_diff_range = max_time_diff - min_time_diff
# calculate P gain
p_gain = (duty_range / time_diff_range) * 3
# set I gain
i_gain = 0.05
# reach target time diff based on the current time diff
if time_diff_ms < target_time_diff:
# calculate the difference
diff = target_time_diff - time_diff_ms
# calculate the duty cycle
target_duty_cycle = min_duty_cycle + (diff * p_gain)
# set the duty cycle
if target_duty_cycle > max_duty_cycle:
target_duty_cycle = max_duty_cycle
elif target_duty_cycle < min_duty_cycle:
target_duty_cycle = min_duty_cycle
else:
# calculate the difference
diff = time_diff_ms - target_time_diff
# calculate the duty cycle
target_duty_cycle = max_duty_cycle - (diff * p_gain)
# set the duty cycle
if target_duty_cycle > max_duty_cycle:
target_duty_cycle = max_duty_cycle
elif target_duty_cycle < min_duty_cycle:
target_duty_cycle = min_duty_cycle
# set the duty cycle
throttle.start(target_duty_cycle)
# print("[INFO] timediff: {}, target duty cycle: {}".format(time_diff_ms, target_duty_cycle))
def get_time_diff():
global time_diff_ms
global sysfs_path
try:
# Attempt to read the current time difference from sysf
with open(sysfs_path, 'r') as f:
time_diff_ms = int(f.read().strip())
except FileNotFoundError:
# If the module file is not found fall back to default value
print(f"Module {module_name} not found. Using default value: {time_diff_ms} ms")
except ValueError:
# If the file contents are invalid use default
print(f"Invalid value read from {sysfs_path}. Using default value: {time_diff_ms} ms")
return time_diff_ms
# Throttle
current_speed = 1600000
# Lists for graphs
speed_list = []
steering_list = []
error_history = []
# stop sign
at_stop_sign = False
stop_start_time = 0
# imwrite switch
imwrite_flag = False
# Max speed value
max_speed = 1650000
# Step size of speed increase/decrease
speed_step = 2500
#Delay of going faster
go_faster_tick_delay = 80
# Do not change this here. Code will set this value after seeing stop sign
go_faster_tick = 0
# Steering
steeringPin = "P9_14"
# Max number of loops
max_ticks = 2000
# motor and servo interfaces
throttle = ''
steering = ''
# function to recognize the 'stop sign'
def getRedFloorBoundaries():
"""
Gets the hsv boundaries and success boundaries indicating if the floor is red
:return: [[lower color and success boundaries for red floor], [upper color and success boundaries for red floor]]
"""
# gets color values from stop sign text file
return getBoundaries("redboundaries.txt")
# function to recognize position wrt stop sign
def isRedFloorVisible(frame):
"""
Detects whether or not the floor is red
:param frame: Image
:return: [(True is the camera sees a red on the floor, false otherwise), video output]
"""
# locates stop sign corners
boundaries = getRedFloorBoundaries()
return isMostlyColor(frame, boundaries)
# function to get most common color in camera
def isMostlyColor(image, boundaries):
"""
Detects whether or not the majority of a color on the screen is a particular color
:param image:
:param boundaries: [[color boundaries], [success boundaries]]
:return: boolean if image satisfies provided boundaries, and an image used for debugging
"""
#Convert to HSV color space
hsv_img = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)
#parse out the color boundaries and the success boundaries
color_boundaries = boundaries[0]
percentage = boundaries[1]
lower = np.array(color_boundaries[0])
upper = np.array(color_boundaries[1])
mask = cv2.inRange(hsv_img, lower, upper) # Create binary mask where HSV values fall within bounds
output = cv2.bitwise_and(hsv_img, hsv_img, mask=mask) # Apply mask to original HSV image to isolate target region
#Calculate what percentage of image falls between color boundaries
percentage_detected = np.count_nonzero(mask) * 100 / np.size(mask)
# If the percentage percentage_detected is betweeen the success boundaries, we return true, otherwise false for result
result = percentage[0] < percentage_detected <= percentage[1]
if result:
# print(percentage_detected)
pass
return result, output
# function to get color values from object in camera
def getBoundaries(filename):
"""
Reads the boundaries from the file filename
Format:
[0] lower: [H, S, V, lower percentage for classification of success]
[1] upper: [H, S, V, upper percentage for classification of success]
:param filename: file containing boundary information as above
:return: [[lower color and success boundaries], [upper color and success boundaries]]
"""
default_lower_percent = 50
default_upper_percent = 100
with open(filename, "r") as f:
boundaries = f.readlines() # Read both lines from the boundary file
lower_data = [val for val in boundaries[0].split(",")]
upper_data = [val for val in boundaries[1].split(",")]
# Check if a custom percentage was provided, otherwise use default
if len(lower_data) >= 4:
lower_percent = float(lower_data[3])
else:
lower_percent = default_lower_percent
# Check if a custom percentage was provided, otherwise use default
if len(upper_data) >= 4:
upper_percent = float(upper_data[3])
else:
upper_percent = default_upper_percent
# Extract HSV color bounds
lower = [int(x) for x in lower_data[:3]]
upper = [int(x) for x in upper_data[:3]]
boundaries = [lower, upper]
percentages = [lower_percent, upper_percent]
return boundaries, percentages # Return both sets of data
# function to stop car movement
def stop():
global current_speed # Access unmodified current speed variable
global throttle
global steering
throttle.stop()
steering.stop()
print("[INFO] Car stopped!")
# function to set car speed to 8.1%
def go():
"""
Sends the car forward at a default PWM
:return: none
"""
pi_go()
pass
# Turn the car
def turn(turn_amt):
global steering
# print("[INFO] turn amt: ", turn_amt)
steering.start(turn_amt)
# Make the car go straight
def turn_straight():
global steering
steering.start(7.5)
# print("[INFO] Turn PWM 7.5")
def detect_edges(frame):
# filter for blue lane lines
hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV) # Convert the image from BGR to HSV color space
if imwrite_flag:
cv2.imwrite('HSV.jpg',hsv)
lower_blue = np.array([80, 50, 50], dtype="uint8")
upper_blue = np.array([160, 255, 255], dtype="uint8")
mask = cv2.inRange(hsv, lower_blue, upper_blue) # Create a binary mask where blue pixels are white
if imwrite_flag:
cv2.imwrite('mask.jpg',mask)
# detect edges
edges = cv2.Canny(mask, 50, 100)
if imwrite_flag:
cv2.imwrite('edges.jpg',edges)
return edges # Return the edge map
def region_of_interest(edges):
# Define our region of interest to be the lower half
height, width = edges.shape
mask = np.zeros_like(edges)
# only focus lower half of the screen
polygon = np.array([[
(0, height),
(0, height / 2), # mid-right
(width, height / 2), # mid-left
(width, height),
]], np.int32)
cv2.fillPoly(mask, polygon, 255) # Fill polygon area with white (255) in the mask
cropped_edges = cv2.bitwise_and(edges, mask)
if imwrite_flag:
cv2.imwrite('roi.jpg',cropped_edges)
return cropped_edges # Return masked edge image
def detect_line_segments(cropped_edges):
# Attempt to dectect line segements
rho = 1 # Distance resolution in pixels
theta = np.pi / 180 # Angle resolution in radians
min_threshold = 10 # Minimum number of intersecting points to detect a line
line_segments = cv2.HoughLinesP(cropped_edges, rho, theta, min_threshold, # Detect line segments in the edge-detected ROI
np.array([]), minLineLength=5, maxLineGap=150)
return line_segments
def average_slope_intercept(frame, line_segments):
# Find the average slope intercept
lane_lines = []
if line_segments is None:
#print("no line segments detected")
return lane_lines
height, width, _ = frame.shape
left_fit = []
right_fit = []
boundary = 1 / 3 # Use outer thirds of the image for left/right lanes
left_region_boundary = width * (1 - boundary)
right_region_boundary = width * boundary
# Iterate through each detected line segment
for line_segment in line_segments:
for x1, y1, x2, y2 in line_segment:
if x1 == x2:
# Skip vertical lines
print("skipping vertical lines (slope = infinity")
continue
fit = np.polyfit((x1, x2), (y1, y2), 1) # Fit line using polyfit for redundancy
slope = (y2 - y1) / (x2 - x1)
intercept = y1 - (slope * x1)
if slope < 0:
# Line falls in left region and is sloped left
if x1 < left_region_boundary and x2 < left_region_boundary:
left_fit.append((slope, intercept))
else:
# Line falls in right region and is sloped right
if x1 > right_region_boundary and x2 > right_region_boundary:
right_fit.append((slope, intercept))
# Average left and right line parameters, if any were found
left_fit_average = np.average(left_fit, axis=0)
if len(left_fit) > 0:
lane_lines.append(make_points(frame, left_fit_average)) # Convert fit to screen coordinates
right_fit_average = np.average(right_fit, axis=0)
if len(right_fit) > 0:
lane_lines.append(make_points(frame, right_fit_average))
return lane_lines # Return list of fitted lane lines as pixel coordinates
def make_points(frame, line):
# Create points
height, width, _ = frame.shape
slope, intercept = line
y1 = height # bottom of the frame
y2 = int(y1 / 2) # make points from middle of the frame down
if slope == 0:
slope = 0.1 # Prevent division by zero for horizontal lines
# Calculate x coordinates using y = mx + b rearranged as x = (y - b) / m
x1 = int((y1 - intercept) / slope)
x2 = int((y2 - intercept) / slope)
return [[x1, y1, x2, y2]] # Return coordinates as a line segment
def display_lines(frame, lines, line_color=(0, 255, 0), line_width=6):
#Display lines
line_image = np.zeros_like(frame) # Create a blank image with same shape as frame
if lines is not None:
for line in lines:
for x1, y1, x2, y2 in line:
# Draw each line segment onto the blank line image
cv2.line(line_image, (x1, y1), (x2, y2), line_color, line_width)
line_image = cv2.addWeighted(frame, 0.8, line_image, 1, 1)
return line_image # Return the frame with the overlaid lines
def display_heading_line(frame, steering_angle, line_color=(0, 0, 255), line_width=5):
# Display the heading line
heading_image = np.zeros_like(frame)
height, width, _ = frame.shape
# Convert steering angle from degrees to radians
steering_angle_radian = steering_angle / 180.0 * math.pi
# Define the starting point (bottom-center of the frame)
x1 = int(width / 2)
y1 = height
# Calculate end point based on steering direction and angle
x2 = int(x1 - height / 2 / math.tan(steering_angle_radian))
y2 = int(height / 2)
cv2.line(heading_image, (x1, y1), (x2, y2), line_color, line_width) # Draw the heading line from (x1, y1) to (x2, y2)
heading_image = cv2.addWeighted(frame, 0.8, heading_image, 1, 1) # Overlay heading line on top of original frame
return heading_image
def get_steering_angle(frame, lane_lines):
# Get the steering angle
height, width, _ = frame.shape
if len(lane_lines) == 2:
# If both left and right lane lines are detected
_, _, left_x2, _ = lane_lines[0][0]
_, _, right_x2, _ = lane_lines[1][0]
mid = int(width / 2)
x_offset = (left_x2 + right_x2) / 2 - mid # Deviation from center
y_offset = int(height / 2)
elif len(lane_lines) == 1:
# If only one line is detected, use its slope for angle estimation
x1, _, x2, _ = lane_lines[0][0]
x_offset = x2 - x1
y_offset = int(height / 2)
elif len(lane_lines) == 0:
# No lines detected; assume no offset
x_offset = 0
y_offset = int(height / 2)
# Convert offset to angle (radians degrees)
angle_to_mid_radian = math.atan(x_offset / y_offset)
angle_to_mid_deg = int(angle_to_mid_radian * 180.0 / math.pi)
steering_angle = angle_to_mid_deg + 90 # 90 = straight ahead
return steering_angle
def plot_pd(p_vals, d_vals, error, show_img=False):
# Plot the PD
fig, ax1 = plt.subplots()
t_ax = np.arange(len(p_vals)) # Create x-axis (frame count)
ax1.plot(t_ax, p_vals, '-', label="P values")
ax1.plot(t_ax, d_vals, '-', label="D values")
ax2 = ax1.twinx() # Create secondary y-axis for error
ax2.plot(t_ax, error, '--r', label="Error")
ax1.set_xlabel("Frames")
ax1.set_ylabel("PD Value")
ax2.set_ylim(-90, 90) # Limit error axis range
ax2.set_ylabel("Error Value")
plt.title("PD Values over time")
fig.legend() # Display legend
fig.tight_layout()
plt.savefig("./pd_plot.png")
plt.clf() # Clear figure for future plotting
def plot_pwm(speed_pwms, turn_pwms, error, show_img=False):
# Plot the PWM
fig, ax1 = plt.subplots()
t_ax = np.arange(len(speed_pwms)) # X-axis (frame count)
ax1.plot(t_ax, speed_pwms, '-', label="Speed PWM")
ax1.plot(t_ax, turn_pwms, '-', label="Steering PWM")
ax2 = ax1.twinx() # Create second Y-axis for error
ax2.plot(t_ax, error, '--r', label="Error") # Plot error as red dashed line
ax1.set_xlabel("Frames")
ax1.set_ylabel("PWM Values")
ax2.set_ylabel("Error Value")
plt.title("PWM Values over time")
fig.legend() # Display legend
plt.savefig("./pwm_plot.png")
plt.clf() # Clear the figure
def init_motor_servo():
global throttle
global steering
GPIO.setmode(GPIO.BCM) # Use GPIO numbering instead of physical numbering
throttle_enable = 18
steering_enable = 19 # GPIO pin for steering control
GPIO.setup(throttle_enable, GPIO.OUT)
GPIO.setup(steering_enable, GPIO.OUT)
throttle = GPIO.PWM(throttle_enable, 50) # Initialize throttle PWM at 50Hz
steering = GPIO.PWM(steering_enable, 50)
throttle.start(7.5) # Start throttle at neutral duty cycle
steering.start(7.5) # Start steering at straight position
# set up the car throttle and steering PWMs
print("initializing car")
init_motor_servo()
print("Car Initialized!")
# set up video
video = cv2.VideoCapture(0)
video.set(cv2.CAP_PROP_FRAME_WIDTH, 160)
video.set(cv2.CAP_PROP_FRAME_HEIGHT, 120)
# wait for video to load
time.sleep(1)
# PD variables
kp = 0.085
kd = kp * 0.1
lastTime = 0
lastError = 0
# counter for number of ticks
counter = 0
# arrays for making the final graphs
p_vals = []
d_vals = []
err_vals = []
speed_pwm = []
steer_pwm = []
# Booleans for handling stop light
passedFirstStopSign = False
secondStopLightTick = 0
#See if we've check the stop sign
stopSignCheck = 1
# Sight debuging
sightDebug = True
# Check if it is a stop sting
isStopSignBool = False
# main while loop
print("main loop")
while counter < max_ticks:
# Read in the video feed
ret, original_frame = video.read()
# If there is no video
if original_frame is None:
print("No frame")
# resize the frame
frame = cv2.resize(original_frame, (180, 120))
# Show the resized frame
if imwrite_flag:
cv2.imwrite('Resized.jpg',frame)
# check for stop sign every couple ticks
if ((counter + 1) % stopSignCheck) == 0:
# check for the first stop sign
if not at_stop_sign:
if not passedFirstStopSign:
isStopSignBool, floorSight = isRedFloorVisible(frame)
# Trigger when first stop signed is encountered
if isStopSignBool:
print("detected first stop sign, stopping")
at_stop_sign = True
stop_start_time = time.time() # Record the stop start time
stop()
time.sleep(4)
passedFirstStopSign = True
# this is used to not check for the second stop sign until many frames later
secondStopSignTick = counter + 4
# now check for stop sign less frequently
stopSignCheck = 50 # originally 10
# add a delay to calling go faster
go_faster_tick = counter + go_faster_tick_delay
print("first stop finished!")
go()
# check for the second stop sign
elif passedFirstStopSign and counter > secondStopSignTick:
go()
isStop2SignBool, _ = isRedFloorVisible(frame)
if isStop2SignBool:
# last stop sign detected, exits while loop
print("detected second stop sign, stopping")
at_stop_sign = True
stop_start_time = time.time()
time.sleep(.3) # wait for one second
stop()
break
# Reset stop sign flag after cooldown period (8 seconds)
else:
if(time.time() - stop_start_time) > 8:
at_stop_sign = False
# makes car go faster, helps it have enough speed to get to the end of the course
# process the frame to determine the desired steering angle
edges = detect_edges(frame)
roi = region_of_interest(edges)
line_segments = detect_line_segments(roi) # Detect line segments from the edges
lane_lines = average_slope_intercept(frame, line_segments)
lane_lines_image = display_lines(frame, lane_lines) # Draw lane lines on the original frame
steering_angle = get_steering_angle(frame, lane_lines) # Compute steering angle based on lane lines
heading_image = display_heading_line(lane_lines_image,steering_angle)
if imwrite_flag:
pass
cv2.imwrite('headline.jpg', heading_image) # Save annotated output image for debugging
# calculate changes for PD
now = time.time()
dt = now - lastTime # Calculate time elapsed since last frame
print("[INFO] Frame rate: {}".format(str(float(1/float(dt)))))
if imwrite_flag:
cv2.imwrite('cropped.jpg', roi) # Save region of interest image if debug flag is enabled
deviation = steering_angle - 90
# PD Code
error = deviation
error_history.append(error) # Add current error to the history buffer
if len(error_history) > 9:
error_history.pop(0)
if len(error_history) >= 7:
temp = error_history.copy() # Make a copy of the buffer
temp.remove(max(error_history))
temp.remove(min(error_history))
filtered_error = sum(temp) / len(temp) # Calculate average of the trimmed set
else:
filtered_error = error # Use raw error if not enough history
error = filtered_error # Overwrite error with smoothed value
base_turn = 7.5
proportional = kp * error
derivative = kd * (error - lastError) / dt
# take values for graphs
p_vals.append(proportional)
d_vals.append(derivative)
err_vals.append(error)
# determine actual turn to do
turn_amt = base_turn + proportional + derivative
# Handling turning
# caps turns to make PWM values
if 7.4 < turn_amt < 7.6:
turn_straight()
pass
else:
if turn_amt < 6:
# print("raising low value")
turn_amt = 6
elif turn_amt > 9:
# print("lowering high value")
turn_amt = 9
turn(turn_amt)
pass
# Update speed and turning
steer_pwm.append(turn_amt)
plot_speed = current_speed / 200000 # Correct speed to percentage values for plotting
speed_pwm.append(plot_speed)
# Used for graphs
lastError = error
lastTime = time.time()
# update PD values for next loop
lastTime = time.time()
key = cv2.waitKey(1) # Listen for key press
if key == 27: # If ESC key is pressed
print("key 27")
break
go()
counter += 1
# clean up resources
stop()
video.release()
cv2.destroyAllWindows()
plot_pd(p_vals, d_vals, err_vals, True) # Plot proportional, derivative, & error values
plot_pwm(speed_pwm, steer_pwm, err_vals, True)
Comments
Please log in or sign up to comment.