Samyakkumar Jain
Published © MIT

Smart Traffic Camera – Pothole detector using Kria KV260

The pothole is an important and serious issue in India, as per statistics 30% of people die due to potholes or a sequence of events by this/

IntermediateFull instructions providedOver 2 days1,963
Smart Traffic Camera – Pothole detector using Kria KV260

Things used in this project

Story

Read more

Code

Powershell Script for Data Upload

Powershell
Invoked by real-time detection process and send the results with location to the server. The REST API at the server end will manipulate json/xml to generate Dashboards and Notifications to authority.
param
(
  [string] $Url = http://10.1.2.172/WSVistaWebClient/RESTService.svc/member/,
	[int] $MaxRetries = 3,
	[int] $OneRetryTimeout = 5,
	[int] $OverallTimeout = 60,
	[int] $WaitBetweenRetries = 1
)

$body = @{
 $Location=$args[0]
 $Potholes=$args[1]
 $Time = Get-Date
} | ConvertTo-Json

$header = @{
 "Accept"="application/json"
 "connectapitoken"="97fe6ab5b1a640909551e36a071ce9ed"
 "Content-Type"="application/json"
} 

$Success = $false
$RetriesLeft = $MaxRetries
$TimeLeft = [timespan]::fromseconds($OverallTimeout)

do
{
    Write-Host "Pinging url - $Url"

    $StopWatch = [Diagnostics.Stopwatch]::StartNew()
    try
    {
        $Response = Invoke-WebRequest $Url -UseBasicParsing -TimeoutSec $OneRetryTimeout -Method 'Post' -Body $body -Headers $header | ConvertTo-HTML
		

        $StatusCode = $Response.StatusDescription
        $StatusCodeInt = $Response.StatusCode
    
        $StatusText = "Ping returned status $StatusCode ($StatusCodeInt)"
        
        $ExceptionOccurred = $false
    } 
    catch 
    {
        if ($_.Exception.Response.StatusCode)
        {
            $StatusCode = $_.Exception.Response.StatusCode
            $StatusCodeInt = $StatusCode.value__
    
            $StatusText = "Ping returned status $StatusCode ($StatusCodeInt)"
        }
        else
        {
            $StatusText = $_.Exception.Message
        }
        $ExceptionOccurred = $true
    }
    $StopWatch.Stop()


    Write-Host $StatusText
    Write-Host 'Time elapsed' $StopWatch.Elapsed
	$TimeLeft-=$StopWatch.Elapsed

    $Success = (($StatusCodeInt -eq 200) -and -not($ExceptionOccurred))
    $RetriesLeft--
	
	Start-Sleep $WaitBetweenRetries
	$TimeLeft-=[timespan]::fromseconds($WaitBetweenRetries)
}
while (($Success -eq $false) -and ($RetriesLeft -ge 0) -and ($TimeLeft -ge 0))

if ($Success) 
{ 
    Write-Host "Ping OK!" -ForegroundColor Green
}
else 
{
    throw "Ping failed: '$Url'"
}

Object Detection in Image

Python
# Import packages
import os
import cv2
import numpy as np
import tensorflow as tf
import sys
import subprocess

# This is needed since the notebook is stored in the object_detection folder.
sys.path.append("..")

# Import utilites
from utils import label_map_util
from utils import visualization_utils as vis_util

location = "39.6167, 2.9833"
# Name of the directory containing the object detection module we're using
MODEL_NAME = 'inference_graph'
cap = cv2.VideoCapture(0)
# Check if the webcam is opened correctly
if not cap.isOpened():
    raise IOError("Cannot open webcam")
# While loop
while True:
    # Capture frame-by-frame
    ret, frame = cap.read()

    # Show the captured image
    cv2.imshow('WebCam', frame)
    break
IMAGE_NAME = cv2.imread('image.png')

# Grab path to current working directory
CWD_PATH = os.getcwd()

# Path to frozen detection graph .pb file, which contains the model that is used
# for object detection.
PATH_TO_CKPT = os.path.join(CWD_PATH,MODEL_NAME,'frozen_inference_graph.pb')

# Path to label map file
PATH_TO_LABELS = os.path.join(CWD_PATH,'training','labelmap.pbtxt')

# Path to image
PATH_TO_IMAGE = os.path.join(CWD_PATH,'pothole_testImages',IMAGE_NAME)

# Number of classes the object detector can identify
NUM_CLASSES = 4

# Load the label map.
# Label maps map indices to category names, so that when our convolution
# network predicts `5`, we know that this corresponds to `king`.
# Here we use internal utility functions, but anything that returns a
# dictionary mapping integers to appropriate string labels would be fine
label_map = label_map_util.load_labelmap(PATH_TO_LABELS)
categories = label_map_util.convert_label_map_to_categories(label_map, max_num_classes=NUM_CLASSES, use_display_name=True)
category_index = label_map_util.create_category_index(categories)

# Load the Tensorflow model into memory.
detection_graph = tf.Graph()
with detection_graph.as_default():
    od_graph_def = tf.GraphDef()
    with tf.gfile.GFile(PATH_TO_CKPT, 'rb') as fid:
        serialized_graph = fid.read()
        od_graph_def.ParseFromString(serialized_graph)
        tf.import_graph_def(od_graph_def, name='')

    sess = tf.Session(graph=detection_graph)

# Define input and output tensors (i.e. data) for the object detection classifier

# Input tensor is the image
image_tensor = detection_graph.get_tensor_by_name('image_tensor:0')

# Output tensors are the detection boxes, scores, and classes
# Each box represents a part of the image where a particular object was detected
detection_boxes = detection_graph.get_tensor_by_name('detection_boxes:0')

# Each score represents level of confidence for each of the objects.
# The score is shown on the result image, together with the class label.
detection_scores = detection_graph.get_tensor_by_name('detection_scores:0')
detection_classes = detection_graph.get_tensor_by_name('detection_classes:0')

# Number of objects detected
num_detections = detection_graph.get_tensor_by_name('num_detections:0')

# Load image using OpenCV and
# expand image dimensions to have shape: [1, None, None, 3]
# i.e. a single-column array, where each item in the column has the pixel RGB value
image = cv2.imread(PATH_TO_IMAGE)
image_expanded = np.expand_dims(image, axis=0)

# Perform the actual detection by running the model with the image as input
(boxes, scores, classes, num) = sess.run(
    [detection_boxes, detection_scores, detection_classes, num_detections],
    feed_dict={image_tensor: image_expanded})

# Draw the results of the detection (aka 'visulaize the results')

vis_util.visualize_boxes_and_labels_on_image_array(
    image,
    np.squeeze(boxes),
    np.squeeze(classes).astype(np.int32),
    np.squeeze(scores),
    category_index,
    use_normalized_coordinates=True,
    line_thickness=8,
    min_score_thresh=0.60)

# All the results have been drawn on image. Now display the image.
cv2.imshow('Object detector', image)
p = subprocess.Popen(['powershell.exe', './Alert.ps1' + location + num_detections], stdout=sys.stdout)
print("Alert response is" + p)
# Press any key to close the image
cv2.waitKey(0)

# Clean up
cv2.destroyAllWindows()

Generate TF Records

Python
import os
import io
import pandas as pd
import tensorflow as tf

from PIL import Image
from object_detection.utils import dataset_util
from collections import namedtuple, OrderedDict

flags = tf.app.flags
flags.DEFINE_string('csv_input', '', 'Path to the CSV input')
flags.DEFINE_string('image_dir', '', 'Path to the image directory')
flags.DEFINE_string('output_path', '', 'Path to output TFRecord')
FLAGS = flags.FLAGS


# TO-DO replace this with label map
def class_text_to_int(row_label):
    if row_label == 'pothole':
        return 1
    elif row_label == 'car':
        return 2
    elif row_label == 'potholeGroup':
        return 3
    elif row_label == 'auto':
        return 4
    else:
        None


def split(df, group):
    data = namedtuple('data', ['filename', 'object'])
    gb = df.groupby(group)
    return [data(filename, gb.get_group(x)) for filename, x in zip(gb.groups.keys(), gb.groups)]


def create_tf_example(group, path):
    with tf.gfile.GFile(os.path.join(path, '{}'.format(group.filename)), 'rb') as fid:
        encoded_jpg = fid.read()
    encoded_jpg_io = io.BytesIO(encoded_jpg)
    image = Image.open(encoded_jpg_io)
    width, height = image.size

    filename = group.filename.encode('utf8')
    image_format = b'jpg'
    xmins = []
    xmaxs = []
    ymins = []
    ymaxs = []
    classes_text = []
    classes = []

    for index, row in group.object.iterrows():
        xmins.append(row['xmin'] / width)
        xmaxs.append(row['xmax'] / width)
        ymins.append(row['ymin'] / height)
        ymaxs.append(row['ymax'] / height)
        classes_text.append(row['class'].encode('utf8'))
        classes.append(class_text_to_int(row['class']))

    tf_example = tf.train.Example(features=tf.train.Features(feature={
        'image/height': dataset_util.int64_feature(height),
        'image/width': dataset_util.int64_feature(width),
        'image/filename': dataset_util.bytes_feature(filename),
        'image/source_id': dataset_util.bytes_feature(filename),
        'image/encoded': dataset_util.bytes_feature(encoded_jpg),
        'image/format': dataset_util.bytes_feature(image_format),
        'image/object/bbox/xmin': dataset_util.float_list_feature(xmins),
        'image/object/bbox/xmax': dataset_util.float_list_feature(xmaxs),
        'image/object/bbox/ymin': dataset_util.float_list_feature(ymins),
        'image/object/bbox/ymax': dataset_util.float_list_feature(ymaxs),
        'image/object/class/text': dataset_util.bytes_list_feature(classes_text),
        'image/object/class/label': dataset_util.int64_list_feature(classes),
    }))
    return tf_example


def main(_):
    writer = tf.python_io.TFRecordWriter(FLAGS.output_path)
    path = os.path.join(os.getcwd(), FLAGS.image_dir)
    examples = pd.read_csv(FLAGS.csv_input)
    grouped = split(examples, 'filename')
    for group in grouped:
        tf_example = create_tf_example(group, path)
        writer.write(tf_example.SerializeToString())

    writer.close()
    output_path = os.path.join(os.getcwd(), FLAGS.output_path)
    print('Successfully created the TFRecords: {}'.format(output_path))


if __name__ == '__main__':
    tf.app.run()

XML to CSV conversion

Python
import os
import glob
import pandas as pd
import xml.etree.ElementTree as ET


def xml_to_csv(path):
    xml_list = []
    for xml_file in glob.glob(path + '/*.xml'):
        tree = ET.parse(xml_file)
        root = tree.getroot()
        for member in root.findall('object'):
            value = (root.find('filename').text,
                     int(root.find('size')[0].text),
                     int(root.find('size')[1].text),
                     member[0].text,
                     int(member[4][0].text),
                     int(member[4][1].text),
                     int(member[4][2].text),
                     int(member[4][3].text)
                     )
            xml_list.append(value)
    column_name = ['filename', 'width', 'height', 'class', 'xmin', 'ymin', 'xmax', 'ymax']
    xml_df = pd.DataFrame(xml_list, columns=column_name)
    return xml_df


def main():
    for folder in ['train','test']:
        image_path = os.path.join(os.getcwd(), ('Pothole_Images/' + folder))
        xml_df = xml_to_csv(image_path)
        xml_df.to_csv(('images/' + folder + '_labels.csv'), index=None)
        print('Successfully converted xml to csv.')


main()

Label mapping

Properties
item {
  id: 1
  name: 'pothole'
}

item {
  id: 2
  name: 'car'
}

item {
  id: 3
  name: 'potholeGroup'
}

item {
  id: 4
  name: 'auto'
}

Credits

Samyakkumar Jain

Samyakkumar Jain

2 projects • 4 followers
I am Samyak, a final year student of master's with a Specialization in Embedded Systems from Nirma University, India.

Comments