Brian Kincaid
Published

AI assisted video adjustment for Photosensitive Viewers

Use AI to detect video effects that are irritating to photosensitive viewers, so that the frames can be modified in real-time for display.

AdvancedFull instructions provided10 hours354
AI assisted video adjustment for Photosensitive Viewers

Things used in this project

Hardware components

VCK5000 Versal Development Card
AMD VCK5000 Versal Development Card
ES1 version of card (contest hardware)
×1

Software apps and online services

Vitis Unified Software Platform
AMD Vitis Unified Software Platform
Main page of the Xilinx tools
Vitis-AI
Vitis-AI repository containing all of the Xilinx AI tools used to build and deploy model to the VCK5000
ffmpeg
Really solid tool to manipulate videos and images for use with AI modelling
ImageMagick
Very helpful tool to resize images

Story

Read more

Schematics

Flow_Chart

tf2_resnet50 xmodel, ready to run on VCK5000 ES1

unzip and move to /usr/share/vitis_ai_library/models/tf2_resnet50

Code

train_eval_h5.py

Python
Main python source file for the project, used in the Design 12 example. Please note Xilinx license, the original file is included in the github repository, as referenced by this project.
# Copyright 2021 Xilinx Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import os, time
import numpy as np
import tensorflow as tf
#import pdb

from tensorflow.keras.applications.resnet50 import ResNet50
from tensorflow.keras.layers import Input
from tensorflow.keras.preprocessing import image
from tensorflow.keras.applications.resnet50 import preprocess_input, decode_predictions
from tensorflow.compat.v1 import flags
from tensorflow.keras.optimizers import RMSprop
from dataset import synth_input_fn
from dataset import input_fn, NUM_IMAGES
from dataset import get_images_infor_from_file, ImagenetSequence

keras = tf.keras

flags.DEFINE_string(
    'model', '/workspace/Videos/resnet50_model_bk.h5',
    'TensorFlow \'GraphDef\' file to load.')
flags.DEFINE_bool(
    'eval_tfrecords', True,
    'If True then use tf_records data .')
flags.DEFINE_string(
    'data_dir', '/workspace/Videos/tf_records',
    'The directory where put the eval images')
flags.DEFINE_bool(
    'eval_images', False,
    'If True then use tf_records data .')
flags.DEFINE_string(
    'eval_image_path', '/workspace/Videos/val/val',
    'The directory where put the eval images')
flags.DEFINE_string(
    'eval_image_list', '/workspace/Videos/val/val_labels.txt', 'file has validation images list')
flags.DEFINE_string(
    'save_path', "train_dir",
    'The directory where save model')
flags.DEFINE_string(
    'filename', "resnet50_model_{epoch}.h5",
    'The name of sved model')
flags.DEFINE_integer(
    'label_offset', 1, 'label offset')
flags.DEFINE_string(
    'gpus', '0',
    'The gpus used for running evaluation.')
flags.DEFINE_bool(
    'eval_only', False,
    'If True then do not train model, only eval model.')
flags.DEFINE_bool(
    'save_whole_model', False,
    'as applications h5 file just include weights if true save whole model to h5 file.')
flags.DEFINE_bool(
    'use_synth_data', False,
    'If True then use synth data other than imagenet.')
flags.DEFINE_bool(
    'save_best_only', False,
    'If True then only save a model if `val_loss` has improved..')
flags.DEFINE_integer('train_step', None, 'Train step number')
flags.DEFINE_integer('batch_size', 32, 'Train batch size')
flags.DEFINE_integer('epochs', 200, 'Train epochs')
flags.DEFINE_integer('eval_batch_size', 50, 'Evaluate batch size')
flags.DEFINE_integer('save_every_epoch', 1, 'save every step number')
flags.DEFINE_integer('eval_every_epoch', 1, 'eval every step number')
flags.DEFINE_integer('steps_per_epoch', None, 'steps_per_epoch')
flags.DEFINE_integer('decay_steps', 10000, 'decay_steps')
flags.DEFINE_float('learning_rate', 1e-6, 'learning rate')
flags.DEFINE_bool('createnewmodel', False, 'Create a new model from the base Resnet50 model')
# Quantization Config
flags.DEFINE_bool('quantize', False, 'Whether to do quantization.')
flags.DEFINE_string('quantize_output_dir', './quantized/', 'Directory for quantize output results.')
flags.DEFINE_bool('quantize_eval', False, 'Whether to do quantize evaluation.')
flags.DEFINE_bool('dump', False, 'Whether to do dump.')
flags.DEFINE_string('dump_output_dir', './quantized/', 'Directory for dump output results.')

FLAGS = flags.FLAGS

TRAIN_NUM = NUM_IMAGES['train']
EVAL_NUM = NUM_IMAGES['validation']

def get_input_data(num_epochs=1):
  print("getting train_data and eval_data from dirs")
  train_data = input_fn(
      is_training=True, data_dir=FLAGS.data_dir,
      batch_size=FLAGS.batch_size,
      num_epochs=num_epochs,
      num_gpus=0,
      dtype=tf.float32)

  eval_data = input_fn(
      is_training=False, data_dir='/workspace/Videos/tf_records',
      batch_size=FLAGS.eval_batch_size,
      num_epochs=1,
      num_gpus=0,
      dtype=tf.float32)
  # data_dir=FLAGS.data_dir,
  print("train num : ",TRAIN_NUM)
  print("eval num  : ",EVAL_NUM)
  return train_data, eval_data


def main():
  #breakpoint()
  print("********",tf.__version__)
  ## run once to save h5 file (add model info)
  if FLAGS.save_whole_model:
    print("********set to save whole model")
    model = ResNet50(weights='imagenet')
    model.save(FLAGS.model)
    exit()

  if not FLAGS.eval_images:
    print("********getting input data (no image evaluation)")
    train_data, eval_data = get_input_data(FLAGS.epochs)

  if FLAGS.dump or FLAGS.quantize_eval:
      print("********loading model for quantization")
      from tensorflow_model_optimization.quantization.keras import vitis_quantize
      with vitis_quantize.quantize_scope():
          model = keras.models.load_model(FLAGS.model)

  elif FLAGS.createnewmodel:
      print("********creating new model")
      #for training the model from scratch use the following:
      basemodel = ResNet50(weights='imagenet', include_top=True,input_tensor=Input(shape=(224, 224, 3)))
      base_output = basemodel.layers[175].output 
      new_output = tf.keras.layers.Dense(activation="softmax", units=2)(base_output)
      model = tf.keras.models.Model(inputs=basemodel.inputs, outputs=new_output)
      print(model.summary())

  else:
      print("********loading model")
      model = keras.models.load_model(FLAGS.model)
      print(model.summary())

  print("********loading image information from files here")
  print("eval image path : ",FLAGS.eval_image_path)
  print("eval image list : ",FLAGS.eval_image_list)
  print("label offset : ",FLAGS.label_offset)
  img_paths, labels = get_images_infor_from_file(FLAGS.eval_image_path,
          FLAGS.eval_image_list, FLAGS.label_offset)
  imagenet_seq = ImagenetSequence(img_paths[0:400], labels[0:400], FLAGS.eval_batch_size)

  if FLAGS.quantize:
      print("********running quantization")
      # do quantization
      from tensorflow_model_optimization.quantization.keras import vitis_quantize
      model = vitis_quantize.VitisQuantizer(model).quantize_model(calib_dataset=imagenet_seq)
      #print(eval_data)
      #model = vitis_quantize.VitisQuantizer(model).quantize_model(calib_dataset=eval_data)
      print("********model call completed, now save quantized model")

      # save quantized model
      model.save(os.path.join(FLAGS.quantize_output_dir, 'quantized.h5'))
      print('Quantize finished, results in: {}'.format(FLAGS.quantize_output_dir))
      return

  print("********loading information from files")
  img_paths, labels = get_images_infor_from_file(FLAGS.eval_image_path,
          FLAGS.eval_image_list, FLAGS.label_offset)
  imagenet_seq = ImagenetSequence(img_paths[0:1], labels[0:1], FLAGS.eval_batch_size)

  if FLAGS.dump:
      print("********dumping quantization results")
      # do quantize dump
      quantizer = vitis_quantize.VitisQuantizer.dump_model(model, imagenet_seq, FLAGS.dump_output_dir)

      print('Dump finished, results in: {}'.format(FLAGS.dump_output_dir))
      return

  print("********setting learning parameters")
  initial_learning_rate = FLAGS.learning_rate
  lr_schedule = keras.optimizers.schedules.ExponentialDecay(
              initial_learning_rate, decay_steps=FLAGS.decay_steps, decay_rate=0.96,
              staircase=True

          )
  opt = RMSprop(learning_rate=lr_schedule)
  
  loss = keras.losses.SparseCategoricalCrossentropy()
  metric_top_5 = keras.metrics.SparseTopKCategoricalAccuracy()
  accuracy = keras.metrics.SparseCategoricalAccuracy()
  model.compile(optimizer=opt, loss=loss,
          metrics=[accuracy, metric_top_5])
  if not FLAGS.eval_only:
    if not os.path.exists(FLAGS.save_path):
      os.makedirs(FLAGS.save_path)
    callbacks = [
      keras.callbacks.ModelCheckpoint(
          filepath=os.path.join(FLAGS.save_path,FLAGS.filename),
          save_best_only=True,
          monitor="sparse_categorical_accuracy",
          verbose=1,
      )]
    steps_per_epoch = FLAGS.steps_per_epoch if FLAGS.steps_per_epoch else np.ceil(TRAIN_NUM/FLAGS.batch_size)
    model.fit(train_data,
            epochs=FLAGS.epochs,
            callbacks=callbacks,
            steps_per_epoch=steps_per_epoch,
            validation_freq=FLAGS.eval_every_epoch,
            validation_steps = EVAL_NUM/FLAGS.eval_batch_size,
            validation_data=train_data) #eval_data)
  if not FLAGS.eval_images:
    print("evaluate model using tf_records data format")
    model.evaluate(eval_data, steps=EVAL_NUM/FLAGS.eval_batch_size)
  if FLAGS.eval_images and FLAGS.eval_only:
    print("we are running this path")
    img_paths, labels = get_images_infor_from_file(FLAGS.eval_image_path,FLAGS.eval_image_list, FLAGS.label_offset)
    imagenet_seq = ImagenetSequence(img_paths, labels, FLAGS.eval_batch_size)
    res = model.evaluate(imagenet_seq, steps=EVAL_NUM/FLAGS.eval_batch_size, verbose=1)


if __name__ == "__main__":
  os.environ['CUDA_VISIBLE_DEVICES'] = FLAGS.gpus
  main()

gen_validation_set.py

Python
Python source to generate validation set directory, original is from Xilinx, note the license included
# Copyright 2021 Xilinx Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import numpy as np
import cv2
import os
labels= ["event","none1"]

validation_images="/workspace/Videos/AS_to_TFR/Test"
validation_output="/workspace/Videos/val/val"
validation_labels="/workspace/Videos/val/val_labels.txt"


validation_labels_file= open(validation_labels,"w")

index=1

for (dirpath, dirnames, filenames) in os.walk(validation_images):
    for filename in filenames:
        folder=dirpath.split(os.sep)
        folder = folder[-1]
        output_label_name = folder
        output_label_name = output_label_name.replace(" ","")
        label_idx = labels.index(folder)
        image = cv2.imread(dirpath+ '/'+ filename)
        output_filepath = validation_output + '/' + output_label_name + filename
        cv2.imwrite(output_filepath,image)
        validation_labels_file.write(output_label_name + filename + " " + str(label_idx) + "\n")
        print("wrote: ", output_filepath)
validation_labels_file.close()

process_result.hpp

C/C++
File that overlays text onto a processed image. Please note the Xilinx license, original file is included with Vitis-AI github clone
/*
 * Copyright 2019 Xilinx Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

cv::Mat process_result(cv::Mat& image, vitis::ai::ClassificationResult& result,
                       bool is_jpeg) {
  auto x = 10;
  auto y = 20;
  auto i = 0;
  char cats [5][10] = {"event","none","none","none","none"};
  for (auto& r : result.scores) {
    i++;
    LOG_IF(INFO, is_jpeg) << "r.index " << r.index << " "  //
                          << cats[r.index] << " "
                          << "r.score " << r.score << " "  //
                          << std::endl;
    auto cls = std::string("") + cats[r.index] + " prob. " +
               std::to_string(r.score);
    cv::putText(image, cls, cv::Point(x, y + 20 * i), cv::FONT_HERSHEY_SIMPLEX,
                0.5, cv::Scalar(20, 20, 180), 1, 1);
    if(i>=2) break;
  }
  return image;
}


/* original - BK
cv::Mat process_result(cv::Mat& image, vitis::ai::ClassificationResult& result,
                       bool is_jpeg) {
  auto x = 10;
  auto y = 20;
  auto i = 0;
  for (auto& r : result.scores) {
    i++;
    LOG_IF(INFO, is_jpeg) << "r.index " << r.index << " "  //
                          << result.lookup(r.index) << " "
                          << "r.score " << r.score << " "  //
                          << std::endl;
    auto cls = std::string("") + result.lookup(r.index) + " prob. " +
               std::to_string(r.score);
    cv::putText(image, cls, cv::Point(x, y + 20 * i), cv::FONT_HERSHEY_SIMPLEX,
                0.5, cv::Scalar(20, 20, 180), 1, 1);
  }
  return image;
}
*/

Credits

Brian Kincaid

Brian Kincaid

1 project • 1 follower
Career electrical engineer, semiconductor
Thanks to Gorodenkoff, flashmovie, and gonin.

Comments