Hackster is hosting Hackster Holidays, Finale: Livestream & Giveaway Drawing. Watch previous episodes or stream live on Tuesday!Stream Hackster Holidays, Finale on Tuesday!
Alan JacobAshwini kumar sinha
Created July 31, 2024 © MIT

ROS AI Adas Robot

ROS and AI-based computer vision power ADAS for autonomous robots, enhancing real-time obstacle detection, and autonomous drive.

BeginnerFull instructions provided8 hours26

Things used in this project

Story

Read more

Code

adas.py

Python
import subprocess
subprocess.run("python3 lidarplot.py & python3 sample.py & python3 lane.py", shell=True)

lane.py

Python
import cv2
import numpy as np

cap = cv2.VideoCapture(0)  # initialize the camera

while True:
    _, frame = cap.read()  # read a frame from the camera
    frame = cv2.resize(frame, (640, 480))  # resize the frame

    # apply color thresholding to detect white and yellow lanes
    hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
    lower_white = np.array([0, 0, 200])
    upper_white = np.array([180, 30, 255])
    mask_white = cv2.inRange(hsv, lower_white, upper_white)
    lower_yellow = np.array([15, 50, 50])
    upper_yellow = np.array([30, 255, 255])
    mask_yellow = cv2.inRange(hsv, lower_yellow, upper_yellow)
    mask = cv2.bitwise_or(mask_white, mask_yellow)

    # apply Gaussian blur to reduce noise
    blur = cv2.GaussianBlur(mask, (5, 5), 0)

    # apply Canny edge detection to detect edges
    edges = cv2.Canny(blur, 50, 150)

    # apply a region of interest mask to focus on the road lanes
    mask = np.zeros_like(edges)
    mask_color = (255, 255, 255)
    vertices = np.array([[(0, 480), (320, 240), (320, 240), (640, 480)]], dtype=np.int32)
    cv2.fillPoly(mask, vertices, mask_color)
    masked_edges = cv2.bitwise_and(edges, mask)

    # apply Hough line detection to detect lane lines
    rho = 1
    theta = np.pi / 180
    threshold = 15
    min_line_length = 30
    max_line_gap = 10
    lines = cv2.HoughLinesP(masked_edges, rho, theta, threshold, np.array([]), min_line_length, max_line_gap)

    # draw the lane lines on the original frame
    line_color = (0, 0, 255)
    line_thickness = 3
    if lines is not None:
        for line in lines:
            x1, y1, x2, y2 = line[0]
            cv2.line(frame, (x1, y1), (x2, y2), line_color, line_thickness)

    # display the resulting frame
    cv2.imshow('frame', frame)

    # exit the loop when 'q' is pressed
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

# release the camera and close all windows
cap.release()
cv2.destroyAllWindows()

Robot Road lane detection car detetion Car driver robot code verison 1.py

Python
import cv2
import numpy as np

cap = cv2.VideoCapture(0)  # initialize the camera

while True:
    _, frame = cap.read()  # read a frame from the camera
    frame = cv2.resize(frame, (640, 480))  # resize the frame

    # apply color thresholding to detect white and yellow lanes
    hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
    lower_white = np.array([0, 0, 200])
    upper_white = np.array([180, 30, 255])
    mask_white = cv2.inRange(hsv, lower_white, upper_white)
    lower_yellow = np.array([15, 50, 50])
    upper_yellow = np.array([30, 255, 255])
    mask_yellow = cv2.inRange(hsv, lower_yellow, upper_yellow)
    mask = cv2.bitwise_or(mask_white, mask_yellow)

    # apply Gaussian blur to reduce noise
    blur = cv2.GaussianBlur(mask, (5, 5), 0)

    # apply Canny edge detection to detect edges
    edges = cv2.Canny(blur, 50, 150)

    # apply a region of interest mask to focus on the road lanes
    mask = np.zeros_like(edges)
    mask_color = (255, 255, 255)
    vertices = np.array([[(0, 480), (320, 240), (320, 240), (640, 480)]], dtype=np.int32)
    cv2.fillPoly(mask, vertices, mask_color)
    masked_edges = cv2.bitwise_and(edges, mask)

    # apply Hough line detection to detect lane lines
    rho = 1
    theta = np.pi / 180
    threshold = 15
    min_line_length = 30
    max_line_gap = 10
    lines = cv2.HoughLinesP(masked_edges, rho, theta, threshold, np.array([]), min_line_length, max_line_gap)

    # draw the lane lines on the original frame
    line_color = (0, 0, 255)
    line_thickness = 3
    if lines is not None:
        for line in lines:
            x1, y1, x2, y2 = line[0]
            cv2.line(frame, (x1, y1), (x2, y2), line_color, line_thickness)

    # display the resulting frame
    cv2.imshow('frame', frame)

    # exit the loop when 'q' is pressed
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

# release the camera and close all windows
cap.release()
cv2.destroyAllWindows()

LidarX2.py

Python
from serial import Serial #pip install pySerial
from time import sleep
from math import atan, pi
from threading import Thread
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
fig, ax = plt.subplots(subplot_kw={'projection': 'polar'})
line, = ax.plot([], [])

# set the axis limits and labels
ax.set_ylim(0, 1.2)
ax.set_yticklabels([])
ax.set_xticklabels(['0', '', '45', '', '90', '', '135', '', '180', '', '225', '', '270', '', '360'])


class LidarMeasure:

    def __init__(self, angle, distance):
        self.angle = angle
        self.distance = distance
        #print("angle =" + str(angle))
        #print("distance="+str (distance))
        


    def __repr__(self):
        if int(self.angle) <= 220:
            print("angle=" + str(self.angle))
            print("distance =" + str(self.distance))
            if int(self.distance) <=  400:
                if int(self.distance) != 0.0:
                    print("veryclose please slow speed")
        #print("grater")
        return  "" #"grater" #""+str(self.angle)+": "+str(self.distance)+"mm"
   

        
    

class LidarX2:

    def __init__(self, port):
        self.port = port
        self.baudrate = 115200
        self.connected = False
        self.measureThread = None
        self.stopThread = False
        self.measureList = []
        self.serial = None

    def open(self):
        try:
            if not self.connected:
                # Open serial
                self.serial = Serial(self.port, self.baudrate)
                timeout = 4000  # ms
                while not self.serial.isOpen() and timeout > 0:
                    timeout -= 10  # ms
                    sleep(0.01)  # 10ms
                if self.serial.isOpen():
                    self.connected = True
                    self.serial.flushInput()
                else:
                    return False
                # Start measure thread
                self.stopThread = False
                self.measureThread = Thread(target=LidarX2.__measureThread, args=(self,))
                self.measureThread.setDaemon(True)
                self.measureThread.start()
                return True
        except Exception as e:
            print(e)
        return False

    def close(self):
        self.stopThread = True
        if self.measureThread:
            self.measureThread.join()
        if self.connected:
            self.serial.close()
            self.connected = False

    def getMeasures(self):
        return list(self.measureList)

    
   

    def __measureThread(self):
        startAngle = 0
        while not self.stopThread:
            measures = self.__readMeasures()
            if len(measures) == 0:
                continue
            # Get Start an End angles
            endAngle = measures[len(measures)-1].angle
            # Clear measures in the angle range
            i = 0
            while i < len(self.measureList):
                m = self.measureList[i]
                inRange = False
                if endAngle > startAngle:
                    inRange = startAngle <= m.angle and m.angle <= endAngle
                else:
                    inRange = (startAngle <= m.angle and m.angle <= 360) or (0 <= m.angle and m.angle <= endAngle)
                if inRange:
                    self.measureList.pop(i)
                    i -= 1
                i += 1
            # Add measures
            for m in measures:
                self.__insort_measure(self.measureList, m)
            startAngle = endAngle

    def __insort_measure(self, l, m, lo=0, hi=None):
        if lo < 0:
            raise ValueError('lo must be non-negative')
        if hi is None:
            hi = len(l)
        while lo < hi:
            mid = (lo + hi) // 2
            if m.angle < l[mid].angle:
                hi = mid
            else:
                lo = mid + 1
        l.insert(lo, m)

    def __readByte(self):
        # serial.read can return byte or str depending on python version...
        return self.__strOrByteToInt(self.serial.read(1))
    
    def __strOrByteToInt(self, value):
        if isinstance(value, str):
            return int(value.encode('hex'), 16)
        if isinstance(value, int):
            return value
        return int.from_bytes(value, byteorder='big')

    def __readMeasures(self):
        result = []
        # Check and flush serial
        if not self.connected:
            return result
        # Wait for data start bytes
        found = False
        checksum = 0x55AA
        while not found and not self.stopThread:
            while self.serial.read(1) != b"\xaa":
                pass
            if self.serial.read(1) == b"\x55":
                found = True
        if self.stopThread:
            return []
        # Check packet type
        ct = self.__readByte()
        if ct != 0:
            return result
        # Get sample count in packet
        ls = self.__readByte()
        sampleCount = ls#int(ls.encode('hex'), 16)
        if sampleCount == 0:
            return result
        # Get start angle
        fsaL = self.__readByte()
        fsaM = self.__readByte()
        fsa = fsaL + fsaM * 256
        checksum ^= fsa
        startAngle = (fsa>>1)/64
        # Get end angle
        lsaL = self.__readByte()
        lsaM = self.__readByte()
        lsa = lsaL + lsaM * 256
        endAngle = (lsa>>1)/64
        # Compute angle diff
        aDiff = float(endAngle - startAngle)
        if (aDiff < 0):
            aDiff = aDiff + 360
        # Get checksum
        csL = self.__readByte()
        csM = self.__readByte()
        cs = csL + csM * 256
        # Read and parse data
        dataRaw = self.serial.read(sampleCount*2)
        data = []
        for i in range(0, sampleCount*2):
            data.append(self.__strOrByteToInt(dataRaw[i]))
        for i in range(0, sampleCount*2, 2):
            # Get distance
            siL = data[i]
            siM = data[i+1]
            checksum ^= (siL + siM * 256)
            distance = float(siL + siM * 256)/4
            # Get angle and correct value from distance
            angle = startAngle+(aDiff/float(sampleCount))*i/2
            angleCorrection = 0
            if distance > 0:
                angleCorrection = (atan(21.8 * ((155.3 - distance) / (155.3 * distance))) * (180 / pi))
            angle = angle + angleCorrection
            if angle>360:
                angle = angle-360
            # Append to result
            result.append(LidarMeasure(angle,distance))
        checksum ^= (ct + ls * 256)
        checksum ^= lsa
        # Validate checksum
        if checksum == cs:
            return result
        return []

plotlodar.py

Python
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
import numpy as np
import serial

# Open the serial port for communication with the lidar
ser = serial.Serial('/dev/ttyUSB0', 115200, timeout=1)

# Create a figure and axes for the plot
fig, ax = plt.subplots()
x = np.linspace(0, 360, 360)
line, = ax.plot(x, np.zeros(360))

# Function to read data from the lidar and update the plot
def update(frame):
    # Read data from the lidar
    data = ser.readline().decode().strip().split(',')
    data = list(map(int, data))

    # Update the plot with the new data
    line.set_ydata(data)
    ax.relim()
    ax.autoscale_view()

# Create an animation object and start the animation
ani = FuncAnimation(fig, update, interval=50)
plt.show()

ydlidardistancetime.py

Python
import serial
import time

# Connect to YDX2 lidar via serial port
ser = serial.Serial('/dev/ttyUSB0', 115200, timeout=1)  # Replace 'COM3' with the appropriate port name
time.sleep(2)  # Wait for serial port to initialize

# Continuously read angle and distance data from lidar
while True:
    # Read one line of data from lidar
    line = ser.readline().decode().strip()
    
    # Parse angle and distance from line of data
    try:
        angle, distance = map(float, line.split(','))
    except ValueError:
        continue  # Ignore any malformed data
    
    # Print angle and distance data
    print(f"Angle: {angle:.2f} degrees, Distance: {distance:.2f} mm")

Credits

Alan Jacob
1 project • 0 followers
Ashwini kumar sinha
31 projects • 76 followers
Ashwini kumar sinha a Robotic lover and electronics hobbyist. Works at EFY-I, Founder, CTO Buttonboard LLC

Comments