Sambhu BalakrishnanGabriel ZookBill RuiNik Gautam
Published

Techno Turbo Tetracyclists

A little self-driving car that can stay between lanes and know when to stop is like 90% of a Tesla Model S.

IntermediateFull instructions provided81
Techno Turbo Tetracyclists

Things used in this project

Hardware components

Logitech C270
×1
Raspberry Pi 4 Model B
Raspberry Pi 4 Model B
×1
Optical Speed Encoder
×1
Battery Pack for RC car
×1
Charmast Battery Bank (for RPi)
×1

Software apps and online services

OpenCV
OpenCV

Story

Read more

Code

main.py

Python
# Drawn from 
# User raja_961, Autonomous Lane-Keeping Car Using Raspberry Pi and OpenCV. Instructables. URL:
# https://www.instructables.com/Autonomous-Lane-Keeping-Car-Using-Raspberry-Pi-and/
import pigpio
import time
from pwm_control import pwm_dev
import cv_code
import cv2
import numpy as np
import math
from speed import speed_encoder
from pd import PD_Controller
import matplotlib.pyplot as plt

# init pi for pigpio (used for PWM)
pi = pigpio.pi()

# pin controlling steering
STEER_PIN = 2
# pin controlling main drive motor
MOTOR_PIN = 3
# duty cycle for stopped motor
MOTOR_STOP = PD_Controller.MOTOR_STOP
STEERING_STOP = PD_Controller.STEERING_STOP

# Set motor and steering to neutral
motor_pwm_duty = MOTOR_STOP
steer_pwm_duty = STEERING_STOP
motor_dev = pwm_dev(pin=MOTOR_PIN, pwmFreq=50, pwmDuty=motor_pwm_duty)
steer_dev = pwm_dev(pin=STEER_PIN, pwmFreq=50, pwmDuty=steer_pwm_duty)

# Start camera and set resolution
video = cv2.VideoCapture(0)
video.set(cv2.CAP_PROP_FRAME_WIDTH,320) # set the width to 320 p
video.set(cv2.CAP_PROP_FRAME_HEIGHT,240) # set the height to 240 p

# Speed encoder init
s_encoder = speed_encoder()
# Wait to give file time to open
time.sleep(0.5)

# For plotting data later
figure, axis = plt.subplots(2,2)
mdc = []
sdc = []
speeds = []
angles = []
error_s_a = []
pterm_s_a = []
dterm_s_a = []
error_m_a = []
pterm_m_a = []
dterm_m_a = []

# PD Controllers
steer_pd = PD_Controller(True, steer_dev)
speed_pd = PD_Controller(False, motor_dev)
# Give time for duty cycle setpoint for ESC
input("Press enter when ESC double beep")
# Run motors for 0.5s to confirm working
motor_dev.set_duty_percent(8)
time.sleep(.5)
motor_dev.set_duty_percent(7.5)
# Wait to start following
input("Press enter when wheels stop - starts drive")
# Main loop
ct = 0  # loop count for cv
steering_angle = 90  # desired angle
contains_red = False  # tracks red detection
stopping = False  # tracks stop status
last_red = -1  # tracks if red detected
times_stopped = 0  # tracks number of times stopped
try:
    while True:
        # Only call CV code every 10 frames to avoid slowing down too much
        if ct == 10:
            # Read in current video frame
            ret,frame = video.read()
            #Convert to HSV
            hsv = cv_code.convert_to_HSV(frame)
            # Detect frame edges
            edges = cv_code.detect_edges(hsv)
            # Get ROI from frame edges
            roi = cv_code.region_of_interest(edges)
            # Display for debugging
            cv2.imshow('roi', roi)
            # Get line segments in roi (from lanes)
            line_segments = cv_code.detect_line_segments(roi)
            #Get average slope intercept from line segments
            lane_lines = cv_code.average_slope_intercept(frame,line_segments)
            # Display line image
            lane_lines_image = cv_code.display_lines(frame,lane_lines)
            # Infer steering angle from lane lines
            steering_angle = cv_code.get_steering_angle(frame, lane_lines)
            # Display for debugging
            heading_image = cv_code.display_heading_line(lane_lines_image,steering_angle)
            print('steering angle: ' + str(steering_angle))
            ct = 0
            # Check if frame contains "red" (stop box)
            contains_red = cv_code.contains_red(frame, 30+times_stopped*30)
            if contains_red:
                print(f"Contains red: {contains_red}")
                # Mark current time
                cur_time = time.time()
                # Compare current time to last time we saw red
                # to avoid stopping twice at same box
                if cur_time - last_red > 10000:
                    # if set, stop motors
                    stopping = True
        # Compute and set steering duty cycle
        error_s, pterm_s, dterm_s = steer_pd.go_to_target(steering_angle)
        
        # Get speed
        speed = s_encoder.get_speed()

        # Increment frame count
        ct += 1

        # Compute and set main motor duty cycle
        error_m, pterm_m, dterm_m = speed_pd.go_to_target(150, curr=speed)

        # Append all values (for plotting later)
        angles.append(steering_angle)
        speeds.append(speed)
        mdc.append(motor_dev.duty)
        sdc.append(steer_dev.duty)
        error_s_a.append(error_s)
        pterm_s_a.append(pterm_s)
        dterm_s_a.append(dterm_s)
        error_m_a.append(error_m)
        pterm_m_a.append(pterm_m)
        dterm_m_a.append(dterm_m)

        # Orange box detected
        if stopping:
            # Stop main motor
            motor_dev.set_duty_percent(MOTOR_STOP)
            # First stop
            if times_stopped == 0:
                # wait for 5 seconds
                time.sleep(5)
                stopping = False
                # count this stop
                times_stopped = 1
            else:
                time.sleep(10000) # "Permanent" stop

except KeyboardInterrupt:
    # Cleanup
    video.release()
    cv2.destroyAllWindows()
    s_encoder.close()
    # Stop motor
    motor_dev.set_duty_percent(MOTOR_STOP)
    # Convert collected data to np arrays
    m_duty_cycle = np.array(mdc)
    s_duty_cycle = np.array(sdc)
    speeds_arr = np.array(speeds)
    angles_arr = np.array(angles)
    X = np.array(range(len(speeds)))
    # Plot data
    axis[0,0].plot(X, speeds_arr)
    axis[0,0].set_title("Speed encoder reading")
    axis[1,0].plot(X, m_duty_cycle)
    axis[1,0].set_title("Drive motor duty cycle")
    axis[0,1].plot(X, angles_arr)
    axis[0,1].set_title("CV target steering angle")
    axis[1,1].plot(X, s_duty_cycle)
    axis[1,1].set_title("Steering motor duty cycle")
    plt.show()
    
    # Save all data
    np.save('data/motor_duty_cycle', np.array(m_duty_cycle))
    np.save('data/steer_duty_cycle', np.array(s_duty_cycle))
    np.save('data/speed', np.array(speeds_arr))
    np.save('data/angle', np.array(angles_arr))
    np.save('data/steer_err', np.array(error_s_a))
    np.save('data/steer_p', np.array(pterm_s_a))
    np.save('data/steer_d', np.array(dterm_s_a))
    np.save('data/motor_err', np.array(error_m_a))
    np.save('data/motor_p', np.array(pterm_m_a))
    np.save('data/motor_d', np.array(dterm_m_a))

cv_code.py

Python
import cv2
import numpy as np
import math

def contains_red(frame, threshold=30):
    # convert to HSV
    frame_HSV = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)

    # Allow lower red hues to pass through
    lower_mask = cv2.inRange(frame_HSV,np.array([0, 80, 50]),np.array([20, 255, 255]))

    # Allow lower red hues to pass through
    upper_mask = cv2.inRange(frame_HSV,np.array([145, 80, 50]),np.array([179, 255, 255]))

    # Combine masks
    mask = cv2.bitwise_or(lower_mask, upper_mask)

    # Calculate percent of the screen that is red
    percent = 100 * (np.sum(mask/255) / mask.size)

    # Return True if a large enough number of pixels is red
    return percent > threshold

def convert_to_HSV(frame):
    hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)  
  return hsv

def detect_edges(frame):
    lower_blue = np.array([80, 100, 0], dtype = "uint8") # lower limit of blue color
    upper_blue = np.array([150, 255, 255], dtype="uint8") # upper limit of blue color
    mask = cv2.inRange(frame,lower_blue,upper_blue) # this mask will filter out everything but blue

    # detect edges
    edges = cv2.Canny(mask, 50, 100) 
    return edges

def region_of_interest(edges):
    height, width = edges.shape # extract the height and width of the edges frame
    mask = np.zeros_like(edges) # make an empty matrix with same dimensions of the edges frame

    # only focus lower 3/4 of the screen
    # specify the coordinates of 4 points (lower left, upper left, upper right, lower right)
    polygon = np.array([[
        (0, height),
        (0,  height / 4),
        (width , height / 4),
        (width , height),
    ]], np.int32)

    cv2.fillPoly(mask, polygon, 255) # fill the polygon with blue color
    cropped_edges = cv2.bitwise_and(edges, mask)
    return cropped_edges

def detect_line_segments(cropped_edges):
    rho = 1             # distance from the origin in the polar coordinate system
    theta = np.pi / 180 # angle from the positive x-axis in the polar coordinate system
    
    min_threshold = 10
    # Use open CV to detect line segments in ROI
    line_segments = cv2.HoughLinesP(cropped_edges, rho, theta, min_threshold,
                                    np.array([]), minLineLength=5, maxLineGap=0)
    return line_segments

def average_slope_intercept(frame, line_segments):
    lane_lines = []

    # no segments given
    if line_segments is None:
        return lane_lines

    # get shape of given frame
    height, width,_ = frame.shape
    left_fit = []
    right_fit = []
    boundary = 1/3

    # identify the regions to look for the left and right lanes
    left_region_boundary = width * (1 - boundary)
    right_region_boundary = width * boundary

    # calculate the slope of all given line segments
    for line_segment in line_segments:
        for x1, y1, x2, y2 in line_segment:
            if x1 == x2:
                print("skipping vertical lines (slope = infinity)")
                continue

            # calculate slope and intercept for non-vertical lines
            fit = np.polyfit((x1, x2), (y1, y2), 1)
            slope = (y2 - y1) / (x2 - x1)
            intercept = y1 - (slope * x1)

            # aggregate all the slopes and intercepts of the line segments 
            # in their respective lane regions
            if slope < 0:
                if x1 < left_region_boundary and x2 < left_region_boundary:
                    left_fit.append((slope, intercept))
            else:
                if x1 > right_region_boundary and x2 > right_region_boundary:
                    right_fit.append((slope, intercept))

    # find average slope and intercept for the two lane regions
    left_fit_average = np.average(left_fit, axis=0)
    if len(left_fit) > 0:
        lane_lines.append(make_points(frame, left_fit_average))

    right_fit_average = np.average(right_fit, axis=0)
    if len(right_fit) > 0:
        lane_lines.append(make_points(frame, right_fit_average))

    # lane_lines is a 2-D array consisting the coordinates of the right and left lane lines
    # for example: lane_lines = [[x1,y1,x2,y2],[x1,y1,x2,y2]]
    # where the left array is for left lane and the right array is for right lane
    # all coordinate points are in pixels
    return lane_lines

def make_points(frame, line):
    height, width, _ = frame.shape
    slope, intercept = line
    y1 = height  # bottom of the frame
    y2 = int(y1 / 2)  # make points from middle of the frame down

    # get rid of completely horizontal lines
    if slope == 0:
        slope = 0.1

    x1 = int((y1 - intercept) / slope)
    x2 = int((y2 - intercept) / slope)

    # return the coordinates describing two points on the lane line
    return [[x1, y1, x2, y2]]

def display_lines(frame, lines, line_color=(0, 255, 0), line_width=6): # line color (B,G,R)
    line_image = np.zeros_like(frame)

    if lines is not None:
        for line in lines:
            for x1, y1, x2, y2 in line:
                # draw a line in an empty plot the same size as the frame
                cv2.line(line_image, (x1, y1), (x2, y2), line_color, line_width)

    # overlay the frame with the drawn lines so that the lines appear on the frame
    line_image = cv2.addWeighted(frame, 0.8, line_image, 1, 1)
    return line_image

def get_steering_angle(frame, lane_lines):
    height, width, _ = frame.shape

    # Handle different cases depending on how many lane lines have been identified
    if len(lane_lines) == 2: # if two lane lines are detected
        _, _, left_x2, _ = lane_lines[0][0] # extract left x2 from lane_lines array
        _, _, right_x2, _ = lane_lines[1][0] # extract right x2 from lane_lines array
        mid = int(width / 2) # x-axis midpoint of the frame
        x_offset = (left_x2 + right_x2) / 2 - mid
        y_offset = int(height / 2)  

    elif len(lane_lines) == 1: # if only one line is detected
        x1, _, x2, _ = lane_lines[0][0]
        x_offset = x2 - x1
        y_offset = int(height / 2)

    elif len(lane_lines) == 0: # if no line is detected
        x_offset = 0
        y_offset = int(height / 2)

    # get the steering angle
    angle_to_mid_radian = math.atan(x_offset / y_offset)
    angle_to_mid_deg = int(angle_to_mid_radian * 180.0 / math.pi)  
    steering_angle = angle_to_mid_deg + 90 

    return steering_angle

def display_heading_line(frame, steering_angle, line_color=(0, 0, 255), line_width=5):

    # create an empty image the same size as the frame
    heading_image = np.zeros_like(frame)
    height, width, _ = frame.shape

    # get the steering angle and generate two points to plot a line
    steering_angle_radian = steering_angle / 180.0 * math.pi
    x1 = int(width / 2)
    y1 = height
    x2 = int(x1 - height / 2 / math.tan(steering_angle_radian))
    y2 = int(height / 2)

    # draw the heading line based on the steering angle
    cv2.line(heading_image, (x1, y1), (x2, y2), line_color, line_width)

    # overlay the image with the heading line with the frame
    heading_image = cv2.addWeighted(frame, 0.8, heading_image, 1, 1)

    return heading_image

# testing the functionality of the above functions
if __name__ == '__main__':
    video = cv2.VideoCapture(0)
    video.set(cv2.CAP_PROP_FRAME_WIDTH,320) # set the width to 320 p
    video.set(cv2.CAP_PROP_FRAME_HEIGHT,240) # set the height to 240 p

    while True:
        ret,frame = video.read()

        #Calling the functions
        hsv = convert_to_HSV(frame)
        edges = detect_edges(hsv)
        roi = region_of_interest(edges)
        cv2.imshow('image', frame)
        line_segments = detect_line_segments(roi)
        lane_lines = average_slope_intercept(frame,line_segments)
        lane_lines_image = display_lines(frame,lane_lines)
        steering_angle = get_steering_angle(frame, lane_lines)
        print(steering_angle)
        heading_image = display_heading_line(lane_lines_image,steering_angle)

        print(contains_red(frame))

        key = cv2.waitKey(1)
        if key == 27:
            break

    video.release()
    cv2.destroyAllWindows()

gpiod_driver.c

C/C++
#include <linux/module.h>
#include <linux/of_device.h>
#include <linux/kernel.h>
#include <linux/gpio/consumer.h>
#include <linux/platform_device.h>
#include <linux/interrupt.h>
#include <linux/timekeeping.h>

#define WINDOW_SIZE 1

// Declare globals
unsigned int irq_number;
static struct gpio_desc *reset, *speed_encoder;

// Initialize first delta to be longer than anything realistic
static long delta = 1000000000;

// Setup delta writing to file
module_param(delta, ulong, S_IRUGO);

static long numDeltas = 0;
// Setup delta count writing to file
module_param(numDeltas, ulong, S_IRUGO);

// Array for filtering time values
static long deltas[WINDOW_SIZE];
static int avgIndex = 0;

static ktime_t last, curr;

// Interrupt service routine
static irq_handler_t gpio_irq_handler(unsigned int irq, void *dev_id, struct pt_regs *regs) {
	// Current trigger
	curr = ktime_get_ns();
	// Change between last trigger
	long temp_delta = curr - last;
	// 1 ms between last (debounce)
	if (temp_delta < 1000000) {
		return (irq_handler_t) IRQ_HANDLED;
	}
	// Message to kernel
	printk("gpio_irq: Encoder was triggered and ISR was called! numDeltas: %d\n", numDeltas);
	// Update window list
	deltas[avgIndex] = temp_delta;
	avgIndex = (avgIndex + 1) % WINDOW_SIZE;
	// Compute average
	long total = 0;
	for (int i = 0; i < WINDOW_SIZE; i++) {
		total += deltas[i];
	}
	delta = total / WINDOW_SIZE;

	// Update previous
	last = curr;

	// Increase count
	numDeltas++;
	return (irq_handler_t) IRQ_HANDLED; 
}

// probe function
static int led_probe(struct platform_device* pdev)
{
	printk("Loading module...\n");  // indicate module loading
	speed_encoder = devm_gpiod_get(&pdev->dev, "button", GPIOD_IN); // get encoder gpio
	int db;
	if((db = gpiod_set_debounce(speed_encoder, 5000)) != 0){  // set debounce for encoder
		printk("Error! Cannot set debounce: error %d\n", db);
	}
	irq_number = gpiod_to_irq(speed_encoder);  // get irq number
	if(request_irq(irq_number, (irq_handler_t) gpio_irq_handler, IRQF_TRIGGER_FALLING, "my_gpio_irq", NULL) != 0){  // if request fails
		printk("Error!\nCan not request interrupt for speed encoder nr.: %d\n", irq_number);  // error handling
		gpiod_put(reset);
		gpiod_put(speed_encoder);
		return -1;
	}

	for (int i = 0; i < WINDOW_SIZE; i++) {  // fill buffer
		deltas[i] = 0;
	}
	last = ktime_get_ns();  // get last time
	delta = last;
	return 0;
}

// remove function
static int led_remove(struct platform_device* pdev)
{
	// Free irq and print
	printk("Freeing irq\n");
	free_irq(irq_number, NULL);
	gpiod_put(speed_encoder);
	return 0;
}

static struct of_device_id matchy_match[] = {
    {.compatible="flashing-lights"},
    {/* leave alone - keep this here (end node) */},
};

// platform driver object
static struct platform_driver adam_driver = {
	.probe	 = led_probe,
	.remove	 = led_remove,
	.driver	 = {
	       .name  = "The Rock: this name doesn't even matter",
	       .owner = THIS_MODULE,
	       .of_match_table = matchy_match,
	},
};

module_platform_driver(adam_driver);

MODULE_DESCRIPTION("424\'s finest");
MODULE_AUTHOR("GOAT");
MODULE_LICENSE("GPL v2");
MODULE_ALIAS("platform:adam_driver");

pd.py

Python
import time
from numpy import clip

class PD_Controller:
    # PD Constants
    STEERING_P = 0.05
    STEERING_D = 0.1
    MOTOR_P = 0.0045
    MOTOR_D = 0.5

    # Duty cycle limits
    STEERING_MAX = 9.0
    STEERING_MIN = 6.0
    
    # Stop duty cycle for steering
    STEERING_STOP = 7.5
    
    # Stop duty cycle for drive motor
    MOTOR_STOP = 7.5
    
    # Duty cycle limits for motor
    MOTOR_MIN = 7.5
    MOTOR_MAX = 8

    def __init__(self, isSteer, dev):
        # Initialize instance variables
        self.isSteer = isSteer
        self.p = 0
        self.d = 0
        self.min = 0
        self.max = 0
        self.stop = 0
        self.last_control = 7.5
        if isSteer: # Set for steering motor
            self.p = PD_Controller.STEERING_P
            self.d = PD_Controller.STEERING_D
            self.min = PD_Controller.STEERING_MIN
            self.stop = PD_Controller.STEERING_STOP
            self.max = PD_Controller.STEERING_MAX
        else: # Set for drive motor
            self.p = PD_Controller.MOTOR_P
            self.d = PD_Controller.MOTOR_D
            self.min = PD_Controller.MOTOR_MIN
            self.stop = PD_Controller.MOTOR_STOP
            self.max = PD_Controller.MOTOR_MAX
            dev.set_duty_percent(PD_Controller.MOTOR_STOP)
        # Set pigpio device to write to
        self.dev = dev
        # Keep track of error for D control
        self.last_error = 0
        self.last_time = time.time()
    
    def go_to_target(self, target, curr=90):
        # Computer error (negative for steering)
        if self.isSteer:
            error = - (target - curr)
        else:
            error = target - curr
        # Computer change in error between this and last call
        curr = time.time()
        delta_error = (self.last_error - error)/(curr - self.last_time)
        
        # Calculate PD terms
        p_term = self.p*error
        d_term = self.d*delta_error
        
        # Calculate raw control
        control = self.stop + ((self.p*error) + (self.d*delta_error))
        
        # Clip control to acceptable bounds
        control = clip(control, self.min, self.max)
        
        # Write control out to motor
        self.dev.set_duty_percent(control)
        
        # Update error tracking
        self.last_error = error
        self.last_time = curr
        self.last_control = control
        
        # Return data for plotting later
        return error, p_term, d_term

pwm_control.py

Python
import pigpio
import time

pi = pigpio.pi()

STEER_PIN = 2
MOTOR_PIN = 3

class pwm_dev():
    """
    Class for interfacing with pins using software PWM using pigpio library
    """
    pwm_range = 10000 # "max" pwm to write out
    def __init__(self, pin: int, pwmFreq: int = 10000, pwmDuty: int = 50):
        # Remember pin
        self.pin = pin
        # Set PWM frequency
        pi.set_PWM_frequency(self.pin,pwmFreq)
        # Set range of acceptable duty cycle values
        pi.set_PWM_range(pin,pwm_dev.pwm_range)
        self.set_duty_percent(pwmDuty) # start on
        self.duty = pwmDuty # Keep track of duty percent
        
    def set_duty_percent(self, duty_percent):
        # Write pwm out to pin
        pi.set_PWM_dutycycle(self.pin, pwm_dev._percent_duty_convert(duty_percent))
        # Keep track of duty percent
        self.duty = pwm_dev._percent_duty_convert(duty_percent)
    
    def set_duty(self, duty):
        # Set duty without converting from percentage
        pi.set_PWM_dutycycle(self.pin, duty)
    
    def off(self):
        # Stop pwm output
        pi.set_PWM_dutycycle(self.pin, 0)
    
    def _percent_duty_convert(val: int):
        # Convert raw pwm value from percentage to value
        return int((val/100.0) * pwm_dev.pwm_range)

speed.py

Python
import time
# Interface between python code and kernel driver for speed encoder
class speed_encoder():
    def __init__(self):
        # open delta and count files
        self.deltaFile = open("/sys/module/gpiod_driver/parameters/delta", "r")
        self.countFile = open("/sys/module/gpiod_driver/parameters/numDeltas", "r")
        self.last_read = int(self.countFile.read())  # read count file to last read
        self.countFile.seek(0,0)  # find beginning of file
        print("Speed_enc: init: encoder count: " + str(self.last_read))
        self.countLastRead = 0
        read = int(self.deltaFile.read())  # read delta file
        if read == 0:  # if delta is zero set to infinity
            read = float(inf)
        self.lastReadSpeed = 1000000000 / read
        self.deltaFile.seek(0,0)  # seek start of file
    
    def get_speed(self):  # returns speed of car
        d = int(self.countFile.read())  # read count file
        self.countFile.seek(0, 0)
        print("Speed_enc: get_speed: encoder count: " + str(d))
        if self.last_read == d:  # if reading the same tick
            print("speed_enc: reading same tick")
            self.countLastRead += 1
            if self.countLastRead < 1:  # return last read speed if last read is smaller than 1
                return self.lastReadSpeed
            return 0
        read = int(self.deltaFile.read())  # read delta file
        print("speed_enc: read: " + str(read))
        if read == 0:  # if delta reads zero prevent zero
            read = float(inf)
        s = 1000000000 / read
        self.deltaFile.seek(0,0)  # seek start of file
        self.countLastRead = 0  # set last read and speed
        self.lastReadSpeed = s
        self.last_read = d
        return s  # return speed
    
    def close(self):  # close files
        self.deltaFile.close()
        self.countFile.close()

nkg3.dts

C/C++
/dts-v1/;
/plugin/;


/ {
	fragment@0 {
		target-path="/";
		__overlay__ {
			nkgdev {
				compatible = "flashing-lights";
				led-gpios = <&gpio 5 0>;
				button-gpios = <&gpio 6 0>;
			};
		};
	};
};

Credits

Sambhu Balakrishnan
1 project • 1 follower
Gabriel Zook
0 projects • 1 follower
Bill Rui
1 project • 0 followers
Nik Gautam
0 projects • 0 followers
Thanks to raja_961.

Comments