Chad
Published

Solar Powered Well Water Level Monitor

After some dry summers our well level has dropped significantly, so this allows us to monitor the well water level & send alerts.

BeginnerWork in progress13 hours2,989
Solar Powered Well Water Level Monitor

Things used in this project

Hardware components

Espressif ESP32-WROOM-32U
×1
0.5-4.5V DC Contact Level Transmitter 10M Depth Range
×1
DFRobot Solar Power Manager 5V V1.1
×1
DC 0-36V INA226 Current Voltage Monitoring Sensor Module IIC I2C Interface Current Shunt Power Monitor Board
×2
9g Digital Servos Metal Gear Servo Waterproof
Optional if you don't want to track he sunlight
×2
Rotory Rotatable Digital JX Servo Platfm 2DOF servo robot arm
Optional if you don't want to track he sunlight
×1
ESP32 Expansion Board Compatible with ESP32 WiFi Bluetooth Development Board
×1
3.7V/4.2V 18650 Li-ion battery 12000mAh with PCB protection XH 2.54 2P Plug
×1
KCD1 224N 23mm Round 4 Pin 250V 6A Boat Switch Snap-in SPST ON OFF Rocker Position Switch KCD1-2 KCD1-224
×1
4W 5V Solar Cells Charger Micro USB+Type-C 2in1 Charging Portable Solar Panels for Security Camera Home Light System
×1
2.4G Antenna
×1
Keyestudio Super-bright Emitting Color 5MM Green LED
×1
200X120X75mm Junction Box Waterproof Dustproof IP65 ABS Plastic Universal Electric Project Enclosure Black with Fixed Ear
×1

Software apps and online services

Micropython
Influxdb
Grafana
Espressif

Hand tools and fabrication machines

Soldering iron (generic)
Soldering iron (generic)
Solder Wire, Lead Free
Solder Wire, Lead Free

Story

Read more

Schematics

Wiring Diagram Tracking Solar Panel Option

Wiring Diagram

Wiring Diagram Fixed Solar Panel Option

Code

Main Code for tracking solar panel option

MicroPython
Must have supporting files (ina226, bh1750fvi, servo)
import network
import urequests
import utime
import time
from time import sleep
import machine
import esp32
from machine import Pin, ADC, lightsleep, deepsleep, Timer, PWM, SoftI2C
import ina226
from bh1750fvi import BH1750
from servo import Servo

# Configure GPIO pins
gpio_pins = [4, 14, 16, 17, 18, 19, 23, 25, 26, 27, 32, 34, 35, 36, 37, 38, 39]

# Set unused GPIO pins as inputs with pull-down resistors
for pin in gpio_pins:
    try:
        machine.Pin(pin, machine.Pin.IN, machine.Pin.PULL_DOWN)
    except ValueError:
        print("Pin", pin, "cannot be configured as GPIO.")

# Disable Bluetooth
machine.Pin(0, machine.Pin.OUT).value(0)

# Initialize I2C
i2c = machine.SoftI2C(scl=machine.Pin(22), sda=machine.Pin(21))

# Initialize INA226 devices
ina1 = None
ina2 = None
ina3 = None
I2C1_connected = False
I2C2_connected = False
I2C3_connected = False

def move_servos(x_angle, y_angle):
    servo_x.move(x_angle)
    servo_y.move(y_angle)

# Set servo angle limits
x_min, x_max = 0, 80
y_min, y_max = 0, 180

# Define center points for servos
x_center = (x_min + x_max) // 2
y_center = (y_min + y_max) // 2

# Define variables for best position
best_x = 0
best_y = 0

# Initialize PWM for servo motor control
servo_x = Servo(pin=13)
servo_y = Servo(pin=12)
move_servos(best_x, best_y)

# Initialize LED pin
led = Pin(2, Pin.OUT)

# Define the timer for blinking LED
timer1 = Timer(0)

def blink(timer):
    led.value(not led.value())  # Toggle LED state

# Blink LED
led.value(1)
utime.sleep(1)
led.value(0)
utime.sleep(1)

# Define the ADC pins
adc_pins = [33]

# Create an ADC object for each pin
adc1 = machine.ADC(machine.Pin(33, machine.Pin.IN))
adc1.atten(ADC.ATTN_11DB)

# InfluxDB Cloud settings
INFLUXDB_URL = 'https://eastus-1.azure.cloud2.influxdata.com/api/v2/write?org="<Your Organization>"&bucket="<Your Bucket>"&precision=s'
INFLUXDB_TOKEN = 'Your Token'

# Define the names and passwords of your WiFi networks
WIFI_NETWORKS = [
    ('SSID1', 'PW1'),
    ('SSID2', 'PW2'),
    ('SSID3', 'PW3')
]

def connect_to_wifi():
    global ssid, wlan
    wlan = network.WLAN(network.STA_IF)
    wlan.active(True)

    for ssid, password in WIFI_NETWORKS:
        try:
            wlan.disconnect()
            wlan.connect(ssid, password)
            start_time = utime.time()
            while not wlan.isconnected():
                if (utime.time() - start_time) > 10:  # Wait for 10 seconds
                    print("Timeout exceeded while connecting to {}".format(ssid))
                    break
                utime.sleep(0.1)
            else:
                print("Connected to WiFi network:", ssid)
                return True, ssid
        except Exception as e:
            print("Failed to connect to WiFi network:", ssid, e)
            continue
    return False, None

def connect_devices():
    global I2C1_connected, I2C2_connected, I2C3_connected, ina1, ina2, bh
    
    try:
        ina1 = ina226.INA226(i2c, 0x40)
#        ina1.set_calibration_custom(calValue=5120, config=0x4427)
        I2C1_connected = True
    except Exception as e:
        print("Failed to connect to I2C Battery Device 1")
        I2C1_connected = False
        
    try:
        ina2 = ina226.INA226(i2c, 0x44)
#        ina2.set_calibration_custom(calValue=5120, config=0x4427)
        I2C2_connected = True
    except Exception as e:
        print("Failed to connect to I2C Solar Device 2")
        I2C2_connected = False
        
    try:
        bh = BH1750(i2c)
        I2C3_connected = True
    except Exception as e:
        print("Failed to connect to I2C Light Device 3")
        I2C3_connected = False


# Constants
lux_sp = 20 # ammount of sunlight to enable best sun position function
scan_time = 30 # minutes for find best sun position timer
light_sleep_time = 1 # minutes for lightsleep time
watchdog_timeout = 5 # minutes for timeout watchdog timer

# Define timer interval for scanning the best sun position
BEST_POSITION_INTERVAL = (scan_time * 60) * 1000 # 60 minutes

# Variables
lux = 0 # global variable
start_time = utime.time()

def get_remaining_time():
    global start_time
    current_time = utime.time()
    elapsed_time = current_time - start_time
    remaining_time = ((BEST_POSITION_INTERVAL/1000) - elapsed_time) / 60
    return remaining_time

def find_best_position():
    global best_x, best_y, start_time
    max_power = 0
    max_lux = 0
    max_score_y = 0
    max_score_x = 0
    
    # Check if I2C3 is connected and get lux value
    if I2C3_connected:
        lux = bh.luminance() * 0.0079  # convert lux to W/m2
    else:
        print("lux sensor not ready")
        return  # I2C3 not connected, exit the function
        
    # Check if the lux value is greater than lux setpoint
    if lux <= lux_sp:
        print("Not enough sunlight holding position")
        # Reset timer
        start_time = time.time()
        return
        
    print("Scanning for best sun position...")
    
    # Reset max_score at the beginning
    max_score_y = 0
    max_score_x = 0
        
    # Stage 1: Scan for best vertical position
    for y in range(y_min, y_max+1, 10):
        print("Scanning vertically at position y={}".format(y))
        move_servos(x_center, y)
        time.sleep(1)  # Add delay to allow INA226 time to settle
        
        power_samples = []
        lux_samples = []
        
        for _ in range(3):
            power_samples.append(ina2.bus_voltage * ina2.current)
            lux_samples.append(bh.luminance() * 0.0079)
            time.sleep(0.5)  # Delay for 0.5 seconds
            
        power = sum(power_samples) / len(power_samples)
        lux = sum(lux_samples) / len(lux_samples)
        
        score_y = power + lux
        
        print("Power & Light at vertical position y={}: {:.2f} W, Lux: {:.2f}".format(y, power, lux))
        
        if score_y > max_score_y:
            max_power_y = power
            max_lux_y = lux
            max_score_y = score_y
            best_y = y
    
    # Stage 2: Scan horizontally around best vertical position
    for x in range(x_min, x_max+1, 10):
        print("Scanning horizontally around best vertical position y={} at position x={}".format(best_y, x))
        move_servos(x, best_y)
        time.sleep(1)  # Add delay to allow INA226 time to settle
        
        power_samples = []
        lux_samples = []
        
        for _ in range(3):
            power_samples.append(ina2.bus_voltage * ina2.current)
            lux_samples.append(bh.luminance() * 0.0079)
            time.sleep(0.5)  # Delay for 0.5 seconds
            
        power = sum(power_samples) / len(power_samples)
        lux = sum(lux_samples) / len(lux_samples)
        
        score_x = power + lux
        
        print("Power & Light at position x={}, y={}: {:.2f} W, Lux: {:.2f}".format(x, best_y, power, lux))
        
        if score_x > max_score_x:
            max_power_x = power
            max_lux_x = lux
            max_score_x = score_x
            best_x = x
    
    print("Best sun position found: x={}, y={}".format(best_x, best_y))
    print("Best solar power: {:.2f} W".format(max_power))
    print("Lux at best position: {:.2f}".format(max_lux))
    
    move_servos(best_x, best_y)
    # Reset timer
    start_time = time.time()

def send_sensor_data():
    global lux
    # Define variables for pressure, current, and voltage
    pressure_samples = []
    battery_current_samples = []
    battery_voltage_samples = []
    battery_power_samples = []
    solar_voltage_samples = []
    solar_current_samples = []
    solar_power_samples = []
    temperature_samples = []

    try:
        for _ in range(3):
            adc1_value = adc1.read()
            voltage1 = adc1_value * (3.9 / 4095)
            pressure = ((voltage1 - 0.5)) * (2.4/7) * 100.0
            pressure_samples.append(pressure)
            time.sleep(0.5)  # Delay for 0.5 seconds
        pressure = sum(pressure_samples) / len(pressure_samples)
        print("Pressure: " + str(pressure) + " %")
    except Exception as e:
        print("Failed to collect pressure data from Device")
        pressure = 123

    try:
        if I2C1_connected:
            for _ in range(3):
                battery_current_samples.append(ina1.current)
                battery_voltage_samples.append(ina1.bus_voltage)
                battery_power_samples.append(ina1.power)
                time.sleep(0.5)  # Delay for 0.5 seconds
            battery_current = sum(battery_current_samples) / len(battery_current_samples)
            battery_voltage = sum(battery_voltage_samples) / len(battery_voltage_samples)
            battery_power = sum(battery_power_samples) / len(battery_power_samples)
            print("Battery Current: " + str(battery_current) + " mA")
            print("Battery Voltage:" + str(battery_voltage) + " VDC")
            print("Battery Power: " + str(battery_power) + " W")
        else:
            raise Exception("Failed to connect to INA1")
    except Exception as e:
        print("Failed to collect battery data from Device")
        battery_current = 123
        battery_voltage = 123
        battery_power = 123

    try:
        if I2C2_connected:
            for _ in range(3):
                solar_current_samples.append(ina2.current)
                solar_voltage_samples.append(ina2.bus_voltage)
                solar_power_samples.append(ina2.power)
                time.sleep(0.5)  # Delay for 0.5 seconds
            solar_current = sum(solar_current_samples) / len(solar_current_samples)
            solar_voltage = sum(solar_voltage_samples) / len(solar_voltage_samples)
            solar_power = sum(solar_power_samples) / len(solar_power_samples)
            print("Solar Current: " + str(solar_current) + " mA")
            print("Solar Voltage: " + str(solar_voltage) + " VDC")
            print("Solar Power: " + str(solar_power) + " W")
        else:
            raise Exception("Failed to connect to INA2")
    except Exception as e:
        print("Failed to collect solar data from Device")
        solar_current = 123
        solar_voltage = 123
        solar_power = 123
        
    try:
        if I2C3_connected:
            for _ in range(3):
                lux_samples.append(bh.luminance() * 0.0079)
                time.sleep(0.5)  # Delay for 0.5 seconds
            lux = sum(lux_samples) / len(lux_samples)
            print("Light intensity: {:.2f} w/m2".format(lux))
        else:
            raise Exception("Failed to connect to INA3")
    except Exception as e:
        print("Failed to collect Light data from Device")
        lux = 123
        
    try:
        temperature_samples = [esp32.raw_temperature() for _ in range(3)]
        temperature = (sum(temperature_samples) / len(temperature_samples) - 32) * 5 / 9
        print("CPU Temperature: " + str(temperature) + " C")
    except Exception as e:
        print("Failed to collect temperature data from Device")
        temperature = 123
    
    # Create list of sensor data
    sensor_data = [
        {'name': 'PRESSURE_TEST', 'tags': {'location': 'Test'}, 'fields': {'value': str(pressure)}},
        {'name': 'BATT_CURRENT_TEST', 'tags': {'location': 'Test'}, 'fields': {'value': str(battery_current)}},
        {'name': 'SOLAR_CURRENT_TEST', 'tags': {'location': 'Test'}, 'fields': {'value': str(solar_current)}},
        {'name': 'SOLAR_VOLT_TEST', 'tags': {'location': 'Test'}, 'fields': {'value': str(solar_voltage)}},
        {'name': 'BATT_VOLT_TEST', 'tags': {'location': 'Test'}, 'fields': {'value': str(battery_voltage)}},
        {'name': 'BATT_POWER_TEST', 'tags': {'location': 'Test'}, 'fields': {'value': str(battery_power)}},
        {'name': 'SOLAR_POWER_TEST', 'tags': {'location': 'Test'}, 'fields': {'value': str(solar_power)}},
        {'name': 'CPUTEMPERATURE_TEST', 'tags': {'location': 'Test'}, 'fields': {'value': str(temperature)}},
        {'name': 'LIGHT_TEST', 'tags': {'location': 'Test'}, 'fields': {'value': str(lux)}},
        {'name': 'BEST_X_TEST', 'tags': {'location': 'Test'}, 'fields': {'value': str(best_x)}},
        {'name': 'BEST_Y_TEST', 'tags': {'location': 'Test'}, 'fields': {'value': str(best_y)}},
        {'name': 'TIMER_TEST', 'tags': {'location': 'Test'}, 'fields': {'value': str(get_remaining_time())}},
    ]

    # Send data for each sensor and track whether each one was successful
    all_sent = True
    for data in sensor_data:
        # Create InfluxDB line protocol string
        line_protocol = '{measurement},{tags} {fields}'.format(
            measurement=data['name'],
            tags=','.join(['{}={}'.format(k, v) for k, v in data['tags'].items()]),
            fields=','.join(['{}={}'.format(k, v) for k, v in data['fields'].items()])
        )

        # Send data to InfluxDB Cloud
        headers = {'Authorization': 'Token {}'.format(INFLUXDB_TOKEN), 'Content-Type': 'application/octet-stream'}
        try:
            response = urequests.post(INFLUXDB_URL, headers=headers, data=line_protocol.encode('utf-8'))
            if response.status_code != 204:
                print('Error sending data to InfluxDB Cloud for {}: {}'.format(data['name'], response.content))
                all_sent = False
            response.close()
            print('Data sent for {}'.format(data['name']))
        except:
            print('Error sending data to InfluxDB Cloud for {}'.format(data['name']))
            all_sent = False

    return all_sent

ssid = None
print("Connecting to WiFi network...")
connected = connect_to_wifi()
connect_devices()

# Set the watchdog timer to trigger a reset if the device becomes unresponsive longer than setpoint
wdt = machine.WDT(timeout=watchdog_timeout * 60 * 1000)

# Initialize a counter to keep track of consecutive failed data transmissions
failed_transmissions = 0
wifi_attempts = 0

if connected:
    find_best_position()

while True:
    # Reset the watchdog timer at the start of each loop iteration
    wdt.feed()

    if not I2C1_connected or not I2C2_connected or not I2C3_connected:
        # Attempt to reconnect devices
        connect_devices()
        
    # Connect to WiFi network
    if connected:
        print("Connected to WiFi network:", ssid)

    if not connected:
        print("Connecting to WiFi network...")
        connect_to_wifi()
        print('Failed to connect to WiFi network')
        wifi_attempts += 1
        sleep(10)
        print('WiFi Connection attempts: ' + str(wifi_attempts))
        # Check if the number of consecutive failed WiFi attempts has reached a certain threshold
        if wifi_attempts >= 5:
            print("Reached the maximum number of consecutive failed WiFi attempts, rebooting the device...")
            machine.reset()
    else:
        wifi_attempts = 0
        
    sleep(5)

    # Collect and send sensor data
    try:
        data_sent = send_sensor_data()
    except Exception as e:
        print("Failed to collect data from I2C Device", str(e))
        failed_transmissions += 1
        sleep(30)
        continue

    if not data_sent:
        print('Failed to send data to InfluxDB Cloud')
        failed_transmissions += 1
        print("Failed transmissions = " + str(failed_transmissions))
        sleep(30)
        if failed_transmissions >= 5:
            print("Reached the maximum number of consecutive failed transmissions, rebooting the device...")
            machine.reset()
        continue

    # Data sent successfully, blink LED
    try:
        timer1.init(period=500, mode=Timer.PERIODIC, callback=blink)
        sleep(10)
    finally:
        timer1.deinit()

    # Reset the failed transmissions counter
    failed_transmissions = 0
 
    remaining_time = get_remaining_time()
    print("Remaining time:", remaining_time, " Minutes")
    
    if remaining_time <=0:
        find_best_position()

#    print("Light sleep...")
    sleep(10)

    # Light sleep for setpoint duration
    machine.lightsleep(light_sleep_time * 60 * 1000)

INA226 Class

MicroPython
Support File for both options
# The MIT License (MIT)
#
# Copyright (c) 2017 Dean Miller for Adafruit Industries
# Copyright (c) 2020 Christian Becker
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
"""
`ina226`
====================================================

micropython driver for the INA226 current sensor.

* Author(s): Christian Becker

"""
# taken from https://github.com/robert-hh/INA219 , modified for the INA226 devices by
# Christian Becker
# June 2020

from micropython import const
# from adafruit_bus_device.i2c_device import I2CDevice

__version__ = "0.0.0-auto.0"
__repo__ = "https://github.com/elschopi/TI_INA226_micropython.git"

# Bits
# pylint: disable=bad-whitespace
_READ = const(0x01)

# Config Register (R/W)
_REG_CONFIG = const(0x00)
_CONFIG_RESET = const(0x8000)  # Reset Bit
# Constant bits - don't change
_CONFIG_CONST_BITS = const(0x4000)
# Averaging mode
_CONFIG_AVGMODE_MASK = const(0x0e00)
_CONFIG_AVGMODE_1SAMPLES = const(0x0000)
_CONFIG_AVGMODE_4SAMPLES = const(0x0200)
_CONFIG_AVGMODE_16SAMPLES = const(0x0400)
_CONFIG_AVGMODE_64SAMPLES = const(0x0600)
_CONFIG_AVGMODE_128SAMPLES = const(0x0800)
_CONFIG_AVGMODE_256SAMPLES = const(0x0a00)
_CONFIG_AVGMODE_512SAMPLES = const(0x0c00)
_CONFIG_AVGMODE_1024SAMPLES = const(0x0e00)

# Bus voltage conversion time
_CONFIG_VBUSCT_MASK = const(0x01c0)
_CONFIG_VBUSCT_140us = const(0x0000)
_CONFIG_VBUSCT_204us = const(0x0040)
_CONFIG_VBUSCT_332us = const(0x0080)
_CONFIG_VBUSCT_588us = const(0x00c0)
_CONFIG_VBUSCT_1100us = const(0x0100)
_CONFIG_VBUSCT_21116us = const(0x0140)
_CONFIG_VBUSCT_4156us = const(0x0180)
_CONFIG_AVGMODE_8244us = const(0x01c0)

# Shunt voltage conversion time
_CONFIG_VSHUNTCT_MASK = const(0x0038)
_CONFIG_VSHUNTCT_140us = const(0x0000)
_CONFIG_VSHUNTCT_204us = const(0x0008)
_CONFIG_VSHUNTCT_332us = const(0x0010)
_CONFIG_VSHUNTCT_588us = const(0x0018)
_CONFIG_VSHUNTCT_1100us = const(0x0020)
_CONFIG_VSHUNTCT_21116us = const(0x0028)
_CONFIG_VSHUNTCT_4156us = const(0x0030)
_CONFIG_VSHUNTCT_8244us = const(0x0038)

# Operating mode
_CONFIG_MODE_MASK = const(0x0007)  # Operating Mode Mask
_CONFIG_MODE_POWERDOWN = const(0x0000)
_CONFIG_MODE_SVOLT_TRIGGERED = const(0x0001)
_CONFIG_MODE_BVOLT_TRIGGERED = const(0x0002)
_CONFIG_MODE_SANDBVOLT_TRIGGERED = const(0x0003)
_CONFIG_MODE_ADCOFF = const(0x0004)
_CONFIG_MODE_SVOLT_CONTINUOUS = const(0x0005)
_CONFIG_MODE_BVOLT_CONTINUOUS = const(0x0006)
_CONFIG_MODE_SANDBVOLT_CONTINUOUS = const(0x0007)

# SHUNT VOLTAGE REGISTER (R)
_REG_SHUNTVOLTAGE = const(0x01)

# BUS VOLTAGE REGISTER (R)
_REG_BUSVOLTAGE = const(0x02)

# POWER REGISTER (R)
_REG_POWER = const(0x03)

# CURRENT REGISTER (R)
_REG_CURRENT = const(0x04)

# CALIBRATION REGISTER (R/W)
_REG_CALIBRATION = const(0x05)
# pylint: enable=bad-whitespace


def _to_signed(num):
    if num > 0x7FFF:
        num -= 0x10000
    return num


class INA226:
    """Driver for the INA226 current sensor"""
    def __init__(self, i2c_device, addr=0x40):
        self.i2c_device = i2c_device

        self.i2c_addr = addr
        self.buf = bytearray(2)
        # Multiplier in mA used to determine current from raw reading
        self._current_lsb = 0
        # Multiplier in W used to determine power from raw reading
        self._power_lsb = 0

        # Set chip to known config values to start
        self._cal_value = 4096
        self.set_calibration()

    def _write_register(self, reg, value):
        self.buf[0] = (value >> 8) & 0xFF
        self.buf[1] = value & 0xFF
        self.i2c_device.writeto_mem(self.i2c_addr, reg, self.buf)

    def _read_register(self, reg):
        self.i2c_device.readfrom_mem_into(self.i2c_addr, reg & 0xff, self.buf)
        value = (self.buf[0] << 8) | (self.buf[1])
        return value

    @property
    def shunt_voltage(self):
        """The shunt voltage (between V+ and V-) in Volts (so +-.327V)"""
        value = _to_signed(self._read_register(_REG_SHUNTVOLTAGE))
        # The least signficant bit is 10uV which is 0.00001 volts
        return value * 0.00001

    @property
    def bus_voltage(self):
        """The bus voltage (between V- and GND) in Volts"""
        raw_voltage = self._read_register(_REG_BUSVOLTAGE)
        # voltage in millVolt is register content multiplied with 1.25mV/bit
        voltage_mv = raw_voltage * 1.25
        # Return Volts instead of milliVolts
        return voltage_mv * 0.001

    @property
    def current(self):
        """The current through the shunt resistor in milliamps."""
        # Sometimes a sharp load will reset the INA219, which will
        # reset the cal register, meaning CURRENT and POWER will
        # not be available ... athis by always setting a cal
        # value even if it's an unfortunate extra step
        self._write_register(_REG_CALIBRATION, self._cal_value)

        # Now we can safely read the CURRENT register!
        raw_current = _to_signed(self._read_register(_REG_CURRENT))
        return raw_current * self._current_lsb
    
    @property
    def power(self):
        # INA226 stores the calculated power in this register        
        raw_power = _to_signed(self._read_register(_REG_POWER))
        # Calculated power is derived by multiplying raw power value with the power LSB
        return raw_power * self._power_lsb
# Example calculations for calibration register value, current LSB and power LSB
# 1. Assuming a 100milliOhm resistor as shunt
# RSHUNT = 0.1
#
# 2. Determine current_lsb
# Assuming a maximum expected current of 3.6A
# current_lsb = MaxExpected_I / (2^15)
# current_lsb = 3.6A / (2^15)
# current_lsb = 0.0001098632813
# -> Rounding to "nicer" numbers:
# current_lsb = 0.0001
#
# 3. Setting the power LSB
# power_lsb = 25 * current_lsb
# power_lsb = 25 * 0.0001
# power_lsb = 0.0025
#
# 4. Determine calibration register value
# cal_value = 0.00512 / (RSHUNT * current_lsb)
# cal_value = 0.00512 / (0.1 * 0.0001)
# cal_value = 512
#
#
    def set_calibration(self):  # pylint: disable=invalid-name
        """Configures to INA226 to be able to measure up to 36V and 2A
            of current. Counter overflow occurs at 3.2A.
           ..note :: These calculations assume a 0.1 shunt ohm resistor"""
        self._current_lsb = .0001
        self._cal_value = 512
        self._power_lsb = .0025  
        self._write_register(_REG_CALIBRATION, self._cal_value)
        
        config = (_CONFIG_CONST_BITS |
                  _CONFIG_AVGMODE_512SAMPLES |
                  _CONFIG_VBUSCT_588us |
                  _CONFIG_VSHUNTCT_588us |
                  _CONFIG_MODE_SANDBVOLT_CONTINUOUS)
        
        self._write_register(_REG_CONFIG, config)
    
    def set_calibration_custom(self, calValue=512, config=0x4127):
    # Set the configuration register externally by using the hex value for the config register
    # Value can be calculated with spreadsheet
    # Calibration value needs to be calculated seperately and passed as parameter too
        self._cal_value = calValue        
        self._write_register(_REG_CALIBRATION, self._cal_value)
        self._write_register(_REG_CONFIG, config)

INA226 Calibration

MicroPython
Supporting File for both options
# INA226 Config register calculator

# Lists of possible register settings
liste_AVG = [['1', '0x0000'],['4', '0x0200'],['16', '0x0400'],['64', '0x0600'],['128', '0x0800'],['256', '0x0A00'],['512', '0x0C00'],['1024', '0x0E00'] ]

liste_VBUSCT = [['140','0x0000'],['204','0x0040'],['332','0x0080'],['588','0x00C0'],['1100','0x0100'],['2116','0x0140'],['4156','0x0180'],['8244','0x01C0']]

liste_VSHCT = [['140','0x0000'],['204','0x0008'],['332','0x0010'],['588','0x0018'],['1100','0x0020'],['2116','0x0028'],['4156','0x0030'],['8244','0x0038']]

liste_MODE = [['OFF1','0x0000'],['SHV_TRIG','0x0001'],['BUSV_TRIG','0x0002'],['SHBUSV_TRIG','0x0003'],['OFF2','0x0004'],['SHV_CONT','0x0005'],['BUSV_CONT','0x0006'],['SHBUS_CONT','0x0007']]

constant_bits = 0x4000

# Structure of the config register
# constant_bits + AVG + VBUSCT + VSHCT + MODE

def sucher(wert, liste):
    for element in liste:
        if element[0] == wert:
            # print(element)
            return element[1]

def listeguck(liste):
    for element in liste:
        print('{}: {}'.format(element[0], element[1]))

def registern():
    listeguck(liste_AVG)
    auswahl_avg_input = input('Please input sample number: ')
    auswahl_avg = int(sucher(auswahl_avg_input, liste_AVG), 16)

    listeguck(liste_VBUSCT)
    auswahl_VBUSCT_input = input('Please input VBUS conversion time: ')
    auswahl_VBUSCT = int(sucher(auswahl_VBUSCT_input, liste_VBUSCT), 16)

    listeguck(liste_VSHCT)
    auswahl_VSHCT_input = input('Please input VSHUNT conversion time: ')
    auswahl_VSHCT = int(sucher(auswahl_VSHCT_input, liste_VSHCT), 16)

    listeguck(liste_MODE)
    auswahl_MODE_input = input('Please input operating mode: ')
    auswahl_MODE = int(sucher(auswahl_MODE_input, liste_MODE), 16)

    print('{} {} {} {} {}'.format(constant_bits, auswahl_avg, auswahl_VBUSCT, auswahl_VSHCT, auswahl_MODE))
    register = hex(constant_bits + auswahl_avg + auswahl_VBUSCT + auswahl_VSHCT + auswahl_MODE)
    print('Value for the configuration register: ')
    print(register)
    return register

# print(type(sucher(input('wert: '), liste_MODE)))

status = True

while status:
    try:
        registern()
        weiter = input('Nochmal? (j/n): ')
        if weiter == 'n':
            status = False
    except:
        status = False
        print('uh-oh!')

Servo Class

MicroPython
Support File for tracking solar panel option
from machine import Pin, PWM

class Servo:
    # these defaults work for the standard TowerPro SG90
    __servo_pwm_freq = 50
    __min_u10_duty = 26 - 0 # offset for correction
    __max_u10_duty = 123- 0  # offset for correction
    min_angle = 0
    max_angle = 180
    current_angle = 0.001


    def __init__(self, pin):
        self.__initialise(pin)


    def update_settings(self, servo_pwm_freq, min_u10_duty, max_u10_duty, min_angle, max_angle, pin):
        self.__servo_pwm_freq = servo_pwm_freq
        self.__min_u10_duty = min_u10_duty
        self.__max_u10_duty = max_u10_duty
        self.min_angle = min_angle
        self.max_angle = max_angle
        self.__initialise(pin)


    def move(self, angle):
        # round to 2 decimal places, so we have a chance of reducing unwanted servo adjustments
        angle = round(angle, 2)
        # do we need to move?
        if angle == self.current_angle:
            return
        self.current_angle = angle
        # calculate the new duty cycle and move the motor
        duty_u10 = self.__angle_to_u10_duty(angle)
        self.__motor.duty(duty_u10)

    def __angle_to_u10_duty(self, angle):
        return int((angle - self.min_angle) * self.__angle_conversion_factor) + self.__min_u10_duty


    def __initialise(self, pin):
        self.current_angle = -0.001
        self.__angle_conversion_factor = (self.__max_u10_duty - self.__min_u10_duty) / (self.max_angle - self.min_angle)
        self.__motor = PWM(Pin(pin))
        self.__motor.freq(self.__servo_pwm_freq)

BH1750FVI Digital Light Intensity

MicroPython
Support File for tracking solar panel option
Missing at the moment

Main Code for fixed positon solar panel option

MicroPython
Must have supporting file ina226
import network
import urequests
import utime
import time
from time import sleep
import machine
import esp32
from machine import Pin, ADC, lightsleep, deepsleep, Timer, SoftI2C
import ina226

# Configure GPIO pins
gpio_pins = [4, 14, 16, 17, 18, 19, 23, 25, 26, 27, 32, 34, 35, 36, 37, 38, 39]

# Set unused GPIO pins as inputs with pull-down resistors
for pin in gpio_pins:
    try:
        machine.Pin(pin, machine.Pin.IN, machine.Pin.PULL_DOWN)
    except ValueError:
        print("Pin", pin, "cannot be configured as GPIO.")

# Disable Bluetooth
machine.Pin(0, machine.Pin.OUT).value(0)

# Initialize I2C
i2c = machine.SoftI2C(scl=machine.Pin(22), sda=machine.Pin(21))

# Initialize INA226 devices
ina1 = None
ina2 = None
I2C1_connected = False
I2C2_connected = False

# Initialize LED pin
led = Pin(2, Pin.OUT)

# Define the timer for blinking LED
timer1 = Timer(0)

def blink(timer):
    led.value(not led.value())  # Toggle LED state

# Blink LED
led.value(1)

# Define the ADC pins
adc_pins = [33]

# Create an ADC object for each pin
adc1 = machine.ADC(machine.Pin(33, machine.Pin.IN))
adc1.atten(ADC.ATTN_11DB)

# InfluxDB Cloud settings
INFLUXDB_URL = 'https://eastus-1.azure.cloud2.influxdata.com/api/v2/write?org="<Your Organization>"&bucket="<Your Bucket>"&precision=s'
INFLUXDB_TOKEN = 'Your Token'

# Define the names and passwords of your WiFi networks
WIFI_NETWORKS = [
    ('SSID1', 'PW1'),
    ('SSID2', 'PW2'),
    ('SSID3', 'PW3')
]

def connect_to_wifi():
    global ssid, wlan
    wlan = network.WLAN(network.STA_IF)
    wlan.active(True)

    for ssid, password in WIFI_NETWORKS:
        try:
            wlan.disconnect()
            wlan.connect(ssid, password)
            start_time = utime.time()
            while not wlan.isconnected():
                if (utime.time() - start_time) > 10:  # Wait for 10 seconds
                    print("Timeout exceeded while connecting to {}".format(ssid))
                    break
                utime.sleep(0.1)
            else:
                print("Connected to WiFi network:", ssid)
                return True, ssid
        except Exception as e:
            print("Failed to connect to WiFi network:", ssid, e)
            continue
    return False, None

def connect_devices():
    global I2C1_connected, I2C2_connected, ina1, ina2,
    
    try:
        ina1 = ina226.INA226(i2c, 0x40)
#        ina1.set_calibration_custom(calValue=5120, config=0x4427)
        I2C1_connected = True
    except Exception as e:
        print("Failed to connect to I2C Battery Device 1")
        I2C1_connected = False
        
    try:
        ina2 = ina226.INA226(i2c, 0x44)
#        ina2.set_calibration_custom(calValue=5120, config=0x4427)
        I2C2_connected = True
    except Exception as e:
        print("Failed to connect to I2C Solar Device 2")
        I2C2_connected = False

# Constants
light_sleep_time = 1 # minutes for lightsleep time
watchdog_timeout = 5 # minutes for timeout watchdog timer

def send_sensor_data():
    # Define variables for pressure, current, and voltage
    pressure_samples = []
    battery_current_samples = []
    battery_voltage_samples = []
    battery_power_samples = []
    solar_voltage_samples = []
    solar_current_samples = []
    solar_power_samples = []
    temperature_samples = []

    try:
        for _ in range(3):
            adc1_value = adc1.read()
            voltage1 = adc1_value * (3.9 / 4095)
            pressure = ((voltage1 - 0.5)) * (2.4/7) * 100.0
            pressure_samples.append(pressure)
            time.sleep(0.5)  # Delay for 0.5 seconds
        pressure = sum(pressure_samples) / len(pressure_samples)
        print("Pressure: " + str(pressure) + " %")
    except Exception as e:
        print("Failed to collect pressure data from Device")
        pressure = 999

    try:
        if I2C1_connected:
            for _ in range(3):
                battery_current_samples.append(ina1.current)
                battery_voltage_samples.append(ina1.bus_voltage)
                battery_power_samples.append(ina1.power)
                time.sleep(0.5)  # Delay for 0.5 seconds
            battery_current = sum(battery_current_samples) / len(battery_current_samples)
            battery_voltage = sum(battery_voltage_samples) / len(battery_voltage_samples)
            battery_power = sum(battery_power_samples) / len(battery_power_samples)
            print("Battery Current: " + str(battery_current) + " mA")
            print("Battery Voltage:" + str(battery_voltage) + " VDC")
            print("Battery Power: " + str(battery_power) + " W")
        else:
            raise Exception("Failed to connect to INA1")
    except Exception as e:
        print("Failed to collect battery data from Device")
        battery_current = 999
        battery_voltage = 999
        battery_power = 999

    try:
        if I2C2_connected:
            for _ in range(3):
                solar_current_samples.append(ina2.current)
                solar_voltage_samples.append(ina2.bus_voltage)
                solar_power_samples.append(ina2.power)
                time.sleep(0.5)  # Delay for 0.5 seconds
            solar_current = sum(solar_current_samples) / len(solar_current_samples)
            solar_voltage = sum(solar_voltage_samples) / len(solar_voltage_samples)
            solar_power = sum(solar_power_samples) / len(solar_power_samples)
            print("Solar Current: " + str(solar_current) + " mA")
            print("Solar Voltage: " + str(solar_voltage) + " VDC")
            print("Solar Power: " + str(solar_power) + " W")
        else:
            raise Exception("Failed to connect to INA2")
    except Exception as e:
        print("Failed to collect solar data from Device")
        solar_current = 999
        solar_voltage = 999
        solar_power = 999
        
    try:
        temperature_samples = [esp32.raw_temperature() for _ in range(3)]
        temperature = (sum(temperature_samples) / len(temperature_samples) - 32) * 5 / 9
        print("CPU Temperature: " + str(temperature) + " C")
    except Exception as e:
        print("Failed to collect temperature data from Device")
        temperature = 999
    
    # Create list of sensor data
    sensor_data = [
        {'name': 'PRESSURE_TEST', 'tags': {'location': 'Test'}, 'fields': {'value': str(pressure)}},
        {'name': 'BATT_CURRENT_TEST', 'tags': {'location': 'Test'}, 'fields': {'value': str(battery_current)}},
        {'name': 'SOLAR_CURRENT_TEST', 'tags': {'location': 'Test'}, 'fields': {'value': str(solar_current)}},
        {'name': 'SOLAR_VOLT_TEST', 'tags': {'location': 'Test'}, 'fields': {'value': str(solar_voltage)}},
        {'name': 'BATT_VOLT_TEST', 'tags': {'location': 'Test'}, 'fields': {'value': str(battery_voltage)}},
        {'name': 'BATT_POWER_TEST', 'tags': {'location': 'Test'}, 'fields': {'value': str(battery_power)}},
        {'name': 'SOLAR_POWER_TEST', 'tags': {'location': 'Test'}, 'fields': {'value': str(solar_power)}},
        {'name': 'CPUTEMPERATURE_TEST', 'tags': {'location': 'Test'}, 'fields': {'value': str(temperature)}},
    ]

    # Send data for each sensor and track whether each one was successful
    all_sent = True
    for data in sensor_data:
        # Create InfluxDB line protocol string
        line_protocol = '{measurement},{tags} {fields}'.format(
            measurement=data['name'],
            tags=','.join(['{}={}'.format(k, v) for k, v in data['tags'].items()]),
            fields=','.join(['{}={}'.format(k, v) for k, v in data['fields'].items()])
        )

        # Send data to InfluxDB Cloud
        headers = {'Authorization': 'Token {}'.format(INFLUXDB_TOKEN), 'Content-Type': 'application/octet-stream'}
        try:
            response = urequests.post(INFLUXDB_URL, headers=headers, data=line_protocol.encode('utf-8'))
            if response.status_code != 204:
                print('Error sending data to InfluxDB Cloud for {}: {}'.format(data['name'], response.content))
                all_sent = False
            response.close()
            print('Data sent for {}'.format(data['name']))
        except:
            print('Error sending data to InfluxDB Cloud for {}'.format(data['name']))
            all_sent = False

    return all_sent

ssid = None
print("Connecting to WiFi network...")
connected = connect_to_wifi()
connect_devices()

# Set the watchdog timer to trigger a reset if the device becomes unresponsive longer than setpoint
wdt = machine.WDT(timeout=watchdog_timeout * 60 * 1000)

# Initialize a counter to keep track of consecutive failed data transmissions
failed_transmissions = 0
wifi_attempts = 0

while True:
    # Reset the watchdog timer at the start of each loop iteration
    wdt.feed()

    if not I2C1_connected or not I2C2_connected:
        # Attempt to reconnect devices
        connect_devices()
        
    # Connect to WiFi network
    if connected:
        print("Connected to WiFi network:", ssid)

    if not connected:
        print("Connecting to WiFi network...")
        connect_to_wifi()
        print('Failed to connect to WiFi network')
        wifi_attempts += 1
        sleep(10)
        print('WiFi Connection attempts: ' + str(wifi_attempts))
        # Check if the number of consecutive failed WiFi attempts has reached a certain threshold
        if wifi_attempts >= 5:
            print("Reached the maximum number of consecutive failed WiFi attempts, rebooting the device...")
            machine.reset()
    else:
        wifi_attempts = 0
        
    sleep(5)

    # Collect and send sensor data
    try:
        data_sent = send_sensor_data()
    except Exception as e:
        print("Failed to collect data from I2C Device", str(e))
        failed_transmissions += 1
        sleep(30)
        continue

    if not data_sent:
        print('Failed to send data to InfluxDB Cloud')
        failed_transmissions += 1
        print("Failed transmissions = " + str(failed_transmissions))
        sleep(30)
        if failed_transmissions >= 5:
            print("Reached the maximum number of consecutive failed transmissions, rebooting the device...")
            machine.reset()
        continue

    # Data sent successfully, blink LED
    try:
        timer1.init(period=500, mode=Timer.PERIODIC, callback=blink)
        sleep(10)
    finally:
        timer1.deinit()

    # Reset the failed transmissions counter
    failed_transmissions = 0
 
#    print("Light sleep...")
    sleep(3)

    # Light sleep for setpoint duration
    machine.lightsleep(light_sleep_time * 60 * 1000)

Credits

Chad

Chad

5 projects • 12 followers

Comments