Aren Marukyan
Published © LGPL

Grain cleaning and sorting with machine vision

This device is a conveyor, at the input of which raw grain enters, and at the output we get grain clean from dirt.

IntermediateFull instructions provided2 hours569
Grain cleaning and sorting with machine vision

Things used in this project

Hardware components

webcam
×1
LED (generic)
LED (generic)
×1
DC Motor, 12 V
DC Motor, 12 V
×1
power suply 5v, 12v
×1
Raspberry Pi 3 Model B
Raspberry Pi 3 Model B
×1

Software apps and online services

Edge Impulse Studio
Edge Impulse Studio

Story

Read more

Code

classify.py

Python
#!/usr/bin/env python
import threading
import device_patches       # Device specific patches for Jetson Nano (needs to be before importing cv2)
import RPi.GPIO as GPIO
from time import sleep     #time
from array import *
GPIO. setwarnings ( False ) #      
GPIO. setmode ( GPIO.BOARD ) #      
GPIO. setup ( 11 , GPIO.OUT, initial=GPIO.LOW ) #  
leds= array('B',[11,13,15,19,21])# GPIO Pin numbers for actuator control
aa=0
yy=0

for i in range(len(leds)):
               
    GPIO. setup ( leds[i] , GPIO.OUT, initial=GPIO.LOW ) 



for i in range(0,5):
    GPIO.output(leds[i] , GPIO.HIGH) # Turn on actuator
    sleep(0.3)                  # Sleep for 0,3 second
    GPIO.output(leds[i] , GPIO.LOW)  # Turn off actuator
    sleep(0.3) 
               
    

#Non blocking Delay function
def LED_Off_Wait( wait_time,IO_port ):
    threading.Timer(wait_time, lambda: GPIO.output(IO_port, False)).start()


import cv2
import os
import sys, getopt
import signal
import time
from edge_impulse_linux.image import ImageImpulseRunner
GPIO.setwarnings(False) # Ignore warning for now
GPIO.setmode(GPIO.BOARD) # Use physical pin numbering




runner = None
# if you don't want to see a camera preview, set this to False
show_camera = True
if (sys.platform == 'linux' and not os.environ.get('DISPLAY')):
    show_camera = False

def now():
    return round(time.time() * 1000)

def get_webcams():
    port_ids = []
    for port in range(5):
        print("Looking for a camera in port %s:" %port)
        camera = cv2.VideoCapture(port)
        if camera.isOpened():
            ret = camera.read()[0]
            if ret:
                backendName =camera.getBackendName()
                w = camera.get(30)
                h = camera.get(40)
                print("Camera %s (%s x %s) found in port %s " %(backendName,h,w, port))
                port_ids.append(port)
            camera.release()
    return port_ids

def sigint_handler(sig, frame):
    print('Interrupted')
    if (runner):
        runner.stop()
    sys.exit(0)

signal.signal(signal.SIGINT, sigint_handler)

def help():
    print('python classify.py <path_to_model.eim> <Camera port ID, only required when more than 1 camera is present>')

def main(argv):
    try:
        opts, args = getopt.getopt(argv, "h", ["--help"])
    except getopt.GetoptError:
        help()
        sys.exit(2)

    for opt, arg in opts:
        if opt in ('-h', '--help'):
            help()
            sys.exit()

    if len(args) == 0:
        help()
        sys.exit(2)

    model = args[0]

    dir_path = os.path.dirname(os.path.realpath(__file__))
    modelfile = os.path.join(dir_path, model)

    print('MODEL: ' + modelfile)

    with ImageImpulseRunner(modelfile) as runner:
        try:
            model_info = runner.init()
            print('Loaded runner for "' + model_info['project']['owner'] + ' / ' + model_info['project']['name'] + '"')
            labels = model_info['model_parameters']['labels']
            if len(args)>= 2:
                videoCaptureDeviceId = int(args[1])
            else:
                port_ids = get_webcams()
                if len(port_ids) == 0:
                    raise Exception('Cannot find any webcams')
                if len(args)<= 1 and len(port_ids)> 1:
                    raise Exception("Multiple cameras found. Add the camera port ID as a second argument to use to this script")
                videoCaptureDeviceId = int(port_ids[0])

            camera = cv2.VideoCapture(videoCaptureDeviceId)
            ret = camera.read()[0]
            if ret:
                backendName = camera.getBackendName()
                w = camera.get(3)
                h = camera.get(4)
                print("Camera %s (%s x %s) in port %s selected." %(backendName,h,w, videoCaptureDeviceId))
                camera.release()
            else:
                raise Exception("Couldn't initialize selected camera.")

            next_frame = 0 # limit to ~10 fps here

            for res, img in runner.classifier(videoCaptureDeviceId):
                if (next_frame > now()):
                    time.sleep((next_frame - now()) / 1000)

                # print('classification runner response', res)

                if "classification" in res["result"].keys():
                    print('Result (%d ms.) ' % (res['timing']['dsp'] + res['timing']['classification']), end='')
                    for label in labels:
                        score = res['result']['classification'][label]
                        print('%s: %.2f\t' % (label, score), end='')
                    print('', flush=True)

                elif "bounding_boxes" in res["result"].keys():
                    global yy
                    global aa
                    print('Found %d bounding boxes (%d ms.)' % (len(res["result"]["bounding_boxes"]), res['timing']['dsp'] + res['timing']['classification']))
                    for bb in res["result"]["bounding_boxes"]:
                        print('\t%s (%.2f): x=%d y=%d w=%d h=%d' % (bb['label'], bb['value'], bb['x'], bb['y'], bb['width'], bb['height']))

   #number of actuator -- if bb['x']=42   then     number=42//19=2              
                        aa=bb['x']//19 
                        yy=bb['y']
                        if yy<10: #threshold value for actuating the actuator
                            GPIO.output(leds[aa], GPIO.HIGH) # Turn on actuator
                            LED_Off_Wait(0.2,leds[aa])
                        yy=20 
                    
               
                        img = cv2.rectangle(img, (bb['x'], bb['y']), (bb['x'] + bb['width'], bb['y'] + bb['height']), (255, 0, 0), 1)
                        
                        

                        
                        

                if (show_camera):
                    cv2.imshow('edgeimpulse', cv2.cvtColor(img, cv2.COLOR_RGB2BGR))
                    if cv2.waitKey(1) == ord('q'):
                        break

                next_frame = now() + 100
        finally:
            if (runner):
                runner.stop()

if __name__ == "__main__":
   main(sys.argv[1:])

Credits

Aren Marukyan
2 projects • 1 follower
Contact

Comments

Please log in or sign up to comment.