Liam GarrisonDavid PadiernaConor WilliamsSamuel Xu
Published

Beagle Bone Baja Blast

Creating an autonomous self driving car that uses lanekeeping and stops at "stop boxes".

IntermediateWork in progress143
Beagle Bone Baja Blast

Things used in this project

Hardware components

Logitech Webcam
×1
BBAI64
×1
USB Wifi Adapter
×1
Portable Charger
×1

Software apps and online services

OpenCV
OpenCV

Story

Read more

Code

lanekeepingalgorithm_py.py

Python
Based on code from User raja_961, “Autonomous Lane-Keeping Car Using Raspberry Pi and OpenCV”. Instructables. URL:
https://www.instructables.com/Autonomous-Lane-Keeping-Car-U
sing-Raspberry-Pi-and/
import cv2
import numpy as np
import matplotlib.pyplot as plt
import math
import sys
import time
import os

# based on: https://www.instructables.com/Autonomous-Lane-Keeping-Car-Using-Raspberry-Pi-and/

# and https://www.hackster.io/really-bad-idea/autonomous-path-following-car-6c4992

# Throttle
throttlePin = "P8_13"
go_forward = 7.91
go_faster_addition = 0.022
go_faster_tick_delay = 80
go_faster_tick = 0  # Do not change this here. Code will set this value after seeing stop sign
dont_move = 7.5
pwmspeed = 1550000
speed = 12000000

# Steering
steeringPin = "P9_14"
left = 9
right = 6

# Max number of loops
max_ticks = 2000

# Booleans for handling stop light
passedStopLight = False
atStopLight = False
passedFirstStopSign = False

secondStopLightTick = 0


def getRedFloorBoundaries():
    """
    Gets the hsv boundaries and success boundaries indicating if the floor is red
    :return: [[lower color and success boundaries for red floor], [upper color and success boundaries for red floor]]
    """
    return getBoundaries("redboundaries.txt")


def isRedFloorVisible(frame):
    """
    Detects whether or not the floor is red
    :param frame: Image
    :return: [(True is the camera sees a red on the floor, false otherwise), video output]
    """
    print("Checking for floor stop")
    boundaries = getRedFloorBoundaries()
    return isMostlyColor(frame, boundaries)


def getTrafficRedLightBoundaries():
    """
    Gets the traffic red light hsv boundaries and success boundaries
    :return: [[lower color and success boundaries for red light], [upper color and success boundaries for red light]]
    """
    return getBoundaries("trafficRedBoundaries.txt")


def isTrafficRedLightVisible(frame):
    """
    Detects whether or not we can see a stop sign
    :param frame:
    :return: [(True is the camera sees a stop light, false otherwise), video output]
    """
    print("Checking for traffic stop")
    boundaries = getTrafficRedLightBoundaries()
    return isMostlyColor(frame, boundaries)


def getTrafficGreenLightBoundaries():
    """
    Gets the traffic green light hsv boundaries and success boundaries
    :return: [[lower color and success boundaries for green light], [upper color and success boundaries for green light]]
    """
    return getBoundaries("trafficGreenboundaries.txt")


def isTrafficGreenLightVisible(frame):
    """
    Detects whether or not we can see a green traffic light
    :param frame:
    :return: [(True is the camera sees a green light, false otherwise), video output]
    """
    print("Checking For Green Light")
    boundaries = getTrafficGreenLightBoundaries()
    return isMostlyColor(frame, boundaries)


def isMostlyColor(image, boundaries):
    """
    Detects whether or not the majority of a color on the screen is a particular color
    :param image:
    :param boundaries: [[color boundaries], [success boundaries]]
    :return: boolean if image satisfies provided boundaries, and an image used for debugging
    """
    #Convert to HSV color space
    hsv_img = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)

    #parse out the color boundaries and the success boundaries
    color_boundaries = boundaries[0]
    percentage = boundaries[1]

    lower = np.array(color_boundaries[0])
    upper = np.array(color_boundaries[1])
    mask = cv2.inRange(hsv_img, lower, upper)
    output = cv2.bitwise_and(hsv_img, hsv_img, mask=mask)

    #Calculate what percentage of image falls between color boundaries
    percentage_detected = np.count_nonzero(mask) * 100 / np.size(mask)
    print("percentage_detected " + str(percentage_detected) + " lower " + str(lower) + " upper " + str(upper))
    # If the percentage percentage_detected is betweeen the success boundaries, we return true, otherwise false for result
    result = percentage[0] < percentage_detected <= percentage[1]
    if result:
        print(percentage_detected)
    return result, output


def getBoundaries(filename):
    """
    Reads the boundaries from the file filename
    Format:
        [0] lower: [H, S, V, lower percentage for classification of success]
        [1] upper: [H, S, V, upper percentage for classification of success]
    :param filename: file containing boundary information as above
    :return: [[lower color and success boundaries], [upper color and success boundaries]]
    """
    default_lower_percent = 50
    default_upper_percent = 100
    with open(filename, "r") as f:
        boundaries = f.readlines()
        lower_data = [val for val in boundaries[0].split(",")]
        upper_data = [val for val in boundaries[1].split(",")]

        if len(lower_data) >= 4:
            lower_percent = float(lower_data[3])
        else:
            lower_percent = default_lower_percent

        if len(upper_data) >= 4:
            upper_percent = float(upper_data[3])
        else:
            upper_percent = default_upper_percent

        lower = [int(x) for x in lower_data[:3]]
        upper = [int(x) for x in upper_data[:3]]
        boundaries = [lower, upper]
        percentages = [lower_percent, upper_percent]
    return boundaries, percentages


def initialize_car():
    # give 7.5% duty at 50Hz to throttle
    with open('/dev/bone/pwm/1/a/duty_cycle', 'w') as filetowrite:
      filetowrite.write('1550000')

    # wait for car to be ready
    input()
    with open('/dev/bone/pwm/1/b/duty_cycle', 'w') as filetowrite:
      filetowrite.write('1500000')


def stop():
    """
    Stops the car
    :return: none
    """
    with open('/dev/bone/pwm/1/a/duty_cycle', 'w') as filetowrite:
      filetowrite.write('1550000')


def go():
    """
    Sends the car forward at a default PWM
    :return: none
    """
    
    with open('/dev/bone/pwm/1/a/duty_cycle', 'w') as filetowrite:
      filetowrite.write('1660000')


def go_faster():
    """
    Sends the car forward at a faster default PWM
    :return: none
    """
    
    with open('/dev/bone/pwm/1/a/duty_cycle', 'w') as filetowrite:
      filetowrite.write(str(pwmspeed))



def go_backwards():
    """
    (Attempts to) send the car backwards
    :return: none
    """
    with open('/dev/bone/pwm/1/a/duty_cycle', 'w') as filetowrite:
      filetowrite.write('1500000')



def detect_edges(frame):
    # filter for blue lane lines
    hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
    # cv2.imshow("HSV",hsv)
    lower_blue = np.array([90, 120, 0], dtype="uint8")
    upper_blue = np.array([150, 255, 255], dtype="uint8")
    mask = cv2.inRange(hsv, lower_blue, upper_blue)
    # cv2.imshow("mask",mask)

    # detect edges
    edges = cv2.Canny(mask, 50, 100)
    cv2.imshow("edges",edges)

    return edges


def region_of_interest(edges):
    height, width = edges.shape
    mask = np.zeros_like(edges)

    # only focus lower half of the screen
    polygon = np.array([[
        (0, height),
        (0, height / 2),
        (width, height / 2),
        (width, height),
    ]], np.int32)

    cv2.fillPoly(mask, polygon, 255)

    cropped_edges = cv2.bitwise_and(edges, mask)
    # cv2.imshow("roi",cropped_edges)

    return cropped_edges


def detect_line_segments(cropped_edges):
    rho = 1
    theta = np.pi / 180
    min_threshold = 10

    line_segments = cv2.HoughLinesP(cropped_edges, rho, theta, min_threshold,
                                    np.array([]), minLineLength=5, maxLineGap=150)

    return line_segments


def average_slope_intercept(frame, line_segments):
    lane_lines = []

    if line_segments is None:
        print("no line segments detected")
        return lane_lines

    height, width, _ = frame.shape
    left_fit = []
    right_fit = []

    boundary = 1 / 3
    left_region_boundary = width * (1 - boundary)
    right_region_boundary = width * boundary

    for line_segment in line_segments:
        for x1, y1, x2, y2 in line_segment:
            if x1 == x2:
                print("skipping vertical lines (slope = infinity")
                continue

            fit = np.polyfit((x1, x2), (y1, y2), 1)
            slope = (y2 - y1) / (x2 - x1)
            intercept = y1 - (slope * x1)

            if slope < 0:
                if x1 < left_region_boundary and x2 < left_region_boundary:
                    left_fit.append((slope, intercept))
            else:
                if x1 > right_region_boundary and x2 > right_region_boundary:
                    right_fit.append((slope, intercept))

    left_fit_average = np.average(left_fit, axis=0)
    if len(left_fit) > 0:
        lane_lines.append(make_points(frame, left_fit_average))

    right_fit_average = np.average(right_fit, axis=0)
    if len(right_fit) > 0:
        lane_lines.append(make_points(frame, right_fit_average))

    return lane_lines


def make_points(frame, line):
    height, width, _ = frame.shape

    slope, intercept = line

    y1 = height  # bottom of the frame
    y2 = int(y1 / 2)  # make points from middle of the frame down

    if slope == 0:
        slope = 0.1

    x1 = int((y1 - intercept) / slope)
    x2 = int((y2 - intercept) / slope)

    return [[x1, y1, x2, y2]]


def display_lines(frame, lines, line_color=(0, 255, 0), line_width=6):
    line_image = np.zeros_like(frame)

    if lines is not None:
        for line in lines:
            for x1, y1, x2, y2 in line:
                cv2.line(line_image, (x1, y1), (x2, y2), line_color, line_width)

    line_image = cv2.addWeighted(frame, 0.8, line_image, 1, 1)

    return line_image


def display_heading_line(frame, steering_angle, line_color=(0, 0, 255), line_width=5):
    heading_image = np.zeros_like(frame)
    height, width, _ = frame.shape

    steering_angle_radian = steering_angle / 180.0 * math.pi

    x1 = int(width / 2)
    y1 = height
    x2 = int(x1 - height / 2 / math.tan(steering_angle_radian))
    y2 = int(height / 2)

    cv2.line(heading_image, (x1, y1), (x2, y2), line_color, line_width)
    heading_image = cv2.addWeighted(frame, 0.8, heading_image, 1, 1)

    return heading_image


def get_steering_angle(frame, lane_lines):
    height, width, _ = frame.shape

    if len(lane_lines) == 2:
        _, _, left_x2, _ = lane_lines[0][0]
        _, _, right_x2, _ = lane_lines[1][0]
        mid = int(width / 2)
        x_offset = (left_x2 + right_x2) / 2 - mid
        y_offset = int(height / 2)

    elif len(lane_lines) == 1:
        x1, _, x2, _ = lane_lines[0][0]
        x_offset = x2 - x1
        y_offset = int(height / 2)

    elif len(lane_lines) == 0:
        x_offset = 0
        y_offset = int(height / 2)

    angle_to_mid_radian = math.atan(x_offset / y_offset)
    angle_to_mid_deg = int(angle_to_mid_radian * 180.0 / math.pi)
    steering_angle = angle_to_mid_deg + 90

    return steering_angle


def plot_pd(p_vals, d_vals, error, show_img=False):
    fig, ax1 = plt.subplots()
    t_ax = np.arange(len(p_vals))
    ax1.plot(t_ax, p_vals, '-', label="P values")
    ax1.plot(t_ax, d_vals, '-', label="D values")
    ax2 = ax1.twinx()
    ax2.plot(t_ax, error, '--r', label="Error")

    ax1.set_xlabel("Frames")
    ax1.set_ylabel("PD Value")
    ax2.set_ylim(-90, 90)
    ax2.set_ylabel("Error Value")

    plt.title("PD Values over time")
    fig.legend()
    fig.tight_layout()
    plt.savefig("pd_plot.png")

    if show_img:
        plt.show()
    plt.clf()


def plot_pwm(speed_pwms, turn_pwms, error, show_img=False):
    fig, ax1 = plt.subplots()
    t_ax = np.arange(len(speed_pwms))
    ax1.plot(t_ax, speed_pwms, '-', label="Speed PWM")
    ax1.plot(t_ax, turn_pwms, '-', label="Steering PWM")
    ax2 = ax1.twinx()
    ax2.plot(t_ax, error, '--r', label="Error")

    ax1.set_xlabel("Frames")
    ax1.set_ylabel("PWM Values")
    ax2.set_ylabel("Error Value")

    plt.title("PWM Values over time")
    fig.legend()
    plt.savefig("pwm_plot.png")

    if show_img:
        plt.show()
    plt.clf()


# set up the car throttle and steering PWMs
initialize_car()

# set up video
video = cv2.VideoCapture(2)
cv2.waitKey(3)
video.set(cv2.CAP_PROP_FRAME_WIDTH, 176)
video.set(cv2.CAP_PROP_FRAME_HEIGHT, 144)

# wait for video to load
time.sleep(1)

# PD variables
kp = 0.085
kd = kp * 0.04
lastTime = 0
lastError = 0

# counter for number of ticks
counter = 0

# start the engines
go()

# arrays for making the final graphs
p_vals = []
d_vals = []
err_vals = []
speed_pwm = []
steer_pwm = []
current_speed = go_forward

stopSignCheck = 1
sightDebug = False
isStopSignBool = False
while counter < max_ticks:
    ret, original_frame = video.read()
    frame = cv2.resize(original_frame, (176, 144))
    print("while")
    if sightDebug:
        cv2.imshow("Resized Frame", frame)
        
    # Read from file to determine speed
    if (0 == 0):
      # with open('/var/log/messages', 'rb') as file: #/var/log/kern.log
      with open('/var/log/kern.log', 'rb') as file:
        try:
            file.seek(-2, os.SEEK_END)
            while file.read(1) != b'\n':
                file.seek(-2, os.SEEK_CUR)
        except OSError:
            file.seek(0)
        last_line = file.readline()
        print(last_line)
        
        if (last_line.split()[9] != b'reset'):
          speed = int(last_line.split()[9])
        print("speed:", speed)
        
        if speed > 12000000:
          pwmspeed += 1000
        
        else:
          pwmspeed -= 1000
          
        if pwmspeed > 1660000:
          pwmspeed = 1660000
        
        if pwmspeed < 1450000:
          pwmspeed = 1450000
        
        print("pwmspeed:", pwmspeed)
      #end fileread

    # check for stop sign/traffic light every couple ticks
    if ((counter + 1) % stopSignCheck) == 0:
        # check for stop light
        if not passedStopLight and not atStopLight:
            trafficStopBool, _ = isTrafficRedLightVisible(frame)
            print(trafficStopBool)
            if trafficStopBool:
                print("detected red light, stopping")
                stop()
                atStopLight = True
                continue
        # check for the first stop sign
        elif passedStopLight and not passedFirstStopSign:
            isStopSignBool, floorSight = isRedFloorVisible(frame)
            if sightDebug:
                cv2.imshow("floorSight", floorSight)
            if isStopSignBool:
                print("detected first stop sign, stopping")
                stop()
                time.sleep(2)
                passedFirstStopSign = True
                # this is used to not check for the second stop sign until many frames later
                secondStopSignTick = counter + 200
                # now check for stop sign less frequently
                stopSignCheck = 3
                # add a delay to calling go faster
                go_faster_tick = counter + go_faster_tick_delay
                print("first stop finished!")
        # check for the second stop sign
        elif passedStopLight and passedFirstStopSign and counter > secondStopSignTick:
            isStop2SignBool, _ = isRedFloorVisible(frame)
            print("is a floor stop: ", isStopSignBool)
            if isStop2SignBool:
                # last stop sign detected, exits while loop
                print("detected second stop sign, stopping")
                stop()
                break

    # makes car go faster, helps it have enough speed to get to the end of the course
    if isStopSignBool and counter == go_faster_tick:
        print("Going FASTER")
        go_faster()
        current_speed += go_faster_addition

    # look for green stop light while waiting after red stop light
    if not passedStopLight and atStopLight:
        print("waiting at red light")
        trafficGoBool, _ = isTrafficGreenLightVisible(frame)
        if trafficGoBool:
            passedStopLight = True
            atStopLight = False
            print("green light!")
            go()
        else:
            continue

    # process the frame to determine the desired steering angle
    # cv2.imshow("original",frame)
    edges = detect_edges(frame)
    roi = region_of_interest(edges)
    line_segments = detect_line_segments(roi)
    lane_lines = average_slope_intercept(frame, line_segments)
    lane_lines_image = display_lines(frame, lane_lines)
    steering_angle = get_steering_angle(frame, lane_lines)
    # heading_image = display_heading_line(lane_lines_image,steering_angle)
    # cv2.imshow("heading line",heading_image)

    # calculate changes for PD
    now = time.time()
    dt = now - lastTime
    if sightDebug:
        cv2.imshow("Cropped sight", roi)
    deviation = steering_angle - 90

    # PD Code
    error = -deviation
    base_turn = 7.5
    proportional = kp * error
    derivative = kd * (error - lastError) / dt

    # take values for graphs
    p_vals.append(proportional)
    d_vals.append(derivative)
    err_vals.append(error)

    # determine actual turn to do
    turn_amt = base_turn + proportional + derivative
    
    steering = 1500000 + (proportional + derivative) * 75000
    steering = (int)(steering)
    print(steering)
    
    if steering > 1000000 and steering < 2000000:
      steering = str(steering)
      # caps turns to make PWM values
      if 7.2 < turn_amt < 7.8:
          with open('/dev/bone/pwm/1/b/duty_cycle', 'w') as filetowrite:
            filetowrite.write('1500000')
      elif turn_amt > left:
          with open('/dev/bone/pwm/1/b/duty_cycle', 'w') as filetowrite:
            filetowrite.write(steering)
      elif turn_amt < right:
          with open('/dev/bone/pwm/1/b/duty_cycle', 'w') as filetowrite:
            filetowrite.write(steering)

    # turn!
    #PWM.set_duty_cycle(steeringPin, turn_amt)

    # take values for graphs
    steer_pwm.append(turn_amt)
    speed_pwm.append(current_speed)

    # update PD values for next loop
    lastError = error
    lastTime = time.time()

    key = cv2.waitKey(1)
    if key == 27:
        break

    counter += 1

# clean up resources
with open('/dev/bone/pwm/1/a/duty_cycle', 'w') as filetowrite:
    filetowrite.write('1550000')

video.release()
cv2.destroyAllWindows()
#PWM.set_duty_cycle(throttlePin, 7.5)
#PWM.set_duty_cycle(steeringPin, 7.5)
#PWM.stop(throttlePin)
#PWM.stop(steeringPin)
#PWM.cleanup()

plot_pd(p_vals, d_vals, err_vals, True)
plot_pwm(speed_pwm, steer_pwm, err_vals, True)

gpiod_driver.c

C/C++
#include <linux/module.h>
#include <linux/of_device.h>
#include <linux/kernel.h>
#include <linux/gpio/consumer.h>
#include <linux/platform_device.h>
#include <linux/interrupt.h>
#include <linux/init.h>
#include <linux/unistd.h>
#include <linux/ktime.h>
#include </usr/include/stdio.h>

// BeagleBone Baja Blast

// Initialize interrupt and device point
unsigned int irq_number;
struct gpio_desc *encoder_gpiod;
int begin = 0;
// struct gpio_desc *led_gpiod;
// struct gpio_desc *button_gpiod;

ktime_t start, end, elapsed_time;

/**
 * @brief Interrupt service routine is called, when interrupt is triggered, slight changed for gpiod
 */
static irq_handler_t irq_handler(unsigned int irq, void *dev_id, struct pt_regs *regs) {
	   printk("Encoder activated!\n");
     // First time ISR is triggered
     /*
     if (begin == 0) {
         begin = 1;
         start = ktime_get();
     }
     */
     // Second time (counting a tick)
     //if (begin == 1) {
         // begin = 0 // The logic is that start time simply needs to be initialized
         end = ktime_get();
         elapsed_time = ktime_sub(end, start);
         //start = ktime_get();
         start = end;
         
         fp = fopen("/home/debian/finalproj/speed.txt", "w");
        // Write string to file
        fputs(str, fp);
        // Close the file pointer
        fclose(fp);
         
         printk("Time elapsed: %llu \n", elapsed_time);
	       // gpiod_set_value(led_gpiod, !gpiod_get_value(led_gpiod));
     //}
	   return (irq_handler_t) IRQ_HANDLED; 
}


static int led_probe(struct platform_device *pdev) {
    // printk(KERN_INFO "Toggle function has been called\n");
    start = ktime_get();
    encoder_gpiod = devm_gpiod_get(&pdev->dev, "userbutton", GPIOD_IN);
    gpiod_set_debounce(encoder_gpiod, 1000000);
    irq_number = gpiod_to_irq(encoder_gpiod);
    
    // We want to set everything up: Get gpio pointers for the button and LED, set up debounce
    
    //button_gpiod = devm_gpiod_get(&pdev->dev, "button", GPIOD_IN);
    //led_gpiod = devm_gpiod_get(&pdev->dev, "toggle", GPIOD_OUT_LOW);
    //irq_number = gpiod_to_irq(button_gpiod);
    
    if (request_irq(irq_number, (irq_handler_t) irq_handler, IRQF_TRIGGER_RISING, "button", pdev) != 0) {
        printk("Failed to request interrupt.\n");
        return -1;
    };
    // gpiod_set_debounce(button_gpiod, 150);
    return 0;
}
// remove the LED to deload the ISR and ensure smooth functioning afterward
static int led_remove(struct platform_device *pdev)
    {
    // Want to free the irq here
    // Exit message to indicate when function terminates
    printk(KERN_INFO "The LED device module has been removed\n");
    // Free the ISR
    free_irq(irq_number, NULL);
    return 0;
    }

// Matching table
static struct of_device_id matchy_match[] = {
    {.compatible = "hello"},
    {/* leave alone - keep this here (end node) */},
};

// platform driver object
static struct platform_driver adam_driver = {
    .probe = led_probe,
    .remove = led_remove,
    .driver = {
    .name = "The Rock: this name doesn't even matter",
    .owner = THIS_MODULE,
    .of_match_table = matchy_match,
    },
};

module_platform_driver(adam_driver);
MODULE_DESCRIPTION("424\'s finest");
MODULE_AUTHOR("GOAT");
MODULE_LICENSE("GPL v2");
MODULE_ALIAS("platform:adam_driver");

setup.py

Python
# P9_14 - Speed/ESC
with open('/dev/bone/pwm/1/a/period', 'w') as filetowrite:
  filetowrite.write('20000000')
with open('/dev/bone/pwm/1/a/duty_cycle', 'w') as filetowrite:
  filetowrite.write('1550000')
with open('/dev/bone/pwm/1/a/enable', 'w') as filetowrite:
  filetowrite.write('1')

# P9_16 - Steering
with open('/dev/bone/pwm/1/b/period', 'w') as filetowrite:
  filetowrite.write('20000000')
with open('/dev/bone/pwm/1/b/duty_cycle', 'w') as filetowrite:
  filetowrite.write('1500000')
with open('/dev/bone/pwm/1/b/enable', 'w') as filetowrite:
  filetowrite.write('1')

BONE-PWM0.dts

C/C++
// SPDX-License-Identifier: GPL-2.0
/*
 * Copyright (C) 2022 BeagleBoard.org - https://beagleboard.org/
 *
 * https://elinux.org/Beagleboard:BeagleBone_cape_interface_spec#PWM
 */

/dts-v1/;
/plugin/;
#include <dt-bindings/gpio/gpio.h>
#include <dt-bindings/board/k3-j721e-bone-pins.h>
/*
 * Helper to show loaded overlays under: /proc/device-tree/chosen/overlays/
 */
&{/chosen} {
	overlays {
		BONE-PWM0.kernel = __TIMESTAMP__;
	};
};

&bone_pwm_0 {
	status = "okay";
};

&{/} {
  gpio-leds {
    compatible = "hello";
    userbutton-gpios = <gpio_P8_03 GPIO_ACTIVE_HIGH>;
  };
};

Credits

Liam Garrison

Liam Garrison

1 project • 0 followers
David Padierna

David Padierna

2 projects • 0 followers
Conor Williams

Conor Williams

2 projects • 0 followers
Samuel Xu

Samuel Xu

2 projects • 5 followers

Comments