Welcome to Hackster!
Hackster is a community dedicated to learning hardware, from beginner to pro. Join us, it's free!
Hackster is hosting Impact Spotlights: Industrial Automation. Watch the stream live on Thursday!Hackster is hosting Impact Spotlights: Industrial Automation. Stream on Thursday!
Tasos
Published © GPL3+

Ambient mood orchestrator

Detect the mood of PC AI users through facial emotion recognition utilizing AI algorithms and produce relevant ambient sounds.

AdvancedFull instructions provided3 days53
Ambient mood orchestrator

Things used in this project

Hardware components

Minisforum Venus UM790 Pro with AMD Ryzen™ 9
Minisforum Venus UM790 Pro with AMD Ryzen™ 9
×1
Webcam, Logitech® HD Pro
Webcam, Logitech® HD Pro
×1

Story

Read more

Code

faces.py

Python
A Google Image Search program in order to automatically collect facial expression images (or any other images for that matter!).
#Code modified fromQ
# red and green.co.uk


from google_images_search import GoogleImagesSearch
from dotenv import load_dotenv
import os


print ("STARTING.....")
# load the API key and CX code from .env file
if os.path.exists(".env"):
  res=load_dotenv()
  print("KEYS LOADED.....  ", res,"      ")
else:
  print(".env file missing, please create one with your API and CX")

# create an 'ims' sub directory if it doesn't already exists
if not os.path.exists('images/'):
  os.mkdir('images/')
spath = 'images/'

# Get env variables
DK = os.environ.get('DEVELOPER_KEY')
CX = os.environ.get('CX')

# custom progressbar function
def my_progressbar(url, progress):
    print(url + " " + progress + "%")

# create google images search - object
gis = GoogleImagesSearch(DK, CX, progressbar_fn=my_progressbar)

def fetch_images(searchfor):
  # using contextual mode (Curses)
  with GoogleImagesSearch(DK, CX) as gis:
    # define search params and in particular number of photos: 
    _search_params = {"q": searchfor, 
        "num": 200, 
        #"safe": "high", 
        #"fileType": "jpg",
        "imgType": "photo",
        #"rights": "cc_publicdomain" 
        #  free for use by anyone for any purpose without restriction under copyright law
        }
    gis.search(search_params=_search_params, path_to_dir=spath)
    # this will search and download:
    # gis.search(search_params=_search_params, path_to_dir='/path/')

    # this will search, download and resize:
    # gis.search(search_params=_search_params, path_to_dir='/path/', width=500, height=500)

  print("Finished!")

fetch_images("sad face") #SELECT THE SEARCH KEYWORDS !!!!!!!!!!!!!!!

README_RYZEN_AI

CSS
Instructions on how to install Ryzen AI software.
# Ryzen AI Software 

## Introduction

AMD Ryzen AI Software includes the tools and runtime libraries for optimizing and deploying AI inference on your [AMD Ryzen AI](https://www.amd.com/en/products/processors/consumer/ryzen-ai.html) based PC. It enables developers to quickly build and run a variety of AI applications for Ryzen AI. It is designed with high efficiency and ease-of-use in mind, unleashing the full potential of AI acceleration on Ryzen AI.

This repository contains the demos, examples and tutorials, demonstrating usage and capabilities of the Ryzen AI Software. It is a subset of the Ryzen AI Software release.

## Git LFS and Instructions to clone: 

 Due to the presence of large files in some examples/tutorials, Git Large File Storage (LFS) has been configured in this repository. Follow the instructions below to ensure Git LFS is properly set up: 
 - Install Git LFS by downloading it from the [official website](https://git-lfs.com/)
 - After installation, run the following command in your terminal to set up Git LFS on your machine:
```
 git lfs install
```
 - Clone the repository (or a fork of it): 
```
git clone https://github.com/amd/RyzenAI-SW.git
```
- Pull the actual LFS files: 
```
git lfs pull
```

## Examples

- [Run Vitis AI ONNX Quantizer example](example/onnx_quantizer)
- [Real-time object detection with Yolov8](example/yolov8)
- [Run multiple concurrent AI applications with ONNXRuntime](example/multi-model)
- [Run Ryzen AI Library example](example/Ryzen-AI-Library)
- [Run ONNX end-to-end examples with custom pre/post-processing nodes running on IPU](https://github.com/amd/RyzenAI-SW/tree/main/example/onnx-e2e)
- Generative AI Examples
   - [Run LLM OPT-1.3B model with ONNXRuntime](example/transformers/)
   - [Run LLM OPT-1.3B model with PyTorch](example/transformers/)
   - [Run LLM Llama 2 model with PyTorch](example/transformers/)

## Demos

- [Cloud-to-Client demo on Ryzen AI](demo/cloud-to-client)
- [Multiple model concurrency demo on Ryzen AI](demo/multi-model-exec)

## Tutorials

- [A Getting Started Tutorial with a fine-tuned ResNet model](tutorial/getting_started_resnet)
- [Hello World Jupyter Notebook Tutorial](tutorial/hello_world)
- [End-to-end Object Detection](tutorial/yolov8_e2e)
- [Quantization for Ryzen AI](tutorial/RyzenAI_quant_tutorial)

## Benchmarking 

- [ONNX Benchmarking utilities](onnx-benchmark)



## Getting Started
    
To run the demos and examples in this repository, please follow the instructions of README.md in each directory. 

## Reference

- [Ryzen AI Developer Guide](https://ryzenai.docs.amd.com/en/latest)
- [ONNXRuntime Vitis-AI EP](https://onnxruntime.ai/docs/execution-providers/Vitis-AI-ExecutionProvider.html)
- [AMD AI Developer Forum](https://community.amd.com/t5/ai/ct-p/amd_ai)

train_model_faces

Python
Transfer Learning for the resnet50 network.
import os, time
from tempfile import TemporaryDirectory

import numpy as np
import pandas as pd
from PIL import Image
import matplotlib.pyplot as plt

import torch
import torchvision
import torch.nn as nn
import torch.optim as optim
from torch.optim import lr_scheduler
from torchvision import datasets, models, transforms
from torch.utils.data import DataLoader, random_split


#print(dir(models))
resnet = models.resnet50(weights='ResNet50_Weights.DEFAULT')
num_classes = 4 # 4 for faces
# Freeze model parameters
for param in resnet.parameters():
    param.requires_grad = False

# Change the final layer of ResNet50 Model for Transfer Learning
fc_inputs = resnet.fc.in_features
resnet.fc = nn.Sequential(
    nn.Linear(fc_inputs, 256),
    nn.ReLU(),
    nn.Dropout(0.4),
    nn.Linear(256, num_classes),  #Normal, Happy, Sad, Amgry
    nn.LogSoftmax(dim=1) # For using NLLLoss()
)

# Define Optimizer and Loss Function
loss_func = nn.NLLLoss()
optimizer = optim.Adam(resnet.parameters())

transform = transforms.Compose([            #[1]
 transforms.Resize(232),                    #[2]
 transforms.CenterCrop(224),                #[3]
 transforms.ToTensor(),                     #[4]
 transforms.Normalize(                      #[5]
 mean=[0.485, 0.456, 0.406],                #[6]
 std=[0.229, 0.224, 0.225]                  #[7]
 )])

image_transforms = { 
    'train': transforms.Compose([
        transforms.RandomResizedCrop(size=256, scale=(0.8, 1.0)),
        transforms.RandomRotation(degrees=15),
        transforms.RandomHorizontalFlip(),
        transforms.CenterCrop(size=224),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406],
                             [0.229, 0.224, 0.225])
    ]),
    'valid': transforms.Compose([
        transforms.Resize(size=256),
        transforms.CenterCrop(size=224),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406],
                             [0.229, 0.224, 0.225])
    ]),
    'test': transforms.Compose([
        transforms.Resize(size=256),
        transforms.CenterCrop(size=224),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406],
                             [0.229, 0.224, 0.225])
    ])
}


# Load the Data
# Set train and valid directory paths
dataset = 'gsearch'
train_directory = 'train'
test_directory = 'test'
valid_directory = 'valid'
# Batch size
bs = 32
# Number of classes


# Load Data from folders
data = {
    'train': datasets.ImageFolder(root=train_directory, transform=image_transforms['train']),
    'valid': datasets.ImageFolder(root=valid_directory, transform=image_transforms['valid']),
    'test': datasets.ImageFolder(root=test_directory, transform=image_transforms['test'])
}

# Get a mapping of the indices to the class names, in order to see the output classes of the test images.
idx_to_class = {v: k for k, v in data['train'].class_to_idx.items()}
print(idx_to_class)

train_data_size = len(data['train'])
valid_data_size = len(data['valid'])
test_data_size = len(data['test'])
# Create iterators for the Data loaded using DataLoader module
# Create iterators for the Data loaded using DataLoader module
train_data_loader = DataLoader(data['train'], batch_size=bs, shuffle=True)
valid_data_loader = DataLoader(data['valid'], batch_size=bs, shuffle=True)
test_data_loader = DataLoader(data['test'], batch_size=bs, shuffle=True)
# Print the train, validation and test set data sizes
print(train_data_size, valid_data_size, test_data_size)

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") #At this point you should be able to define the IPU but it did not work for me!!!
resnet = resnet.to(device)

def train_and_validate(model, loss_criterion, optimizer, epochs=25):
    # '''
    # Function to train and validate
    # Parameters
    #     :param model: Model to train and validate
    #     :param loss_criterion: Loss Criterion to minimize
    #     :param optimizer: Optimizer for computing gradients
    #     :param epochs: Number of epochs (default=25)
  
    # Returns
    #     model: Trained Model with best validation accuracy
    #     history: (dict object): Having training loss, accuracy and validation loss, accuracy
    # '''
    
    start = time.time()
    history = []
    best_loss = 100000.0
    best_epoch = None

    for epoch in range(epochs):
        epoch_start = time.time()
        print("Epoch: {}/{}".format(epoch+1, epochs))
        
        # Set to training mode
        model.train()
        
        # Loss and Accuracy within the epoch
        train_loss = 0.0
        train_acc = 0.0
        
        valid_loss = 0.0
        valid_acc = 0.0
        
        for i, (inputs, labels) in enumerate(train_data_loader):

            inputs = inputs.to(device)
            labels = labels.to(device)
            
            # Clean existing gradients
            optimizer.zero_grad()
            
            # Forward pass - compute outputs on input data using the model
            outputs = model(inputs)
            
            # Compute loss
            loss = loss_criterion(outputs, labels)
            
            # Backpropagate the gradients
            loss.backward()
            
            # Update the parameters
            optimizer.step()
            
            # Compute the total loss for the batch and add it to train_loss
            train_loss += loss.item() * inputs.size(0)
            
            # Compute the accuracy
            ret, predictions = torch.max(outputs.data, 1)
            correct_counts = predictions.eq(labels.data.view_as(predictions))
            
            # Convert correct_counts to float and then compute the mean
            acc = torch.mean(correct_counts.type(torch.FloatTensor))
            
            # Compute total accuracy in the whole batch and add to train_acc
            train_acc += acc.item() * inputs.size(0)
            
            #print("Batch number: {:03d}, Training: Loss: {:.4f}, Accuracy: {:.4f}".format(i, loss.item(), acc.item()))

        
        # Validation - No gradient tracking needed
        with torch.no_grad():

            # Set to evaluation mode
            model.eval()

            # Validation loop
            for j, (inputs, labels) in enumerate(valid_data_loader):
                inputs = inputs.to(device)
                labels = labels.to(device)

                # Forward pass - compute outputs on input data using the model
                outputs = model(inputs)

                # Compute loss
                loss = loss_criterion(outputs, labels)

                # Compute the total loss for the batch and add it to valid_loss
                valid_loss += loss.item() * inputs.size(0)

                # Calculate validation accuracy
                ret, predictions = torch.max(outputs.data, 1)
                correct_counts = predictions.eq(labels.data.view_as(predictions))

                # Convert correct_counts to float and then compute the mean
                acc = torch.mean(correct_counts.type(torch.FloatTensor))

                # Compute total accuracy in the whole batch and add to valid_acc
                valid_acc += acc.item() * inputs.size(0)

                #print("Validation Batch number: {:03d}, Validation: Loss: {:.4f}, Accuracy: {:.4f}".format(j, loss.item(), acc.item()))
        if valid_loss < best_loss:
            best_loss = valid_loss
            best_epoch = epoch

        # Find average training loss and training accuracy
        avg_train_loss = train_loss/train_data_size 
        avg_train_acc = train_acc/train_data_size

        # Find average training loss and training accuracy
        avg_valid_loss = valid_loss/valid_data_size 
        avg_valid_acc = valid_acc/valid_data_size

        history.append([avg_train_loss, avg_valid_loss, avg_train_acc, avg_valid_acc])
                
        epoch_end = time.time()
    
        print("Epoch : {:03d}, Training: Loss - {:.4f}, Accuracy - {:.4f}%, \n\t\tValidation : Loss - {:.4f}, Accuracy - {:.4f}%, Time: {:.4f}s".format(epoch, avg_train_loss, avg_train_acc*100, avg_valid_loss, avg_valid_acc*100, epoch_end-epoch_start))
        
        # Save if the model has best accuracy till now
        torch.save(model, dataset+'_model_'+str(epoch)+'.pt')
            
    return model, history, best_epoch
    
# Train the model for 25 epochs
num_epochs = 25

trained_model, history, best_epoch = train_and_validate(resnet, loss_func, optimizer, num_epochs)

torch.save(history, dataset+'_history.pt')

history = np.array(history)
plt.plot(history[:,0:2])
plt.legend(['Tr Loss', 'Val Loss'])
plt.xlabel('Epoch Number')
plt.ylabel('Loss')
plt.ylim(0,1)
plt.savefig(dataset+'_loss_curve.png')
plt.show()

plt.plot(history[:,2:4])
plt.legend(['Tr Accuracy', 'Val Accuracy'])
plt.xlabel('Epoch Number')
plt.ylabel('Accuracy')
plt.ylim(0,1)
plt.savefig(dataset+'_accuracy_curve.png')
plt.show()






# Apply it to the input image
img=Image.open("gfg_logo.jpeg")
mg_t = image_transforms['train'](img)
trans = torchvision.transforms.ToPILImage()
out = trans(mg_t)
out.show()
print("Original: ", img.size)

def predict(model, test_image_name):
    # '''
    # Function to predict the class of a single test image
    # Parameters
    #     :param model: Model to test
    #     :param test_image_name: Test image

    # '''
    
    transform = image_transforms['test']


    test_image = Image.open(test_image_name)
    plt.imshow(test_image)
    
    test_image_tensor = transform(test_image)
    if torch.cuda.is_available():
        test_image_tensor = test_image_tensor.view(1, 3, 224, 224).cuda()
    else:
        test_image_tensor = test_image_tensor.view(1, 3, 224, 224)
    
    with torch.no_grad():
        model.eval()
        # Model outputs log probabilities
        out = model(test_image_tensor)
        ps = torch.exp(out)

        topk, topclass = ps.topk(3, dim=1)
        cls = idx_to_class[topclass.cpu().numpy()[0][0]]
        score = topk.cpu().numpy()[0][0]

        for i in range(3):
            print("Predcition", i+1, ":", idx_to_class[topclass.cpu().numpy()[0][i]], ", Score: ", topk.cpu().numpy()[0][i])

best_epoch=23
print("BEST EPOCH IS: ", best_epoch)
model = torch.load("{}_model_{}.pt".format(dataset, best_epoch))
predict(model, 'images.jpg')

print("DONE")

torchonnx

Python
ONNX model conversion
import torch


model = torch.load('gsearch_model_23.pt') # Declare the best epoch in order to produce the onnx model.
model.eval()
batch_size=1

# Input to the model
x = torch.randn(batch_size, 3, 224, 224, requires_grad=True)
#print("x is: ", x)

torch_out = model(x)

# Export the model
torch.onnx.export(model,               # model being run
                  x,                         # model input (or a tuple for multiple inputs)
                  "grapes.onnx",   # where to save the model (can be a file or file-like object)
                  export_params=True,        # store the trained parameter weights inside the model file
                  opset_version=10,          # the ONNX version to export the model to
                  do_constant_folding=True,  # whether to execute constant folding for optimization
                  input_names = ['input'],   # the model's input names
                  output_names = ['output'], # the model's output names
                  dynamic_axes={'input' : {0 : 'batch_size'},    # variable length axes
                                'output' : {0 : 'batch_size'}})

quantize_tv_model_ipu

Python
ONNX Model quantization with IPU acceleration.
#
# Copyright (C) 2023, Advanced Micro Devices, Inc. All rights reserved.
# SPDX-License-Identifier: MIT
#
import torch
import torchvision
from torch.utils.data import DataLoader, Dataset
from torchvision import transforms
from torchvision.datasets import ImageFolder

import os
import sys
import numpy as np

import onnx
import onnxruntime
from onnxruntime.quantization.calibrate import CalibrationDataReader
import vai_q_onnx


class ImageNetDataset:

    def __init__(
        self,
        data_dir,
        weights,
        **kwargs,
    ):
        super().__init__()
        self.vld_path = data_dir
        self.weights = weights
        self.setup("fit")

    def setup(self, stage: str):
        weights = torchvision.models.get_weight(self.weights)
        preprocessing = weights.transforms()
        self.val_dataset = ImageFolder(self.vld_path, preprocessing)


class GenModelDataset(Dataset):

    def __init__(self, dataset):
        self.dataset = dataset

    def __len__(self):
        return len(self.dataset)

    def __getitem__(self, index):
        sample = self.dataset[index]
        input_data = sample[0]
        label = sample[1]
        return input_data, label


def create_dataloader(data_dir, weights, batch_size):
    image_dataset = ImageNetDataset(data_dir, weights)
    benchmark_dataloader = DataLoader(GenModelDataset(
        image_dataset.val_dataset),
                                      batch_size=batch_size,
                                      drop_last=True)
    return benchmark_dataloader


class CalibrationDataReader(CalibrationDataReader):

    def __init__(self, data_dir: str, weights: str, batch_size: int = 1):
        super().__init__()
        self.iterator = iter(create_dataloader(data_dir, weights, batch_size))

    def get_next(self) -> dict:
        try:
            return {"input": next(self.iterator)[0].numpy()}
        except Exception:
            return None


def calibration_reader(data_dir, weights, batch_size=1):
    return CalibrationDataReader(data_dir, weights, batch_size=batch_size)


def main():
    # `weights_name` is the name of the weights in the original floating-point Torch model.
    weights_name = sys.argv[1]

    # `input_model_path` is the path to the original, unquantized ONNX model.
    input_model_path = sys.argv[2]

    # `output_model_path` is the path where the quantized model will be saved.
    output_model_path = sys.argv[3]

    # `calibration_dataset_path` is the path to the dataset used for calibration during quantization.
    calibration_dataset_path = sys.argv[4]

    # `dr` (Data Reader) is an instance of ResNet50DataReader, which is a utility class that
    # reads the calibration dataset and prepares it for the quantization process.
    dr = calibration_reader(calibration_dataset_path, weights_name, 1)
    vai_q_onnx.quantize_static(
        input_model_path,
        output_model_path,
        dr,
        activation_type=vai_q_onnx.QuantType.QUInt8,
        calibrate_method=vai_q_onnx.PowerOfTwoMethod.MinMSE,
        enable_dpu=True,
        extra_options={
            'ActivationSymmetric': True,
        })

    print('Calibrated and quantized model saved at:', output_model_path)


if __name__ == '__main__':
    main()

sing

Python
Main Application File (Capture and Categorize frame, play music file).
import multiprocessing
import numpy as np
import cv2
from multiprocessing import Process, Queue
import os
from playsound import playsound
import keyboard
import time

import os
import onnxruntime as ort
from pathlib import Path

model = "xxxxxxxx.onnx"  #The name of the quantized MODEL

path = r'voe-4.0-win_amd64' # Usually in C:\Users\YourUserName\ryzen-ai-sw-1.1
providers = ['VitisAIExecutionProvider']
cache_dir = Path(__file__).parent.resolve()
print("Config Directory: ",os.path.join('..', path, 'vaip_config.json' ))
provider_options = [{
            'config_file': os.path.join('..', path, 'vaip_config.json'),
            'cacheDir': str(cache_dir),
            'cacheKey': 'modelcachekey_quick'
        }]


try:
    session = ort.InferenceSession(model, providers=providers, provider_options=provider_options)
except Exception as e:
    print("SESSION Failed!!!!!!")




def clear(q):
    while not q.empty():
        q.get() 
       


def sing(f):
    
    print("Path is:   ",os.path.realpath(f))
    playsound(os.path.realpath(f))
    

    

def camera(st, frame):
    cap = cv2.VideoCapture(0, cv2.CAP_DSHOW)
    while(True):
    # Capture frame-by-frame
      ret, frame = cap.read()

    # Our operations on the frame come here
      gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

    #ANALYZE FRAME
      
    # Check for Normal, Happy, Sad, Amgry
    
      cv2.imshow('frame',gray)

      # inference on captured image data
      input_data = frame
      try:
        outputs = session.run(None, {'input': input_data})
      except Exception as e:
        print("Image Analysis Failed")
      else:
        print("Image Analysis Done")

      if outputs=="NORMAL":
          clear(st)
          st.put("NORMAL")
      if outputs=="SAD":
          clear(st)
          st.put("SAD")
      if outputs=="HAPPY":
          clear(st)
          st.put("HAPPY")
      if outputs=="ANGRY":
          clear(st)
          st.put("ANGRY")
            
# Display the resulting frame and emulate the facial expression detection through keys if your model is not ready....

      key=cv2.waitKey(5)
      if (key == ord('z')):
          print("Pressed z")
          clear(st)
          st.put("DONE")
          print("PLACED DONE")
          break
      if (key == ord('q')): 
           print("Pressed q")
           clear(st)
           st.put("NORMAL")
           print("PLACED NORMAL") 
      if (key == ord('a')): 
           print("Pressed a")
           clear(st)
           st.put("ANGRY")
           print("PLACED ANGRY") 
      if (key == ord('b')): 
           print("Pressed b")
           clear(st)
           st.put("SAD")
           print("PLACED SAD")
      if (key == ord('c')): 
           print("Pressed c")
           clear(st)
           st.put("HAPPY")
           print("PLACED HAPPY")  
        
      
           

# When everything done, release the capture
    cap.release()
    cv2.destroyAllWindows()

def dj():
   nq=Queue()
   
   status=Queue()
   status.put("EMPTY")
   #Create a list of multiple mp3 files corresponding to different moods.
   normal_1="C:\\Users\\Tasos\\Music\\XXXXXXX.mp3"
   angry_1="C:\\Users\\Tasos\\Music\\YYYYY.mp3"
   happy_1="C:\\Users\\Tasos\\Music\\ZZZZZZ.mp3"
   sad_1="C:\\Users\\Tasos\\Music\\AAAAAAAAAAA.mp3"
   
   proc2 = Process(target=camera, args=(status,))
   
   print("Starting PROC2--> CAMERA")
   proc2.start()
  #  print("STARTED PRC CAMERA")
   while (True):
        msg = status.get()  
        if msg == "NORMAL":
            nq.put(normal_1)
            proc1 = Process(target=sing, args=(nq.get(),))  
            proc1.start()
        if msg == "ANGRY":
            nq.put(angry_1)
            proc1 = Process(target=sing, args=(nq.get(),))  
            proc1.start()
        if msg == "HAPPY":
            nq.put(happy_1)
            proc1 = Process(target=sing, args=(nq.get(),))  
            proc1.start()
        if msg == "SAD":
            nq.put(sad_1)
            proc1 = Process(target=sing, args=(nq.get(),))  
            proc1.start()
        if msg == "DONE":
            print("Message is: ", msg)
            print("Received DONE.....")
            proc1.terminate()  
            proc2.terminate()
            proc1.join()
            proc2.join()
            time.sleep(5)
            # procx = Process(target=camera, args=(status,))
            # procx.start()
            # time.sleep(15)
            # procx.terminate()
            
            # procx.join()

            break
   
            

if __name__ == "__main__":  # confirms that the code is under main function
    
    
    catgs = ['Happy', 'Sad', 'Angry', 'Normal']
        
    
    proc3 =Process(target=dj)  # instantiating without any argument
     
    
    
    proc3.start()

   
    
    proc3.join()

    print("BYE BYE!!!!")

Credits

Tasos
11 projects • 2 followers
Contact

Comments

Please log in or sign up to comment.