David Winn
Published © GPL3+

Solar Powered AI Managed Aquaponics Farm Bot

Intent: Use Rasberry Pi, Arduino Nanos, off the shelf parts to make solar powered aquaponics system managed by Artificial Intelligence.

ExpertWork in progressOver 416 days7,270
Solar Powered AI Managed Aquaponics Farm Bot

Things used in this project

Hardware components

Raspberry Pi 3 Model B+
Raspberry Pi 3 Model B+
The rasberry pi will simultaneously be used to control the Arduinos, and serve as the main AI platform. Pi 5s are available now.
×4
Arduino Nano R3
Arduino Nano R3
Each Arduino is capable of many sensor measurements for parts of the aquaponics system.
×20
USB-A to Mini-USB Cable
USB-A to Mini-USB Cable
×1
Soldering Iron
×1
Hand Tools
×1
PVC Pipe
×5
IBC Tote
×2
Raspberry Pi Pico W
Raspberry Pi Pico W
might not need 24 of these little beasts, I just figured a large number of them, is all... :)
×24

Software apps and online services

Raspbian
Raspberry Pi Raspbian
Arduino IDE
Arduino IDE

Story

Read more

Schematics

Some Aquaponic Components

Here are two grow beds, sump and a fish tank

Greenhouse Footprint

A geometric consideration of the greenhouse footprint. The greenhouse would be a hybrid geodesic dome / shed. The semicircle would be translucent or transparent for the plants. The back of the structure, on the north side would have some kind of conventional roof.

diagram showing water flow within the system

Each section of the system can be treated as a subsystem that can be measured and modified in various ways. Temperature, and chemistry for example. In the garden side, would be Temperature, and humidity as another example.

Code

Code generated by Claude AI

Python
Python
import numpy as np
import pandas as pd
from typing import Dict, List, Tuple, Optional
import pickle
import logging
import time
from datetime import datetime
import RPi.GPIO as GPIO
import cv2
from picamera2 import Picamera2
import torch
import torchvision.transforms as transforms
from PIL import Image
import os
import threading
from queue import Queue

# [Previous imports and sensor code remain the same]

class AquaponicsCamera:
    def __init__(self):
        self.logger = logging.getLogger(__name__)
        
        # Initialize cameras
        self.fish_camera = Picamera2(0)  # First camera for fish monitoring
        self.plant_camera = Picamera2(1)  # Second camera for plant monitoring
        
        # Configure cameras
        self.configure_cameras()
        
        # Initialize ML models
        self.fish_health_model = self.load_model('models/fish_health_model.pth')
        self.plant_health_model = self.load_model('models/plant_health_model.pth')
        
        # Image processing transforms
        self.transform = transforms.Compose([
            transforms.Resize((224, 224)),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406],
                               std=[0.229, 0.224, 0.225])
        ])
        
        # Initialize image queues for processing
        self.fish_image_queue = Queue(maxsize=10)
        self.plant_image_queue = Queue(maxsize=10)
        
        # Create storage directories
        os.makedirs('images/fish', exist_ok=True)
        os.makedirs('images/plants', exist_ok=True)
        
        # Start processing threads
        self.start_processing_threads()
        
    def configure_cameras(self):
        """Configure both cameras for optimal aquaponics monitoring"""
        # Fish camera configuration (optimized for underwater viewing)
        fish_config = self.fish_camera.create_still_configuration()
        fish_config["main"]["size"] = (1920, 1080)
        fish_config["transform"] = libcamera.Transform(hflip=1, vflip=1)
        self.fish_camera.configure(fish_config)
        
        # Plant camera configuration (optimized for plant monitoring)
        plant_config = self.plant_camera.create_still_configuration()
        plant_config["main"]["size"] = (1920, 1080)
        self.plant_camera.configure(plant_config)
        
    def start_processing_threads(self):
        """Start separate threads for processing fish and plant images"""
        self.processing_active = True
        
        self.fish_thread = threading.Thread(
            target=self.process_fish_images, 
            daemon=True
        )
        self.plant_thread = threading.Thread(
            target=self.process_plant_images,
            daemon=True
        )
        
        self.fish_thread.start()
        self.plant_thread.start()
        
    def capture_images(self) -> Tuple[np.ndarray, np.ndarray]:
        """Capture images from both cameras"""
        try:
            fish_image = self.fish_camera.capture_array()
            plant_image = self.plant_camera.capture_array()
            
            timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
            
            # Save images
            cv2.imwrite(f'images/fish/fish_{timestamp}.jpg', fish_image)
            cv2.imwrite(f'images/plants/plant_{timestamp}.jpg', plant_image)
            
            # Add to processing queues
            self.fish_image_queue.put((fish_image, timestamp))
            self.plant_image_queue.put((plant_image, timestamp))
            
            return fish_image, plant_image
            
        except Exception as e:
            self.logger.error(f"Image capture error: {str(e)}")
            return None, None
            
    def load_model(self, model_path: str) -> Optional[torch.nn.Module]:
        """Load a PyTorch model for health detection"""
        try:
            model = torch.load(model_path)
            model.eval()
            return model
        except Exception as e:
            self.logger.error(f"Error loading model {model_path}: {str(e)}")
            return None
            
    def process_fish_images(self):
        """Process fish images for health monitoring"""
        while self.processing_active:
            try:
                if not self.fish_image_queue.empty():
                    image, timestamp = self.fish_image_queue.get()
                    
                    # Convert to PIL Image
                    pil_image = Image.fromarray(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))
                    
                    # Prepare image for model
                    input_tensor = self.transform(pil_image).unsqueeze(0)
                    
                    # Get predictions
                    with torch.no_grad():
                        predictions = self.fish_health_model(input_tensor)
                    
                    # Analyze fish health indicators
                    health_metrics = self.analyze_fish_health(predictions, image)
                    
                    self.logger.info(f"Fish health metrics: {health_metrics}")
                    
                time.sleep(1)  # Prevent busy waiting
                
            except Exception as e:
                self.logger.error(f"Fish image processing error: {str(e)}")
                
    def process_plant_images(self):
        """Process plant images for health monitoring"""
        while self.processing_active:
            try:
                if not self.plant_image_queue.empty():
                    image, timestamp = self.plant_image_queue.get()
                    
                    # Convert to PIL Image
                    pil_image = Image.fromarray(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))
                    
                    # Prepare image for model
                    input_tensor = self.transform(pil_image).unsqueeze(0)
                    
                    # Get predictions
                    with torch.no_grad():
                        predictions = self.plant_health_model(input_tensor)
                    
                    # Analyze plant health indicators
                    health_metrics = self.analyze_plant_health(predictions, image)
                    
                    self.logger.info(f"Plant health metrics: {health_metrics}")
                    
                time.sleep(1)  # Prevent busy waiting
                
            except Exception as e:
                self.logger.error(f"Plant image processing error: {str(e)}")
                
    def analyze_fish_health(self, predictions: torch.Tensor, image: np.ndarray) -> Dict[str, float]:
        """Analyze fish health based on ML predictions and image analysis"""
        # Convert predictions to health metrics
        health_metrics = {
            'activity_level': float(predictions[0]),
            'coloration': float(predictions[1]),
            'swimming_pattern': float(predictions[2]),
            'fish_count': self.count_fish(image),
            'unusual_behavior': float(predictions[3]),
            'overall_health': float(predictions[4])
        }
        
        return health_metrics
        
    def analyze_plant_health(self, predictions: torch.Tensor, image: np.ndarray) -> Dict[str, float]:
        """Analyze plant health based on ML predictions and image analysis"""
        # Convert predictions to health metrics
        health_metrics = {
            'leaf_color': float(predictions[0]),
            'growth_rate': float(predictions[1]),
            'pest_presence': float(predictions[2]),
            'leaf_damage': float(predictions[3]),
            'nutrient_deficiency': float(predictions[4]),
            'overall_health': float(predictions[5])
        }
        
        return health_metrics
        
    def count_fish(self, image: np.ndarray) -> int:
        """Count number of fish in image using computer vision"""
        try:
            # Convert to grayscale
            gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
            
            # Apply background subtraction
            blurred = cv2.GaussianBlur(gray, (5, 5), 0)
            thresh = cv2.threshold(blurred, 60, 255, cv2.THRESH_BINARY)[1]
            
            # Find contours
            contours, _ = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
            
            # Filter contours by size to count only fish-sized objects
            fish_contours = [c for c in contours if 100 < cv2.contourArea(c) < 1000]
            
            return len(fish_contours)
            
        except Exception as e:
            self.logger.error(f"Fish counting error: {str(e)}")
            return 0
            
    def detect_plant_disease(self, image: np.ndarray) -> Dict[str, float]:
        """Detect plant diseases using computer vision"""
        try:
            # Convert to HSV color space for better disease detection
            hsv = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)
            
            # Define disease indicators (yellow/brown spots, wilting, etc.)
            disease_indicators = {
                'leaf_spots': self.detect_leaf_spots(hsv),
                'wilting': self.detect_wilting(hsv),
                'discoloration': self.detect_discoloration(hsv)
            }
            
            return disease_indicators
            
        except Exception as e:
            self.logger.error(f"Disease detection error: {str(e)}")
            return {}
            
    def cleanup(self):
        """Clean up camera resources"""
        self.processing_active = False
        self.fish_camera.close()
        self.plant_camera.close()

class AquaponicsController:
    def __init__(self, model_path: str = None):
        self.sensor = AquaponicsSensor()
        self.camera = AquaponicsCamera()
        self.model = self.load_model(model_path) if model_path else None
        self.history = []
        self.logger = logging.getLogger(__name__)
        
        # Enhanced optimal ranges including visual metrics
        self.optimal_ranges = {
            # [Previous sensor ranges remain the same]
            'fish_activity_level': (0.7, 1.0),
            'fish_coloration': (0.8, 1.0),
            'plant_health_index': (0.8, 1.0),
            'leaf_color_index': (0.7, 1.0)
        }
    
    def run_cycle(self, interval: int = 300):
        """Run main control cycle with visual monitoring"""
        try:
            while True:
                # Read sensor data
                sensor_data = self.sensor.read_sensors()
                
                # Capture and analyze images
                fish_image, plant_image = self.camera.capture_images()
                
                if sensor_data and fish_image is not None and plant_image is not None:
                    # Combine sensor and visual data
                    combined_data = {
                        **sensor_data,
                        **self.camera.analyze_fish_health(self.model(fish_image), fish_image),
                        **self.camera.analyze_plant_health(self.model(plant_image), plant_image)
                    }
                    
                    # Store in history
                    self.history.append(combined_data)
                    
                    # Get control adjustments
                    adjustments = self.predict_adjustments(combined_data)
                    
                    # Apply controls
                    self.apply_controls(adjustments)
                    
                    # Log state
                    self.logger.info(f"System state: {combined_data}")
                    
                time.sleep(interval)
                
        except KeyboardInterrupt:
            self.cleanup()
            
    def cleanup(self):
        """Cleanup all resources"""
        self.sensor.cleanup()
        self.camera.cleanup()
        GPIO.cleanup()
        self.save_history()
        self.logger.info("System shutdown completed")

Code by Claude AI

Python
import numpy as np
import pandas as pd
from typing import Dict, List, Tuple
import pickle
import logging
import time
from datetime import datetime
import RPi.GPIO as GPIO
from w1thermsensor import W1ThermSensor
import board
import busio
import adafruit_ads1x15.ads1115 as ADS
from adafruit_ads1x15.analog_in import AnalogIn
import adafruit_ds3231
import adafruit_dht
import adafruit_mcp3xxx.mcp3008 as MCP
from adafruit_mcp3xxx.analog_in import AnalogIn as MCP_AnalogIn
import adafruit_bmp280

class AquaponicsSensor:
    def __init__(self):
        # Initialize I2C bus
        self.i2c = busio.I2C(board.SCL, board.SDA)
        
        # Initialize SPI bus for MCP3008
        self.spi = busio.SPI(board.SCK, MOSI=board.MOSI, MISO=board.MISO)
        self.cs = digitalio.DigitalInOut(board.D5)
        
        # Initialize ADCs
        self.ads1 = ADS.ADS1115(self.i2c)  # First ADS1115 for original sensors
        self.ads2 = ADS.ADS1115(self.i2c, address=0x49)  # Second ADS1115 for additional sensors
        self.mcp = MCP.MCP3008(self.spi, self.cs)  # MCP3008 for more analog sensors
        
        # Initialize RTC
        self.rtc = adafruit_ds3231.DS3231(self.i2c)
        
        # Initialize DHT22 for air temp/humidity
        self.dht = adafruit_dht.DHT22(board.D4)
        
        # Initialize BMP280 for barometric pressure
        self.bmp280 = adafruit_bmp280.Adafruit_BMP280_I2C(self.i2c)
        
        # Setup GPIO
        GPIO.setmode(GPIO.BCM)
        self.setup_pins()
        
        # Initialize Original Sensors
        self.water_temp_sensor = W1ThermSensor()
        self.ph_channel = AnalogIn(self.ads1, ADS.P0)
        self.ec_channel = AnalogIn(self.ads1, ADS.P1)
        self.water_level_channel = AnalogIn(self.ads1, ADS.P2)
        self.light_sensor_channel = AnalogIn(self.ads1, ADS.P3)
        
        # Initialize Additional Sensors
        self.do_channel = AnalogIn(self.ads2, ADS.P0)        # Dissolved Oxygen
        self.ammonia_channel = AnalogIn(self.ads2, ADS.P1)   # Ammonia
        self.nitrate_channel = AnalogIn(self.ads2, ADS.P2)   # Nitrate
        self.nitrite_channel = AnalogIn(self.ads2, ADS.P3)   # Nitrite
        
        # Initialize MCP3008 channels
        self.flow_rate_channel = MCP_AnalogIn(self.mcp, MCP.P0)  # Flow Rate
        self.turbidity_channel = MCP_AnalogIn(self.mcp, MCP.P1)  # Turbidity
        self.co2_channel = MCP_AnalogIn(self.mcp, MCP.P2)        # CO2
        
        # Sensor calibration values (should be loaded from configuration)
        self.calibration = {
            'do_slope': 1.0,
            'do_offset': 0.0,
            'ph_slope': 1.0,
            'ph_offset': 0.0,
            'ec_slope': 1.0,
            'ec_offset': 0.0,
            'nh3_slope': 1.0,
            'nh3_offset': 0.0,
            'no3_slope': 1.0,
            'no3_offset': 0.0,
            'no2_slope': 1.0,
            'no2_offset': 0.0
        }
        
        # Setup logging
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s',
            filename='aquaponics.log'
        )
        self.logger = logging.getLogger(__name__)
    
    def read_sensors(self) -> Dict[str, float]:
        """Read all sensor data and return as dictionary"""
        try:
            # Get timestamp from RTC
            current_time = self.rtc.datetime
            
            # Read DHT22 sensor
            try:
                air_temp = self.dht.temperature
                humidity = self.dht.humidity
            except RuntimeError:
                air_temp = humidity = None
                self.logger.warning("DHT22 reading failed, retrying next cycle")
            
            data = {
                # Time data
                'timestamp': datetime.now().timestamp(),
                'rtc_time': datetime(
                    current_time.tm_year, current_time.tm_mon, 
                    current_time.tm_mday, current_time.tm_hour,
                    current_time.tm_min, current_time.tm_sec
                ).timestamp(),
                
                # Original sensors
                'water_temperature': self.water_temp_sensor.get_temperature(),
                'ph': self.convert_to_ph(self.ph_channel.value),
                'ec': self.convert_to_ec(self.ec_channel.value),
                'water_level': self.water_level_channel.value,
                'light_level': self.light_sensor_channel.value,
                
                # Additional water quality sensors
                'dissolved_oxygen': self.convert_to_do(self.do_channel.value),
                'ammonia': self.convert_to_ammonia(self.ammonia_channel.value),
                'nitrate': self.convert_to_nitrate(self.nitrate_channel.value),
                'nitrite': self.convert_to_nitrite(self.nitrite_channel.value),
                
                # Environmental sensors
                'air_temperature': air_temp,
                'humidity': humidity,
                'co2': self.convert_to_co2(self.co2_channel.value),
                'pressure': self.bmp280.pressure,
                
                # System performance sensors
                'flow_rate': self.convert_to_flow_rate(self.flow_rate_channel.value),
                'turbidity': self.convert_to_turbidity(self.turbidity_channel.value)
            }
            
            # Log any null values
            null_sensors = [k for k, v in data.items() if v is None]
            if null_sensors:
                self.logger.warning(f"Null readings from sensors: {null_sensors}")
            
            return data
            
        except Exception as e:
            self.logger.error(f"Sensor reading error: {str(e)}")
            return None
    
    # Conversion methods with temperature compensation where applicable
    def convert_to_do(self, raw_value: int) -> float:
        """Convert raw ADC value to dissolved oxygen in mg/L"""
        temp_compensation = 1 + 0.02 * (self.water_temp_sensor.get_temperature() - 25)
        return (raw_value * self.calibration['do_slope'] + self.calibration['do_offset']) / temp_compensation
    
    def convert_to_ph(self, raw_value: int) -> float:
        """Convert raw ADC value to pH"""
        return raw_value * self.calibration['ph_slope'] + self.calibration['ph_offset']
    
    def convert_to_ec(self, raw_value: int) -> float:
        """Convert raw ADC value to EC (mS/cm) with temperature compensation"""
        temp_compensation = 1 + 0.019 * (self.water_temp_sensor.get_temperature() - 25)
        return (raw_value * self.calibration['ec_slope'] + self.calibration['ec_offset']) * temp_compensation
    
    def convert_to_ammonia(self, raw_value: int) -> float:
        """Convert raw ADC value to ammonia concentration (mg/L)"""
        return raw_value * self.calibration['nh3_slope'] + self.calibration['nh3_offset']
    
    def convert_to_nitrate(self, raw_value: int) -> float:
        """Convert raw ADC value to nitrate concentration (mg/L)"""
        return raw_value * self.calibration['no3_slope'] + self.calibration['no3_offset']
    
    def convert_to_nitrite(self, raw_value: int) -> float:
        """Convert raw ADC value to nitrite concentration (mg/L)"""
        return raw_value * self.calibration['no2_slope'] + self.calibration['no2_offset']
    
    def convert_to_co2(self, raw_value: int) -> float:
        """Convert raw ADC value to CO2 concentration (ppm)"""
        return raw_value * 2000.0 / 65535.0  # Assuming 2000 ppm full scale
    
    def convert_to_flow_rate(self, raw_value: int) -> float:
        """Convert raw ADC value to flow rate (L/min)"""
        frequency = raw_value * 1000.0 / 65535.0  # Convert to frequency
        return frequency * 0.1  # Typical conversion factor for flow sensors
    
    def convert_to_turbidity(self, raw_value: int) -> float:
        """Convert raw ADC value to turbidity (NTU)"""
        voltage = raw_value * 5.0 / 65535.0
        return -1120.4 * voltage * voltage + 5742.3 * voltage - 4352.9  # Typical conversion curve

    def update_rtc(self):
        """Update RTC if drift detected"""
        try:
            ntp_time = time.time()
            rtc_time = self.rtc.datetime.timestamp()
            
            # If drift is more than 1 second, update RTC
            if abs(ntp_time - rtc_time) > 1:
                self.rtc.datetime = time.localtime(ntp_time)
                self.logger.info("RTC updated due to time drift")
        except Exception as e:
            self.logger.error(f"RTC update error: {str(e)}")

    def calibrate_sensors(self):
        """Calibrate sensors using known reference solutions"""
        # Implementation would include calibration procedures for each sensor
        pass

Credits

David Winn
2 projects • 15 followers
I'm an old school hacker from the late eighties... I am coming back to my home. Interests include music, agricultural automation, and AI.
Contact
Thanks to FarmBot and GPT 4o.

Comments

Please log in or sign up to comment.