Andreas Motzek
Published © CC BY

Small Things can help to fix our Planet

Here is a small thing that can help to use more electricity when it is produced by renewable sources.

IntermediateFull instructions provided1 hour2,204

Things used in this project

Hardware components

ATOM Matrix ESP32 Development Kit
M5Stack ATOM Matrix ESP32 Development Kit
×1

Software apps and online services

MicroPython Firmware for ESP32 v1.19.1
Download esp32-20220618-v1.19.1.bin from the download page.
Renewable Electricity Production in Germany
This webservice returns an estimation about the share of renewable energy sources in electricity production in Germany.

Story

Read more

Code

main.py

Python
Fill in your WLAN credentials in line 25 and a host name in line 23.
import uos
import utime
from cooperative_multitasking import Tasks
from network import WLAN, AP_IF, STA_IF
import usocket
import ssl
import ure
import ujson
from machine import Pin
from neopixel import NeoPixel
from font5 import Font5
from neopixel_scroller import NeopixelScroller

tasks = Tasks()

wlan = WLAN(AP_IF)
wlan.active(False)
wlan = None
utime.sleep(1)

wlan = WLAN(STA_IF)
wlan.active(True)
utime.sleep(3)
wlan.config(dhcp_hostname = '...')

ssid_passwords = [('...', '...')]
service_hostname = 'smard-renewables-service-3hbx73e5ra-ey.a.run.app'

def connect_wlan():
    try:
        if wlan.isconnected():
            tasks.now(send_request)
            return
        access_points = wlan.scan()
        for ssid_password in ssid_passwords:
            ssid = ssid_password[0]
            for access_point in access_points:
                if access_point[0] == bytes(ssid, 'ascii'):
                    wlan.connect(ssid, ssid_password[1])
                    tasks.after(15000, send_request)
                    return
    except:
        pass
    tasks.after(5000, connect_wlan)


socket = None
stream = None

def send_request():
    global socket, stream
    try:
        socket = usocket.socket()
        try:
            socket.connect(usocket.getaddrinfo(service_hostname, 443)[0][-1])
            stream = ssl.wrap_socket(socket)
            stream.write('GET / HTTP/1.0\r\n')
            stream.write('Host: ')
            stream.write(service_hostname)
            stream.write('\r\n')
            stream.write('\r\n')
            tasks.after(500, receive_response)
            return
        except:
            socket.close()
            socket = None
            stream = None
    except:
        pass
    tasks.after(30000, connect_wlan)

def read_line(stream):
    return str(stream.readline(), 'ascii')

def receive_response():
    global socket, stream
    try:
        line = read_line(stream)
        while len(line) > 2:
            line = read_line(stream)
            set_local_time(line)
        set_share_renewables(stream)
        tasks.now(create_scroller, priority=1)
        tasks.now(scroll_message)
        tasks.after(60000, update_scroller, priority=1)
        tasks.after(120000, update_scroller, priority=1)
        tasks.after(180000, update_scroller, priority=1)
        tasks.after(240000, update_scroller, priority=1)
        return
    except:
        pass
    finally:
        socket.close()
        socket = None
        stream = None
    tasks.after(30000, connect_wlan)


hour = None
minute = None
share_renewables = None
regexp = ure.compile('Date: (Mon|Tue|Wed|Thu|Fri|Sat|Sun), ([0-9]+) (Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec) ([0-9]+) ([0-9]+):([0-9]+):([0-9]+) GMT')
offset_by_day_of_week = {'Mon': 6, 'Tue': 5, 'Wed': 4, 'Thu': 3, 'Fri': 2, 'Sat': 1, 'Sun': 0}

def last_sunday(day_of_week, day):
    offset = offset_by_day_of_week[day_of_week]
    last_sun = day + offset
    while last_sun <= 31:
        last_sun += 7
    return last_sun - 7

def is_daylight_saving(day_of_week, day, month, hour):
    if month in ('Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep'):
        return True
    if month in ('Jan', 'Feb', 'Nov', 'Dec'):
        return False
    last_sun = last_sunday(day_of_week, day)
    if day < last_sun:
        return month == 'Oct'
    if day > last_sun:
        return month == 'Mar'
    return (hour > 1) == (month == 'Mar')

def set_local_time(line):
    global hour, minute
    match = regexp.match(line)
    if match is not None:
        day_of_week = match.group(1)
        day = int(match.group(2))
        month = match.group(3)
        hour = int(match.group(5))
        minute = int(match.group(6))
        hour += 2 if is_daylight_saving(day_of_week, day, month, hour) else 1

def set_share_renewables(stream):
    global share_renewables
    payload = ujson.loads(read_line(stream))
    value = payload['Share Renewables']
    share_renewables = None if value is None else int(100 * value)

def clear_response():
    global hour, minute, share_renewables
    hour = None
    minute = None
    share_renewables = None


gpio27 = Pin(27, Pin.OUT)
neopixels = NeoPixel(gpio27, 25)
font = Font5()
colors = [(30, 0, 0), (25, 25, 0), (0, 30, 0), (0, 0, 50)]
scroller = None
count = 0

def create_scroller():
    global scroller
    message = ' '
    if hour is not None and minute is not None:
        message += str(hour)
        message += ':'
        if minute < 10:
            message += '0'
        message += str(minute)
    if share_renewables is not None:
        message += ' '
        message += str(share_renewables)
        message += '%'
    color = random_color()
    scroller = NeopixelScroller(neopixels, message, font, foreground_color=color)

def update_scroller():
    global minute, hour
    if hour is not None and minute is not None:
        minute += 1
        if minute >= 60:
            minute = 0
            hour += 1
        if hour >= 24:
            hour = 0
        create_scroller()

def scroll_message():
    global count
    scroller.scroll()
    neopixels.write()
    count += 1
    if count < 900:
        tasks.after(330, scroll_message)
    else:
        count = 0
        tasks.now(clear_response, priority=1)
        tasks.now(clear_display)
        tasks.now(send_request)

def clear_display():
    global scroller
    scroller = None
    for pixel_index in range(25):
        neopixels[pixel_index] = (0, 0, 0)
    neopixels.write()

def randint(low, high):
    s = 0
    bs = uos.urandom(4)
    for b in bs:
        s = (s << 8) | b
    return low + (s % (high - low + 1))

def random_color():
    return colors[randint(0, len(colors) - 1)]


tasks.after(3000, connect_wlan)

while tasks.available():
    tasks.run()

Credits

Andreas Motzek

Andreas Motzek

16 projects • 9 followers
I like algorithms, code, statistics and decisions based on them, solving difficult problems and, if possible, avoiding them elegantly.

Comments