Welcome to Hackster!
Hackster is a community dedicated to learning hardware, from beginner to pro. Join us, it's free!
Tim Hodges
Published © GPL3+

Traffic camera

An outdoor camera that counts passing traffic and logs their speed and direction. See the results @ sites.google.com/view/burnshotroad/home

IntermediateFull instructions provided3 days22,248
Traffic camera

Things used in this project

Hardware components

Raspberry Pi 3 Model B
Raspberry Pi 3 Model B
×1
Raspberry Pi PoE HAT
Raspberry Pi PoE HAT
×1
Pi NoIR Camera V2
Raspberry Pi Pi NoIR Camera V2
×1
Naturebytes Wildlife Cam Case
×1
5v Cooling Fan for Raspberry Pi
Doesn't have to be this model - any fan compatible with the Pi will do. This one also included a heat sink to place on the CPU.
×1

Software apps and online services

OpenCV
OpenCV
Raspbian
Raspberry Pi Raspbian

Story

Read more

Code

carspeed Version 3

Python
Example python3 carspeed.py -ulx 136 -uly 321 -lrx 896 -lry 444
# CarSpeed Version 3.0

# import the necessary packages
from picamera.array import PiRGBArray
from picamera import PiCamera
import time
import math
import datetime
import cv2
import paho.mqtt.client as mqtt
import numpy as np
import argparse


# place a prompt on the displayed image
def prompt_on_image(txt):
    global image
    cv2.putText(image, txt, (10, 35),
    cv2.FONT_HERSHEY_SIMPLEX, 0.35, (0, 0, 255), 1)
     
# calculate speed from pixels and time
def get_speed(pixels, ftperpixel, secs):
    if secs > 0.0:
        return ((pixels * ftperpixel)/ secs) * 0.681818    # Magic number to convert fps to mph
    else:
        return 0.0
 
# calculate elapsed seconds
def secs_diff(endTime, begTime):
    diff = (endTime - begTime).total_seconds()
    return diff

# record speed in .csv format
def record_speed(res):
    global csvfileout
    f = open(csvfileout, 'a')
    f.write(res+"\n")
    f.close

# mapping function equivalent to C map function from Arduino
def my_map(x, in_min, in_max, out_min, out_max):
    return int((x-in_min) * (out_max-out_min) / (in_max-in_min) + out_min)

def measure_light(hsvImg):
    #Determine luminance level of monitored area 
    #returns the median from the histogram which contains 0 - 255 levels
    hist = cv2.calcHist([hsvImg], [2], None, [256],[0,255])
    windowsize = (hsvImg.size)/3   #There are 3 pixels per HSV value 
    count = 0
    sum = 0
    print (windowsize)
    for value in hist:
        sum = sum + value
        count +=1    
        if (sum > windowsize/2):   #test for median
            break
    return count   

#Reciprocal function curve to give a smaller number for bright light and a bigger number for low light
def get_save_buffer(light):
    save_buffer = int((100/(light - 0.5)) + MIN_SAVE_BUFFER)    
    print(" save buffer " + str(save_buffer))
    return save_buffer

def get_min_area(light):
    if (light > 10):
        light = 10;
    area =int((1000 * math.sqrt(light - 1)) + 100)
    print("min area= " + str(area)) 
    return area

def get_threshold(light):
   #Threshold for dark needs to be high so only pick up lights on vehicle
    if (light <= 1):
        threshold = 130
    elif(light <= 2):
        threshold = 100
    elif(light <= 3):
        threshold = 60
    else:
        threshold = THRESHOLD
    print("threshold= " + str(threshold))
    return threshold

def store_image():
    # timestamp the image - 
    global cap_time, image, mean_speed
    cv2.putText(image, cap_time.strftime("%A %d %B %Y %I:%M:%S%p"),
    (10, image.shape[0] - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.75, (0, 255, 0), 1)
    # write the speed: first get the size of the text
    size, base = cv2.getTextSize( "%.0f mph" % mean_speed, cv2.FONT_HERSHEY_SIMPLEX, 2, 3)
    # then center it horizontally on the image
    cntr_x = int((IMAGEWIDTH - size[0]) / 2) 
    cv2.putText(image, "%.0f mph" % mean_speed,
    (cntr_x , int(IMAGEHEIGHT * 0.2)), cv2.FONT_HERSHEY_SIMPLEX, 2.00, (0, 255, 0), 3)
    # and save the image to disk
    imageFilename = "car_at_" + cap_time.strftime("%Y%m%d_%H%M%S") + ".jpg"
    cv2.imwrite(imageFilename,image)

def store_traffic_data():
    global cap_time, mean_speed, direction, counter, sd, client

    csvString=(cap_time.strftime("%Y-%m-%d") + ' ' +\
    cap_time.strftime('%H:%M:%S:%f')+','+("%.0f" % mean_speed) + ',' +\
    ("%d" % direction) + ',' + ("%d" % counter) + ','+ ("%d" % sd))

    record_speed(csvString)

    jsonstring = '{"created_at":'+'"'+ cap_time.strftime("%Y-%m-%d")+\
        ' ' + cap_time.strftime('%H:%M:%S:%f')+'"' +\
        ',"field1":'+("%.0f" % mean_speed)+\
        ',"field2":' + ("%d" % direction) +\
        ',"field3":' + ("%d" % counter) +\
        ',"field4":' + ("%.0f" % sd) +\
        '}'
    client.publish('traffic', jsonstring)  #Publish MQTT data
    
    
# define some constants
L2R_DISTANCE = 47  #<---- enter your distance-to-road value for cars going left to right here
R2L_DISTANCE = 37  #<---- enter your distance-to-road value for cars going left to right here
MIN_SPEED_IMAGE = 50  #<---- enter the minimum speed for saving images
SAVE_CSV = True  #<---- record the results in .csv format in carspeed_(date).csv
MIN_SPEED_SAVE = 10  #<---- enter the minimum speed for publishing to MQTT broker and saving to CSV
MAX_SPEED_SAVE = 80  #<---- enter the maximum speed for publishing to MQTT broker and saving to CSV

THRESHOLD = 25
MIN_AREA = 175
BLURSIZE = (15,15)
IMAGEWIDTH = 1024
IMAGEHEIGHT = 592
RESOLUTION = [IMAGEWIDTH,IMAGEHEIGHT]
FOV = 62.2 # Pi Camera v2 is wider
FPS = 30
SHOW_BOUNDS = True
SHOW_IMAGE = True

# the following enumerated values are used to make the program more readable
WAITING = 0
TRACKING = 1
SAVING = 2
UNKNOWN = 0
LEFT_TO_RIGHT = 1
RIGHT_TO_LEFT = 2
TOO_CLOSE = 0.4
MIN_SAVE_BUFFER = 2

# construct the argument parser and parse the arguments
ap = argparse.ArgumentParser()
ap.add_argument("-ulx", "--upper_left_x", required=True,
	help="upper left x coord")
ap.add_argument("-uly", "--upper_left_y", required=True,
	help="upper left y coord")
ap.add_argument("-lrx", "--lower_right_x", required=True,
	help="lower right x coord")
ap.add_argument("-lry", "--lower_right_y", required=True,
	help="lower right y coord")
args = vars(ap.parse_args())

upper_left_x = int(args["upper_left_x"]);
upper_left_y = int(args["upper_left_y"]);
lower_right_x = int(args["lower_right_x"]);
lower_right_y = int(args["lower_right_y"]);

# calculate the the width of the image at the distance specified
#frame_width_ft = 2*(math.tan(math.radians(FOV*0.5))*DISTANCE)
l2r_frame_width_ft = 2*(math.tan(math.radians(FOV*0.5))*L2R_DISTANCE)
r2l_frame_width_ft = 2*(math.tan(math.radians(FOV*0.5))*R2L_DISTANCE)
l2r_ftperpixel = l2r_frame_width_ft / float(IMAGEWIDTH)
r2l_ftperpixel = r2l_frame_width_ft / float(IMAGEWIDTH)
print("L2R Image width in feet {} at {} from camera".format("%.0f" % l2r_frame_width_ft,"%.0f" % L2R_DISTANCE))
print("R2L Image width in feet {} at {} from camera".format("%.0f" % r2l_frame_width_ft,"%.0f" % R2L_DISTANCE))


# state maintains the state of the speed computation process
# if starts as WAITING
# the first motion detected sets it to TRACKING
 
# if it is tracking and no motion is found or the x value moves
# out of bounds, state is set to SAVING and the speed of the object
# is calculated
# initial_x holds the x value when motion was first detected
# last_x holds the last x value before tracking was was halted
# depending upon the direction of travel, the front of the
# vehicle is either at x, or at x+w 
# (tracking_end_time - tracking_start_time) is the elapsed time
# from these the speed is calculated and displayed 
 

#Initialisation
state = WAITING
direction = UNKNOWN
initial_x = 0
last_x = 0
#initialise.
cap_time = datetime.datetime.now()   
#pixel width at left and right of window to detect end of tracking
savebuffer = MIN_SAVE_BUFFER  
 
#-- other values used in program
base_image = None
abs_chg = 0
mph = 0
secs = 0.0
ix,iy = -1,-1
fx,fy = -1,-1
drawing = False
setup_complete = False
tracking = False
text_on_image = 'No cars'
prompt = ''
broker_address = 'emonpi'
save_image = False
t1 = 0.0  #timer
t2 = 0.0  #timer
lightlevel = 0
adjusted_threshold = THRESHOLD
adjusted_min_area = MIN_AREA



# initialise the camera. 
# Adjust vflip and hflip to reflect your camera's orientation
camera = PiCamera()
camera.resolution = RESOLUTION
camera.framerate = FPS
camera.vflip = False
camera.hflip = False

rawCapture = PiRGBArray(camera, size=camera.resolution)
# allow the camera to warm up
time.sleep(0.9)

# create an image window and place it in the upper left corner of the screen
cv2.namedWindow("Speed Camera")
cv2.moveWindow("Speed Camera", 10, 40)
 
#Create MQTT client and connect
client = mqtt.Client('traffic_cam')
client.username_pw_set('xxxxxx', password='xxxxxxxxxxxxx')
client.connect(broker_address)
client.loop_start()

if SAVE_CSV:
    csvfileout = "carspeed_{}.csv".format(datetime.datetime.now().strftime("%Y%m%d_%H%M"))
    record_speed('DateTime,Speed,Direction, Counter,SD, Image,')
else:
    csvfileout = ''
     
monitored_width = lower_right_x - upper_left_x
monitored_height = lower_right_y - upper_left_y
 
print("Monitored area:")
print(" upper_left_x {}".format(upper_left_x))
print(" upper_left_y {}".format(upper_left_y))
print(" lower_right_x {}".format(lower_right_x))
print(" lower_right_y {}".format(lower_right_y))
print(" monitored_width {}".format(monitored_width))
print(" monitored_height {}".format(monitored_height))
print(" monitored_area {}".format(monitored_width * monitored_height))

 
# capture frames from the camera (using capture_continuous.
#   This keeps the picamera in capture mode - it doesn't need
#   to prep for each frame's capture.
for frame in camera.capture_continuous(rawCapture, format="bgr", use_video_port=True):
    # grab the raw NumPy array representing the image 
    image = frame.array
 
    # crop area defined by [y1:y2,x1:x2]
    gray = image[upper_left_y:lower_right_y,upper_left_x:lower_right_x]
    # capture colour for later when measuring light levels
    hsv = cv2.cvtColor(gray, cv2.COLOR_BGR2HSV)
    # convert the frame to grayscale, and blur it
    gray = cv2.cvtColor(gray, cv2.COLOR_BGR2GRAY)
    gray = cv2.GaussianBlur(gray, BLURSIZE, 0)
 
    # if the base image has not been defined, initialize it
    if base_image is None:
        base_image = gray.copy().astype("float")
        rawCapture.truncate(0)
        cv2.imshow("Speed Camera", image)
  

    if lightlevel == 0:   #First pass through only
        #Set threshold and min area and save_buffer based on light readings
        lightlevel = my_map(measure_light(hsv),0,256,1,10)
        print("light level = " + str(lightlevel))
        adjusted_min_area = get_min_area(lightlevel)
        adjusted_threshold = get_threshold(lightlevel)
        adjusted_save_buffer = get_save_buffer(lightlevel)
        last_lightlevel = lightlevel

    # compute the absolute difference between the current image and
    # base image and then turn eveything lighter gray than THRESHOLD into
    # white
    frameDelta = cv2.absdiff(gray, cv2.convertScaleAbs(base_image))
    thresh = cv2.threshold(frameDelta, adjusted_threshold, 255, cv2.THRESH_BINARY)[1]
    
    # dilate the thresholded image to fill in any holes, then find contours
    # on thresholded image
    thresh = cv2.dilate(thresh, None, iterations=2)
    (_, cnts, _) = cv2.findContours(thresh.copy(), cv2.RETR_EXTERNAL,cv2.CHAIN_APPROX_SIMPLE)

    # look for motion 
    motion_found = False
    biggest_area = 0
    # examine the contours, looking for the largest one
    for c in cnts:
        (x1, y1, w1, h1) = cv2.boundingRect(c)
        # get an approximate area of the contour
        found_area = w1*h1 
        # find the largest bounding rectangle
        if (found_area > adjusted_min_area) and (found_area > biggest_area):  
            biggest_area = found_area
            motion_found = True
            x = x1
            y = y1
            h = h1
            w = w1
            #record the timestamp at the point in code where motion found
            timestamp = datetime.datetime.now()

    if motion_found:
        if state == WAITING:
            # intialize tracking
            state = TRACKING
            initial_x = x
            last_x = x
            #if initial capture straddles start line then the
            # front of vehicle is at position w when clock started
            initial_w = w
            initial_time = timestamp
            last_mph = 0
        
            #Initialise array for storing speeds
            speeds = np.array([])
            sd=0  #Initialise standard deviation
            
            text_on_image = 'Tracking'
            counter = 0   # use to test later if saving with too few data points    
            car_gap = secs_diff(initial_time, cap_time) 
            print("initial time = "+str(initial_time) + " " + "cap_time =" + str(cap_time) + " gap= " +\
                 str(car_gap) + " initial x= " + str(initial_x) + " initial_w= " + str(initial_w))
            print(text_on_image)
            print("x-chg    Secs      MPH  x-pos width  BA DIR Count time")
            # if gap between cars too low then probably seeing tail lights of current car
            #but I might need to tweek this if find I'm not catching fast cars
            if (car_gap<TOO_CLOSE):   
                state = WAITING
                print("too close")
        else:  #state != WAITING
            # compute the lapsed time
            secs = secs_diff(timestamp,initial_time)
            if secs >= 3: # Object taking too long to move across
                state = WAITING
                direction = UNKNOWN
                text_on_image = 'No Car Detected'
                motion_found = False
                biggest_area = 0
                rawCapture.truncate(0)
                base_image = None
                print('Resetting')
                continue             

            if state == TRACKING:       
                if x >= last_x:
                    direction = LEFT_TO_RIGHT
                    abs_chg = (x + w) - (initial_x + initial_w)
                    mph = get_speed(abs_chg,l2r_ftperpixel,secs)
                else:
                    direction = RIGHT_TO_LEFT
                    abs_chg = initial_x - x     
                    mph = get_speed(abs_chg,r2l_ftperpixel,secs)           

                counter+=1   #Increment counter

                speeds = np.append(speeds, mph)   #Append speed to array

                if mph < 0:
                    print("negative speed - stopping tracking"+ "{0:7.2f}".format(secs))
                    if direction == LEFT_TO_RIGHT:
                        direction = RIGHT_TO_LEFT  #Reset correct direction
                        x=1  #Force save
                    else:
                        direction = LEFT_TO_RIGHT  #Reset correct direction
                        x=monitored_width + MIN_SAVE_BUFFER  #Force save
                else:
                    print("{0:4d}  {1:7.2f}  {2:7.0f}   {3:4d}  {4:4d} {5:4d} {6:1d} {7:1d} {8:%H%M%S%f}". \
                    format(abs_chg,secs,mph,x,w,biggest_area, direction,counter, timestamp))
                
                real_y = upper_left_y + y
                real_x = upper_left_x + x
              

                # is front of object outside the monitired boundary? Then write date, time and speed on image
                # and save it 
                if ((x <= adjusted_save_buffer) and (direction == RIGHT_TO_LEFT)) \
                        or ((x+w >= monitored_width - adjusted_save_buffer) \
                        and (direction == LEFT_TO_RIGHT)):
                    
                    #you need at least 2 data points to calculate a mean and we're deleting one on line below
                    if (counter > 2): 
                        mean_speed = np.mean(speeds[:-1])   #Mean of all items except the last one
                        sd = np.std(speeds[:-1])  #SD of all items except the last one
                    elif (counter > 1):
                        mean_speed = speeds[-1] # use the last element in the array
                        sd = 99 # Set it to a very high value to highlight it's not to be trusted.
                    else:
                        mean_speed = 0 #ignore it 
                        sd = 0
                    
                    print("numpy mean= " + "%.0f" % mean_speed)   
                    print("numpy SD = " + "%.0f" % sd)

                    #Captime used for mqtt, csv, image filename. 
                    cap_time = datetime.datetime.now()   

                    # save the image but only if there is light and above the min speed for images 
                    if (mean_speed > MIN_SPEED_IMAGE) and (lightlevel > 1) :    
                        store_image()
                    
                    # save the data if required and above min speed for data
                    if SAVE_CSV and mean_speed > MIN_SPEED_SAVE and mean_speed < MAX_SPEED_SAVE:
                        store_traffic_data()
                    
                    counter = 0
                    state = SAVING
                    print("saving")  #debug                    
                # if the object hasn't reached the end of the monitored area, just remember the speed 
                # and its last position
                last_mph = mph
                last_x = x
    else:
        # No motion detected
        if state == TRACKING:
            #Last frame has skipped the buffer zone    
            if (counter > 2): 
                mean_speed = np.mean(speeds[:-1])   #Mean of all items except the last one
                sd = np.std(speeds[:-1])  #SD of all items except the last one
                print("missed but saving")
            elif (counter > 1):
                mean_speed = speeds[-1] # use the last element in the array
                sd = 99 # Set it to a very high value to highlight it's not to be trusted.
                print("missed but saving")
            else:
                mean_speed = 0 #ignore it 
                sd = 0
                    
            print("numpy mean= " + "%.0f" % mean_speed)   
            print("numpy SD = " + "%.0f" % sd)

            cap_time = datetime.datetime.now()
            if (mean_speed > MIN_SPEED_IMAGE) and (lightlevel > 1) :    
                        store_image()
            if SAVE_CSV and mean_speed > MIN_SPEED_SAVE:
                store_traffic_data()

        if state != WAITING:
            state = WAITING
            direction = UNKNOWN
            text_on_image = 'No Car Detected'
            counter = 0
            print(text_on_image)
            
    # only update image and wait for a keypress when waiting for a car
    # This is required since waitkey slows processing.
    if (state == WAITING):    
 
        # draw the text and timestamp on the frame
        cv2.putText(image, datetime.datetime.now().strftime("%A %d %B %Y %I:%M:%S%p"),
            (10, image.shape[0] - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.75, (0, 255, 0), 1)
        cv2.putText(image, "Road Status: {}".format(text_on_image), (10, 20),
            cv2.FONT_HERSHEY_SIMPLEX,0.35, (0, 0, 255), 1)
     
        if SHOW_BOUNDS:
            #define the monitored area right and left boundary
            cv2.line(image,(upper_left_x,upper_left_y),(upper_left_x,lower_right_y),(0, 255, 0))
            cv2.line(image,(lower_right_x,upper_left_y),(lower_right_x,lower_right_y),(0, 255, 0))
       
        # show the frame and check for a keypress
        if SHOW_IMAGE:
            prompt_on_image(prompt)
            cv2.imshow("Speed Camera", image)
            
        # Adjust the base_image as lighting changes through the day
        if state == WAITING:
            last_x = 0
            cv2.accumulateWeighted(gray, base_image, 0.25)
            t2 = time.process_time()
            if (t2 - t1) > 60:   # We need to measure light level every so often
                t1 = time.process_time()
                lightlevel = my_map(measure_light(hsv),0,256,1,10)
                print("light level = " + str(lightlevel))
                adjusted_min_area = get_min_area(lightlevel)
                adjusted_threshold = get_threshold(lightlevel)
                adjusted_save_buffer = get_save_buffer(lightlevel)
                if lightlevel != last_lightlevel:
                    base_image = None
                last_lightlevel = lightlevel
        state=WAITING
        key = cv2.waitKey(1) & 0xFF
      
        # if the `q` key is pressed, break from the loop and terminate processing
        if key == ord("q"):
            client.loop_stop()
            client.disconnect()   ##disconnect from mqtt broker
            break
         
    # clear the stream in preparation for the next frame
    rawCapture.truncate(0)
# cleanup the camera and close any open windows
cv2.destroyAllWindows()

Carspeed version 3

Code to run Raspberry Pi with camera and count traffic and vehicle speeds. Launch with python3 carspeed.py -ulx 64 -uly 321 -lrx 960 -lry 444

Credits

Tim Hodges
1 project • 1 follower
Contact

Comments

Please log in or sign up to comment.