import pygame
from pygame.locals import *
from time import sleep
import time
import sys, time
import cloud4rpi
import rpi
import RPi.GPIO as GPIO
import board
import busio
import VL53L0X
import adafruit_ads1x15.ads1115 as ADS
from adafruit_ads1x15.analog_in import AnalogIn
# GPIO for Sensor 1 shutdown pin
sensor1_shutdown = 27
# GPIO for Sensor 2 shutdown pin
sensor2_shutdown = 22
GPIO.setwarnings(False)
# Setup GPIO for shutdown pins on each VL53L0X
GPIO.setmode(GPIO.BCM)
GPIO.setup(sensor1_shutdown, GPIO.OUT)
GPIO.setup(sensor2_shutdown, GPIO.OUT)
#Set all shutdown pins low to turn off each VL53L0X
GPIO.output(sensor1_shutdown, GPIO.LOW)
GPIO.output(sensor2_shutdown, GPIO.LOW)
# Keep all low for 500 ms or so to make sure they reset
time.sleep(0.5)
# Create one object per VL53L0X passing the address to give to
# each.
tof = VL53L0X.VL53L0X(address=0x2B)
tof1 = VL53L0X.VL53L0X(address=0x2D)
GPIO.output(sensor1_shutdown, GPIO.HIGH)
time.sleep(0.5)
tof.start_ranging(VL53L0X.VL53L0X_BETTER_ACCURACY_MODE)
# Set shutdown pin high for the second VL53L0X then
# call to start ranging
GPIO.output(sensor2_shutdown, GPIO.HIGH)
time.sleep(0.5)
tof1.start_ranging(VL53L0X.VL53L0X_BETTER_ACCURACY_MODE)
# Create the I2C bus
i2c = busio.I2C(board.SCL, board.SDA)
# Create the ADC object using the I2C bus
ads1 = ADS.ADS1115(i2c, address=0x48)
# Create single-ended input on channel 0
chan0 = AnalogIn(ads1, ADS.P0)
chan1 = AnalogIn(ads1, ADS.P1)
chan2 = AnalogIn(ads1, ADS.P2)
chan3 = AnalogIn(ads1, ADS.P3)
#Put your device token here. To get the token,
# sign up at https://cloud4rpi.io and create a device.
DEVICE_TOKEN = 'YOUR CODE HERE'
# Constants
Tank_H = 30 #Tank height in inches
Sump_H = 16 #Sump Depth in inches
# Change these values depending on your requirements.
DATA_SENDING_INTERVAL = 600 # secs
DIAG_SENDING_INTERVAL = 600 # secs
POLL_INTERVAL = 0.5 # 500 ms
pygame.init()
screenDimentions = (1024, 600)
screen = pygame.display.set_mode(screenDimentions)
pygame.display.set_caption('WATER SYSTEM MONITOR')
view_mode = 'Normal'
done = False
# setup font size
FONTSIZE = 22
LINEHEIGHT = 18
basicFont = pygame.font.SysFont(None, FONTSIZE)
Alarm = ''
# setup font colors
BLACK = (255,255,255)
WHITE = (0,0,0)
GREY = (200,200,200)
DKGREY = (169,169,169)
RED = (255, 0, 0)
BLUE = (0, 0, 255)
GRAY = (0, 0, 55)
# Background
bg = pygame.image.load('bg4.png')
screen.blit(bg,(0,0))
alarm_bg = pygame.image.load('Alarm.png')
# Button Class
class Button():
def __init__(self, txt, location, action, bg=DKGREY, fg=WHITE, size=(80, 30), font_name="Segoe Print", font_size=16):
self.color = bg # the static (normal) color
self.bg = bg # actual background color, can change on mouseover
self.fg = fg # text color
self.size = size
self.font = pygame.font.SysFont(font_name, font_size)
self.txt = txt
self.txt_surf = self.font.render(self.txt, 1, self.fg)
self.txt_rect = self.txt_surf.get_rect(center=[s//2 for s in self.size])
self.surface = pygame.surface.Surface(size)
self.rect = self.surface.get_rect(center=location)
self.call_back_ = action
def draw(self):
self.mouseover()
self.surface.fill(self.bg)
self.surface.blit(self.txt_surf, self.txt_rect)
screen.blit(self.surface, self.rect)
def mouseover(self):
self.bg = self.color
pos = pygame.mouse.get_pos()
if self.rect.collidepoint(pos):
self.bg = GREY # mouseover color
def call_back(self):
self.call_back_()
def DrawBar(pos, size, borderC, barC):
pygame.draw.rect(screen, borderC, (*pos, *size), 1)
innerPos = (pos[0]+3, pos[1]+3)
innerSize = ((size[0]-6) * size[1]-6)
def mousebuttondown():
pos = pygame.mouse.get_pos()
# for button in buttons:
# if button.rect.collidepoint(pos):
# button.call_back()
def drawText(surface, text, color, rect, font, aa=False, bkg=None):
rect = Rect(rect)
y = rect.bottom
lineSpacing = -2
# get the height of the font
fontHeight = font.size("Tg")[1]
while text:
i = 1
# determine if the row of text will be outside our area
if y + fontHeight > rect.bottom:
break
# determine maximum width of line
while font.size(text[:i])[0] < rect.width and i < len(text):
i += 1
# if we've wrapped the text, then adjust the wrap to the last word
if i < len(text):
i = text.rfind(" ", 0, i) + 1
# render the line and blit it to the surface
if bkg:
image = font.render(text[:i], 1, color, bkg)
image.set_colorkey(bkg)
else:
image = font.render(text[:i], aa, color)
screen.blit(image, (rect.left, y))
y += fontHeight + lineSpacing
# remove the text we just blitted
text = text[i:]
return text
class Bar():
def __init__(self, rect, bar = BLUE, outline = GRAY):
self.rect = pygame.Rect(rect)
self.bar = bar
self.outline = outline
self.value = 0
def draw(self, surf):
length = round(self.value * self.rect.height / 100)
top = self.rect.height - length
pygame.draw.rect(surf, self.bar, (self.rect.x, self.rect.y + top, self.rect.width, length))
pygame.draw.rect(surf, self.outline, self.rect, 2)
txt =basicFont .render((str(round(self.value, 2)) + ' %'), True, GRAY)
txt_rect = txt.get_rect(bottomleft = (self.rect.x + 30, self.rect.y + 125))
screen.blit(txt, txt_rect)
def draw1(self, surf):
length = round(self.value * self.rect.height / 100)
top = self.rect.height - length
pygame.draw.rect(surf, self.bar, (self.rect.x, self.rect.y + top, self.rect.width, length))
pygame.draw.rect(surf, self.outline, self.rect, 2)
txt =basicFont .render((str(round(self.value, 2)) + ' %'), True, GRAY)
txt_rect = txt.get_rect(bottomleft = (self.rect.x + 230, self.rect.y + 175))
screen.blit(txt, txt_rect)
bar = Bar((450, 200, 20, 145))
bar2 = Bar((570, 200, 20, 145))
bar1 = Bar((50, 392, 207, 198))
class UltraSonic():
# Ultrasonic sensor class
def __init__(self, TRIG, ECHO, offset = 0.5):
# Create a new sensor instance
self.TRIG = TRIG
self.ECHO = ECHO
self.offset = offset # Sensor calibration factor
GPIO.setmode(GPIO.BCM)
GPIO.setup(self.TRIG, GPIO.OUT) # Set pin as GPIO output
GPIO.setup(self.ECHO, GPIO.IN) # Set pin as GPIO input
def __str__(self):
# Return string representation of sensor
return "Ultrasonic Sensor: TRIG - {0}, ECHO - {1}, Offset: {2} cm".format(self.TRIG, self.ECHO, self.offset)
def ping(self):
maxTime = 0.04
# Get distance measurement
GPIO.output(self.TRIG, GPIO.LOW) # Set TRIG LOW
sleep(0.2) # Min gap between measurements
# Create 10 us pulse on TRIG
GPIO.output(self.TRIG, GPIO.HIGH) # Set TRIG HIGH
sleep(0.00001) # Delay 10 us
GPIO.output(self.TRIG, GPIO.LOW) # Set TRIG LOW
# Measure return echo pulse duration
pulse_start = time.time()
timeout = pulse_start + maxTime
while GPIO.input(self.ECHO) == GPIO.LOW and pulse_start < timeout: # Wait until ECHO is LOW
pulse_start = time.time() # Save pulse start time
pulse_end = time.time()
timeout = pulse_end + maxTime
while GPIO.input(self.ECHO) == GPIO.HIGH and pulse_end < timeout: # Wait until ECHO is HIGH
pulse_end = time.time() # Save pulse end time
pulse_duration = pulse_end - pulse_start
# Distance = 17160.5 * Time (unit cm) at sea level and 20C
distance = pulse_duration * 17160.5 # Calculate distance
distance = round(distance, 2) # Round to two decimal points
if distance > 2 and distance < 400: # Check distance is in sensor range
distance = distance + self.offset
#print("Distance: ", distance," cm")
else:
distance = 0
#print("No obstacle") # Nothing detected by sensor
return distance
def calibrate(self):
# Calibrate sensor distance measurement
while True:
self.ping()
response = input("Enter Offset (q = quit): ")
if response == __QUIT:
break;
sensor.offset = float(response)
print(sensor)
@staticmethod
def low_pass_filter(value, previous_value, beta):
# Simple infinite-impulse-response (IIR) single-pole low-pass filter.
# = discrete-time smoothing parameter (determines smoothness). 0 < < 1
# LPF: Y(n) = (1-)*Y(n-1) + (*X(n))) = Y(n-1) - (*(Y(n-1)-X(n)))
smooth_value = previous_value - (beta * (previous_value - value))
return smooth_value
sensor = UltraSonic(21, 20) # create a new sensor instance on GPIO pins 9 & 10
print(sensor)
def SLTLEVEL1():
distance = tof.get_distance()
SLTLEVEL1 = round((1000 - distance)/700*100,1)
return(SLTLEVEL1)
def SLTLEVEL2():
distance1 = tof1.get_distance()
SLTLEVEL2 = round((1000 - distance1)/700*100,1)
return(SLTLEVEL2)
def PRESS():
PRESS = round(((chan0.voltage -0.1) * 27),1)
return (PRESS)
def filtered_value():
beta = .75
filtered_value = 2
filtered_value = sensor.low_pass_filter(sensor.ping(), filtered_value, float(beta))
filtered_value = round(filtered_value, 2)
#print("Filtered: ", filtered_value, " cm")
return (filtered_value)
def SUMPLEVEL():
SUMPLEVEL = round(((15) - (filtered_value() - 15))/(15) * 100,1)
return(SUMPLEVEL)
def MAX():
MAX = 100
return(MAX)
def MIN():
MIN = 0.0
return(MIN)
def main():
# Put variable declarations here
# Available types: 'bool', 'numeric', 'string', 'location'
variables = {
'Salt Level Tank 1': {
'type': 'numeric',
'bind': SLTLEVEL1
},
'Salt Level Tank 2': {
'type': 'numeric',
'bind': SLTLEVEL2
},
'Sump Level': {
'type': 'numeric',
'bind': SUMPLEVEL
},
'Water Pressure': {
'type': 'numeric',
'bind': PRESS
},
'Trend Max': {
'type': 'numeric',
'bind': MAX
},
'Trend Min': {
'type': 'numeric',
'bind': MIN
},
'CPU Temp': {
'type': 'numeric',
'bind': rpi.cpu_temp
},
}
diagnostics = {
'CPU Temp': rpi.cpu_temp
}
device = cloud4rpi.connect(DEVICE_TOKEN)
try:
device.declare(variables)
device.declare_diag(diagnostics)
device.publish_config()
# Adds a 1 second delay to ensure device variables are created
sleep(1)
data_timer = 0
diag_timer = 0
while True:
screen.blit(bg,(0,0))
########################################################################
# Buttons
########################################################################
# button_01 = Button("MAN FLUSH", (685, 500), manual_flush_membrane)
# button_02 = Button("RINSE", (795, 500), manual_rinse_membrane)
# button_03 = Button("RESET", (400, 25), reset_totalizer)
# button_04 = Button("RESET", (300, 25), reset_soft_totalizer)
# buttons = [button_01, button_02, button_03, button_04]
# for button in buttons:
# button.draw()
#Draw Level indicators
bar.value = SLTLEVEL1()
bar2.value = SLTLEVEL2()
bar1.value = SUMPLEVEL()
bar.draw(screen)
bar2.draw(screen)
bar1.draw1(screen)
text = basicFont.render("SUPPLY", True, WHITE, BLACK)
textRect = text.get_rect()
screen.blit(text, (525, 40 + LINEHEIGHT))
text = basicFont.render((str(PRESS()) + ' PSI'), True, WHITE, BLACK)
textRect = text.get_rect()
screen.blit(text, (525, 57 + LINEHEIGHT))
# Setup Alarms
if SLTLEVEL1() <= 20:
screen.blit(alarm_bg,(0,0))
text = basicFont.render("LOW SALT LEVEL1 ALARM", True, WHITE, BLACK)
textRect = text.get_rect()
screen.blit(text, (75, 35 - LINEHEIGHT))
view_mode = 'Alarm'
else:
view_mode = 'Normal'
if SLTLEVEL2() <= 20:
screen.blit(alarm_bg,(0,0))
text = basicFont.render("LOW SALT LEVEL2 ALARM", True, WHITE, BLACK)
textRect = text.get_rect()
screen.blit(text, (75, 35 - LINEHEIGHT))
view_mode = 'Alarm'
else:
view_mode = 'Normal'
if SUMPLEVEL() >= 85:
screen.blit(alarm_bg,(0,0))
text = basicFont.render("HIGH SUMP LEVEL ALARM", True, WHITE, BLACK)
textRect = text.get_rect()
screen.blit(text, (75, 35 - LINEHEIGHT))
view_mode = 'Alarm'
else:
view_mode = 'Normal'
if PRESS() <= 20:
view_mode = 'Alarm'
text = basicFont.render("LOW WATER PRESSURE ALARM", True, WHITE, BLACK)
textRect = text.get_rect()
screen.blit(text, (75, 35 - LINEHEIGHT))
else:
view_mode = 'Normal'
if data_timer <= 0:
device.publish_data()
data_timer = DATA_SENDING_INTERVAL
if diag_timer <= 0:
device.publish_diag()
diag_timer = DIAG_SENDING_INTERVAL
sleep(POLL_INTERVAL)
diag_timer -= POLL_INTERVAL
data_timer -= POLL_INTERVAL
# print(SLTLEVEL1())
# print(SLTLEVEL2())
# print(SUMPLEVEL(), '%')
# print(PRESS(), ' PSI')
# print(SLTLEVEL1(),SLTLEVEL2())
# print(PRESS())
# time.sleep(1.0)
pygame.display.flip()
for event in pygame.event.get():
if event.type == pygame.QUIT:
GPIO.cleanup
pygame.quit()
sys.exit()
if event.type == pygame.MOUSEBUTTONDOWN:
mousebuttondown()
# for button in buttons:
# button.draw()
except Exception as e:
error = cloud4rpi.get_error_message(e)
cloud4rpi.log.exception("ERROR! %s %s", error, sys.exc_info()[0])
finally:
GPIO.cleanup
pygame.quit()
sys.exit()
if __name__ == '__main__':
main()
Comments