import network
import time
import urequests
import esp32
import machine
from DFRobot_MAX17043 import DFRobot_MAX17043
from time import sleep
# Define global variables
battery_percent = 0
battery_voltage = 0
level = 0
temperature = 0
connected = False
ssid = None
failed_transmissions = 0
wifi_attempts = 0
sleep_time = 60 # minutes for deepsleep time
watchdog_timeout = 10 # minutes for timeout watchdog timer
#Level transmitter calibraiton values
adc_min = 1825
adc_max = 1995
comp1 = 72.5
comp2 = 20
sda_pin = 23
scl_pin = 19
i2c_pin = 26
lvl_pin = 25
i2c_pwr_pin = machine.Pin(i2c_pin, machine.Pin.OUT)
lvl_pwr_pin = machine.Pin(lvl_pin, machine.Pin.OUT)
def interruptCallBack(channel):
gauge.clear_interrupt()
print('Low power alert interrupt!')
# Initialize devices
i2c = None
adc1 = None
I2C_connected = False
# Function to turn off I2C
def disable_adc():
lvl_pwr_pin.value(0)
print("ADC Disabled")
# Function to turn off I2C
def disable_i2c():
i2c_pwr_pin.value(0)
print("I2C Disabled")
# InfluxDB Cloud settings
INFLUXDB_URL = ''
INFLUXDB_TOKEN = ''
# Define the names and passwords of your WiFi networks
WIFI_NETWORKS = [
('', ''),
('', ''),
('', '')
]
# Function to connect to WiFi
def connect_to_wifi():
global ssid, wlan
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
for ssid, password in WIFI_NETWORKS:
try:
wlan.disconnect()
wlan.connect(ssid, password)
start_time = time.time()
while not wlan.isconnected():
if (time.time() - start_time) > 10: # Wait for 10 seconds
raise Exception("Timeout exceeded while connecting to {}".format(ssid))
pass
print("Connected to WiFi network:", ssid)
return True, ssid
except Exception as e:
print("Failed to connect to WiFi network:", e)
continue
return False
def connect_devices():
global I2C_connected, i2c, adc1, gauge, rslt
# Enable I2C power
i2c_pwr_pin.value(1)
sleep(2)
print("I2C Enabled")
# Enable ADC power
lvl_pwr_pin = machine.Pin(lvl_pin, machine.Pin.OUT)
lvl_pwr_pin.value(1)
# Initialize ADC
adc1 = machine.ADC(machine.Pin(33, machine.Pin.IN))
adc1.atten(machine.ADC.ATTN_11DB)
print("ADC Enabled")
sleep(1)
try:
gauge = DFRobot_MAX17043()
rslt = gauge.begin()
I2C_connected = True
except Exception as e:
print("Failed to connect to I2C Battery Device")
I2C1_connected = False
if I2C_connected:
print("All I2C devices connected successfully")
return True
else:
return False
def LPT1_level():
global level, adc1_value
try:
adc1_value = adc1.read()
level = ((adc1_value - adc_min) / (adc_max - adc_min)) * comp1 + comp2
# Ensure the calculated level is within the 10% to 85% range
level = min(max(10, level), 85)
# print("Well Level: {} %".format(level))
except Exception as e:
print("Failed to collect level data from Device")
level = 999
return level
def battery():
global battery_percent, battery_voltage
try:
if I2C_connected:
battery_percent = gauge.read_percentage()
# print("Battery ercent: {} mA".format(battery_percent))
battery_voltage = gauge.read_voltage()
# print("Battery Voltage: {} VDC".format(battery_voltage))
else:
raise Exception("Failed to connect to INA1")
except Exception as e:
print("Failed to collect battery data from Device")
battery_percent = 999
battery_voltage = 999
return battery_voltage, battery_percent
def cputemp():
global temperature
try:
cputemp = esp32.raw_temperature()
temperature = (cputemp - 32) * 5 / 9
# print("CPU Temperature: {} C".format(temperature))
except Exception as e:
# print("Failed to collect temperature data from Device")
temperature = 999
return temperature
def deepsleep(sleep_time_minutes):
# Disable Wi-Fi
wlan = network.WLAN(network.STA_IF)
wlan.active(False)
print("WiFi Disabled")
disable_i2c()
disable_adc()
# Convert sleep time to milliseconds (1 minute = 60000 milliseconds)
sleep_time_ms = sleep_time_minutes * 60000
print(" Enering Deepsleep...")
sleep(3)
# Enter deep sleep mode with the specified sleep time
machine.deepsleep(sleep_time_ms)
def print_data():
print("LPT Level: {} %".format(level))
print("ADC Value: {}".format(adc1_value))
print("Battery Percent: {} %".format(battery_percent))
print("Battery Voltage: {} mA".format(battery_voltage))
print("CPU Temperature: {} C".format(temperature))
def send_sensor_data():
print("Sending sensor data")
# Create list to store line protocol strings for all samples
all_samples = []
# Create list of sensor data
sensor_data = [
{'measurement': 'Sensor_Data', 'tags': {'location': 'LPT1'}, 'fields': {'LPT1_Level': None}},
{'measurement': 'Sensor_Data', 'tags': {'location': 'LPT1'}, 'fields': {'Battery_Percent': None}},
{'measurement': 'Sensor_Data', 'tags': {'location': 'LPT1'}, 'fields': {'Battery_Voltage': None}},
{'measurement': 'Sensor_Data', 'tags': {'location': 'LPT1'}, 'fields': {'CPUTemperature': None}},
]
# Take three samples and average them for each sensor (except for the ones that don't need averaging)
for i in range(3):
# Take samples and average them (for the relevant sensors)
level_samples = [LPT1_level() for _ in range(3)]
battery_samples = [battery() for _ in range(3)]
temperature_samples = [cputemp() for _ in range(3)]
for j in range(3):
level_samples.append(LPT1_level())
sleep(0.5)
# Average samples (for the relevant sensors)
level_avg = sum(level_samples) / len(level_samples)
battery_voltage_avg, battery_percent_avg = zip(*battery_samples)
battery_voltage_avg = sum(battery_voltage_avg) / len(battery_voltage_avg)
battery_percent_avg = sum(battery_percent_avg) / len(battery_percent_avg)
temperature_avg = sum(temperature_samples) / len(temperature_samples)
# Update the respective fields with the averaged values
sensor_data[0]['fields']['LPT1_Level'] = str(level_avg)
sensor_data[1]['fields']['Battery_Percent'] = str(battery_percent_avg)
sensor_data[2]['fields']['Battery_Voltage'] = str(battery_voltage_avg)
sensor_data[3]['fields']['CPUTemperature'] = str(temperature_avg)
# Create InfluxDB line protocol string for each sensor sample
for data in sensor_data:
tags = ','.join(['{}={}'.format(k, v) for k, v in data['tags'].items()])
fields = ','.join(['{}={}'.format(k, v) for k, v in data['fields'].items()])
line_protocol = '{measurement},{tags} {fields}'.format(
measurement=data['measurement'],
tags=tags,
fields=fields
)
# Append line protocol string to the list
all_samples.append(line_protocol)
# Join all line protocol strings into a single string
batch_line_protocol = '\n'.join(all_samples)
# Send data to InfluxDB Cloud in a single request
headers = {'Authorization': 'Token {}'.format(INFLUXDB_TOKEN), 'Content-Type': 'application/octet-stream'}
try:
response = urequests.post(INFLUXDB_URL, headers=headers, data=batch_line_protocol.encode('utf-8'))
if response.status_code != 204:
print('Error sending data to InfluxDB Cloud: {}'.format(response.content))
return False
response.close()
except Exception as e:
print('Error sending data to InfluxDB Cloud: {}'.format(e))
return False
print("All Data Sent without Errors")
return True
wdt = machine.WDT(timeout=watchdog_timeout * 60 * 1000)
while True:
# print("Connecting to WiFi network...")
connected = connect_to_wifi()
connect_devices()
while True:
# Reset the watchdog timer at the start of each loop iteration
wdt.feed()
if not I2C_connected:
# Attempt to reconnect devices
connect_devices()
if not connected:
print("Connecting to WiFi network...")
connect_to_wifi()
print('Failed to connect to WiFi network')
wifi_attempts += 1
sleep(10)
print('WiFi Connection attempts:', wifi_attempts)
if wifi_attempts >= 5:
print("Reached the maximum number of consecutive failed WiFi attempts, rebooting the device...")
machine.reset()
else:
wifi_attempts = 0
sleep(3)
try:
data_sent = send_sensor_data()
except Exception as e:
print("Failed to collect data from I2C Device:", str(e))
failed_transmissions += 1
sleep(30)
continue
if not data_sent:
print('Failed to send data to InfluxDB Cloud')
failed_transmissions += 1
print("Failed transmissions:", failed_transmissions)
sleep(30)
if failed_transmissions >= 5:
print("Reached the maximum number of consecutive failed transmissions, rebooting the device...")
machine.reset()
continue
print_data()
failed_transmissions = 0
# Machine sleep for setpoint duration
deepsleep(sleep_time)
Comments