SIH-MIN, LIU陳博琳
Published

Forest Fire Prevention

Implementing a lightweight neural network on the KR260 edge device for precise wildfire detection.

37
Forest Fire Prevention

Things used in this project

Hardware components

Kria™ KR260 Robotics Starter Kit
AMD Kria™ KR260 Robotics Starter Kit
×1

Software apps and online services

Snappy Ubuntu Core
Snappy Ubuntu Core

Story

Read more

Code

cnn.py

Python
# -*- coding: utf-8 -*-
"""
Created on Mon May 13 15:47:43 2024

@author: a3311
"""

import numpy as np
import os
from sklearn.metrics import confusion_matrix
import seaborn as sn; sn.set(font_scale=1.4)
from sklearn.utils import shuffle           
import matplotlib.pyplot as plt             
import cv2                                 
import tensorflow as tf                
from tqdm import tqdm
from keras.models import Sequential
from keras.layers import Dense, Activation, Dropout, Flatten
from keras.layers import Conv2D
from keras.layers import MaxPooling2D
from keras.optimizers import SGD, Adam


class_names = ['nofire','fire']
class_names_label = {class_name:i for i, class_name in enumerate(class_names)}
# 'nofire': 0 / 'fire': 1

nb_classes = len(class_names)

IMAGE_SIZE = (64, 64)

def load_data():
    datasets = ['seg_train', 'seg_test']
    output = []
    
    # Iterate through training and test sets
    for dataset in datasets:
        
        images = []
        labels = []
        
        print("Loading {}".format(dataset))
        
        # Iterate through each folder corresponding to a category
        for folder in os.listdir(dataset):
            label = class_names_label[folder]
            
            # Iterate through each image in our folder
            for file in tqdm(os.listdir(os.path.join(dataset, folder))):
                
                # Get the path name of the image
                img_path = os.path.join(os.path.join(dataset, folder), file)
                
            
                # Open and resize the img
                image = cv2.imread(img_path)
                image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
                #Read photos with OpenCV, colors are considered as BGR, need to convert to RGB, errors indicate grayscale or already converted.
                image = cv2.resize(image, IMAGE_SIZE) 
                
                # Append the image and its corresponding label to the output
                images.append(image)
                labels.append(label)
                
        
                
        images = np.array(images, dtype = 'float32')
        labels = np.array(labels, dtype = 'int32')   
        
        output.append((images, labels))

    return output

(train_images, train_labels), (test_images, test_labels) = load_data()

#%%
'random'
train_images, train_labels = shuffle(train_images, train_labels, random_state=25)
'standardization'
train_images = train_images / 255.0 
test_images = test_images / 255.0
#%%
'Modeling'
input_shape = (64, 64, 3)

model = Sequential([
    Conv2D(64, (3, 3), input_shape=input_shape, padding='same',
           activation='relu', strides=2),
    MaxPooling2D(pool_size=(2, 2), strides=2),
    Dropout(0.2),
    Conv2D(128, (3, 3), input_shape=input_shape, padding='same',
           activation='relu', strides=2),
    MaxPooling2D(pool_size=(2, 2), strides=2),
    Dropout(0.2),
    Flatten(),
    Dropout(0.5),
    Dense(2, activation='softmax') #output layeractivaiton with softmax
])

model.compile(optimizer = 'adam', #SGD(lr=0.1)
              loss = 'sparse_categorical_crossentropy',
              metrics=['accuracy'])
#%%
history = model.fit(train_images, train_labels, 
                    #validation_data=(test_images, test_labels),
                    #verbose=2,callbacks=[earlyStop],
                    batch_size=128, epochs=50)
#%%
'model report'
plt.title('train_loss')
plt.ylabel('loss')
plt.xlabel('Epoch')
plt.plot(history.history["loss"])
#%%
'prediction'
predictions = model.predict(test_images)     # Vector of probabilities
pred_labels = np.argmax(predictions, axis = 1) # We take the highest probability

#%%
'confusion matrix'
CM = confusion_matrix(test_labels, pred_labels)
def accuracy(confusion_matrix):
    diagonal_sum = confusion_matrix.trace()
    sum_of_all_elements = confusion_matrix.sum()
    return diagonal_sum / sum_of_all_elements 
print(accuracy(CM))

#use function
from sklearn import metrics
#Print the prediction accuracy
print ("Accuracy = ", metrics.accuracy_score(test_labels, pred_labels))
#%%
'visiable confusion matrix'
ax = plt.axes()
sn.heatmap(CM, annot=True, 
           annot_kws={"size": 10}, 
           xticklabels=class_names, 
           yticklabels=class_names, ax = ax)
ax.set_title('Confusion matrix')
plt.show()
#%%
'save model'
from keras.models import load_model
model.save("CNN_model.keras")
model = load_model('CNN_model.keras')

cnn_test.py

Python
import glob
import os
import random

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd

import torch
import torchvision
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

from PIL import Image
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
from torch.utils.data import DataLoader, Dataset, RandomSampler, Subset, random_split
from torchvision import transforms, models, datasets
from tqdm import tqdm

# device
# os.environ["CUDA_VISIBLE_DEVICES"] = "0"
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'Using {device} device')

# set the seed
def set_all_seed(seed):
    np.random.seed(seed)
    random.seed(seed)
    torch.manual_seed(seed)
set_all_seed(123)

# batch size
batch_size = 256

# resize the data image
resize_transform = transforms.Resize((32, 32))
# resize_transform = transforms.Resize((460, 860))

# data preprocessing
train_transform = transforms.Compose([
    transforms.Pad(4, padding_mode='reflect'),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(10),
    resize_transform,
    transforms.RandomCrop(32),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.2),
    transforms.RandomErasing(),
    transforms.ToTensor(),
])
test_transform = transforms.Compose([
    resize_transform,
    transforms.ToTensor(),
])

# import dataset
#dataset_path = r'C:\Users\112033639\Desktop\\AMD_contest\Dataset\Fire'
#train_dataset = datasets.ImageFolder(root=dataset_path, transform=train_transform)
# = datasets.ImageFolder(root=dataset_path, transform=test_transform)

#train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, pin_memory=True)
#test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, pin_memory=True)

# import dataset
dataset_path = r'C:\Users\112033639\Desktop\\AMD_contest\Dataset\Fire\fire_dataset'
dataset = datasets.ImageFolder(root=dataset_path)

# Split the dataset into training and validation sets (80% training, 20% testing)
train_size = int(0.8 * len(dataset))
test_size = len(dataset) - train_size
train_dataset, test_dataset = random_split(dataset, [train_size, test_size])

# Apply transforms
train_dataset.dataset.transform = train_transform
test_dataset.dataset.transform = test_transform

# Create data loaders
train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, pin_memory=True)
test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, pin_memory=True)

# bulid the network
class NeuralNetwork(nn.Module):
    def __init__(self,input_size, output_size):
        super().__init__()
        self.flatten = nn.Flatten()
        self.linear_relu_stack = nn.Sequential(
            nn.Linear(input_size, 2048),  
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(2048, 2048),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(2048, 1024),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(1024, 512),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(512, output_size), 
        )

    def forward(self, x):
        x = self.flatten(x)
        logits = self.linear_relu_stack(x)
        # logits = self.hidden_layers(x)
        return logits

# class NeuralNetwork(nn.Module):
    # def __init__(self, input_channels, output_size):
        # super(NeuralNetwork, self).__init__()
        # self.conv1 = nn.Conv2d(input_channels, 64, kernel_size=3, stride=2, padding=1)
        # self.pool1 = nn.MaxPool2d(kernel_size=2, stride=2)
        # self.dropout1 = nn.Dropout(0.5)
        # self.conv2 = nn.Conv2d(64, 128, kernel_size=3, stride=2, padding=1)
        # self.pool2 = nn.MaxPool2d(kernel_size=2, stride=2)
        # self.dropout2 = nn.Dropout(0.5)
        # self.flatten = nn.Flatten()
        # self.fc1 = nn.Linear(128 * 2 * 2, 512)  
        # self.dropout3 = nn.Dropout(0.5)
        # self.fc2 = nn.Linear(512, output_size)
    
    # def forward(self, x):
        # x = self.conv1(x)
        # x = F.relu(x)
        # x = self.pool1(x)
        # x = self.dropout1(x)
        # x = self.conv2(x)
        # x = F.relu(x)
        # x = self.pool2(x)
        # x = self.dropout2(x)
        # x = self.flatten(x)
        # x = self.fc1(x)
        # x = F.relu(x)
        # x = self.dropout3(x)
        # x = self.fc2(x)
        # return x


input_channels = 3
input_size = 32*32*3
output_size = 2


# load the model
# model = NeuralNetwork(input_channels, output_size)
model = NeuralNetwork(input_size, output_size)
model.eval()
print(model)

# num_fc = model.fc.in_features
# model.fc = nn.Linear(num_fc, 2)

# loss function
loss_fn = nn.CrossEntropyLoss()

# optimizer (L2 normalization)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3, weight_decay=1e-3)

# learning rate scheduler
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.1)


# train
def train(dataloader, model, loss_fn, optimizer):
    num_batches = len(dataloader)
    size = len(dataloader.dataset)
    epoch_loss = 0
    correct = 0
    
    all_preds = []
    all_labels = []

    model.train()

    for X, y in tqdm(dataloader):
        X, y = X.to(device), y.to(device)

        # Compute prediction error
        pred = model(X)
        loss = loss_fn(pred, y)

        # Backpropagation
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        epoch_loss += loss.item()
        pred = pred.argmax(dim=1, keepdim=True)
        correct += pred.eq(y.view_as(pred)).sum().item()
        
        # Store predictions and labels
        all_preds.extend(pred.view(-1).cpu().numpy())
        all_labels.extend(y.cpu().numpy())

    avg_epoch_loss = epoch_loss / num_batches
    avg_acc = correct / size
    
    # Calculate confusion matrix
    cm = confusion_matrix(all_labels, all_preds)
    disp = ConfusionMatrixDisplay(confusion_matrix=cm)
    disp.plot()
    plt.show()

    return avg_epoch_loss, avg_acc

# test
def test(dataloader, model, loss_fn):
    num_batches = len(dataloader)
    size = len(dataloader.dataset)
    epoch_loss = 0
    correct = 0
    
    all_preds = []
    all_labels = []

    model.eval()

    with torch.no_grad():
        for X, y in tqdm(dataloader):

            X, y = X.to(device), y.to(device)

            pred = model(X)

            epoch_loss += loss_fn(pred, y).item()
            pred = pred.argmax(dim=1, keepdim=True)
            correct += pred.eq(y.view_as(pred)).sum().item()
            
            # Store predictions and labels
            all_preds.extend(pred.view(-1).cpu().numpy())
            all_labels.extend(y.cpu().numpy())

    avg_epoch_loss = epoch_loss / num_batches
    avg_acc = correct / size
    
    # Calculate confusion matrix
    cm = confusion_matrix(all_labels, all_preds)
    disp = ConfusionMatrixDisplay(confusion_matrix=cm)
    disp.plot()
    plt.show()

    return avg_epoch_loss, avg_acc

# epoch
epochs = 50

# Initialize lists to store training and testing metrics
train_losses = []
train_accuracies = []
test_losses = []
test_accuracies = []

# train accuracy initialization
train_acc1 = 0

# test accuracy initialization
test_acc1 = 0

# start_train
for epoch in range(epochs):

    # train loss/accuracy
    train_loss, train_acc = train(train_dataloader, model, loss_fn, optimizer)
    train_losses.append(train_loss)
    train_accuracies.append(train_acc)
    print(f"Epoch {epoch + 1:2d}: Loss = {train_loss:.4f} Acc = {train_acc:.2f}")
    train_acc1+=train_acc

    # test loss/accuracy
    test_loss, test_acc = test(test_dataloader, model, loss_fn)
    test_losses.append(test_loss)
    test_accuracies.append(test_acc)
    print(f"Epoch {epoch + 1:2d}: Loss = {train_loss:.4f} Acc = {train_acc:.2f} Test_Loss = {test_loss:.4f} Test_Acc = {test_acc:.2f}")
    test_acc1 += test_acc
    
    # save the model
    torch.save(model.state_dict(), f'model_epoch_{epoch}.pth')

# Plot accuracy and loss curves
epochs_range = range(1, epochs + 1)

plt.figure(figsize=(12, 5))

# Plot training & validation accuracy
plt.subplot(1, 2, 1)
plt.plot(epochs_range, train_accuracies, label='Training Accuracy')
plt.plot(epochs_range, test_accuracies, label='Validation Accuracy')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.title('Training and Validation Accuracy')
plt.legend()

# Plot training & validation loss
plt.subplot(1, 2, 2)
plt.plot(epochs_range, train_losses, label='Training Loss')
plt.plot(epochs_range, test_losses, label='Validation Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.title('Training and Validation Loss')
plt.legend()

plt.show()

hier_kmeans.py

Python
import cv2
import numpy as np
from sklearn.cluster import AgglomerativeClustering
from collections import Counter
import pandas as pd
import numpy as np 
import matplotlib.pyplot as plt 
import sklearn
from sklearn.cluster import KMeans
from collections import Counter
from skimage.color import rgb2lab, deltaE_cie76
import random

def get_image(image_path, downsample_factor=0.2):
    image = cv2.imread(image_path)  
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)  
    if downsample_factor < 1:  # 
        image = cv2.resize(image, (0, 0), fx=downsample_factor, fy=downsample_factor)
    return image

def RGB2HEX(color):
    return "#{:02x}{:02x}{:02x}".format(int(color[0]), int(color[1]), int(color[2]))  

def top_down_hierarchical_clustering(image, num_colors, sample_size=10000, random_seed=50):
    pixels = image.reshape((-1, 3))
    np.random.seed(random_seed)  # 
    sample_size = min(sample_size, len(pixels))
    random_indices = np.random.choice(len(pixels), sample_size, replace=False)  
    sample_pixels = pixels[random_indices]
    clustering = AgglomerativeClustering(n_clusters=num_colors)
    labels = clustering.fit_predict(sample_pixels)

    cluster_centers = []
    for label in range(num_colors):
        cluster_center = sample_pixels[labels == label].mean(axis=0)
        cluster_centers.append(cluster_center)

    return np.array(cluster_centers), labels

image = get_image('fire.41.png', downsample_factor=0.1)

number_of_colors = 10

cluster_centers, labels = top_down_hierarchical_clustering(image, number_of_colors)
modified_image = image.reshape(image.shape[0]*image.shape[1], 3)  #  RGB 
clf = KMeans(n_clusters=number_of_colors, init=cluster_centers, n_init=1)
labels = clf.fit_predict(modified_image)  # 
counts = Counter(labels)  # 

center_colors = clf.cluster_centers_  # 
# 
ordered_colors = [center_colors[i] for i in counts.keys()]  # 
hex_colors = [RGB2HEX(ordered_colors[i]) for i in counts.keys()]  # 
rgb_colors = [ordered_colors[i] for i in counts.keys()]  #  RGB 

plt.title('Colors Detection ($n=10$)', fontsize=20)  # 
plt.pie(counts.values(), labels = hex_colors, colors = hex_colors)  # 

judgement.py

Python
# -*- coding: utf-8 -*-
"""
Created on Mon May 13 16:54:57 2024

@author: a3311
"""

# necessary libraries
import numpy as np
import cv2
from keras.models import load_model

# load model
model = load_model("CNN_model.keras")

# read one image and preprocess
def preprocess_image(image_path):
    # read image
    image = cv2.imread(image_path)
    # change color space
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    # adjust size
    image = cv2.resize(image, (64, 64))
    # normalization
    image = image / 255.0
    # change image into numpy array and add one dimensionality
    # so that it meets the needs of model input
    image = np.expand_dims(image, axis=0)
    return image

# define image path
image_path = "fire.688.png"  

# preprocess image and predict
image = preprocess_image(image_path)
prediction = model.predict(image)

# the probability of nofire(left) / fire(right)
print(prediction)

# print the results of prediction
if (prediction[0][0] > prediction[0][1]):
    print("non-fire")
else:
    print("fire")

kr260_test.py

Python
import numpy as np
import cv2
import torch
import torch.nn as nn
import torchvision.transforms as transforms
from PIL import Image

# model structure
class NeuralNetwork(nn.Module):
    def __init__(self,input_size, output_size):
        super().__init__()
        self.flatten = nn.Flatten()
        self.linear_relu_stack = nn.Sequential(
            nn.Linear(input_size, 1024),  
            nn.ReLU(),
            nn.Linear(1024, 1024),
            nn.ReLU(),
            nn.Linear(1024, 512),  
            nn.ReLU(),
            nn.Linear(512, output_size),  
        )

    def forward(self, x):
        x = self.flatten(x)
        logits = self.linear_relu_stack(x)
        # logits = self.hidden_layers(x)
        return logits

# initialize
input_size = 32 * 32 * 3  
output_size = 2
model = NeuralNetwork(input_size, output_size)

# load state dictionary
# enter in the last epoch (.pth)
model.load_state_dict(torch.load("model_epoch_4.pth"))
model.eval()

# data preprocess
def preprocess_image(image_path):
    # read image
    image = Image.open(image_path).convert("RGB")
    # image preprocess
    transform = transforms.Compose([
        transforms.Resize((32, 32)),  
        transforms.ToTensor(),
    ])
    image = transform(image)
    image = image.unsqueeze(0)  
    return image

# image path
image_path = "fire.41.png"  

# predict
image = preprocess_image(image_path)
image = image.to(torch.device('cpu'))  

with torch.no_grad():
    output = model(image)
    prediction = torch.argmax(output, dim=1)
    print(f'Predicted class: {prediction.item()}')

# print
if prediction.item() == 0:
    print("non-fire")
else:
    print("fire")

Credits

SIH-MIN, LIU
1 project • 0 followers
陳博琳
1 project • 0 followers
我一直沒辦法被找到,我想要參加比賽,哭阿

Comments