Andrew HolzbachAlex HolzbachGabrielOlamide Adeshola
Published © GPL3+

Curvy Path Crusaders

Autonomously navigates a vehicle along winding paths using real-time image processing and PID control algorithms.

IntermediateFull instructions provided92
Curvy Path Crusaders

Things used in this project

Hardware components

Webcam, Logitech® HD Pro
Webcam, Logitech® HD Pro
×1
Raspberry Pi 4 Model B
Raspberry Pi 4 Model B
×1
Optical Speed Encoder
×1
Portable Power Bank
×1
Jumper wires (generic)
Jumper wires (generic)
×1

Software apps and online services

OpenCV
OpenCV
Raspbian
Raspberry Pi Raspbian

Story

Read more

Code

LaneKeepingAlgorithm

Python
Main script for our autonomous vehicle.
# Team: Curvy Path Crusaders
# Authors: Alex Holzbach, Andrew Holzbach, Gabe Ong, and Olamide Adeshola
# Date: 12/6/2023

# References:
# User raja_961, “Autonomous Lane-Keeping Car Using Raspberry
# Pi and OpenCV”. Instructables. URL:
# https://www.instructables.com/Autonomous-Lane-Keeping-Car-Using-Raspberry-Pi-and/
# Team 10: Tyler Montague, Tremayne Currie, Jakob Valdez, Kj Dix
# "Autonomous RC Car"
# https://www.hackster.io/team-10/autonomous-rc-car-86714b

import cv2
import numpy as np
import math
import time
import board
import busio
import adafruit_mcp4728
import RPi.GPIO as GPIO
from collections import deque

# Initialize I2C bus and MCP4728 device.
i2c = busio.I2C(board.SCL, board.SDA)
dac = adafruit_mcp4728.MCP4728(i2c, 0x64)

# Function to set throttle using DAC
def set_throttle(value):
    """
    Sets the throttle of the RC car.
    :param value: Throttle value (0 to 65535, corresponding to 0V to Vref)
    """
    dac.channel_b.value = int(value)

# Function to set steering using DAC
def set_steering(value):
    """
    Sets the steering of the RC car.
    :param value: Steering value (0 to 65535, corresponding to 0V to Vref)
    """
    dac.channel_a.value = int(value)

# Encoder setup
encoder_pin = 23
GPIO.setmode(GPIO.BCM)
GPIO.setup(encoder_pin, GPIO.IN, pull_up_down=GPIO.PUD_UP)

# Global variables for encoder
encoder_count = 0
last_encoder_check_time = time.time()

# Callback function to increment encoder count
def encoder_callback(channel):
    global encoder_count
    encoder_count += 1

# Attach callback function to encoder pin
GPIO.add_event_detect(encoder_pin, GPIO.RISING, callback=encoder_callback)

# Arrays to store PID values over time
pid_proportional_values = []
pid_integral_values = []
pid_derivative_values = []
pid_errors = []

# Arrays to store steering and throttle values over time
steering_values = []
throttle_values = []

# Color detection settings
color_detection_range = 15  # HSV color tolerance
stop_color_counter = 0
stop_buffer_time = 12  # Seconds to wait before detecting color again
last_color_stop_time = 0

# Function to check for the target color in the specified region of the HSV frame
def check_for_stop_color(frame):
    global stop_color_counter, last_color_stop_time

    # Define the target color in HSV
    target_color_hsv = np.array([176, 59, 160])
    
    # Define the target color range for detection
    lower_bound = np.array([target_color_hsv[0] - color_detection_range,
                            target_color_hsv[1] - color_detection_range,
                            target_color_hsv[2] - color_detection_range])
    upper_bound = np.array([target_color_hsv[0] + color_detection_range,
                            target_color_hsv[1] + color_detection_range,
                            target_color_hsv[2] + color_detection_range])
    
    # Create a mask for color detection
    mask = cv2.inRange(frame, lower_bound, upper_bound)
    
    # Focus on the bottom 1/8th middle part of the screen
    height, width = frame.shape[:2]
    target_region = mask[int(7 * height / 8):, int(width / 3):int(2 * width / 3)]
    
    # Check if color is present in the target region
    if cv2.countNonZero(target_region) > 0:
        current_time = time.time()
        if current_time - last_color_stop_time > stop_buffer_time:
            stop_color_counter += 1
            last_color_stop_time = current_time
            return True
    return False

def calculate_speed():
    global encoder_count, last_encoder_check_time
    current_time = time.time()
    time_elapsed = current_time - last_encoder_check_time

    # Calculate encoder counts per second
    encoder_counts_per_second = encoder_count / time_elapsed

    # Reset the count and timestamp
    encoder_count = 0
    last_encoder_check_time = current_time

    # Coefficients for linear approximation
    # These values should be adjusted based on your experimental data
    slope = (40000 - 37000) / (35 - 16)  # Change in throttle / change in encoder speed
    intercept = 37000 - slope * 16       # Calculate intercept using one data point

    # Estimate speed in DAC encoder values
    estimated_speed_dac = slope * encoder_counts_per_second + intercept
    return estimated_speed_dac

def detect_edges(frame):
    # Color picker output
    h, s, v = 99, 228, 162

    # Set a range for each component
    # You may need to adjust these ranges based on your specific requirements
    hue_range = 10
    sat_range = 40
    val_range = 255

    lower_blue = np.array([h - hue_range, max(s - sat_range, 0), max(v - val_range, 0)])
    upper_blue = np.array([h + hue_range, min(s + sat_range, 255), min(v + val_range, 255)])

    mask = cv2.inRange(frame, lower_blue, upper_blue)
    
    # detect edges
    edges = cv2.Canny(mask, 50, 100)
    #cv2.imshow("edges",edges)
    
    return edges

def region_of_interest(edges):
    height, width = edges.shape
    mask = np.zeros_like(edges)

    # only focus lower half of the screen
    polygon = np.array([[
        (0, height),
        (0,  height/2),
        (width , height/2),
        (width , height),
    ]], np.int32)
    
    cv2.fillPoly(mask, polygon, 255)
    
    cropped_edges = cv2.bitwise_and(edges, mask)
    cv2.imshow("roi",cropped_edges)
    
    return cropped_edges

def detect_line_segments(cropped_edges):
    rho = 1  
    theta = np.pi / 180  
    min_threshold = 10  
    
    # Get the line segments
    line_segments = cv2.HoughLinesP(cropped_edges, rho, theta, min_threshold, 
                                    np.array([]), minLineLength=5, maxLineGap=150)

    return line_segments

def average_slope_intercept(frame, line_segments):
    lane_lines = []
    
    if line_segments is None:
    #     print("no line segments detected")
        return lane_lines

    # Get the height and width of the frame
    height, width,_ = frame.shape
    left_fit = []
    right_fit = []

    # Boundaries
    boundary = 1/3
    left_region_boundary = width * (1 - boundary)
    right_region_boundary = width * boundary
    
    # Computation for each line segment
    for line_segment in line_segments:
        for x1, y1, x2, y2 in line_segment:
            if x1 == x2:
                print("skipping vertical lines (slope = infinity")
                continue
            
            fit = np.polyfit((x1, x2), (y1, y2), 1)
            slope = (y2 - y1) / (x2 - x1)
            intercept = y1 - (slope * x1)
            
            if slope < 0:
                if x1 < left_region_boundary and x2 < left_region_boundary:
                    left_fit.append((slope, intercept))
            else:
                if x1 > right_region_boundary and x2 > right_region_boundary:
                    right_fit.append((slope, intercept))

    # Compute averages
    left_fit_average = np.average(left_fit, axis=0)
    if len(left_fit) > 0:
        lane_lines.append(make_points(frame, left_fit_average))

    right_fit_average = np.average(right_fit, axis=0)
    if len(right_fit) > 0:
        lane_lines.append(make_points(frame, right_fit_average))

    return lane_lines

def make_points(frame, line):
    height, width, _ = frame.shape
    
    slope, intercept = line
    
    y1 = height  # bottom of the frame
    y2 = int(y1 / 2)  # make points from middle of the frame down
    
    if slope == 0:
        slope = 0.1
        
    # Caculate x vals
    x1 = int((y1 - intercept) / slope)
    x2 = int((y2 - intercept) / slope)
    
    return [[x1, y1, x2, y2]]

def display_lines(frame, lines, line_color=(0, 255, 0), line_width=6):
    line_image = np.zeros_like(frame)
    
    if lines is not None:
        for line in lines:
            for x1, y1, x2, y2 in line:
                cv2.line(line_image, (x1, y1), (x2, y2), line_color, line_width)
                
    # Get the line_image
    line_image = cv2.addWeighted(frame, 0.8, line_image, 1, 1)
    
    return line_image

def display_heading_line(frame, steering_angle, line_color=(0, 0, 255), line_width=5 ):
    heading_image = np.zeros_like(frame)
    height, width, _ = frame.shape
    
    # Get the steering angle in radians
    steering_angle_radian = steering_angle / 180.0 * math.pi
    
    # Get x and y vals
    x1 = int(width / 2)
    y1 = height
    x2 = int(x1 - height / 2 / math.tan(steering_angle_radian))
    y2 = int(height / 2)
    
    cv2.line(heading_image, (x1, y1), (x2, y2), line_color, line_width)
    heading_image = cv2.addWeighted(frame, 0.8, heading_image, 1, 1)
    
    return heading_image

def get_steering_angle(frame, lane_lines):
    
    height,width,_ = frame.shape
    
    # Cases depending on the number of lines detected
    if len(lane_lines) == 2:
        _, _, left_x2, _ = lane_lines[0][0]
        _, _, right_x2, _ = lane_lines[1][0]
        mid = int(width / 2)
        x_offset = (left_x2 + right_x2) / 2 - mid
        y_offset = int(height / 2)
        
    elif len(lane_lines) == 1:
        x1, _, x2, _ = lane_lines[0][0]
        x_offset = x2 - x1
        y_offset = int(height / 2)
        
    elif len(lane_lines) == 0:
        x_offset = 0
        y_offset = int(height / 2)
        
    # Adjust angle values    
    angle_to_mid_radian = math.atan(x_offset / y_offset)
    angle_to_mid_deg = int(angle_to_mid_radian * 180.0 / math.pi)  
    steering_angle = angle_to_mid_deg + 90
    
    return steering_angle

def convert_angle_to_dac_value(steering_angle, steering_scale=1.1):
    """
    Convert the steering angle to DAC value with adjustable scaling for sharpness.
    :param steering_angle: Steering angle (0 to 180 degrees, where 90 is straight)
    :param steering_scale: Scaling factor for steering sharpness
    :return: Corresponding DAC value (0 to 65535)
    """
    # Calculate deviation from the center (90 degrees)
    deviation = steering_angle - 90

    # Apply the scaling factor to the deviation
    scaled_deviation = deviation * steering_scale

    # Ensure the scaled angle is within the range of -90 to +90 degrees deviation
    scaled_deviation = max(-90, min(scaled_deviation, 90))

    # Recalculate the steering angle with the scaled deviation
    scaled_angle = 90 + scaled_deviation

    # Assuming 0 degrees maps to 0, 90 degrees to 32768, and 180 degrees to 65535
    dac_value = int((scaled_angle * (65535 / 180)))

    # Ensure that the DAC value is within limits
    dac_value = max(0, min(dac_value, 65535))

    return dac_value

video = cv2.VideoCapture(0)
video.set(cv2.CAP_PROP_FRAME_WIDTH,320)
video.set(cv2.CAP_PROP_FRAME_HEIGHT,240)

time.sleep(1)

# Desired speed in DAC units (0 to 65535)
desired_speed_dac = 37600  # Target DAC value when moving forward

# Maximum allowed forward throttle value (to prevent going over 50000)
max_forward_throttle = 37900

# Neutral throttle value (no movement)
neutral_throttle = 32768

# PID controller parameters
kp = 1.8  # Proportional gain
kd = 0.8  # Derivative gain
ki = 0.1  # Integral gain (not used in this example)
last_error = 0
integral = 0

# time of last PID calculation
last_time = time.time()
last_print_time = time.time()

# Smoothing for steering angle
steering_smoothing_window = 2  # Number of past readings to average
steering_history = deque(maxlen=steering_smoothing_window)

try:
    while True:
        ret, frame = video.read()
        if not ret:
            break

        # HSV frame
        hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
        
        # Calculate steering angle from lane lines
        cv2.imshow("original", frame)
        edges = detect_edges(hsv)
        roi = region_of_interest(edges)
        line_segments = detect_line_segments(roi)
        lane_lines = average_slope_intercept(frame, line_segments)
        lane_lines_image = display_lines(frame, lane_lines)
        steering_angle = get_steering_angle(frame, lane_lines)
        heading_image = display_heading_line(lane_lines_image, steering_angle)
        cv2.imshow("heading line", heading_image)
 
        # Get current speed (scaled appropriately)
        current_speed_dac = calculate_speed()  # Assume this returns a DAC equivalent value

        # PID error terms
        error = desired_speed_dac - current_speed_dac
        integral += error * (time.time() - last_time)
        # Avoid integral windup by limiting integral term
        integral = max(-1000, min(integral, 1000))
        derivative = (error - last_error) / (time.time() - last_time)

        # PID output
        pid_output = kp * error + ki * integral + kd * derivative

        # Correctly apply PID output to throttle value
        throttle_value = neutral_throttle + int(pid_output)
        throttle_value = max(neutral_throttle, min(throttle_value, max_forward_throttle))

        # Check for stop color
        if check_for_stop_color(hsv):
            if stop_color_counter == 1:
                # First stop: Stop for 1.5 seconds and then continue
                print("=========================================")
                print("Stop sign detected")
                print("=========================================")
                set_throttle(32768)  # Stop the car
                time.sleep(5)
                # Resume driving
                set_throttle(throttle_value)
            elif stop_color_counter >= 2:
                # Second stop: Stop permanently
                print("=========================================")
                print("End sign detected")
                print("=========================================")
                set_throttle(32768)  # Stop the car
                break

        # Set throttle using DAC
        set_throttle(throttle_value)

        # Steering control with smoothing
        steering_history.append(convert_angle_to_dac_value(steering_angle))
        smooth_steering_angle = np.mean(steering_history)
        set_steering(smooth_steering_angle)

        last_error = error
        last_time = time.time()

        # Store values for plotting
        pid_proportional_values.append(kp * error)
        pid_integral_values.append(ki * integral)
        pid_derivative_values.append(kd * derivative)
        pid_errors.append(error)
        steering_values.append(steering_angle)
        throttle_values.append(throttle_value)

        # Print values every second
        current_time = time.time()
        if current_time - last_print_time >= 1.0:
            print("=========================================")
            print(f"PID Output: {pid_output}")
            print(f"Current Speed: {current_speed_dac:.2f}")
            print(f"Steering Angle: {steering_angle} degrees")
            print(f"Throttle DAC Value: {throttle_value}")
            print(f"Steering DAC Value: {smooth_steering_angle}")
            last_print_time = current_time

        key = cv2.waitKey(1)
        if key == 27:  # ESC key
            break

except KeyboardInterrupt:
    print("Interrupted by user")

finally:
    video.release()
    cv2.destroyAllWindows()
    GPIO.cleanup()
    print("Script ended")
    set_throttle(32768)  # Neutral throttle
    set_steering(32768)  # Neutral steering

    # Save PID values for plotting
    np.savetxt("pid_proportional_values.csv", pid_proportional_values, delimiter=",")
    np.savetxt("pid_integral_values.csv", pid_integral_values, delimiter=",")
    np.savetxt("pid_derivative_values.csv", pid_derivative_values, delimiter=",")
    np.savetxt("pid_errors.csv", pid_errors, delimiter=",")

    # Save steering and throttle values for plotting
    np.savetxt("steering_values.csv", steering_values, delimiter=",")
    np.savetxt("throttle_values.csv", throttle_values, delimiter=",")

Color Picker

Python
Used to find HSV color values for calibration
import cv2
import numpy as np

def calculate_average_color(roi):
    """Calculate the average color of the region of interest."""
    average_color_per_row = np.average(roi, axis=0)
    average_color = np.average(average_color_per_row, axis=0)
    average_color = np.uint8(average_color)  # Convert to uint8
    return average_color

# Start video capture
video = cv2.VideoCapture(0)

while True:
    ret, frame = video.read()
    if not ret:
        break

    hsv_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)

    height, width, _ = frame.shape

    # Define the size and position of the box
    box_size = 50  # Size of the box
    x1 = width // 2 - box_size // 2
    y1 = height // 2 - box_size // 2
    x2 = x1 + box_size
    y2 = y1 + box_size

    # Extract the region of interest (ROI)
    roi = hsv_frame[y1:y2, x1:x2]

    # Calculate average color
    average_color = calculate_average_color(roi)

    # Display the ROI and average color on the frame
    cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
    cv2.putText(frame, f"Average HSV Color: {average_color}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)

    # Show the frame
    cv2.imshow("Frame", frame)

    # Break the loop if 'q' is pressed
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

video.release()
cv2.destroyAllWindows()

Credits

Andrew Holzbach
1 project • 0 followers
Contact
Alex Holzbach
2 projects • 1 follower
Contact
Gabriel
1 project • 1 follower
Contact
Olamide Adeshola
1 project • 0 followers
Contact
Thanks to raja_961.

Comments

Please log in or sign up to comment.