import asyncio
import time
import cv2
import serial
import numpy as np
from mavsdk import System
from mavsdk.offboard import (OffboardError, VelocityBodyYawspeed)
def empty(a):
pass
frameWidth = 640
frameHeight = 480
cap = cv2.VideoCapture(-1)
cap.set(3, frameWidth)
cap.set(4, frameHeight)
cv2.resizeWindow("img", 1280, 940)
cv2.resizeWindow("imgCanny", 1280, 940)
cv2.resizeWindow("imgContour", 1280, 940)
cv2.resizeWindow("Tracking", 1500, 100)
#cv2.createTrackbar("Area", "Parameters", 5000, 10000, empty)
#------------------------------------------------------------------------------------------------------------------------------
# this function places images side by side each other
def stackImages(scale , imgArray):
rows = len(imgArray)
cols = len(imgArray[0])
rowsAvailable = isinstance(imgArray[0], list)
width = imgArray[0][0].shape[1]
height = imgArray[0][0].shape[0]
if rowsAvailable:
for x in range(0, rows):
for y in range(0, cols):
if imgArray[x][y].shape[:2] == imgArray[0][0].shape[:2]:
imgArray[x][y] = cv2.resize(imgArray[x][y], (0, 0), None, scale, scale)
else:
imgArray[x][y] = cv2.resize(imgArray[x][y], (imgArray[0][0].shape[1], imgArray[0][0]), None, scale, scale)
if len(imgArray[x][y].shape) == 2: imgArray[x][y] = cv2.Color(imgArray[x][y], cv2.COLOR_GRAY2BGR)
imageBlank = np.zeros((height, width, 3), np.uint8)
hor = [imageBlank]*rows
hor_con = [imageBlank]*rows
for x in range(0, rows):
hor[x] = np.hstack(imgArray[x])
ver = np.vstach(hor)
else:
for x in range(0, rows):
if imgArray[x].shape[:2] == imgArray[0].shape[:2]:
imgArray[x] = cv2.resize(imgArray[x], (0, 0), None, scale, scale)
else:
imgArray[x] = cv2.resize(imgArray[x], (imgArray[0].shape[1], imgArray[0].shape[0]), None, scale, scale)
if len(imgArray[x].shape) == 2: imgArray[x] = cv2.cvtColor(imgArray[x], cv2.COLOR_GRAY2BGR)
hor = np.hstack(imgArray)
ver = hor
return ver
#------------------------------------------------------------------------------------------------------------------------------
def drawBox(img, bbox):
x, y, w, h = int(bbox[0]),int(bbox[1]),int(bbox[2]),int(bbox[3])
cv2.rectangle(img,(x, y), ((x + w), (y + h)), (255, 0, 255), 3, 1)
cv2.putText(img,"Tracking",(20,40),cv2.FONT_HERSHEY_SIMPLEX,0.7,(0,255,255),2)
#------------------------------------------------------------------------------------------------------------------------------
# this function prints the colorful outline around the object
# and prints how many points there are
# and prints the area of the object
def getContours(img, imgContour):
global x
global y
global w
global h
global bbox
contours, hierarchy = cv2.findContours(img, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE)
#areaNotPrinted = True
for cnt in contours:
area = cv2.contourArea(cnt)
"""
if(areaNotPrinted):
print(area)
areaNotPrinted = False
"""
#areaMin = cv2.getTrackbarPos("Area", "Parameters")
areaMin = 5000
areaMax = 13000
if (area >= areaMin and area <= areaMax):
cv2.drawContours(imgContour, cnt, -1, (0, 0, 255), 7)
peri = cv2.arcLength(cnt, True)
approx = cv2.approxPolyDP(cnt, 0.02 * peri, True)
#print(len(approx))
# show bounding box
x, y, w, h = cv2.boundingRect(approx)
cv2.rectangle(imgContour, (x, y), (x + w, y + h), (0, 255, 0), 5)
cv2.putText(imgContour, "Points: " + str(len(approx)), (x + w + 20, y + 20), cv2.FONT_HERSHEY_COMPLEX, 0.7,
(0, 255, 0), 2)
cv2.putText(imgContour, "Area: " + str(int(area)), (x + w + 20, y + 45), cv2.FONT_HERSHEY_COMPLEX, 0.7,
(0, 255, 0), 2)
#print("x =",x, "y =",y, "w =",w, "h =",h, " Points =",str(len(approx)), "Area =", str(int(area)))
bbox = (x, y, w, h)
return True
else:
return False
#------------------------------------------------------------------------------------------------------------------------------
# Start loop running drone commands
async def run():
global bbox
contourFinished = False
xDirection = False
yDirection = True
centered = False
tableHeight = 0.75
rows = 0
tableRows = 3
columns = 0
tableColumns = 2
sim = False
if sim:
drone = System()
await drone.connect()
if not sim:
drone = System(mavsdk_server_address='localhost', port=50051)
await drone.connect(system_address="serial:///dev/ttymxc2:921600")
print("Waiting for drone to connect...")
async for state in drone.core.connection_state():
if state.is_connected:
print(f"Drone discovered with UUID: {state.uuid}")
break
print("-- Arming")
await drone.action.arm()
print("-- Taking off")
await drone.action.set_takeoff_altitude(tableHeight + 0.2)
await drone.action.takeoff()
await asyncio.sleep(20)
print("-- Setting initial setpoint")
await drone.offboard.set_velocity_body(VelocityBodyYawspeed(0.0, 0.0, 0.0, 0.0))
print("-- Starting offboard")
try:
await drone.offboard.start()
except OffboardError as error:
print(f"Starting offboard mode failed with error code: \
{error._result.result}")
print("-- Disarming")
await drone.action.disarm()
return
while (not contourFinished):
success, img = cap.read()
imgContour = img.copy()
frame = img.copy()
imgBlur = cv2.GaussianBlur(img, (7, 7), 1)
imgGray = cv2.cvtColor(imgBlur, cv2.COLOR_BGR2GRAY)
imgCanny = cv2.Canny(imgGray, 150, 225)
kernel = np.ones((5, 5))
imgDil = cv2.dilate(imgCanny, kernel, iterations = 1)
#imgStack = stackImages(0.8,([img, imgCanny, imgContour, frame]))
# If the drone doesn't see the square, go forward at 0.5m/s until found
if (not getContours(imgDil, imgContour)):
getContours(imgDil, imgContour)
print("-- Go forward at 1 m/s: Looking...")
await drone.offboard.set_velocity_body(VelocityBodyYawspeed(0.5, 0.0, 0.0, 0.0))
# Once the drone has found the square, stop drone, prepare for tracking algorithm
else:
print("Found")
await drone.offboard.set_velocity_body(VelocityBodyYawspeed(0.0, 0.0, 0.0, 0.0))
await asyncio.sleep(2)
tracker = cv2.TrackerKCF_create()
time.sleep(0.25)
success, frame = cap.read()
print("getContours=", bbox)
drawBox(frame, bbox)
tracker.init(frame,bbox)
contourFinished = True
#Prepare tracking algorithm
while contourFinished:
#cv2.imshow("Tracking", frame)
#timer = cv2.getTickCount()
success, frame = cap.read()
success, bbox = tracker.update(frame)
bbox2 = (bbox[0]-5, bbox[1]-5, bbox[2]+10, bbox[3]+10)
centerPoint = (bbox[0]+(bbox[0]/2), bbox[1]+(bbox[1]/2))
#Start moving the drone in the givin directions
if success:
#STARTING X DIRECTION LOOP
while (not xDirection):
success, frame = cap.read()
success, bbox = tracker.update(frame)
bbox2 = (bbox[0]-5, bbox[1]-5, bbox[2]+10, bbox[3]+10)
centerPoint = (bbox[0]+(bbox[0]/2), bbox[1]+(bbox[1]/2))
if (centerPoint[0] > (frameWidth/4)+5):
print(" Move left", centerPoint[0] - frameWidth/4, "pixels", end='')
print("")
await drone.offboard.set_velocity_body(VelocityBodyYawspeed(0.0, -0.5, 0.0, 0.0))
if (centerPoint[0] < (frameWidth/4)-5):
print(" Move right", frameWidth/4 - centerPoint[0], "pixels", end='')
print("")
await drone.offboard.set_velocity_body(VelocityBodyYawspeed(0.0, 0.5, 0.0, 0.0))
if (centerPoint[0] < (frameWidth/4)+5 and centerPoint[0] > (frameWidth/4)-5):
xDirection = True
yDirection = False
print("xDirection = True")
#Y DIRECTION LOOP IF X DIRECTION FINISHED
while (not yDirection):
success, frame = cap.read()
success, bbox = tracker.update(frame)
bbox2 = (bbox[0]-5, bbox[1]-5, bbox[2]+10, bbox[3]+10)
centerPoint = (bbox[0]+(bbox[0]/2), bbox[1]+(bbox[1]/2))
if (centerPoint[1] > (frameHeight/4)+5):
print(" Move forward", centerPoint[1] - frameHeight/4, "pixels", end='')
print("")
await drone.offboard.set_velocity_body(VelocityBodyYawspeed(0.5, 0.0, 0.0, 0.0))
if (centerPoint[1] < (frameHeight/4)-5):
print(" Move backward", frameHeight/4 - centerPoint[1], "pixels", end='')
print("")
await drone.offboard.set_velocity_body(VelocityBodyYawspeed(-0.5, 0.0, 0.0, 0.0))
if (centerPoint[1] < (frameHeight/4)+5 and centerPoint[1] > (frameHeight/4)-5):
yDirection = False
centered = True
break
# While the drone is centered, spray sanatizer on desk, and move forward out of the view of the square
while (centered):
rows = rows + 1
print("rows = ", rows)
await drone.offboard.set_velocity_body(VelocityBodyYawspeed(0.0, 0.0, 0.0, 0.0))
print("Centered")
if (not sim):
port = serial.Serial("/dev/ttyACM0", baudrate=19200, timeout=1.0)
sercmd_on = ("relay on 0 \n\r")
sercmd_off = ("relay off 0 \n\r")
port.write(sercmd_on.encode())
time.sleep(2)
port.write(sercmd_off.encode())
print("Getting square out of frame...")
await drone.offboard.set_velocity_body(VelocityBodyYawspeed(0.5, 0.0, 0.0, 0.0))
await asyncio.sleep(1.5)
if(rows == tableRows and columns == tableColumns):
print("Centered. Landing...")
await drone.action.land()
await asyncio.sleep(5)
break
# If reached the end turn right
if (rows == tableRows and columns % 2 == 0):
print("At the end of the row, enter a new column")
await drone.offboard.set_velocity_body(VelocityBodyYawspeed(0.0, 0.0, 0.0, 90.0))
await asyncio.sleep(1)
columns = columns + 1
rows = 0
print("Columns = ", columns)
# If reached the end turn left
if(rows == tableRows and columns % 2 != 0):
print("At the end of the row, enter a new column")
await drone.offboard.set_velocity_body(VelocityBodyYawspeed(0.0, 0.0, 0.0, -90.0))
await asyncio.sleep(1)
columns = columns + 1
rows = 0
print("Columns = ", columns)
else:
print("Do not turn right/left, continue to next desk")
contourFinished = False
xDirection = False
yDirection = True
centered = False
else:
#cv2.putText(frame,"Lost",(20,40),cv2.FONT_HERSHEY_SIMPLEX,0.7,(0,0,255),2)
print("Lost")
contourFinished = False
await drone.offboard.set_velocity_body(VelocityBodyYawspeed(0.0, 0.0, 0.0, 0.0))
print("-- Stopping offboard")
try:
await drone.offboard.stop()
except OffboardError as error:
print(f"Stopping offboard mode failed with error code: \
{error._result.result}")
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(run())
#fps = cv2.getTickFrequency()/(cv2.getTickCount()-timer)
#cv2.putText(frame,str(int(fps)),(75,50),cv2.FONT_HERSHEY_SIMPLEX,0.7,(0,0,255),2)
#if cv2.waitKey(1) & 0xFF == ord('q'):
# break
Comments