Updated for 2019 China-US Young Maker Competition
Intro: Robotic Arm and AI to Sort Out Recyclable GarbageRobotic Arm has already been changing the evolution of human being. But the knowledge is usually kept as industrial level, while building this project, it was still difficult to find open source guides and libraries that both utilizes and controls the robotic arm, so I've decide to write a guide on how to use the robotic arm using Raspberry Pi 3 to sort out recyclable garbage.
If we want robotic arm to move on it's own we'd need AI on the edge. In this specific example we are going to use detection method since the arm shouldn't do anything when nothing is detected.
According to Google Research Publication, Single Shot MultiBox Detector (SSD) discretizes the output space of bounding boxes into a set of default boxes over different aspect ratios and scales per feature map location. At prediction time, the network generates scores for the presence of each object category in each default box and produces adjustments to the box to better match the object shape. network also combines predictions from multiple feature maps with different resolutions to naturally handle objects of various sizes.
For the past couple of years we've been using YOLO, but with advancement of SSD it is much faster than the YOLO method. Also, SSD is a lot more efficient than traditional YOLO method as SSD uses multiple activation maps for the bounding box as YOLO uses a single activation map for prediction of classes and bounding boxes.
We utilized couple of libraries such as qt 5 where you have to install prior making the project work. And after not finding much support on dobot's website, we've still managed to get Dobot Arm working with Raspberry Pi3.
The equipment needed is a Raspberry P i3 B+, Movidius NCS, robotic arm and Suction Cup Addon. Make sure you have a 5v/2.5 amp power supply, between camera, Neural Computing Stick and Pi itself we'd need a little more power. Personally I've tried 2.1amp and it was not enough.
With a clean SD card, we'd first need to download Raspberry Pi to the mini SD card so we can use Pi as our operating system.
https://www.raspberrypi.org/downloads/
With Our OS installed, we can now see the Pi running
Alternatively, the Ubuntu Mate would also work for this project.
Step 2: Setting Up QT5 Along with DobotThis part is a bit tricky, as Dobot does not have official support for Pi, but however, I did find https://github.com/nanusefue/dobotMagician being able to use the device. We need to install following
git clone https://github.com/nanusefue/dobotMagician.git
sudo apt-get install qt5-default
sudo apt-get install qtcreator
sudo apt-get install libqt5serialport5-dev
After that, we should be able to see the basic control over the Dobot Arm using Raspberry Pi.
Technically we can train the arm to pick up and do just about anything, but for this guide, let's use it to pick up pre-trained objects. In order to do that we'd first have to install suction cup which is part of Dobot's kit. GP3 is connected to Arm's connector #1
Afterwards, we'd need either PC or Mac to program each state of the arm when triggered. In this case we will do just 7 steps, we can do this automatically by holding the lock button on Dobot Magician Arm and change the value of suction cup.
Save the file as xml file and transfer it to raspberry pi, run the following command to make it into a json file
parserv2.py -Xml robot.playback -json demo.json
Next we can program the raspberry pi's python file to do the same which can create the similar movement.
import threading
import DobotDllType as dType
import time
import argparse
import json
from pprint import pprint
from collections import OrderedDict
class ScriptRobot():
global CON_STR
CON_STR = {
dType.DobotConnect.DobotConnect_NoError: "DobotConnect_NoError",
dType.DobotConnect.DobotConnect_NotFound: "DobotConnect_NotFound",
dType.DobotConnect.DobotConnect_Occupied: "DobotConnect_Occupied"}
def __init__(self,Json):
self.Json=Json
self.api=dType.load()
self.state=""
def Connect(self):
#Connect Dobot
self.state = dType.ConnectDobot(self.api, "", 115200)[0]
dType.GetDeviceSN(self.api)
dType.GetDeviceName(self.api)
dType.GetDeviceVersion(self.api)
dType.GetDeviceWithL(self.api)
dType.GetPoseL(self.api)
dType.GetKinematics(self.api)
#dType.GetHOMEParams(self.api)
print("Connect status:",CON_STR[self.state])
if (self.state == dType.DobotConnect.DobotConnect_NoError):
dType.SetQueuedCmdClear(self.api)
return True
else :
dType.DisconnectDobot(self.api)
return False
"""
def _MOVJ(self,data):
dType.SetPTPCmd(self.api,dType.PTPMode.PTPMOVJXYZMode,float(data['X']),float(data['Y']),float(data['Z']),float(data['R']), isQueued = 1)
def _MOVL(self,data):
dType.SetPTPCmd(self.api,dType.PTPMode.PTPMOVLXYZMode,float(data['X']),float(data['Y']),float(data['Z']),float(data['R']), isQueued = 1)
def _ARC(self,value,data):
dType.SetARCCmd(self.api,[float(data['X']),float(data['Y']),float(data['Z']),0],[float(data['_X']),float(data['_Y']),float(data['_Z']),0], isQueued = 1)
def moveTypes(self,value,data):
if value=="MOVJ" :
return self._MOVJ(data)
elif value=="MOVL" :
return self._MOVL(data)
elif value=="ARC" :
return self._ARC(value,data)
"""
def ParserMove(self):
dType.SetQueuedCmdClear(self.api)
json_data = open(self.Json)
data = json.load(json_data, object_pairs_hook=OrderedDict)
#def SetPTPCoordinateParams(api, xyzVelocity, xyzAcceleration, rVelocity, rAcceleration, isQueued=0):
for move in data:
#print "TEST_:"+data[move]['MotionStyle'],data[move]['Row']
if data[move]['PauseTime']!=0:
lastIndex=dType.SetWAITCmd(self.api,float(data[move]['PauseTime']), isQueued = 1)[0]
if data[move]['MotionStyle']=="MOVJ" :
lastIndex=dType.SetPTPCmd(self.api,dType.PTPMode.PTPMOVJXYZMode,float(data[move]['X']),float(data[move]['Y']),float(data[move]['Z']),float(data[move]['R']), isQueued = 1)[0]
dType.SetEndEffectorSuctionCup(self.api, 1, 0, isQueued = 1)
if data[move]['MotionStyle']=="MOVL" :
lastIndex=dType.SetPTPCmd(self.api,dType.PTPMode.PTPMOVLXYZMode,float(data[move]['X']),float(data[move]['Y']),float(data[move]['Z']),float(data[move]['R']), isQueued = 1)[0]
if data[move]['MotionStyle']=="MOVJANGLE" :
lastIndex=dType.SetARCCmd(self.api,[float(data[move]['X']),float(data[move]['Y']),float(data[move]['Z']),0],[float(data[move]['_X']),float(data[move]['_Y']),float(data[move]['_Z']),0], isQueued = 1)[0]
dType.SetQueuedCmdStartExec(self.api)
while lastIndex > dType.GetQueuedCmdCurrentIndex(self.api)[0]:
# dType.GetPose(self.api) Obtener la Posicion actual del robot
dType.dSleep(100)
dType.SetQueuedCmdStopExec(self.api)
dType.GetKinematics(self.api)
dType.DisconnectDobot(self.api)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Script Robot')
parser.add_argument('--Json', required=True, help='File name export json')
args = parser.parse_args()
R = ScriptRobot(args.Json)
R.Connect()
R.ParserMove()
When success, we can see something like this straight from Raspberry Pi by running following command
python3 Script.py --Json garbage.json
We need to repeat the process and create the second batch that is "recyclable.json"
python3 Script.py --Json recyclable.json
Step 4: AI on the EdgeThere are a lot of ways we can make the trigger, since this is an AI based application I will make an AI based trigger. In this guide we will be using a SSD neural net that is pre-trained and working with Caffe, and the detection will be garbage. So we'd also learn how to utilize other neural network with little bit of work. We will first install Movidius NCS SDK
cd ~/workspace
git clone https://github.com/movidius/ncsdk.git
cd ~/workspace/ncsdk
make install
Full instruction can be seen from the video below
We'd also likely to install opencv, WARNING: this process will take a long time, around 5 hours for me on RP3 B+
cd ~/workspace/ncsdk
./http://install-opencv.sh
Next we need to download all the sample apps ncappzoo, which is also created by Movidius, and the specific app we need is stream inference, which can be gotten from the example file.
cd ~/workspace
git clone https://github.com/movidius/ncappzoo.git
cd ncappzoo
make
cd caffe
make
This should be able to get the entire software stack for NCS and OpenCV, next is we can create our own folder and run based on the live-object-detector example
cd ~/workspace
git
cd robotic-arm
./python3 test.py
From here, let's use the 2 different objects, that one is garbage and another is recycle for our demo.
After all is done, we can view objects like below
We can use the other classifications as recyclable goods.
Now that we have both AI on the edge and robotic arm running, we can easily do the integration. Once object is detected, can use following command
import os
os.system('python3 Script.py --Json recycle.json')
and
import os
os.system('python3 Script.py --Json recycle.json')
From Step 2, when all said and done. We can use following code to run the AI on the edge through Movidius NCS and trigger the robotic arm movement.
#!/usr/bin/python3
# ****************************************************************************
# Copyright(c) 2017 Intel Corporation.
# License: MIT See LICENSE file in root directory.
# ****************************************************************************
# Detect objects on a LIVE camera feed using
# Intel® Movidius™ Neural Compute Stick (NCS)
import os
import cv2
import sys
import numpy
import ntpath
import argparse
import subprocess
import mvnc.mvncapi as mvnc
from utils import visualize_output
from utils import deserialize_output
# Detection threshold: Minimum confidance to tag as valid detection
CONFIDANCE_THRESHOLD = 0.60 # 60% confidant
# Variable to store commandline arguments
ARGS = None
# OpenCV object for video capture
camera = None
framecount = 0
# ---- Step 1: Open the enumerated device and get a handle to it -------------
def open_ncs_device():
# Look for enumerated NCS device(s); quit program if none found.
devices = mvnc.EnumerateDevices()
if len( devices ) == 0:
print( "No devices found" )
quit()
# Get a handle to the first enumerated device and open it
device = mvnc.Device( devices[0] )
device.OpenDevice()
return device
# ---- Step 2: Load a graph file onto the NCS device -------------------------
def load_graph( device ):
# Read the graph file into a buffer
with open( ARGS.graph, mode='rb' ) as f:
blob = f.read()
# Load the graph buffer into the NCS
graph = device.AllocateGraph( blob )
return graph
# ---- Step 3: Pre-process the images ----------------------------------------
def pre_process_image( frame ):
# Resize image [Image size is defined by choosen network, during training]
img = cv2.resize( frame, tuple( ARGS.dim ) )
# Convert RGB to BGR [OpenCV reads image in BGR, some networks may need RGB]
if( ARGS.colormode == "rgb" ):
img = img[:, :, ::-1]
# Mean subtraction & scaling [A common technique used to center the data]
img = img.astype( numpy.float16 )
img = ( img - numpy.float16( ARGS.mean ) ) * ARGS.scale
return img
# ---- Step 4: Read & print inference results from the NCS -------------------
def infer_image( graph, img, frame ):
# Load the image as a half-precision floating point array
graph.LoadTensor( img, 'user object' )
# Get the results from NCS
output, userobj = graph.GetResult()
# Get execution time
inference_time = graph.GetGraphOption( mvnc.GraphOption.TIME_TAKEN )
# Deserialize the output into a python dictionary
output_dict = deserialize_output.ssd(
output,
CONFIDANCE_THRESHOLD,
frame.shape )
# Print the results (each image/frame may have multiple objects)
#print( "I found these objects in "
# + " ( %.2f ms ):" % ( numpy.sum( inference_time ) ) )
global framecount
for i in range( 0, output_dict['num_detections'] ):
#print( "%3.1f%%\t" % output_dict['detection_scores_' + str(i)]
# + labels[ int(output_dict['detection_classes_' + str(i)]) ]
# + ": Top Left: " + str( output_dict['detection_boxes_' + str(i)][0] )
# + " Bottom Right: " + str( output_dict['detection_boxes_' + str(i)][1] ) )
if 'bottle' in labels[ int(output_dict['detection_classes_' + str(i)]) ] and framecount < 8:
framecount = framecount + 1
# Draw bounding boxes around valid detections
(y1, x1) = output_dict.get('detection_boxes_' + str(i))[0]
(y2, x2) = output_dict.get('detection_boxes_' + str(i))[1]
# Prep string to overlay on the image
display_str = (
labels[output_dict.get('detection_classes_' + str(i))]
+ ": "
+ str( output_dict.get('detection_scores_' + str(i) ) )
+ "%" )
frame = visualize_output.draw_bounding_box(
y1, x1, y2, x2,
frame,
thickness=4,
color=(255, 255, 0),
display_str=display_str )
elif 'bottle' in labels[ int(output_dict['detection_classes_' + str(i)]) ] and framecount > 5:
framecount = 0
os.system('python3 Script.py --Json garbage.json')
print('called robot')
if 'tvmonitor' in labels[ int(output_dict['detection_classes_' + str(i)]) ] and framecount < 8:
framecount = framecount + 1
# Draw bounding boxes around valid detections
(y1, x1) = output_dict.get('detection_boxes_' + str(i))[0]
(y2, x2) = output_dict.get('detection_boxes_' + str(i))[1]
# Prep string to overlay on the image
display_str = (
labels[output_dict.get('detection_classes_' + str(i))]
+ ": "
+ str( output_dict.get('detection_scores_' + str(i) ) )
+ "%" )
frame = visualize_output.draw_bounding_box(
y1, x1, y2, x2,
frame,
thickness=4,
color=(255, 255, 0),
display_str=display_str )
elif 'tvmonitor' in labels[ int(output_dict['detection_classes_' + str(i)]) ] and framecount > 5:
framecount = 0
os.system('python3 Script.py --Json recycle.json')
print('called robot')
print( '\n' )
# If a display is available, show the image on which inference was performed
if 'DISPLAY' in os.environ:
cv2.imshow( 'NCS live inference', frame )
# ---- Step 5: Unload the graph and close the device -------------------------
def close_ncs_device( device, graph ):
graph.DeallocateGraph()
device.CloseDevice()
camera.release()
cv2.destroyAllWindows()
# ---- Main function (entry point for this script ) --------------------------
def main():
device = open_ncs_device()
graph = load_graph( device )
# Main loop: Capture live stream & send frames to NCS
while( True ):
ret, frame = camera.read()
img = pre_process_image( frame )
infer_image( graph, img, frame )
# Display the frame for 5ms, and close the window so that the next
# frame can be displayed. Close the window if 'q' or 'Q' is pressed.
if( cv2.waitKey( 5 ) & 0xFF == ord( 'q' ) ):
break
close_ncs_device( device, graph )
# ---- Define 'main' function as the entry point for this script -------------
if __name__ == '__main__':
parser = argparse.ArgumentParser(
description="Detect objects on a LIVE camera feed using \
Intel® Movidius™ Neural Compute Stick." )
parser.add_argument( '-g', '--graph', type=str,
default='../../caffe/SSD_MobileNet/graph',
help="Absolute path to the neural network graph file." )
parser.add_argument( '-v', '--video', type=int,
default=0,
help="Index of your computer's V4L2 video device. \
ex. 0 for /dev/video0" )
parser.add_argument( '-l', '--labels', type=str,
default='../../caffe/SSD_MobileNet/labels.txt',
help="Absolute path to labels file." )
parser.add_argument( '-M', '--mean', type=float,
nargs='+',
default=[127.5, 127.5, 127.5],
help="',' delimited floating point values for image mean." )
parser.add_argument( '-S', '--scale', type=float,
default=0.00789,
help="Absolute path to labels file." )
parser.add_argument( '-D', '--dim', type=int,
nargs='+',
default=[300, 300],
help="Image dimensions. ex. -D 224 224" )
parser.add_argument( '-c', '--colormode', type=str,
default="bgr",
help="RGB vs BGR color sequence. This is network dependent." )
ARGS = parser.parse_args()
# Create a VideoCapture object
camera = cv2.VideoCapture( ARGS.video )
# Set camera resolution
camera.set( cv2.CAP_PROP_FRAME_WIDTH, 620 )
camera.set( cv2.CAP_PROP_FRAME_HEIGHT, 480 )
# Load the labels file
labels =[ line.rstrip('\n') for line in
open( ARGS.labels ) if line != 'classes\n']
main()
# ==== End of file ===========================================================
And now we have entire system running
Everything is said and done, enjoy the robotic arm that's powered by AI. Now that you've learned how to build robots through AI, use your power for good.
Comments