import cv2
import numpy as np
import math
import sys
import time
import os
import subprocess
import board
import adafruit_mcp4728
import matplotlib.pyplot as plt
'''
The following code is based on:
User raja_961, “Autonomous Lane-Keeping Car Using Raspberry
Pi and OpenCV”. Instructables. URL:
https://www.instructables.com/Autonomous-Lane-Keeping-Car-U
sing-Raspberry-Pi-and/
As well as the following hackster project:
https://www.hackster.io/beagle-bone-baja-blast/beagle-bone-baja-blast-eae48e
Which in turn based its code on this hackster project:
https://www.hackster.io/really-bad-idea/autonomous-path-following-car-6c4992
'''
# Need this
passedFirstStopSign = False
# wait function for stop logic
def wait(wait_time):
start_time = time.perf_counter()
end_time = start_time + wait_time
print('waiting... ' + str(end_time) + 'seconds')
while (time.perf_counter() < end_time):
pass
return
# initializes drive control system
MCP4728A4_DEFAULT_ADDRESS = 0x64 # weird address that only works on our board
i2c = board.I2C() # uses board.SCL and board.SDA
mcp4728 = adafruit_mcp4728.MCP4728(i2c, adafruit_mcp4728.MCP4728A4_DEFAULT_ADDRESS)
'''
These are our default speed and steering variables. Initialized to their midpoint
'''
# speed is on a 0-2 scale where 1-2 is positive and 0-1 is negative.
# steering is on 0 - 2, where 0-1 is left, and 1-2 is right
speed = 1.009
base_turn = 1.175
mcp4728.channel_a.value = int(65535 / 2) # neutral throttle
mcp4728.channel_b.value = int(65535 / 2) # steering
# we might putting camera stuff in this was well
def initialize_car():
mcp4728.channel_a.value = int(65535 / 2) # Voltage = VDD
mcp4728.channel_b.value = int(base_turn * (65535 / 2)) # car by default doesn't steer straight, constant is applied
# to make it as straight as possible
def move(speed):
mcp4728.channel_a.value = int(speed * (65535 / 2)) # Voltage = VDD
# speed = speed - not sure why we have this line, commented out but will uncomment worst case
# stop function sets the throttle 0
def stop():
mcp4728.channel_a.value = int(65535 / 2)
# steep < 1 = left, steep > 1 -- right
def turn(steepness):
mcp4728.channel_b.value = int(base_turn * steepness * (65535 / 2))
# re-centers car (car's turn was initially off-center)
def re_center():
mcp4728.channel_b.value = int(base_turn * 65535 / 2)
# based on: https://www.instructables.com/Autonomous-Lane-Keeping-Car-Using-Raspberry-Pi-and/
# and https://www.hackster.io/really-bad-idea/autonomous-path-following-car-6c4992
# adapted from hackster, changed to adhere to red HSV color segments
def isRedFloorVisible(image):
"""
Detects whether or not the majority of a color on the screen is a particular color
:param image:
:param boundaries: [[color boundaries], [success boundaries]]
:return: boolean if image satisfies provided boundaries, and an image used for debugging
"""
# Convert to HSV color space
hsv_img = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)
cv2.imwrite("redfloor.jpg", hsv_img)
# parse out the color boundaries and the success boundaries
# percentage was adjusted
percentage = 25
# lower and upper range for the lower part of red
lower_red1 = np.array([0, 40, 60], dtype="uint8")
upper_red1 = np.array([10, 255, 255], dtype="uint8")
# lower and upper range for the upper part of red
lower_red2 = np.array([170, 40, 60], dtype="uint8")
upper_red2 = np.array([180, 255, 255], dtype="uint8")
# create two masks to capture both ranges of red
mask1 = cv2.inRange(hsv_img, lower_red1, upper_red1)
mask2 = cv2.inRange(hsv_img, lower_red2, upper_red2)
# combining the masks
mask = cv2.bitwise_or(mask1, mask2)
# applying the mask
output = cv2.bitwise_and(hsv_img, hsv_img, mask=mask)
# save the output image
cv2.imwrite("redfloormask.jpg", output)
# calculating what percentage of image falls between color boundaries
percentage_detected = np.count_nonzero(mask) * 100 / np.size(mask)
# if the percentage percentage_detected is betweeen the success boundaries, we return true,
# otherwise false for result
result = percentage < percentage_detected
if result:
print(percentage_detected)
return result, output
# finds edges on track and writes to images, adapted from raja_961
def detect_edges(frame):
# filter for blue lane lines
hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
cv2.imwrite("hsv.jpg", hsv)
lower_blue = np.array([90, 50, 50], dtype="uint8")
upper_blue = np.array([130, 255, 255], dtype="uint8")
mask = cv2.inRange(hsv, lower_blue, upper_blue)
cv2.imwrite("mask.jpg", mask)
# detect edges
blur = cv2.GaussianBlur(mask, (5, 5), 0)
edges = cv2.Canny(blur, 50, 100)
cv2.imwrite("edges.jpg", edges)
return edges
# determines regions of interest for lane-keeping, adapted from raja_961
def region_of_interest(edges):
height, width = edges.shape
mask = np.zeros_like(edges)
# only focus lower half of the screen
polygon = np.array([[
(0, height),
(0, 3 * height / 4),
(width, 3 * height / 4),
(width, height),
]], np.int32)
cv2.fillPoly(mask, polygon, 255)
cropped_edges = cv2.bitwise_and(edges, mask)
cv2.imwrite("roi.jpg", cropped_edges)
return cropped_edges
# adapted from raja_961
def detect_line_segments(cropped_edges):
rho = 1
theta = np.pi / 180
min_threshold = 20
line_segments = cv2.HoughLinesP(cropped_edges, rho, theta, min_threshold,
np.array([]), minLineLength=10, maxLineGap=150)
return line_segments
# takes the frame under processing and lane segments detected using Hough transform
# and returns the average slope and intercept of two lane lines, adapted from raja_961
def average_slope_intercept(frame, line_segments):
lane_lines = []
print("line_segments", line_segments)
if line_segments is None:
print("no line segments detected")
return lane_lines
height, width, _ = frame.shape
left_fit = []
right_fit = []
boundary = 0
left_region_boundary = width * (1 - boundary)
right_region_boundary = width * boundary
for line_segment in line_segments:
for x1, y1, x2, y2 in line_segment:
if x1 == x2:
print("skipping vertical lines (slope = infinity")
continue
fit = np.polyfit((x1, x2), (y1, y2), 1)
slope = (y2 - y1) / (x2 - x1)
intercept = y1 - (slope * x1)
if slope < 0:
if x1 < left_region_boundary and x2 < left_region_boundary:
left_fit.append((slope, intercept))
else:
if x1 > right_region_boundary and x2 > right_region_boundary:
right_fit.append((slope, intercept))
left_fit_average = np.average(left_fit, axis=0)
if len(left_fit) > 0:
lane_lines.append(make_points(frame, left_fit_average))
right_fit_average = np.average(right_fit, axis=0)
if len(right_fit) > 0:
lane_lines.append(make_points(frame, right_fit_average))
return lane_lines
# helper function for average_slope_intercept() function which returns
# the bounded coordinates of the lane lines (from the bottom to the middle of the frame)
# adapted from raja_961
def make_points(frame, line):
height, width, _ = frame.shape
slope, intercept = line
y1 = height # bottom of the frame
y2 = int(y1 / 2) # make points from middle of the frame down
if slope == 0:
slope = 0.1
x1 = int((y1 - intercept) / slope)
x2 = int((y2 - intercept) / slope)
return [[x1, y1, x2, y2]]
# adapted from raja_961
def display_lines(frame, lines, line_color=(0, 255, 0), line_width=6):
line_image = np.zeros_like(frame)
if lines is not None:
for line in lines:
for x1, y1, x2, y2 in line:
cv2.line(line_image, (x1, y1), (x2, y2), line_color, line_width)
line_image = cv2.addWeighted(frame, 0.8, line_image, 1, 1)
if line_image is not None:
cv2.imwrite("line_image.jpg", line_image)
else:
print("no line image")
return line_image
# adapted from raja_961
def display_heading_line(frame, steering_angle, line_color=(0, 0, 255), line_width=5):
heading_image = np.zeros_like(frame)
height, width, _ = frame.shape
steering_angle_radian = steering_angle / 180.0 * math.pi
x1 = int(width / 2)
y1 = height
x2 = int(x1 - height / 2 / math.tan(steering_angle_radian))
y2 = int(height / 2)
cv2.line(heading_image, (x1, y1), (x2, y2), line_color, line_width)
heading_image = cv2.addWeighted(frame, 0.8, heading_image, 1, 1)
return heading_image
# adapted from raja_961
def get_steering_angle(frame, lane_lines):
height, width, _ = frame.shape
if len(lane_lines) == 2:
_, _, left_x2, _ = lane_lines[0][0]
_, _, right_x2, _ = lane_lines[1][0]
mid = int(width / 2)
x_offset = (left_x2 + right_x2) / 2 - mid
y_offset = int(height / 2)
elif len(lane_lines) == 1:
x1, _, x2, _ = lane_lines[0][0]
x_offset = x2 - x1
y_offset = int(height / 2)
elif len(lane_lines) == 0:
x_offset = 0
y_offset = int(height / 2)
angle_to_mid_radian = math.atan(x_offset / y_offset)
angle_to_mid_deg = int(angle_to_mid_radian * 180.0 / math.pi)
steering_angle = angle_to_mid_deg + 90
print("steer angle", steering_angle)
return steering_angle
# function reads from kernel log to adjust speed based on speed encoder
# time readings
def get_kernel_last_line():
try:
result = subprocess.run(['tail', '-n', '1', '/var/log/kern.log'], check=True, text=True, capture_output=True)
return result.stdout.strip() # Remove leading/trailing whitespaces and newlines
except subprocess.CalledProcessError as e:
print(f"Error executing 'tail': {e}")
return None
# Main Code
# set up the car throttle and steering PWMs
initialize_car()
# set up video
video = cv2.VideoCapture(0)
cv2.waitKey(3)
video.set(cv2.CAP_PROP_FRAME_WIDTH, 320)
video.set(cv2.CAP_PROP_FRAME_HEIGHT, 240)
# wait for video to load
wait(1)
# PD variables
kp = 0.1 / 2.5
kd = 0.3 * kp
lastTime = 0
lastError = 0
# counter for number of ticks
counter = 0
# start the engines
move(speed)
# Max number of loops
max_ticks = 2000
# Booleans for handling stop light
passed_floor_stop = False
atStopLight = False
at_end = False
# check every 3 frames
stopSignCheck = 3
sightDebug = True
isStopSignBool = False
# initialized to take values for plots
prop_arr = []
deriv_arr = []
error_arr = []
steer_v_arr = []
speed_v_arr = []
frames = []
while counter < max_ticks:
ret, original_frame = video.read()
frame = cv2.resize(original_frame, (176, 144))
speed_msg = get_kernel_last_line()
# string will be of similar form to "Dec 5 19:55:26 raspberrypi kernel: [ 1562.508644] Time elapsed: 14444"
# adjusting speed based on speed encoder time delta
try:
encoder_time_delta = int(speed_msg.split("Time elapsed: ")[1])
except Exception as e:
encoder_time_delta = 2000000
if encoder_time_delta > 2000000:
speed += 0.001
else:
speed -= 0.001
# bounding speed
if speed > 1.011:
speed = 1.009
if speed < 1.007:
speed = 1.009
# adjusting speed based on encoder
move(speed)
print("Speed is", speed)
# check for floor stop every couple ticks
if ((counter + 1) % stopSignCheck) == 0:
# check for the first stop sign
if not passedFirstStopSign:
isStopSignBool, floorSight = isRedFloorVisible(frame)
if sightDebug:
cv2.imwrite("floorSight.jpg", floorSight)
if isStopSignBool:
print("detected a stop sign, stopping")
stop()
wait(3)
move(speed)
passedFirstStopSign = True
# this is used to not check for the second stop sign until many frames later
secondStopSignTick = counter + 200
# add a delay to calling go faster
go_faster_tick = counter + go_faster_tick_delay
print("first stop finished!")
# check for the second stop sign
elif passedFirstStopSign and counter > secondStopSignTick:
isStop2SignBool, _ = isRedFloorVisible(frame)
print("is a floor stop: ", isStopSignBool)
if isStop2SignBool:
# last stop sign detected, exits while loop
print("detected second stop sign, stopping")
stop()
break
# process the frame to determine the desired steering angle
edges = detect_edges(frame)
roi = region_of_interest(edges)
line_segments = detect_line_segments(roi)
lane_lines = average_slope_intercept(frame, line_segments)
lane_lines_image = display_lines(frame, lane_lines)
steering_angle = get_steering_angle(frame, lane_lines)
heading_image = display_heading_line(lane_lines_image, steering_angle)
cv2.imwrite("heading line.jpg", heading_image)
# calculate changes for PD
now = time.time()
dt = now - lastTime
if sightDebug:
cv2.imwrite("Cropped_sight.jpg", roi)
deviation = steering_angle - 90
# PD Code
error = -deviation
proportional = kp * error
derivative = kd * (error - lastError) / dt
# determine actual turn to take
turn_amt = proportional + derivative
turn_amt = 1 / (1 + np.exp(turn_amt)) * 2 / base_turn # bounding turn values
# taking values for plots
prop_arr.append(proportional)
deriv_arr.append(derivative)
error_arr.append(error)
steer_v_arr.append(turn_amt * (3.3 / 2))
speed_v_arr.append(speed * (3.3 / 2))
frames.append(counter)
# turning based on PD calculations
turn(turn_amt)
# update PD values for next loop
lastError = error
lastTime = time.time()
key = cv2.waitKey(1)
if key == 27:
break
counter += 1
# creating plots
plt.figure()
plt.plot(frames, prop_arr, '--g', label="P values", linewidth=1)
plt.plot(frames, deriv_arr, '--b', label="D values", linewidth=1)
plt.plot(frames, error_arr, '--r', label="Error values", linewidth=1)
plt.title("PD & error values over time")
plt.xlabel("Frames")
plt.legend()
plt.savefig('plotPDE.png')
plt.close()
plt.figure()
plt.title("P values over time")
plt.plot(frames, prop_arr, 'g', linewidth=0.8)
plt.xlabel("Frames")
plt.ylabel("P values")
plt.savefig('plotP.png')
plt.close()
plt.figure()
plt.title("D values over time")
plt.plot(frames, deriv_arr, 'b', linewidth=0.8)
plt.xlabel("Frames")
plt.ylabel("D values")
plt.savefig('plotD.png')
plt.close()
plt.figure()
plt.title("Error values over time")
plt.plot(frames, error_arr, 'r', linewidth=0.8)
plt.xlabel("Frames")
plt.ylabel("Error values")
plt.savefig('plotE.png')
plt.close()
plt.figure()
plt.title("Speed voltage over time")
plt.plot(frames, speed_v_arr, 'y', linewidth=0.8)
plt.xlabel("Frames")
plt.ylabel("Speed Voltage (V)")
plt.savefig('plotSpeed.png')
plt.close()
plt.figure()
plt.title("Steering voltage over time")
plt.plot(frames, steer_v_arr, 'tab:orange', linewidth=0.8)
plt.xlabel("Frames")
plt.ylabel("Steer Voltage (V)")
plt.savefig('plotSteer.png')
plt.close()
plt.figure()
plt.plot(frames, speed_v_arr, 'y', label="Speed voltage values", linewidth=0.8)
plt.plot(frames, steer_v_arr, 'tab:orange', label="Steer voltage values", linewidth=0.8)
plt.plot(frames, error_arr, 'r', label="Error values", linewidth=0.8)
plt.title("Speed, steer, & error values over time")
plt.xlabel("Frames")
plt.legend()
plt.savefig('plotSSE.png')
plt.close()
video.release()
cv2.destroyAllWindows()
Comments
Please log in or sign up to comment.