Jason YangNatalia MendiolaNatalia PereyRaj Anthony
Published

One-Eyed Self-Driving Car

We designed a lane-keeping R/C car using a Raspberry Pi, OpenCV, and a singular eye (camera).

IntermediateProtip224
One-Eyed Self-Driving Car

Things used in this project

Hardware components

Raspberry Pi 4 Model B
Raspberry Pi 4 Model B
×1
Adafruit MCP4728 Quad DAC
×1
Logitech C270 HD Webcam
×1
MJX Technic Hyper Go RC car (16209 1:16)
×1
Charmast 10400mAh Power Bank
×1
Optical Speed Encoder
×1

Software apps and online services

OpenCV
OpenCV

Story

Read more

Code

webcam_test.py

Python
This python code is used to verify the webcam is functioning as intended.
import cv2

video = cv2.VideoCapture(0)
#frame_count = 0

while True:
	ret, frame = video.read()
	#frame = cv2.flip(frame, -1)  #used to flip image vertically
	
	cv2.imshow('image.jpg',frame)


	key = cv2.waitKey(1)
	if key == 27:
		break

video.release()
cv2.destroyAllWindows()

stop_code.py

Python
This code is used to stop the car. This was used to end script testing the functionality of the motors.
# SPDX-FileCopyrightText: 2021 ladyada for Adafruit Industries
# SPDX-License-Identifier: MIT

import board
import adafruit_mcp4728
import time

MCP4728_DEFAULT_ADDRESS = 0x60
MCP4728A4_DEFAULT_ADDRESS = 0x64

#initialize the board
i2c = board.I2C()  # uses board.SCL and board.SDA
mcp4728 = adafruit_mcp4728.MCP4728(i2c, adafruit_mcp4728.MCP4728A4_DEFAULT_ADDRESS)


#VDD = 65535 = 3.3V    |  65535/2 = 1.65V

#set values of each channel to default (no throttle, steering centered)

throttle = 2
mcp4728.channel_a.value = int(65535/throttle)

#steering 
steer = 2.2
mcp4728.channel_d.value = int(65535/steer)


#DAC channels not used
mcp4728.channel_b.value = 0   
mcp4728.channel_c.value = 0


testing = False

while testing:
  #backward
	print("Going backward!")
	mcp4728.channel_a.value = int(65535 / 2.9)
	time.sleep(3)
	
	#stop
	print("Stopping car!")
	mcp4728.channel_a.value = int(65523 /  2.2)
	time.sleep(2)

	#forward
	print("Going forward!")
	mcp4728.channel_a.value = int(65535/1.85)
	time.sleep(4) #waits 4 seconds

	#stop
	mcp4728.channel_a.value = int(65535/2)
	print("Car stopped!")
	time.sleep(2) #waits for seconds

	#turn left
	print("Turning left!")
	mcp4728.channel_d.value = int(65535/7)
	time.sleep(2)

  #turn right
	print("Turning right!")
	mcp4728.channel_d.value = int(65535/1.2)
	time.sleep(2)

	#neutral
	print("Driving straight!")
	mcp4728.channel_d.value = int(65535/2.1)
	time.sleep(2)

	#stop
	mcp4728.channel_a.value = int(65535/2)
	print("Car stopped!")
	#Done
	print("Test done!")

	testing = False

start_code.py

Python
This code was used to test the functionality of the motors of the RC car.
# SPDX-FileCopyrightText: 2021 ladyada for Adafruit Industries
# SPDX-License-Identifier: MIT

import board
import adafruit_mcp4728
import time

MCP4728_DEFAULT_ADDRESS = 0x60
MCP4728A4_DEFAULT_ADDRESS = 0x64

#initialize the board
i2c = board.I2C()  # uses board.SCL and board.SDA
mcp4728 = adafruit_mcp4728.MCP4728(i2c, adafruit_mcp4728.MCP4728A4_DEFAULT_ADDRESS)

#set values of each channel to default (no throttle, steering centered)

#VDD = 65535 = 3.3V    |  65535/2 = 1.65V

mcp4728.channel_a.value = int(65535/1.9)

#steering
mcp4728.channel_d.value = int(65535/2)


#DAC channels not used
mcp4728.channel_b.value = 0   
mcp4728.channel_c.value = 0



testing = False

#manually change values of the denominator to control car
while testing:
  #backward
	print("Going backward!")
	mcp4728.channel_a.value = int(65535 / 2.9)
	time.sleep(3)
	#stop
	print("Stopping car!")
	mcp4728.channel_a.value = int(65523 /  2.2)
	time.sleep(2)

	#forward
	print("Going forward!")
	mcp4728.channel_a.value = int(65535/1.85)
	time.sleep(4) #waits 4 seconds

	#stop
	mcp4728.channel_a.value = int(65535/2)
	print("Car stopped!")
	time.sleep(2) #waits for seconds

	#turn left
	print("Turning left!")
	mcp4728.channel_d.value = int(65535/7)
	time.sleep(2)

  #turn right
	print("Turning right!")
	mcp4728.channel_d.value = int(65535/1.2)
	time.sleep(2)

	#neutral
	print("Driving straight!")
	mcp4728.channel_d.value = int(65535/2.1)
	time.sleep(2)

	#stop
	mcp4728.channel_a.value = int(65535/2)
	print("Car stopped!")
	#Done
	print("Test done!")

	testing = False

final_project_code.py

Python
This code is used to run the RC car on a track, using computer vision to determine steering and throttle.
'''
This script uses code and ideas from the following sources: 
-https://www.hackster.io/team-spy-family/team-spy-family-aa7632
-https://www.instructables.com/Autonomous-Lane-Keeping-Car-Using-Raspberry-Pi-and/
'''

# SPDX-FileCopyrightText: 2021 ladyada for Adafruit Industries
# SPDX-License-Identifier: MIT
import board
import adafruit_mcp4728
import cv2
import numpy as np
import matplotlib.pyplot as plt
import math
import sys
import time


MCP4728_DEFAULT_ADDRESS = 0x60
MCP4728A4_DEFAULT_ADDRESS = 0x64

#initialize the board
i2c = board.I2C()  # uses board.SCL and board.SDA
mcp4728 = adafruit_mcp4728.MCP4728(i2c, adafruit_mcp4728.MCP4728A4_DEFAULT_ADDRESS)


#VDD = 65535 = 3.3V    |  65535/2 = 1.65V

#DAC channels not used
mcp4728.channel_b.value = 0   
mcp4728.channel_c.value = 0

#number of stops expected on the track
expected_number_stops = 2

# arrays for making the final plots
steering_voltage = []
p_vals = []
d_vals = []
err_vals = []
speed_vals = []
voltage_denom = []
saveValues = True  #for plotting

sightDebug = True #visualize heading line

#------------------------------------------------------------------------



#Throttle
forward = 1.85 #controls initial throttle
stopped = int(65535/2)
neutral = 2 #middle speed

#Steering
turn_left = int(65535/5)
turn_right = int(65535/1.2)
straight = int(65535/2.1)

#op encode
speed = 0
prev_speed = 0
go_faster_tick = 0  # Do not change this here. Code will set this value after seeing stop sign

# Encoder related variables
speedCheck = 10 # number of frames in between speed adjustment
TARGET = 30  #controls desired speed. Target value we want to get from our encoder 

SPEED_STEP = 0.005 #acceleration

go_forward = 1.85
prev = 0

SPEED_CEIL = 1.84  #controls max voltage to motors

def initialize_car():
	  #stops motor
    mcp4728.channel_a.value = stopped #throttle
    # wait for car to be ready
    mcp4728.channel_d.value = straight #steering

def stop():
    """
    Stops the car
    :return: none
    """
    mcp4728.channel_a.value = int(65535 / stopped)

def go():
    """
    Sends the car forward at a default speed
    :return: none
    """
    #mcp4728.channel_a.value = forward
    mcp4728.channel_a.value = int( 65535 / forward)

def detect_edges(frame):

    # filter for blue lane lines
    hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
    
    lower_blue = np.array([90, 120, 0], dtype="uint8")
    upper_blue = np.array([150, 255, 255], dtype="uint8")
    mask = cv2.inRange(hsv, lower_blue, upper_blue)
    

    # detect edges
    edges = cv2.Canny(mask, 50, 100)
    # cv2.imshow("edges",edges)

    return edges

def region_of_interest(edges):
    # focus on the lane lines and ignore anything else
    height, width = edges.shape
    mask = np.zeros_like(edges)

    # only focus lower half of the screen
    polygon = np.array([[
        (0, height),
        (0, height / 2),
        (width, height / 2),
        (width, height),
    ]], np.int32)

    cv2.fillPoly(mask, polygon, 255)

    cropped_edges = cv2.bitwise_and(edges, mask)
    

    return cropped_edges
	
def detect_line_segments(cropped_edges):
    #detect lines in each frame
    rho = 1
    theta = np.pi / 180
    min_threshold = 10

    line_segments = cv2.HoughLinesP(cropped_edges, rho, theta, min_threshold,
                                    np.array([]), minLineLength=5, maxLineGap=150)

    return line_segments
	
def average_slope_intercept(frame, line_segments):
    lane_lines = []

    if line_segments is None:
        # print("no line segments detected")
        return lane_lines

    height, width, _ = frame.shape
    left_fit = []
    right_fit = []

    boundary = 1 / 3
    left_region_boundary = width * (1 - boundary)
    right_region_boundary = width * boundary

    for line_segment in line_segments:
        for x1, y1, x2, y2 in line_segment:
            if x1 == x2:
                # print("skipping vertical lines (slope = infinity")
                continue

            fit = np.polyfit((x1, x2), (y1, y2), 1)
            slope = (y2 - y1) / (x2 - x1)
            intercept = y1 - (slope * x1)

            if slope < 0:
                if x1 < left_region_boundary and x2 < left_region_boundary:
                    left_fit.append((slope, intercept))
            else:
                if x1 > right_region_boundary and x2 > right_region_boundary:
                    right_fit.append((slope, intercept))

    left_fit_average = np.average(left_fit, axis=0)
    if len(left_fit) > 0:
        lane_lines.append(make_points(frame, left_fit_average))

    right_fit_average = np.average(right_fit, axis=0)
    if len(right_fit) > 0:
        lane_lines.append(make_points(frame, right_fit_average))

    return lane_lines

def make_points(frame, line):
    #return the bounded coordinates of the lane lines 
    height, width, _ = frame.shape

    slope, intercept = line

    y1 = height  # bottom of the frame
    y2 = int(y1 / 2)  # make points from middle of the frame down

    #if horizontal line, sets slope value near 0 to prevent dividing by 0
    #does not affect performance
    if slope == 0:
        slope = 0.1

    x1 = int((y1 - intercept) / slope)
    x2 = int((y2 - intercept) / slope)

    return [[x1, y1, x2, y2]]


def display_lines(frame, lines, line_color=(0, 255, 0), line_width=6):
    #display lines on the frames
    line_image = np.zeros_like(frame)

    if lines is not None:
        for line in lines:
            for x1, y1, x2, y2 in line:
                cv2.line(line_image, (x1, y1), (x2, y2), line_color, line_width)

    line_image = cv2.addWeighted(frame, 0.8, line_image, 1, 1) #adds two images, each with a defined weight

    return line_image


def get_steering_angle(frame, lane_lines):
    height, width, _ = frame.shape

    if len(lane_lines) == 2: # if two lane lines are detected
        _, _, left_x2, _ = lane_lines[0][0] # extract left x2 from lane_lines array
        _, _, right_x2, _ = lane_lines[1][0] # extract right x2 from lane_lines array
        mid = int(width / 2)
        x_offset = (left_x2 + right_x2) / 2 - mid
        y_offset = int(height / 2)

    elif len(lane_lines) == 1: # if only one line is detected
        x1, _, x2, _ = lane_lines[0][0]
        x_offset = x2 - x1
        y_offset = int(height / 2)

    elif len(lane_lines) == 0: # if no line is detected
        x_offset = 0
        y_offset = int(height / 2)

    angle_to_mid_radian = math.atan(x_offset / y_offset)
    angle_to_mid_deg = int(angle_to_mid_radian * 180.0 / math.pi)  
    steering_angle = angle_to_mid_deg + 90 

    return steering_angle


def display_heading_line(frame, steering_angle, line_color=(0, 0, 255), line_width=5):
    #heading line determines direction and speed
    heading_image = np.zeros_like(frame)
    height, width, _ = frame.shape

    steering_angle_radian = steering_angle / 180.0 * math.pi

    x1 = int(width / 2)
    y1 = height
    x2 = int(x1 - height / 2 / math.tan(steering_angle_radian))
    y2 = int(height / 2)

    cv2.line(heading_image, (x1, y1), (x2, y2), line_color, line_width)
    heading_image = cv2.addWeighted(frame, 0.8, heading_image, 1, 1)

    return heading_image


#howdy from the Multicultural Center



# set up the car throttle and steering
initialize_car()

# set up video
video = cv2.VideoCapture(0)

#sets resolution to 320x240
video.set(cv2.CAP_PROP_FRAME_WIDTH, 320)
video.set(cv2.CAP_PROP_FRAME_HEIGHT, 240)

# wait for video to load
time.sleep(1)

# PD variables
#error increases, P action increases
#slope of the error, D
kp = 0.4
kd = kp * 0.65

lastTime = 0
lastError = 0

counter = 0 # counter for number of ticks
max_ticks = 2000 

#-----------------------------------
# start the engines
go()


frame_frequency = 10 #10 frames
signs_seen = 0
i = 0


print("Starting loop!")
#frame = cv2.flip(frame,-1) - inverts image 
while counter < max_ticks:
    ret, original_frame = video.read()
    frame = cv2.resize(original_frame, (160, 120)) #resizes to 160*120
    i += 1


    #checks for red square every 10 frames
    if counter % frame_frequency == 0:
        hsv_red = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)

        #define range of red in HSV
        lower_red = np.array([155,3, 140])
        upper_red = np.array([175, 22, 160])

        #threshold the HSV image to get only red colors and count pixels yay
        mask_red = cv2.inRange(hsv_red, lower_red, upper_red)
        countRed = cv2.countNonZero(mask_red)

        #if pink pixels are significant, stop
        if (countRed > 250 and i > 70):  #250 pixels works too!
            mcp4728.channel_a.value = stopped
            
            print("saw a sign!...stopping car.")
            time.sleep(2) #waits
            
            signs_seen += 1
            #checks number of signs it sees
            if (signs_seen == expected_number_stops):
                initialize_car()
                break
            mcp4728.channel_a.value = int(65535 / forward)
            i = 0 #resets counter!
            print("One more to go!!", signs_seen)

    #process camera input for lane keeping using openCV
    edges = detect_edges(frame)
    roi = region_of_interest(edges)
    line_segments = detect_line_segments(roi)
    lane_lines = average_slope_intercept(frame, line_segments)
    lane_lines_image = display_lines(frame, lane_lines)
    steering_angle = get_steering_angle(frame, lane_lines)
    heading_angle = display_heading_line(lane_lines_image, steering_angle)

    if sightDebug:
        cv2.imshow("heading_line.jpg", heading_angle)

    #Calculate changes for PD
    now = time.time()
    dt = now - lastTime
    deviation = steering_angle - 90
    error = abs(deviation)
    
    # speed adjustment ---------------[From Hackster SpyxFamily]
    if counter % speedCheck == 0:
        with open('/sys/module/Speed_encoder/parameters/op_count', 'r') as fileread:
            read = int(fileread.read())
        speed = read - prev
        prev = read
        # print(speed)
        if speed > TARGET:
            go_forward = go_forward + SPEED_STEP
        elif speed < TARGET:
            go_forward = go_forward - SPEED_STEP
        print("speed:",speed," motor voltage:",go_forward,"from op_count")

    #end speed adjustment--------------------

    #controls car steering
    if deviation < 5 and deviation > -5: #buffer of 10 degrees
        deviation = 0
        error = 0
        mcp4728.channel_d.value = straight #steering
      
    elif deviation > 5:
        mcp4728.channel_d.value = turn_right #steering
    
    elif deviation < -5:
        mcp4728.channel_d.value = turn_left #steering
    
    derivative = kd * (error - lastError) / dt
    proportional = kp * error
    PD = int(speed + derivative + proportional)
    spd = abs(PD)
    
    lastError = error
    lastTime = time.time()
    #print(spd)

    #ensures car doesn't go too fast
    if go_forward < SPEED_CEIL:
       go_forward = SPEED_CEIL

    #determines throttle
    mcp4728.channel_a.value = int( 65525 / go_forward)

    # take values for graphs
    p_vals.append(proportional)
    d_vals.append(derivative)
    err_vals.append(error)
    voltage_denom.append(go_forward)
    speed_vals.append(speed)
    voltage = int(65535 / go_forward)
    steering_voltage.append(voltage)

    counter += 1

    key = cv2.waitKey(1)
    if key == 27:
        break


if saveValues:
    d_vals_str = str(d_vals)
    p_vals_str = str(p_vals)
    err_vals_str = str(err_vals)
    steering_voltage_str = str(steering_voltage)
    speed_vals_str = str(speed_vals)
    voltage_denom_str = str(voltage_denom)
    
    #writing lists to text file preserving list structure for external plotting
    with open('PD_data_new.txt', 'w') as file:
        file.write(f'D={d_vals_str}\n')
        file.write(f'P={p_vals_str}\n')
        file.write(f'error={err_vals_str}\n')
        file.write(f'voltage={steering_voltage_str}\n')
        file.write(f'speed={speed_vals_str}\n')
        file.write(f'voltage_denom={voltage_denom_str}\n')
    print("Values written to 'Car_data.txt' successfully.")

#clean up resources
video.release()
cv2.destroyAllWindows()
print("Finishing... ")

#stops car and straightens wheels
mcp4728.channel_a.value = stopped #throttle
mcp4728.channel_d.value = straight #steering

Speed_encoder.c

C/C++
This code is used to trigger an interrupt based on the output of an optical encoder. The frequency of this interrupt is tracked and used to regulate speed.
//Highly inspired by ELEC 424 Project 2
//This code uses code from team Spy Family (https://www.hackster.io/team-spy-family/team-spy-family-aa7632)

#include <linux/module.h>
#include <linux/of_device.h>
#include <linux/kernel.h>
#include <linux/gpio/consumer.h>
#include <linux/platform_device.h>
/* YOU WILL NEED OTHER HEADER FILES */
#include <linux/interrupt.h>
#include <linux/gpio.h>

/* YOU WILL HAVE TO DECLARE SOME VARIABLES HERE */
unsigned int irq_number;
struct gpio_desc *speed_encode;
static int op_count = 0;//initialize op_count to 0

//op_count: 1 spoke of the wheel going through the optical encoder
module_param(op_count, int, S_IRUGO); //S_IRUGO: anyone can read this 

/* ADD THE INTERRUPT SERVICE ROUTINE HERE */
static irq_handler_t gpio_irq_handler(unsigned int irq, void *dev_id, struct pt_regs *regs) {
	op_count += 1;//increment op_count by 1
	return (irq_handler_t) IRQ_HANDLED; 
}

// probe function
static int led_probe(struct platform_device *pd/*INSERT*/)
{
	/*INSERT*/
	printk("Module Inserted \n");
	//printk("op_encode: Loading module... ");
	// struct gpio_desc *__must_check devm_gpiod_get(struct device *dev, const char *con_id, enum gpiod_flags flags)
	speed_encode = devm_gpiod_get(&pd->dev, "button", GPIOD_IN);
	
	
	gpiod_set_debounce(speed_encode, 10000000); // debouce is set to 10 million microseconds
	irq_number = gpiod_to_irq(speed_encode); // set up interrupt service routine
	
	
	
	request_irq(irq_number, (irq_handler_t) gpio_irq_handler, IRQF_TRIGGER_RISING, "op_encode_driver", NULL);

	return 0;
}

// remove function
static int led_remove(struct platform_device *pd/*INSERT*/)
{
	/* INSERT: Free the irq and print a message */
	free_irq(irq_number, NULL);
	printk("gpiod_driver: irq freed, module removed... ");
	return 0;
}


static struct of_device_id matchy_match[] = {
    // {/*INSERT*/},
	{.compatible = "lament"},
    {/* leave alone - keep this here (end node) */},
};

// platform driver object
static struct platform_driver adam_driver = {
	.probe	 = led_probe/*INSERT*/,
	.remove	 = led_remove/*INSERT*/,
	.driver	 = {
	       .name  = "The Rock: this name doesn't even matter",
	       .owner = THIS_MODULE,
	       .of_match_table = matchy_match/*INSERT*/,
	},
};

module_platform_driver(adam_driver/*INSERT*/);

MODULE_DESCRIPTION("424\'s finest");
MODULE_AUTHOR("GOAT");
MODULE_LICENSE("GPL v2");
MODULE_ALIAS("platform:adam_driver");

Credits

Jason Yang
2 projects • 2 followers
Contact
Natalia Mendiola
1 project • 1 follower
Contact
Natalia Perey
1 project • 1 follower
Contact
Raj Anthony
1 project • 0 followers
Contact

Comments

Please log in or sign up to comment.