The purpose of this project was to monitor a well water level 100' away from the house in the back yard with no available power supply. The project was orignally done with a Raspberry Pi Pico W but decided to go with a ESP32-Wroom-32U for the external antenna.
This project has two options:Option 1: Fixed Positon Solar PanelOption 2: Tracking Solar Panel (Comming Soon)
The ESP32 collect the sensor data and sends the data to InfluxDB and then imported to Grafana for trending and visulization over Wifi. InfluxDB Cloud & Grafana in this project are free and allow for 30days of historical data. You would need to setup accounts and obtain your InfluxDB token and create your dashboard. I will not go over this process in this built as a simple google search can give you some examples how to setup the accounts and just replace "<Your Token here>", "<Your bucket here>" & "<Your Organization>" in the provided code to publish the data to InluxDB.
Power(+) from both the battery and the solar panel is run through a 4 pin switch to elminate all sources of power while working. I found that both the Solar input and battery input needed to be switched or the ESP32 would not shutdown due to the voltage from the solar panel. Ensure your switch does not connect these two sources as they have different voltage potenitals.
Each power source is run through its own INA226 to measure the voltage, current and power then connected to the Solar Power Management Board by DFRobot. More information on the board can be found here: https://www.dfrobot.com/product-1712.html
3.3V and ground is supplied to the INA226s from the ESP32 and SDA/SCL for communiation.
The ESP32 is connected to the Solar Power Management Board with a USB cable to provide the power supply to the ESP32.
The ESP32 supplies the 5V+/- to the level sensor and pin 33 is used for the analogue signal from the level sensor. Important to note that the level sensor selected has a higher range then the well can get too so the output voltage from the sensor does not exceed the limit for the ADC on the ESP32. A 10M Range level sensor was used for a well that is 7M deep and the output voltage of the sensor at max range is 4.5V so at 7M the voltage would be around 3.3V
(7 - 0) / (10 - 0) = (V - 0.5) / (4.5 - 0.5). V ≈ 3.3V
If this is not possible in your situation a voltage divider could be used to drop the voltage instead
ESP32 Pins 21/22 are used for the communciation from the INA226s and the light intensity module.
Code Breakdown (Fixed Solar Panel):
MicroPython script designed for an ESP32 microcontroller. Here's a breakdown of its functionality:
Importing Libraries: The code starts by importing various libraries required for different functionalities, such as networking, timekeeping, hardware control, and sensor readings.
GPIO Configuration: The code configures the GPIO pins defined in the gpio_pins
list as inputs with pull-down resistors. It also disables the Bluetooth functionality by setting the pin 0 as an output and setting its value to 0.
I2C Initialization: The code initializes the I2C interface using the SoftI2C
class with specified SCL (clock) and SDA (data) pins.
INA226 Device Initialization: The code initializes two INA226 devices connected via I2C (0x40 and 0x44 addresses). It sets the variables ina1
and ina2
accordingly and flags I2C1_connected
and I2C2_connected
to indicate successful connections.
LED Initialization: The code initializes an LED pin (pin 2) as an output.
Blinking LED: The code defines a timer event (timer1
) that toggles the state of the LED using the blink
function. It then blinks the LED once by setting its value to 1.
ADC Initialization: The code initializes an Analog-to-Digital Converter (ADC) object for a specific pin (pin 33).
WiFi Configuration: The code defines the SSID and password of the WiFi networks in the WIFI_NETWORKS
list. It also includes a function connect_to_wifi()
that attempts to connect to each WiFi network sequentially until a successful connection is established.
Device Connection: The code includes a function connect_devices()
that attempts to connect to the INA226 devices via I2C.
Sensor Data Collection: The code defines a function send_sensor_data()
that collects sensor data, including pressure, battery current, battery voltage, battery power, solar current, solar voltage, solar power, and CPU temperature. The data is then formatted into InfluxDB line protocol strings and sent to an InfluxDB Cloud server using HTTP POST requests.
Main Execution: The code initializes variables, sets constants for light sleep time and watchdog timeout, and starts an infinite loop.
The code resets the watchdog timer, reconnects to devices if necessary, connects to WiFi if not connected, and tracks consecutive failed WiFi connection attempts.
Sensor data is collected and sent to InfluxDB Cloud using the send_sensor_data()
function. If the data transmission fails, it increments a failed transmissions counter and continues the loop.
If data is sent successfully, the LED blinks and the failed transmissions counter is reset.
The code then enters a light sleep mode for the specified duration and repeats the loop.
Main Code for tracking solar panel option
MicroPythonimport network
import urequests
import utime
import time
from time import sleep
import machine
import esp32
from machine import Pin, ADC, lightsleep, deepsleep, Timer, PWM, SoftI2C
import ina226
from bh1750fvi import BH1750
from servo import Servo
# Configure GPIO pins
gpio_pins = [4, 14, 16, 17, 18, 19, 23, 25, 26, 27, 32, 34, 35, 36, 37, 38, 39]
# Set unused GPIO pins as inputs with pull-down resistors
for pin in gpio_pins:
try:
machine.Pin(pin, machine.Pin.IN, machine.Pin.PULL_DOWN)
except ValueError:
print("Pin", pin, "cannot be configured as GPIO.")
# Disable Bluetooth
machine.Pin(0, machine.Pin.OUT).value(0)
# Initialize I2C
i2c = machine.SoftI2C(scl=machine.Pin(22), sda=machine.Pin(21))
# Initialize INA226 devices
ina1 = None
ina2 = None
ina3 = None
I2C1_connected = False
I2C2_connected = False
I2C3_connected = False
def move_servos(x_angle, y_angle):
servo_x.move(x_angle)
servo_y.move(y_angle)
# Set servo angle limits
x_min, x_max = 0, 80
y_min, y_max = 0, 180
# Define center points for servos
x_center = (x_min + x_max) // 2
y_center = (y_min + y_max) // 2
# Define variables for best position
best_x = 0
best_y = 0
# Initialize PWM for servo motor control
servo_x = Servo(pin=13)
servo_y = Servo(pin=12)
move_servos(best_x, best_y)
# Initialize LED pin
led = Pin(2, Pin.OUT)
# Define the timer for blinking LED
timer1 = Timer(0)
def blink(timer):
led.value(not led.value()) # Toggle LED state
# Blink LED
led.value(1)
utime.sleep(1)
led.value(0)
utime.sleep(1)
# Define the ADC pins
adc_pins = [33]
# Create an ADC object for each pin
adc1 = machine.ADC(machine.Pin(33, machine.Pin.IN))
adc1.atten(ADC.ATTN_11DB)
# InfluxDB Cloud settings
INFLUXDB_URL = 'https://eastus-1.azure.cloud2.influxdata.com/api/v2/write?org="<Your Organization>"&bucket="<Your Bucket>"&precision=s'
INFLUXDB_TOKEN = 'Your Token'
# Define the names and passwords of your WiFi networks
WIFI_NETWORKS = [
('SSID1', 'PW1'),
('SSID2', 'PW2'),
('SSID3', 'PW3')
]
def connect_to_wifi():
global ssid, wlan
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
for ssid, password in WIFI_NETWORKS:
try:
wlan.disconnect()
wlan.connect(ssid, password)
start_time = utime.time()
while not wlan.isconnected():
if (utime.time() - start_time) > 10: # Wait for 10 seconds
print("Timeout exceeded while connecting to {}".format(ssid))
break
utime.sleep(0.1)
else:
print("Connected to WiFi network:", ssid)
return True, ssid
except Exception as e:
print("Failed to connect to WiFi network:", ssid, e)
continue
return False, None
def connect_devices():
global I2C1_connected, I2C2_connected, I2C3_connected, ina1, ina2, bh
try:
ina1 = ina226.INA226(i2c, 0x40)
# ina1.set_calibration_custom(calValue=5120, config=0x4427)
I2C1_connected = True
except Exception as e:
print("Failed to connect to I2C Battery Device 1")
I2C1_connected = False
try:
ina2 = ina226.INA226(i2c, 0x44)
# ina2.set_calibration_custom(calValue=5120, config=0x4427)
I2C2_connected = True
except Exception as e:
print("Failed to connect to I2C Solar Device 2")
I2C2_connected = False
try:
bh = BH1750(i2c)
I2C3_connected = True
except Exception as e:
print("Failed to connect to I2C Light Device 3")
I2C3_connected = False
# Constants
lux_sp = 20 # ammount of sunlight to enable best sun position function
scan_time = 30 # minutes for find best sun position timer
light_sleep_time = 1 # minutes for lightsleep time
watchdog_timeout = 5 # minutes for timeout watchdog timer
# Define timer interval for scanning the best sun position
BEST_POSITION_INTERVAL = (scan_time * 60) * 1000 # 60 minutes
# Variables
lux = 0 # global variable
start_time = utime.time()
def get_remaining_time():
global start_time
current_time = utime.time()
elapsed_time = current_time - start_time
remaining_time = ((BEST_POSITION_INTERVAL/1000) - elapsed_time) / 60
return remaining_time
def find_best_position():
global best_x, best_y, start_time
max_power = 0
max_lux = 0
max_score_y = 0
max_score_x = 0
# Check if I2C3 is connected and get lux value
if I2C3_connected:
lux = bh.luminance() * 0.0079 # convert lux to W/m2
else:
print("lux sensor not ready")
return # I2C3 not connected, exit the function
# Check if the lux value is greater than lux setpoint
if lux <= lux_sp:
print("Not enough sunlight holding position")
# Reset timer
start_time = time.time()
return
print("Scanning for best sun position...")
# Reset max_score at the beginning
max_score_y = 0
max_score_x = 0
# Stage 1: Scan for best vertical position
for y in range(y_min, y_max+1, 10):
print("Scanning vertically at position y={}".format(y))
move_servos(x_center, y)
time.sleep(1) # Add delay to allow INA226 time to settle
power_samples = []
lux_samples = []
for _ in range(3):
power_samples.append(ina2.bus_voltage * ina2.current)
lux_samples.append(bh.luminance() * 0.0079)
time.sleep(0.5) # Delay for 0.5 seconds
power = sum(power_samples) / len(power_samples)
lux = sum(lux_samples) / len(lux_samples)
score_y = power + lux
print("Power & Light at vertical position y={}: {:.2f} W, Lux: {:.2f}".format(y, power, lux))
if score_y > max_score_y:
max_power_y = power
max_lux_y = lux
max_score_y = score_y
best_y = y
# Stage 2: Scan horizontally around best vertical position
for x in range(x_min, x_max+1, 10):
print("Scanning horizontally around best vertical position y={} at position x={}".format(best_y, x))
move_servos(x, best_y)
time.sleep(1) # Add delay to allow INA226 time to settle
power_samples = []
lux_samples = []
for _ in range(3):
power_samples.append(ina2.bus_voltage * ina2.current)
lux_samples.append(bh.luminance() * 0.0079)
time.sleep(0.5) # Delay for 0.5 seconds
power = sum(power_samples) / len(power_samples)
lux = sum(lux_samples) / len(lux_samples)
score_x = power + lux
print("Power & Light at position x={}, y={}: {:.2f} W, Lux: {:.2f}".format(x, best_y, power, lux))
if score_x > max_score_x:
max_power_x = power
max_lux_x = lux
max_score_x = score_x
best_x = x
print("Best sun position found: x={}, y={}".format(best_x, best_y))
print("Best solar power: {:.2f} W".format(max_power))
print("Lux at best position: {:.2f}".format(max_lux))
move_servos(best_x, best_y)
# Reset timer
start_time = time.time()
def send_sensor_data():
global lux
# Define variables for pressure, current, and voltage
pressure_samples = []
battery_current_samples = []
battery_voltage_samples = []
battery_power_samples = []
solar_voltage_samples = []
solar_current_samples = []
solar_power_samples = []
temperature_samples = []
try:
for _ in range(3):
adc1_value = adc1.read()
voltage1 = adc1_value * (3.9 / 4095)
pressure = ((voltage1 - 0.5)) * (2.4/7) * 100.0
pressure_samples.append(pressure)
time.sleep(0.5) # Delay for 0.5 seconds
pressure = sum(pressure_samples) / len(pressure_samples)
print("Pressure: " + str(pressure) + " %")
except Exception as e:
print("Failed to collect pressure data from Device")
pressure = 123
try:
if I2C1_connected:
for _ in range(3):
battery_current_samples.append(ina1.current)
battery_voltage_samples.append(ina1.bus_voltage)
battery_power_samples.append(ina1.power)
time.sleep(0.5) # Delay for 0.5 seconds
battery_current = sum(battery_current_samples) / len(battery_current_samples)
battery_voltage = sum(battery_voltage_samples) / len(battery_voltage_samples)
battery_power = sum(battery_power_samples) / len(battery_power_samples)
print("Battery Current: " + str(battery_current) + " mA")
print("Battery Voltage:" + str(battery_voltage) + " VDC")
print("Battery Power: " + str(battery_power) + " W")
else:
raise Exception("Failed to connect to INA1")
except Exception as e:
print("Failed to collect battery data from Device")
battery_current = 123
battery_voltage = 123
battery_power = 123
try:
if I2C2_connected:
for _ in range(3):
solar_current_samples.append(ina2.current)
solar_voltage_samples.append(ina2.bus_voltage)
solar_power_samples.append(ina2.power)
time.sleep(0.5) # Delay for 0.5 seconds
solar_current = sum(solar_current_samples) / len(solar_current_samples)
solar_voltage = sum(solar_voltage_samples) / len(solar_voltage_samples)
solar_power = sum(solar_power_samples) / len(solar_power_samples)
print("Solar Current: " + str(solar_current) + " mA")
print("Solar Voltage: " + str(solar_voltage) + " VDC")
print("Solar Power: " + str(solar_power) + " W")
else:
raise Exception("Failed to connect to INA2")
except Exception as e:
print("Failed to collect solar data from Device")
solar_current = 123
solar_voltage = 123
solar_power = 123
try:
if I2C3_connected:
for _ in range(3):
lux_samples.append(bh.luminance() * 0.0079)
time.sleep(0.5) # Delay for 0.5 seconds
lux = sum(lux_samples) / len(lux_samples)
print("Light intensity: {:.2f} w/m2".format(lux))
else:
raise Exception("Failed to connect to INA3")
except Exception as e:
print("Failed to collect Light data from Device")
lux = 123
try:
temperature_samples = [esp32.raw_temperature() for _ in range(3)]
temperature = (sum(temperature_samples) / len(temperature_samples) - 32) * 5 / 9
print("CPU Temperature: " + str(temperature) + " C")
except Exception as e:
print("Failed to collect temperature data from Device")
temperature = 123
# Create list of sensor data
sensor_data = [
{'name': 'PRESSURE_TEST', 'tags': {'location': 'Test'}, 'fields': {'value': str(pressure)}},
{'name': 'BATT_CURRENT_TEST', 'tags': {'location': 'Test'}, 'fields': {'value': str(battery_current)}},
{'name': 'SOLAR_CURRENT_TEST', 'tags': {'location': 'Test'}, 'fields': {'value': str(solar_current)}},
{'name': 'SOLAR_VOLT_TEST', 'tags': {'location': 'Test'}, 'fields': {'value': str(solar_voltage)}},
{'name': 'BATT_VOLT_TEST', 'tags': {'location': 'Test'}, 'fields': {'value': str(battery_voltage)}},
{'name': 'BATT_POWER_TEST', 'tags': {'location': 'Test'}, 'fields': {'value': str(battery_power)}},
{'name': 'SOLAR_POWER_TEST', 'tags': {'location': 'Test'}, 'fields': {'value': str(solar_power)}},
{'name': 'CPUTEMPERATURE_TEST', 'tags': {'location': 'Test'}, 'fields': {'value': str(temperature)}},
{'name': 'LIGHT_TEST', 'tags': {'location': 'Test'}, 'fields': {'value': str(lux)}},
{'name': 'BEST_X_TEST', 'tags': {'location': 'Test'}, 'fields': {'value': str(best_x)}},
{'name': 'BEST_Y_TEST', 'tags': {'location': 'Test'}, 'fields': {'value': str(best_y)}},
{'name': 'TIMER_TEST', 'tags': {'location': 'Test'}, 'fields': {'value': str(get_remaining_time())}},
]
# Send data for each sensor and track whether each one was successful
all_sent = True
for data in sensor_data:
# Create InfluxDB line protocol string
line_protocol = '{measurement},{tags} {fields}'.format(
measurement=data['name'],
tags=','.join(['{}={}'.format(k, v) for k, v in data['tags'].items()]),
fields=','.join(['{}={}'.format(k, v) for k, v in data['fields'].items()])
)
# Send data to InfluxDB Cloud
headers = {'Authorization': 'Token {}'.format(INFLUXDB_TOKEN), 'Content-Type': 'application/octet-stream'}
try:
response = urequests.post(INFLUXDB_URL, headers=headers, data=line_protocol.encode('utf-8'))
if response.status_code != 204:
print('Error sending data to InfluxDB Cloud for {}: {}'.format(data['name'], response.content))
all_sent = False
response.close()
print('Data sent for {}'.format(data['name']))
except:
print('Error sending data to InfluxDB Cloud for {}'.format(data['name']))
all_sent = False
return all_sent
ssid = None
print("Connecting to WiFi network...")
connected = connect_to_wifi()
connect_devices()
# Set the watchdog timer to trigger a reset if the device becomes unresponsive longer than setpoint
wdt = machine.WDT(timeout=watchdog_timeout * 60 * 1000)
# Initialize a counter to keep track of consecutive failed data transmissions
failed_transmissions = 0
wifi_attempts = 0
if connected:
find_best_position()
while True:
# Reset the watchdog timer at the start of each loop iteration
wdt.feed()
if not I2C1_connected or not I2C2_connected or not I2C3_connected:
# Attempt to reconnect devices
connect_devices()
# Connect to WiFi network
if connected:
print("Connected to WiFi network:", ssid)
if not connected:
print("Connecting to WiFi network...")
connect_to_wifi()
print('Failed to connect to WiFi network')
wifi_attempts += 1
sleep(10)
print('WiFi Connection attempts: ' + str(wifi_attempts))
# Check if the number of consecutive failed WiFi attempts has reached a certain threshold
if wifi_attempts >= 5:
print("Reached the maximum number of consecutive failed WiFi attempts, rebooting the device...")
machine.reset()
else:
wifi_attempts = 0
sleep(5)
# Collect and send sensor data
try:
data_sent = send_sensor_data()
except Exception as e:
print("Failed to collect data from I2C Device", str(e))
failed_transmissions += 1
sleep(30)
continue
if not data_sent:
print('Failed to send data to InfluxDB Cloud')
failed_transmissions += 1
print("Failed transmissions = " + str(failed_transmissions))
sleep(30)
if failed_transmissions >= 5:
print("Reached the maximum number of consecutive failed transmissions, rebooting the device...")
machine.reset()
continue
# Data sent successfully, blink LED
try:
timer1.init(period=500, mode=Timer.PERIODIC, callback=blink)
sleep(10)
finally:
timer1.deinit()
# Reset the failed transmissions counter
failed_transmissions = 0
remaining_time = get_remaining_time()
print("Remaining time:", remaining_time, " Minutes")
if remaining_time <=0:
find_best_position()
# print("Light sleep...")
sleep(10)
# Light sleep for setpoint duration
machine.lightsleep(light_sleep_time * 60 * 1000)
# The MIT License (MIT)
#
# Copyright (c) 2017 Dean Miller for Adafruit Industries
# Copyright (c) 2020 Christian Becker
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
"""
`ina226`
====================================================
micropython driver for the INA226 current sensor.
* Author(s): Christian Becker
"""
# taken from https://github.com/robert-hh/INA219 , modified for the INA226 devices by
# Christian Becker
# June 2020
from micropython import const
# from adafruit_bus_device.i2c_device import I2CDevice
__version__ = "0.0.0-auto.0"
__repo__ = "https://github.com/elschopi/TI_INA226_micropython.git"
# Bits
# pylint: disable=bad-whitespace
_READ = const(0x01)
# Config Register (R/W)
_REG_CONFIG = const(0x00)
_CONFIG_RESET = const(0x8000) # Reset Bit
# Constant bits - don't change
_CONFIG_CONST_BITS = const(0x4000)
# Averaging mode
_CONFIG_AVGMODE_MASK = const(0x0e00)
_CONFIG_AVGMODE_1SAMPLES = const(0x0000)
_CONFIG_AVGMODE_4SAMPLES = const(0x0200)
_CONFIG_AVGMODE_16SAMPLES = const(0x0400)
_CONFIG_AVGMODE_64SAMPLES = const(0x0600)
_CONFIG_AVGMODE_128SAMPLES = const(0x0800)
_CONFIG_AVGMODE_256SAMPLES = const(0x0a00)
_CONFIG_AVGMODE_512SAMPLES = const(0x0c00)
_CONFIG_AVGMODE_1024SAMPLES = const(0x0e00)
# Bus voltage conversion time
_CONFIG_VBUSCT_MASK = const(0x01c0)
_CONFIG_VBUSCT_140us = const(0x0000)
_CONFIG_VBUSCT_204us = const(0x0040)
_CONFIG_VBUSCT_332us = const(0x0080)
_CONFIG_VBUSCT_588us = const(0x00c0)
_CONFIG_VBUSCT_1100us = const(0x0100)
_CONFIG_VBUSCT_21116us = const(0x0140)
_CONFIG_VBUSCT_4156us = const(0x0180)
_CONFIG_AVGMODE_8244us = const(0x01c0)
# Shunt voltage conversion time
_CONFIG_VSHUNTCT_MASK = const(0x0038)
_CONFIG_VSHUNTCT_140us = const(0x0000)
_CONFIG_VSHUNTCT_204us = const(0x0008)
_CONFIG_VSHUNTCT_332us = const(0x0010)
_CONFIG_VSHUNTCT_588us = const(0x0018)
_CONFIG_VSHUNTCT_1100us = const(0x0020)
_CONFIG_VSHUNTCT_21116us = const(0x0028)
_CONFIG_VSHUNTCT_4156us = const(0x0030)
_CONFIG_VSHUNTCT_8244us = const(0x0038)
# Operating mode
_CONFIG_MODE_MASK = const(0x0007) # Operating Mode Mask
_CONFIG_MODE_POWERDOWN = const(0x0000)
_CONFIG_MODE_SVOLT_TRIGGERED = const(0x0001)
_CONFIG_MODE_BVOLT_TRIGGERED = const(0x0002)
_CONFIG_MODE_SANDBVOLT_TRIGGERED = const(0x0003)
_CONFIG_MODE_ADCOFF = const(0x0004)
_CONFIG_MODE_SVOLT_CONTINUOUS = const(0x0005)
_CONFIG_MODE_BVOLT_CONTINUOUS = const(0x0006)
_CONFIG_MODE_SANDBVOLT_CONTINUOUS = const(0x0007)
# SHUNT VOLTAGE REGISTER (R)
_REG_SHUNTVOLTAGE = const(0x01)
# BUS VOLTAGE REGISTER (R)
_REG_BUSVOLTAGE = const(0x02)
# POWER REGISTER (R)
_REG_POWER = const(0x03)
# CURRENT REGISTER (R)
_REG_CURRENT = const(0x04)
# CALIBRATION REGISTER (R/W)
_REG_CALIBRATION = const(0x05)
# pylint: enable=bad-whitespace
def _to_signed(num):
if num > 0x7FFF:
num -= 0x10000
return num
class INA226:
"""Driver for the INA226 current sensor"""
def __init__(self, i2c_device, addr=0x40):
self.i2c_device = i2c_device
self.i2c_addr = addr
self.buf = bytearray(2)
# Multiplier in mA used to determine current from raw reading
self._current_lsb = 0
# Multiplier in W used to determine power from raw reading
self._power_lsb = 0
# Set chip to known config values to start
self._cal_value = 4096
self.set_calibration()
def _write_register(self, reg, value):
self.buf[0] = (value >> 8) & 0xFF
self.buf[1] = value & 0xFF
self.i2c_device.writeto_mem(self.i2c_addr, reg, self.buf)
def _read_register(self, reg):
self.i2c_device.readfrom_mem_into(self.i2c_addr, reg & 0xff, self.buf)
value = (self.buf[0] << 8) | (self.buf[1])
return value
@property
def shunt_voltage(self):
"""The shunt voltage (between V+ and V-) in Volts (so +-.327V)"""
value = _to_signed(self._read_register(_REG_SHUNTVOLTAGE))
# The least signficant bit is 10uV which is 0.00001 volts
return value * 0.00001
@property
def bus_voltage(self):
"""The bus voltage (between V- and GND) in Volts"""
raw_voltage = self._read_register(_REG_BUSVOLTAGE)
# voltage in millVolt is register content multiplied with 1.25mV/bit
voltage_mv = raw_voltage * 1.25
# Return Volts instead of milliVolts
return voltage_mv * 0.001
@property
def current(self):
"""The current through the shunt resistor in milliamps."""
# Sometimes a sharp load will reset the INA219, which will
# reset the cal register, meaning CURRENT and POWER will
# not be available ... athis by always setting a cal
# value even if it's an unfortunate extra step
self._write_register(_REG_CALIBRATION, self._cal_value)
# Now we can safely read the CURRENT register!
raw_current = _to_signed(self._read_register(_REG_CURRENT))
return raw_current * self._current_lsb
@property
def power(self):
# INA226 stores the calculated power in this register
raw_power = _to_signed(self._read_register(_REG_POWER))
# Calculated power is derived by multiplying raw power value with the power LSB
return raw_power * self._power_lsb
# Example calculations for calibration register value, current LSB and power LSB
# 1. Assuming a 100milliOhm resistor as shunt
# RSHUNT = 0.1
#
# 2. Determine current_lsb
# Assuming a maximum expected current of 3.6A
# current_lsb = MaxExpected_I / (2^15)
# current_lsb = 3.6A / (2^15)
# current_lsb = 0.0001098632813
# -> Rounding to "nicer" numbers:
# current_lsb = 0.0001
#
# 3. Setting the power LSB
# power_lsb = 25 * current_lsb
# power_lsb = 25 * 0.0001
# power_lsb = 0.0025
#
# 4. Determine calibration register value
# cal_value = 0.00512 / (RSHUNT * current_lsb)
# cal_value = 0.00512 / (0.1 * 0.0001)
# cal_value = 512
#
#
def set_calibration(self): # pylint: disable=invalid-name
"""Configures to INA226 to be able to measure up to 36V and 2A
of current. Counter overflow occurs at 3.2A.
..note :: These calculations assume a 0.1 shunt ohm resistor"""
self._current_lsb = .0001
self._cal_value = 512
self._power_lsb = .0025
self._write_register(_REG_CALIBRATION, self._cal_value)
config = (_CONFIG_CONST_BITS |
_CONFIG_AVGMODE_512SAMPLES |
_CONFIG_VBUSCT_588us |
_CONFIG_VSHUNTCT_588us |
_CONFIG_MODE_SANDBVOLT_CONTINUOUS)
self._write_register(_REG_CONFIG, config)
def set_calibration_custom(self, calValue=512, config=0x4127):
# Set the configuration register externally by using the hex value for the config register
# Value can be calculated with spreadsheet
# Calibration value needs to be calculated seperately and passed as parameter too
self._cal_value = calValue
self._write_register(_REG_CALIBRATION, self._cal_value)
self._write_register(_REG_CONFIG, config)
# INA226 Config register calculator
# Lists of possible register settings
liste_AVG = [['1', '0x0000'],['4', '0x0200'],['16', '0x0400'],['64', '0x0600'],['128', '0x0800'],['256', '0x0A00'],['512', '0x0C00'],['1024', '0x0E00'] ]
liste_VBUSCT = [['140','0x0000'],['204','0x0040'],['332','0x0080'],['588','0x00C0'],['1100','0x0100'],['2116','0x0140'],['4156','0x0180'],['8244','0x01C0']]
liste_VSHCT = [['140','0x0000'],['204','0x0008'],['332','0x0010'],['588','0x0018'],['1100','0x0020'],['2116','0x0028'],['4156','0x0030'],['8244','0x0038']]
liste_MODE = [['OFF1','0x0000'],['SHV_TRIG','0x0001'],['BUSV_TRIG','0x0002'],['SHBUSV_TRIG','0x0003'],['OFF2','0x0004'],['SHV_CONT','0x0005'],['BUSV_CONT','0x0006'],['SHBUS_CONT','0x0007']]
constant_bits = 0x4000
# Structure of the config register
# constant_bits + AVG + VBUSCT + VSHCT + MODE
def sucher(wert, liste):
for element in liste:
if element[0] == wert:
# print(element)
return element[1]
def listeguck(liste):
for element in liste:
print('{}: {}'.format(element[0], element[1]))
def registern():
listeguck(liste_AVG)
auswahl_avg_input = input('Please input sample number: ')
auswahl_avg = int(sucher(auswahl_avg_input, liste_AVG), 16)
listeguck(liste_VBUSCT)
auswahl_VBUSCT_input = input('Please input VBUS conversion time: ')
auswahl_VBUSCT = int(sucher(auswahl_VBUSCT_input, liste_VBUSCT), 16)
listeguck(liste_VSHCT)
auswahl_VSHCT_input = input('Please input VSHUNT conversion time: ')
auswahl_VSHCT = int(sucher(auswahl_VSHCT_input, liste_VSHCT), 16)
listeguck(liste_MODE)
auswahl_MODE_input = input('Please input operating mode: ')
auswahl_MODE = int(sucher(auswahl_MODE_input, liste_MODE), 16)
print('{} {} {} {} {}'.format(constant_bits, auswahl_avg, auswahl_VBUSCT, auswahl_VSHCT, auswahl_MODE))
register = hex(constant_bits + auswahl_avg + auswahl_VBUSCT + auswahl_VSHCT + auswahl_MODE)
print('Value for the configuration register: ')
print(register)
return register
# print(type(sucher(input('wert: '), liste_MODE)))
status = True
while status:
try:
registern()
weiter = input('Nochmal? (j/n): ')
if weiter == 'n':
status = False
except:
status = False
print('uh-oh!')
from machine import Pin, PWM
class Servo:
# these defaults work for the standard TowerPro SG90
__servo_pwm_freq = 50
__min_u10_duty = 26 - 0 # offset for correction
__max_u10_duty = 123- 0 # offset for correction
min_angle = 0
max_angle = 180
current_angle = 0.001
def __init__(self, pin):
self.__initialise(pin)
def update_settings(self, servo_pwm_freq, min_u10_duty, max_u10_duty, min_angle, max_angle, pin):
self.__servo_pwm_freq = servo_pwm_freq
self.__min_u10_duty = min_u10_duty
self.__max_u10_duty = max_u10_duty
self.min_angle = min_angle
self.max_angle = max_angle
self.__initialise(pin)
def move(self, angle):
# round to 2 decimal places, so we have a chance of reducing unwanted servo adjustments
angle = round(angle, 2)
# do we need to move?
if angle == self.current_angle:
return
self.current_angle = angle
# calculate the new duty cycle and move the motor
duty_u10 = self.__angle_to_u10_duty(angle)
self.__motor.duty(duty_u10)
def __angle_to_u10_duty(self, angle):
return int((angle - self.min_angle) * self.__angle_conversion_factor) + self.__min_u10_duty
def __initialise(self, pin):
self.current_angle = -0.001
self.__angle_conversion_factor = (self.__max_u10_duty - self.__min_u10_duty) / (self.max_angle - self.min_angle)
self.__motor = PWM(Pin(pin))
self.__motor.freq(self.__servo_pwm_freq)
Missing at the moment
import network
import urequests
import utime
import time
from time import sleep
import machine
import esp32
from machine import Pin, ADC, lightsleep, deepsleep, Timer, SoftI2C
import ina226
# Configure GPIO pins
gpio_pins = [4, 14, 16, 17, 18, 19, 23, 25, 26, 27, 32, 34, 35, 36, 37, 38, 39]
# Set unused GPIO pins as inputs with pull-down resistors
for pin in gpio_pins:
try:
machine.Pin(pin, machine.Pin.IN, machine.Pin.PULL_DOWN)
except ValueError:
print("Pin", pin, "cannot be configured as GPIO.")
# Disable Bluetooth
machine.Pin(0, machine.Pin.OUT).value(0)
# Initialize I2C
i2c = machine.SoftI2C(scl=machine.Pin(22), sda=machine.Pin(21))
# Initialize INA226 devices
ina1 = None
ina2 = None
I2C1_connected = False
I2C2_connected = False
# Initialize LED pin
led = Pin(2, Pin.OUT)
# Define the timer for blinking LED
timer1 = Timer(0)
def blink(timer):
led.value(not led.value()) # Toggle LED state
# Blink LED
led.value(1)
# Define the ADC pins
adc_pins = [33]
# Create an ADC object for each pin
adc1 = machine.ADC(machine.Pin(33, machine.Pin.IN))
adc1.atten(ADC.ATTN_11DB)
# InfluxDB Cloud settings
INFLUXDB_URL = 'https://eastus-1.azure.cloud2.influxdata.com/api/v2/write?org="<Your Organization>"&bucket="<Your Bucket>"&precision=s'
INFLUXDB_TOKEN = 'Your Token'
# Define the names and passwords of your WiFi networks
WIFI_NETWORKS = [
('SSID1', 'PW1'),
('SSID2', 'PW2'),
('SSID3', 'PW3')
]
def connect_to_wifi():
global ssid, wlan
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
for ssid, password in WIFI_NETWORKS:
try:
wlan.disconnect()
wlan.connect(ssid, password)
start_time = utime.time()
while not wlan.isconnected():
if (utime.time() - start_time) > 10: # Wait for 10 seconds
print("Timeout exceeded while connecting to {}".format(ssid))
break
utime.sleep(0.1)
else:
print("Connected to WiFi network:", ssid)
return True, ssid
except Exception as e:
print("Failed to connect to WiFi network:", ssid, e)
continue
return False, None
def connect_devices():
global I2C1_connected, I2C2_connected, ina1, ina2,
try:
ina1 = ina226.INA226(i2c, 0x40)
# ina1.set_calibration_custom(calValue=5120, config=0x4427)
I2C1_connected = True
except Exception as e:
print("Failed to connect to I2C Battery Device 1")
I2C1_connected = False
try:
ina2 = ina226.INA226(i2c, 0x44)
# ina2.set_calibration_custom(calValue=5120, config=0x4427)
I2C2_connected = True
except Exception as e:
print("Failed to connect to I2C Solar Device 2")
I2C2_connected = False
# Constants
light_sleep_time = 1 # minutes for lightsleep time
watchdog_timeout = 5 # minutes for timeout watchdog timer
def send_sensor_data():
# Define variables for pressure, current, and voltage
pressure_samples = []
battery_current_samples = []
battery_voltage_samples = []
battery_power_samples = []
solar_voltage_samples = []
solar_current_samples = []
solar_power_samples = []
temperature_samples = []
try:
for _ in range(3):
adc1_value = adc1.read()
voltage1 = adc1_value * (3.9 / 4095)
pressure = ((voltage1 - 0.5)) * (2.4/7) * 100.0
pressure_samples.append(pressure)
time.sleep(0.5) # Delay for 0.5 seconds
pressure = sum(pressure_samples) / len(pressure_samples)
print("Pressure: " + str(pressure) + " %")
except Exception as e:
print("Failed to collect pressure data from Device")
pressure = 999
try:
if I2C1_connected:
for _ in range(3):
battery_current_samples.append(ina1.current)
battery_voltage_samples.append(ina1.bus_voltage)
battery_power_samples.append(ina1.power)
time.sleep(0.5) # Delay for 0.5 seconds
battery_current = sum(battery_current_samples) / len(battery_current_samples)
battery_voltage = sum(battery_voltage_samples) / len(battery_voltage_samples)
battery_power = sum(battery_power_samples) / len(battery_power_samples)
print("Battery Current: " + str(battery_current) + " mA")
print("Battery Voltage:" + str(battery_voltage) + " VDC")
print("Battery Power: " + str(battery_power) + " W")
else:
raise Exception("Failed to connect to INA1")
except Exception as e:
print("Failed to collect battery data from Device")
battery_current = 999
battery_voltage = 999
battery_power = 999
try:
if I2C2_connected:
for _ in range(3):
solar_current_samples.append(ina2.current)
solar_voltage_samples.append(ina2.bus_voltage)
solar_power_samples.append(ina2.power)
time.sleep(0.5) # Delay for 0.5 seconds
solar_current = sum(solar_current_samples) / len(solar_current_samples)
solar_voltage = sum(solar_voltage_samples) / len(solar_voltage_samples)
solar_power = sum(solar_power_samples) / len(solar_power_samples)
print("Solar Current: " + str(solar_current) + " mA")
print("Solar Voltage: " + str(solar_voltage) + " VDC")
print("Solar Power: " + str(solar_power) + " W")
else:
raise Exception("Failed to connect to INA2")
except Exception as e:
print("Failed to collect solar data from Device")
solar_current = 999
solar_voltage = 999
solar_power = 999
try:
temperature_samples = [esp32.raw_temperature() for _ in range(3)]
temperature = (sum(temperature_samples) / len(temperature_samples) - 32) * 5 / 9
print("CPU Temperature: " + str(temperature) + " C")
except Exception as e:
print("Failed to collect temperature data from Device")
temperature = 999
# Create list of sensor data
sensor_data = [
{'name': 'PRESSURE_TEST', 'tags': {'location': 'Test'}, 'fields': {'value': str(pressure)}},
{'name': 'BATT_CURRENT_TEST', 'tags': {'location': 'Test'}, 'fields': {'value': str(battery_current)}},
{'name': 'SOLAR_CURRENT_TEST', 'tags': {'location': 'Test'}, 'fields': {'value': str(solar_current)}},
{'name': 'SOLAR_VOLT_TEST', 'tags': {'location': 'Test'}, 'fields': {'value': str(solar_voltage)}},
{'name': 'BATT_VOLT_TEST', 'tags': {'location': 'Test'}, 'fields': {'value': str(battery_voltage)}},
{'name': 'BATT_POWER_TEST', 'tags': {'location': 'Test'}, 'fields': {'value': str(battery_power)}},
{'name': 'SOLAR_POWER_TEST', 'tags': {'location': 'Test'}, 'fields': {'value': str(solar_power)}},
{'name': 'CPUTEMPERATURE_TEST', 'tags': {'location': 'Test'}, 'fields': {'value': str(temperature)}},
]
# Send data for each sensor and track whether each one was successful
all_sent = True
for data in sensor_data:
# Create InfluxDB line protocol string
line_protocol = '{measurement},{tags} {fields}'.format(
measurement=data['name'],
tags=','.join(['{}={}'.format(k, v) for k, v in data['tags'].items()]),
fields=','.join(['{}={}'.format(k, v) for k, v in data['fields'].items()])
)
# Send data to InfluxDB Cloud
headers = {'Authorization': 'Token {}'.format(INFLUXDB_TOKEN), 'Content-Type': 'application/octet-stream'}
try:
response = urequests.post(INFLUXDB_URL, headers=headers, data=line_protocol.encode('utf-8'))
if response.status_code != 204:
print('Error sending data to InfluxDB Cloud for {}: {}'.format(data['name'], response.content))
all_sent = False
response.close()
print('Data sent for {}'.format(data['name']))
except:
print('Error sending data to InfluxDB Cloud for {}'.format(data['name']))
all_sent = False
return all_sent
ssid = None
print("Connecting to WiFi network...")
connected = connect_to_wifi()
connect_devices()
# Set the watchdog timer to trigger a reset if the device becomes unresponsive longer than setpoint
wdt = machine.WDT(timeout=watchdog_timeout * 60 * 1000)
# Initialize a counter to keep track of consecutive failed data transmissions
failed_transmissions = 0
wifi_attempts = 0
while True:
# Reset the watchdog timer at the start of each loop iteration
wdt.feed()
if not I2C1_connected or not I2C2_connected:
# Attempt to reconnect devices
connect_devices()
# Connect to WiFi network
if connected:
print("Connected to WiFi network:", ssid)
if not connected:
print("Connecting to WiFi network...")
connect_to_wifi()
print('Failed to connect to WiFi network')
wifi_attempts += 1
sleep(10)
print('WiFi Connection attempts: ' + str(wifi_attempts))
# Check if the number of consecutive failed WiFi attempts has reached a certain threshold
if wifi_attempts >= 5:
print("Reached the maximum number of consecutive failed WiFi attempts, rebooting the device...")
machine.reset()
else:
wifi_attempts = 0
sleep(5)
# Collect and send sensor data
try:
data_sent = send_sensor_data()
except Exception as e:
print("Failed to collect data from I2C Device", str(e))
failed_transmissions += 1
sleep(30)
continue
if not data_sent:
print('Failed to send data to InfluxDB Cloud')
failed_transmissions += 1
print("Failed transmissions = " + str(failed_transmissions))
sleep(30)
if failed_transmissions >= 5:
print("Reached the maximum number of consecutive failed transmissions, rebooting the device...")
machine.reset()
continue
# Data sent successfully, blink LED
try:
timer1.init(period=500, mode=Timer.PERIODIC, callback=blink)
sleep(10)
finally:
timer1.deinit()
# Reset the failed transmissions counter
failed_transmissions = 0
# print("Light sleep...")
sleep(3)
# Light sleep for setpoint duration
machine.lightsleep(light_sleep_time * 60 * 1000)
Comments