import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import numpy as np
from typing import Dict, List, Tuple, Optional
import json
from datetime import datetime, timedelta
import threading
from queue import Queue
import tkinter as tk
from tkinter import ttk, filedialog, messagebox
import matplotlib.pyplot as plt
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
from matplotlib.figure import Figure
import matplotlib.dates as mdates
class AquaponicsController(nn.Module):
"""Neural Network for Aquaponics System Control"""
def __init__(self, input_features: int = 13, hidden_layers: List[int] = [64, 32, 16],
output_controls: int = 8, activation: str = 'relu', dropout: float = 0.1):
super(AquaponicsController, self).__init__()
# Input features: pH, temp, DO, ammonia, nitrite, nitrate, water_level,
# light_intensity, humidity, air_temp, EC, time_of_day, day_of_year
# Output controls: water_pump, air_pump, heater, cooler, pH_up, pH_down,
# grow_lights, feeding_system
layers = []
prev_size = input_features
for hidden_size in hidden_layers:
layers.append(nn.Linear(prev_size, hidden_size))
if activation == 'relu':
layers.append(nn.ReLU())
elif activation == 'tanh':
layers.append(nn.Tanh())
elif activation == 'leaky_relu':
layers.append(nn.LeakyReLU())
if dropout > 0:
layers.append(nn.Dropout(dropout))
prev_size = hidden_size
# Output layer with sigmoid for control values (0-1)
layers.append(nn.Linear(prev_size, output_controls))
layers.append(nn.Sigmoid())
self.model = nn.Sequential(*layers)
# Define sensor and control names
self.sensor_names = ['pH', 'Water Temp (°C)', 'Dissolved O2 (mg/L)',
'Ammonia (mg/L)', 'Nitrite (mg/L)', 'Nitrate (mg/L)',
'Water Level (cm)', 'Light (lux)', 'Humidity (%)',
'Air Temp (°C)', 'EC (mS/cm)', 'Time (hour)', 'Day of Year']
self.control_names = ['Water Pump', 'Air Pump', 'Heater', 'Cooler',
'pH Up Pump', 'pH Down Pump', 'Grow Lights', 'Fish Feeder']
def forward(self, x):
return self.model(x)
def get_control_decisions(self, sensor_data: torch.Tensor) -> Dict[str, float]:
"""Convert sensor inputs to control decisions"""
with torch.no_grad():
outputs = self.forward(sensor_data)
decisions = {}
for i, name in enumerate(self.control_names):
decisions[name] = outputs[0][i].item()
return decisions
class AquaponicsDataSimulator:
"""Simulates realistic aquaponics sensor data for training"""
def __init__(self):
self.optimal_ranges = {
'pH': (6.8, 7.2),
'water_temp': (22, 26),
'dissolved_o2': (5, 8),
'ammonia': (0, 0.5),
'nitrite': (0, 0.5),
'nitrate': (5, 150),
'water_level': (45, 55),
'light_intensity': (3000, 5000),
'humidity': (60, 80),
'air_temp': (20, 28),
'ec': (1.0, 2.5)
}
def generate_training_data(self, n_samples: int = 1000) -> Tuple[torch.Tensor, torch.Tensor]:
"""Generate synthetic training data with realistic patterns"""
X = []
y = []
for _ in range(n_samples):
# Generate sensor readings
hour = np.random.uniform(0, 24)
day_of_year = np.random.randint(1, 366)
# Seasonal patterns
seasonal_temp_var = 5 * np.sin(2 * np.pi * (day_of_year - 80) / 365) # Peak summer ~day 171
seasonal_light_var = 2 * np.sin(2 * np.pi * (day_of_year - 80) / 365) # Longer days in summer
# Daily patterns
daily_temp_var = 3 * np.sin(2 * np.pi * hour / 24)
# Daylight hours vary by season
daylight_hours = 12 + seasonal_light_var * 2 # 8-16 hours depending on season
sunrise = 12 - daylight_hours / 2
sunset = 12 + daylight_hours / 2
light_pattern = 0
if sunrise <= hour <= sunset:
light_pattern = 5000 * np.sin(np.pi * (hour - sunrise) / daylight_hours)
sensors = [
np.random.normal(7.0, 0.3), # pH
np.random.normal(24 + seasonal_temp_var + daily_temp_var, 1), # water temp
np.random.normal(6.5 - 0.5 * seasonal_temp_var / 5, 0.5), # DO (inversely related to temp)
np.random.exponential(0.2), # ammonia
np.random.exponential(0.1), # nitrite
np.random.normal(20 + 10 * np.sin(2 * np.pi * day_of_year / 365), 10), # nitrate (seasonal)
np.random.normal(50, 2), # water level
light_pattern + np.random.normal(0, 500), # light
np.random.normal(70 - seasonal_temp_var, 5), # humidity (inverse of temp)
np.random.normal(23 + seasonal_temp_var + daily_temp_var, 2), # air temp
np.random.normal(1.5 + 0.2 * np.sin(2 * np.pi * day_of_year / 365), 0.3), # EC
hour, # time of day
day_of_year # day of year
]
# Generate control outputs based on sensor readings
controls = self._calculate_optimal_controls(sensors, seasonal_temp_var)
X.append(sensors)
y.append(controls)
return torch.FloatTensor(X), torch.FloatTensor(y)
def _calculate_optimal_controls(self, sensors: List[float], seasonal_var: float) -> List[float]:
"""Calculate optimal control values based on sensor readings"""
controls = [0.0] * 8
# Water pump (based on time - cyclic operation)
hour = sensors[11]
controls[0] = 1.0 if (hour % 2) < 0.5 else 0.3 # Flood and drain cycle
# Air pump (based on DO and temperature)
if sensors[2] < 5: # Low DO
controls[1] = 1.0
else:
# Increase aeration in warmer months
controls[1] = 0.5 + 0.2 * max(0, seasonal_var / 5)
# Heater (based on water temp and season)
target_temp = 24 + seasonal_var * 0.5 # Adjust target by season
if sensors[1] < target_temp - 2:
controls[2] = min(1.0, (target_temp - 2 - sensors[1]) / 5)
# Cooler (based on water temp and season)
if sensors[1] > target_temp + 2:
controls[3] = min(1.0, (sensors[1] - target_temp - 2) / 5)
# pH control
if sensors[0] < 6.8:
controls[4] = min(1.0, (6.8 - sensors[0]) / 0.5) # pH up
elif sensors[0] > 7.2:
controls[5] = min(1.0, (sensors[0] - 7.2) / 0.5) # pH down
# Grow lights (based on time, current light, and season)
day_of_year = sensors[12]
# Supplement more in winter
winter_supplement = max(0, 1 - abs(day_of_year - 355) / 180) if day_of_year > 265 else max(0, 1 - day_of_year / 80)
if 6 <= hour <= 18 and sensors[7] < 3000:
controls[6] = min(1.0, 0.7 + 0.3 * winter_supplement)
else:
controls[6] = 0.0
# Fish feeder (scheduled feeding times, adjusted by season)
# Feed more in warmer months
feeding_multiplier = 1.0 + 0.3 * max(0, seasonal_var / 5)
if hour in [8, 14, 20]:
controls[7] = min(1.0, 0.8 * feeding_multiplier)
else:
controls[7] = 0.0
return controls
class AquaponicsTrainerGUI:
"""GUI Application for Training Aquaponics Control System"""
def __init__(self, root):
self.root = root
self.root.title("Aquaponics Neural Network Controller")
self.root.geometry("1400x900")
# Initialize variables
self.model = None
self.trainer = None
self.train_thread = None
self.simulator = AquaponicsDataSimulator()
self.sensor_history = []
self.control_history = []
# Setup GUI
self.setup_ui()
self.setup_styles()
# Start update loops
self.update_training_status()
self.update_sensor_display()
def setup_styles(self):
style = ttk.Style()
style.theme_use('clam')
# Configure styles
style.configure('Title.TLabel', font=('Arial', 16, 'bold'))
style.configure('Header.TLabel', font=('Arial', 12, 'bold'))
style.configure('Sensor.TLabel', font=('Arial', 10))
style.configure('Control.TLabel', font=('Arial', 10, 'bold'))
style.configure('Warning.TLabel', foreground='red')
style.configure('Good.TLabel', foreground='green')
def setup_ui(self):
# Create menu
self.create_menu()
# Main container with tabs
notebook = ttk.Notebook(self.root)
notebook.pack(fill=tk.BOTH, expand=True, padx=5, pady=5)
# System Monitor Tab
monitor_frame = ttk.Frame(notebook)
notebook.add(monitor_frame, text="System Monitor")
self.setup_monitor_tab(monitor_frame)
# Training Tab
training_frame = ttk.Frame(notebook)
notebook.add(training_frame, text="Neural Network Training")
self.setup_training_tab(training_frame)
# Control Settings Tab
control_frame = ttk.Frame(notebook)
notebook.add(control_frame, text="Control Settings")
self.setup_control_tab(control_frame)
# Data Analysis Tab
analysis_frame = ttk.Frame(notebook)
notebook.add(analysis_frame, text="Data Analysis")
self.setup_analysis_tab(analysis_frame)
# Status bar
self.status_var = tk.StringVar(value="System Ready")
status_bar = ttk.Label(self.root, textvariable=self.status_var, relief=tk.SUNKEN)
status_bar.pack(side=tk.BOTTOM, fill=tk.X, padx=5, pady=2)
def create_menu(self):
menubar = tk.Menu(self.root)
self.root.config(menu=menubar)
# File menu
file_menu = tk.Menu(menubar, tearoff=0)
menubar.add_cascade(label="File", menu=file_menu)
file_menu.add_command(label="Save Model...", command=self.save_model)
file_menu.add_command(label="Load Model...", command=self.load_model)
file_menu.add_command(label="Export Data...", command=self.export_data)
file_menu.add_separator()
file_menu.add_command(label="Exit", command=self.root.quit)
# System menu
system_menu = tk.Menu(menubar, tearoff=0)
menubar.add_cascade(label="System", menu=system_menu)
system_menu.add_command(label="Start Monitoring", command=self.start_monitoring)
system_menu.add_command(label="Stop Monitoring", command=self.stop_monitoring)
system_menu.add_separator()
system_menu.add_command(label="Emergency Stop", command=self.emergency_stop)
# Help menu
help_menu = tk.Menu(menubar, tearoff=0)
menubar.add_cascade(label="Help", menu=help_menu)
help_menu.add_command(label="About", command=self.show_about)
def setup_monitor_tab(self, parent):
# Main container
main_frame = ttk.Frame(parent)
main_frame.pack(fill=tk.BOTH, expand=True, padx=10, pady=10)
# Left panel - Sensor readings
left_frame = ttk.LabelFrame(main_frame, text="Sensor Readings", padding=10)
left_frame.pack(side=tk.LEFT, fill=tk.BOTH, expand=True, padx=(0, 5))
# Sensor display grid
self.sensor_vars = {}
self.sensor_labels = {}
sensors = [
('pH', '7.0', (6.8, 7.2)),
('Water Temp', '24.0°C', (22, 26)),
('Dissolved O2', '6.5 mg/L', (5, 8)),
('Ammonia', '0.2 mg/L', (0, 0.5)),
('Nitrite', '0.1 mg/L', (0, 0.5)),
('Nitrate', '20 mg/L', (5, 150)),
('Water Level', '50 cm', (45, 55)),
('Light', '0 lux', (0, 5000)),
('Humidity', '70%', (60, 80)),
('Air Temp', '23°C', (20, 28)),
('EC', '1.5 mS/cm', (1.0, 2.5)),
('Day of Year', '1', (1, 365))
]
for i, (name, default, (min_val, max_val)) in enumerate(sensors):
# Name label
ttk.Label(left_frame, text=f"{name}:", font=('Arial', 11)).grid(
row=i, column=0, sticky=tk.W, padx=5, pady=3
)
# Value variable and label
var = tk.StringVar(value=default)
self.sensor_vars[name] = var
label = ttk.Label(left_frame, textvariable=var, font=('Arial', 11, 'bold'))
label.grid(row=i, column=1, sticky=tk.W, padx=5, pady=3)
self.sensor_labels[name] = label
# Range label
ttk.Label(left_frame, text=f"({min_val}-{max_val})",
font=('Arial', 9), foreground='gray').grid(
row=i, column=2, sticky=tk.W, padx=5, pady=3
)
# Right panel - Control outputs
right_frame = ttk.LabelFrame(main_frame, text="Control Outputs", padding=10)
right_frame.pack(side=tk.RIGHT, fill=tk.BOTH, expand=True, padx=(5, 0))
# Control display
self.control_vars = {}
self.control_indicators = {}
controls = [
'Water Pump', 'Air Pump', 'Heater', 'Cooler',
'pH Up Pump', 'pH Down Pump', 'Grow Lights', 'Fish Feeder'
]
for i, name in enumerate(controls):
frame = ttk.Frame(right_frame)
frame.grid(row=i, column=0, sticky=tk.W, padx=5, pady=5)
# Control name
ttk.Label(frame, text=f"{name}:", font=('Arial', 11)).pack(side=tk.LEFT)
# Status indicator (canvas for colored circle)
canvas = tk.Canvas(frame, width=20, height=20, highlightthickness=0)
canvas.pack(side=tk.LEFT, padx=10)
indicator = canvas.create_oval(2, 2, 18, 18, fill='gray', outline='black')
self.control_indicators[name] = (canvas, indicator)
# Value label
var = tk.StringVar(value="0%")
self.control_vars[name] = var
ttk.Label(frame, textvariable=var, font=('Arial', 10, 'bold')).pack(side=tk.LEFT)
# Bottom panel - System status
bottom_frame = ttk.LabelFrame(parent, text="System Status", padding=10)
bottom_frame.pack(fill=tk.X, padx=10, pady=(10, 0))
# Status grid
self.system_status = {
'Fish Tank': tk.StringVar(value="Normal"),
'Grow Bed': tk.StringVar(value="Normal"),
'Biofilter': tk.StringVar(value="Active"),
'Neural Network': tk.StringVar(value="Not Loaded")
}
for i, (system, var) in enumerate(self.system_status.items()):
ttk.Label(bottom_frame, text=f"{system}:", font=('Arial', 10)).grid(
row=0, column=i*2, sticky=tk.W, padx=5
)
ttk.Label(bottom_frame, textvariable=var, font=('Arial', 10, 'bold')).grid(
row=0, column=i*2+1, sticky=tk.W, padx=5
)
def setup_training_tab(self, parent):
# Left panel - Configuration
left_frame = ttk.Frame(parent)
left_frame.pack(side=tk.LEFT, fill=tk.BOTH, padx=10, pady=10)
# Network architecture
arch_frame = ttk.LabelFrame(left_frame, text="Network Architecture", padding=10)
arch_frame.pack(fill=tk.X, pady=(0, 10))
ttk.Label(arch_frame, text="Hidden Layers:").grid(row=0, column=0, sticky=tk.W, pady=5)
self.hidden_layers_var = tk.StringVar(value="64,32,16")
ttk.Entry(arch_frame, textvariable=self.hidden_layers_var, width=20).grid(
row=0, column=1, pady=5
)
ttk.Label(arch_frame, text="Activation:").grid(row=1, column=0, sticky=tk.W, pady=5)
self.activation_var = tk.StringVar(value="relu")
ttk.Combobox(arch_frame, textvariable=self.activation_var,
values=["relu", "tanh", "leaky_relu"],
width=18, state="readonly").grid(row=1, column=1, pady=5)
ttk.Label(arch_frame, text="Dropout:").grid(row=2, column=0, sticky=tk.W, pady=5)
self.dropout_var = tk.DoubleVar(value=0.1)
ttk.Scale(arch_frame, from_=0, to=0.5, variable=self.dropout_var,
orient=tk.HORIZONTAL, length=150).grid(row=2, column=1, pady=5)
# Training parameters
train_frame = ttk.LabelFrame(left_frame, text="Training Parameters", padding=10)
train_frame.pack(fill=tk.X, pady=(0, 10))
params = [
("Epochs:", "epochs", 1000),
("Batch Size:", "batch_size", 32),
("Learning Rate:", "lr", 0.001),
("Validation Split:", "val_split", 0.2)
]
self.train_params = {}
for i, (label, key, default) in enumerate(params):
ttk.Label(train_frame, text=label).grid(row=i, column=0, sticky=tk.W, pady=5)
if key in ["lr", "val_split"]:
var = tk.DoubleVar(value=default)
else:
var = tk.IntVar(value=default)
self.train_params[key] = var
ttk.Entry(train_frame, textvariable=var, width=10).grid(row=i, column=1, pady=5)
# Data generation
data_frame = ttk.LabelFrame(left_frame, text="Training Data", padding=10)
data_frame.pack(fill=tk.X, pady=(0, 10))
ttk.Label(data_frame, text="Samples:").grid(row=0, column=0, sticky=tk.W, pady=5)
self.n_samples_var = tk.IntVar(value=5000)
ttk.Entry(data_frame, textvariable=self.n_samples_var, width=10).grid(
row=0, column=1, pady=5
)
ttk.Button(data_frame, text="Generate Data",
command=self.generate_training_data).grid(row=1, column=0, columnspan=2, pady=10)
# Control buttons
button_frame = ttk.Frame(left_frame)
button_frame.pack(fill=tk.X)
self.build_btn = ttk.Button(button_frame, text="Build Network", command=self.build_network)
self.build_btn.pack(side=tk.LEFT, padx=5)
self.train_btn = ttk.Button(button_frame, text="Start Training", command=self.start_training)
self.train_btn.pack(side=tk.LEFT, padx=5)
self.stop_btn = ttk.Button(button_frame, text="Stop Training",
command=self.stop_training, state=tk.DISABLED)
self.stop_btn.pack(side=tk.LEFT, padx=5)
# Right panel - Visualization
right_frame = ttk.Frame(parent)
right_frame.pack(side=tk.RIGHT, fill=tk.BOTH, expand=True, padx=10, pady=10)
# Progress
progress_frame = ttk.LabelFrame(right_frame, text="Training Progress", padding=10)
progress_frame.pack(fill=tk.X, pady=(0, 10))
self.epoch_label = ttk.Label(progress_frame, text="Epoch: 0/0")
self.epoch_label.pack()
self.progress_var = tk.DoubleVar()
ttk.Progressbar(progress_frame, variable=self.progress_var,
length=400).pack(pady=5)
self.loss_label = ttk.Label(progress_frame, text="Loss: -")
self.loss_label.pack()
# Loss plot
plot_frame = ttk.LabelFrame(right_frame, text="Training Loss", padding=10)
plot_frame.pack(fill=tk.BOTH, expand=True)
self.fig = Figure(figsize=(6, 4), dpi=100)
self.ax = self.fig.add_subplot(111)
self.ax.set_xlabel('Epoch')
self.ax.set_ylabel('Loss')
self.ax.grid(True, alpha=0.3)
self.canvas = FigureCanvasTkAgg(self.fig, master=plot_frame)
self.canvas.draw()
self.canvas.get_tk_widget().pack(fill=tk.BOTH, expand=True)
def setup_control_tab(self, parent):
# Control strategy settings
strategy_frame = ttk.LabelFrame(parent, text="Control Strategy", padding=15)
strategy_frame.pack(fill=tk.X, padx=10, pady=10)
# Operating mode
ttk.Label(strategy_frame, text="Operating Mode:", font=('Arial', 12)).grid(
row=0, column=0, sticky=tk.W, pady=5
)
self.mode_var = tk.StringVar(value="neural")
modes = [
("Neural Network Control", "neural"),
("Rule-Based Control", "rules"),
("Manual Override", "manual"),
("Hybrid (NN + Rules)", "hybrid")
]
for i, (text, value) in enumerate(modes):
ttk.Radiobutton(strategy_frame, text=text, variable=self.mode_var,
value=value).grid(row=i+1, column=0, sticky=tk.W, padx=20)
# Setpoints
setpoint_frame = ttk.LabelFrame(parent, text="System Setpoints", padding=15)
setpoint_frame.pack(fill=tk.X, padx=10, pady=10)
self.setpoints = {}
setpoint_config = [
("Target pH:", "ph", 7.0, 6.0, 8.0),
("Target Temperature:", "temp", 24.0, 18.0, 30.0),
("Target DO:", "do", 6.5, 4.0, 10.0),
("Target EC:", "ec", 1.5, 0.5, 3.0)
]
for i, (label, key, default, min_val, max_val) in enumerate(setpoint_config):
ttk.Label(setpoint_frame, text=label).grid(row=i, column=0, sticky=tk.W, pady=5)
var = tk.DoubleVar(value=default)
self.setpoints[key] = var
scale = ttk.Scale(setpoint_frame, from_=min_val, to=max_val,
variable=var, orient=tk.HORIZONTAL, length=200)
scale.grid(row=i, column=1, pady=5, padx=10)
value_label = ttk.Label(setpoint_frame, text=f"{default:.1f}")
value_label.grid(row=i, column=2, pady=5)
# Update label when scale moves
def update_label(var=var, label=value_label):
label.config(text=f"{var.get():.1f}")
scale.config(command=lambda x, func=update_label: func())
# Safety limits
safety_frame = ttk.LabelFrame(parent, text="Safety Limits", padding=15)
safety_frame.pack(fill=tk.X, padx=10, pady=10)
safety_text = """Critical Limits:
• pH: < 6.0 or > 8.5 triggers alarm
• Temperature: < 15°C or > 32°C triggers alarm
• Ammonia: > 1.0 mg/L triggers water change
• DO: < 4.0 mg/L increases aeration
• Water Level: < 40cm or > 60cm triggers pump adjustment
"""
ttk.Label(safety_frame, text=safety_text, font=('Arial', 10)).pack()
def setup_analysis_tab(self, parent):
# Time range selection
control_frame = ttk.Frame(parent)
control_frame.pack(fill=tk.X, padx=10, pady=10)
ttk.Label(control_frame, text="Time Range:", font=('Arial', 12)).pack(side=tk.LEFT, padx=5)
self.time_range_var = tk.StringVar(value="24h")
time_combo = ttk.Combobox(control_frame, textvariable=self.time_range_var,
values=["1h", "6h", "24h", "7d", "30d"],
width=10, state="readonly")
time_combo.pack(side=tk.LEFT, padx=5)
ttk.Button(control_frame, text="Refresh", command=self.refresh_analysis).pack(side=tk.LEFT, padx=20)
ttk.Button(control_frame, text="Export Report", command=self.export_report).pack(side=tk.LEFT)
# Analysis notebook
analysis_notebook = ttk.Notebook(parent)
analysis_notebook.pack(fill=tk.BOTH, expand=True, padx=10, pady=5)
# Sensor trends
trends_frame = ttk.Frame(analysis_notebook)
analysis_notebook.add(trends_frame, text="Sensor Trends")
# Create matplotlib figure for trends
self.trends_fig = Figure(figsize=(10, 6), dpi=100)
self.trends_canvas = FigureCanvasTkAgg(self.trends_fig, master=trends_frame)
self.trends_canvas.draw()
self.trends_canvas.get_tk_widget().pack(fill=tk.BOTH, expand=True)
# System performance
performance_frame = ttk.Frame(analysis_notebook)
analysis_notebook.add(performance_frame, text="System Performance")
# Performance metrics
metrics_frame = ttk.LabelFrame(performance_frame, text="Key Metrics", padding=15)
metrics_frame.pack(fill=tk.X, padx=10, pady=10)
self.metrics = {
'System Uptime': tk.StringVar(value="0 hours"),
'Water Usage': tk.StringVar(value="0 L"),
'Energy Usage': tk.StringVar(value="0 kWh"),
'Fish Growth Rate': tk.StringVar(value="0 g/day"),
'Plant Growth Rate': tk.StringVar(value="0 cm/week"),
'Feed Conversion': tk.StringVar(value="0"),
'System Efficiency': tk.StringVar(value="0%")
}
for i, (metric, var) in enumerate(self.metrics.items()):
row = i // 2
col = (i % 2) * 2
ttk.Label(metrics_frame, text=f"{metric}:", font=('Arial', 10)).grid(
row=row, column=col, sticky=tk.W, padx=10, pady=5
)
ttk.Label(metrics_frame, textvariable=var, font=('Arial', 10, 'bold')).grid(
row=row, column=col+1, sticky=tk.W, padx=10, pady=5
)
def build_network(self):
try:
# Parse hidden layers
hidden_layers = [int(x.strip()) for x in self.hidden_layers_var.get().split(',')]
# Create model
self.model = AquaponicsController(
input_features=13,
hidden_layers=hidden_layers,
output_controls=8,
activation=self.activation_var.get(),
dropout=self.dropout_var.get()
)
# Update status
total_params = sum(p.numel() for p in self.model.parameters())
self.status_var.set(f"Neural network built: {total_params:,} parameters")
self.system_status['Neural Network'].set("Ready")
messagebox.showinfo("Success",
f"Aquaponics controller network created\n"
f"Parameters: {total_params:,}\n"
f"Architecture: 13 → {' → '.join(map(str, hidden_layers))} → 8")
except Exception as e:
messagebox.showerror("Error", f"Failed to build network: {str(e)}")
def generate_training_data(self):
try:
n_samples = self.n_samples_var.get()
# Generate data
self.train_data, self.train_labels = self.simulator.generate_training_data(n_samples)
# Create datasets
dataset = TensorDataset(self.train_data, self.train_labels)
# Split data
val_split = self.train_params['val_split'].get()
val_size = int(n_samples * val_split)
train_size = n_samples - val_size
train_dataset, val_dataset = torch.utils.data.random_split(
dataset, [train_size, val_size]
)
# Create dataloaders
batch_size = self.train_params['batch_size'].get()
self.train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
self.val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
self.status_var.set(f"Generated {n_samples} training samples")
messagebox.showinfo("Success",
f"Training data generated:\n"
f"Total samples: {n_samples}\n"
f"Training: {train_size}\n"
f"Validation: {val_size}")
except Exception as e:
messagebox.showerror("Error", f"Failed to generate data: {str(e)}")
def start_training(self):
if self.model is None:
messagebox.showerror("Error", "Please build network first")
return
if not hasattr(self, 'train_loader'):
messagebox.showerror("Error", "Please generate training data first")
return
# Disable/enable buttons
self.train_btn.config(state=tk.DISABLED)
self.stop_btn.config(state=tk.NORMAL)
# Setup training
self.training_active = True
self.training_history = {'train_loss': [], 'val_loss': [], 'epochs': []}
# Start training thread
self.train_thread = threading.Thread(target=self._training_loop)
self.train_thread.start()
self.status_var.set("Training started...")
def _training_loop(self):
"""Training loop running in separate thread"""
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
self.model.to(device)
criterion = nn.MSELoss()
optimizer = optim.Adam(self.model.parameters(), lr=self.train_params['lr'].get())
epochs = self.train_params['epochs'].get()
for epoch in range(epochs):
if not self.training_active:
break
# Training phase
self.model.train()
train_loss = 0
for batch_idx, (data, target) in enumerate(self.train_loader):
data, target = data.to(device), target.to(device)
optimizer.zero_grad()
output = self.model(data)
loss = criterion(output, target)
loss.backward()
optimizer.step()
train_loss += loss.item()
# Update progress
progress = ((epoch * len(self.train_loader) + batch_idx + 1) /
(epochs * len(self.train_loader))) * 100
self.progress_var.set(progress)
# Validation phase
self.model.eval()
val_loss = 0
with torch.no_grad():
for data, target in self.val_loader:
data, target = data.to(device), target.to(device)
output = self.model(data)
val_loss += criterion(output, target).item()
# Record history
avg_train_loss = train_loss / len(self.train_loader)
avg_val_loss = val_loss / len(self.val_loader)
self.training_history['train_loss'].append(avg_train_loss)
self.training_history['val_loss'].append(avg_val_loss)
self.training_history['epochs'].append(epoch + 1)
# Update display
self.epoch_label.config(text=f"Epoch: {epoch + 1}/{epochs}")
self.loss_label.config(text=f"Train Loss: {avg_train_loss:.4f}, Val Loss: {avg_val_loss:.4f}")
# Update plot
self.update_training_plot()
# Training finished
self.training_active = False
self.train_btn.config(state=tk.NORMAL)
self.stop_btn.config(state=tk.DISABLED)
self.status_var.set("Training completed")
self.system_status['Neural Network'].set("Trained")
messagebox.showinfo("Training Complete", "Neural network training finished!")
def stop_training(self):
self.training_active = False
self.status_var.set("Stopping training...")
def update_training_plot(self):
self.ax.clear()
if self.training_history['epochs']:
self.ax.plot(self.training_history['epochs'],
self.training_history['train_loss'],
'b-', label='Train Loss', linewidth=2)
self.ax.plot(self.training_history['epochs'],
self.training_history['val_loss'],
'r-', label='Val Loss', linewidth=2)
self.ax.set_xlabel('Epoch')
self.ax.set_ylabel('Loss')
self.ax.set_title('Training Progress')
self.ax.legend()
self.ax.grid(True, alpha=0.3)
self.canvas.draw()
def update_sensor_display(self):
"""Update sensor readings and control outputs"""
if hasattr(self, 'sensor_vars'):
# Simulate sensor readings
current_hour = datetime.now().hour + datetime.now().minute / 60
current_day = datetime.now().timetuple().tm_yday
# Seasonal variations
seasonal_temp_var = 5 * np.sin(2 * np.pi * (current_day - 80) / 365)
seasonal_light_var = 2 * np.sin(2 * np.pi * (current_day - 80) / 365)
# Daily variations
daily_temp_var = 3 * np.sin(2 * np.pi * current_hour / 24)
# Calculate daylight hours based on season
daylight_hours = 12 + seasonal_light_var * 2
sunrise = 12 - daylight_hours / 2
sunset = 12 + daylight_hours / 2
# Generate realistic sensor values
light_value = 0
if sunrise <= current_hour <= sunset:
light_value = int(5000 * np.sin(np.pi * (current_hour - sunrise) / daylight_hours))
sensors = {
'pH': f"{np.random.normal(7.0, 0.1):.1f}",
'Water Temp': f"{np.random.normal(24 + seasonal_temp_var + daily_temp_var, 0.5):.1f}°C",
'Dissolved O2': f"{np.random.normal(6.5 - 0.5 * seasonal_temp_var / 5, 0.2):.1f} mg/L",
'Ammonia': f"{np.random.exponential(0.1):.2f} mg/L",
'Nitrite': f"{np.random.exponential(0.05):.2f} mg/L",
'Nitrate': f"{np.random.normal(20 + 10 * np.sin(2 * np.pi * current_day / 365), 5):.1f} mg/L",
'Water Level': f"{np.random.normal(50, 1):.1f} cm",
'Light': f"{max(0, light_value)} lux",
'Humidity': f"{np.random.normal(70 - seasonal_temp_var, 2):.0f}%",
'Air Temp': f"{np.random.normal(23 + seasonal_temp_var + daily_temp_var, 1):.1f}°C",
'EC': f"{np.random.normal(1.5 + 0.2 * np.sin(2 * np.pi * current_day / 365), 0.1):.2f} mS/cm",
'Day of Year': f"{current_day}"
}
# Update sensor displays
for name, value in sensors.items():
if name in self.sensor_vars:
self.sensor_vars[name].set(value)
# If model is loaded, get control decisions
if self.model is not None and self.mode_var.get() == "neural":
# Prepare input tensor
sensor_values = [
float(sensors['pH']),
float(sensors['Water Temp'].replace('°C', '')),
float(sensors['Dissolved O2'].replace(' mg/L', '')),
float(sensors['Ammonia'].replace(' mg/L', '')),
float(sensors['Nitrite'].replace(' mg/L', '')),
float(sensors['Nitrate'].replace(' mg/L', '')),
float(sensors['Water Level'].replace(' cm', '')),
float(sensors['Light'].replace(' lux', '')),
float(sensors['Humidity'].replace('%', '')),
float(sensors['Air Temp'].replace('°C', '')),
float(sensors['EC'].replace(' mS/cm', '')),
current_hour,
current_day
]
input_tensor = torch.FloatTensor([sensor_values])
# Get control decisions
self.model.eval()
with torch.no_grad():
controls = self.model.get_control_decisions(input_tensor)
# Update control displays
for name, value in controls.items():
if name in self.control_vars:
self.control_vars[name].set(f"{value*100:.0f}%")
# Update indicator color
canvas, indicator = self.control_indicators[name]
if value > 0.8:
color = 'red'
elif value > 0.5:
color = 'yellow'
elif value > 0.1:
color = 'green'
else:
color = 'gray'
canvas.itemconfig(indicator, fill=color)
# Store history
self.sensor_history.append({
'timestamp': datetime.now(),
'sensors': sensors,
'controls': controls if self.model else {}
})
# Limit history size
if len(self.sensor_history) > 10000:
self.sensor_history = self.sensor_history[-5000:]
# Schedule next update
self.root.after(1000, self.update_sensor_display)
def update_training_status(self):
# Placeholder for training updates
self.root.after(100, self.update_training_status)
def start_monitoring(self):
if self.model is None:
messagebox.showwarning("Warning", "No neural network loaded. Using rule-based control.")
self.system_status['Fish Tank'].set("Active")
self.system_status['Grow Bed'].set("Active")
self.status_var.set("System monitoring active")
def stop_monitoring(self):
self.system_status['Fish Tank'].set("Standby")
self.system_status['Grow Bed'].set("Standby")
self.status_var.set("System monitoring stopped")
def emergency_stop(self):
"""Emergency stop all systems"""
response = messagebox.askyesno("Emergency Stop",
"This will stop all pumps and controls!\n"
"Are you sure?")
if response:
# Set all controls to 0
for var in self.control_vars.values():
var.set("0%")
for name, (canvas, indicator) in self.control_indicators.items():
canvas.itemconfig(indicator, fill='red')
self.system_status['Fish Tank'].set("EMERGENCY STOP")
self.system_status['Grow Bed'].set("EMERGENCY STOP")
self.status_var.set("EMERGENCY STOP ACTIVATED")
messagebox.showwarning("Emergency Stop",
"All systems stopped!\n"
"Manual intervention required to restart.")
def refresh_analysis(self):
"""Refresh analysis plots"""
if not self.sensor_history:
messagebox.showinfo("No Data", "No sensor data available for analysis")
return
# Clear figure
self.trends_fig.clear()
# Create subplots
axes = self.trends_fig.subplots(2, 2)
axes = axes.flatten()
# Get time range
time_range = self.time_range_var.get()
hours = {'1h': 1, '6h': 6, '24h': 24, '7d': 168, '30d': 720}[time_range]
# Filter data by time
cutoff_time = datetime.now() - timedelta(hours=hours)
filtered_data = [d for d in self.sensor_history if d['timestamp'] > cutoff_time]
if not filtered_data:
messagebox.showinfo("No Data", f"No data available for the last {time_range}")
return
# Extract data for plotting
timestamps = [d['timestamp'] for d in filtered_data]
# Plot key parameters
plots = [
('pH', 'pH Level'),
('Water Temp', 'Water Temperature (°C)'),
('Dissolved O2', 'Dissolved Oxygen (mg/L)'),
('Nitrate', 'Nitrate Level (mg/L)')
]
for ax, (key, title) in zip(axes, plots):
values = []
for d in filtered_data:
try:
val = d['sensors'][key]
# Extract numeric value
val = float(val.split()[0].replace('°C', ''))
values.append(val)
except:
values.append(0)
ax.plot(timestamps, values, 'b-', linewidth=1.5)
ax.set_title(title)
ax.grid(True, alpha=0.3)
ax.xaxis.set_major_formatter(mdates.DateFormatter('%H:%M'))
self.trends_fig.tight_layout()
self.trends_canvas.draw()
# Update metrics
self.update_metrics()
def update_metrics(self):
"""Update system performance metrics"""
if self.sensor_history:
# Calculate uptime
uptime = len(self.sensor_history) / 3600 # Assuming 1 reading per second
self.metrics['System Uptime'].set(f"{uptime:.1f} hours")
# Simulate other metrics
self.metrics['Water Usage'].set(f"{np.random.uniform(50, 100):.1f} L")
self.metrics['Energy Usage'].set(f"{np.random.uniform(2, 5):.2f} kWh")
self.metrics['Fish Growth Rate'].set(f"{np.random.uniform(5, 15):.1f} g/day")
self.metrics['Plant Growth Rate'].set(f"{np.random.uniform(1, 3):.1f} cm/week")
self.metrics['Feed Conversion'].set(f"{np.random.uniform(1.2, 1.8):.2f}")
...
This file has been truncated, please download it to see its full contents.
Comments
Please log in or sign up to comment.