Kulshrest Tiwari
Published © GPL3+

RGB MotionSync

Motion-driven RGB LED with Pico-based accelerometer integration.

BeginnerFull instructions provided1 hour22

Things used in this project

Hardware components

SB Components Dual Squary Display 1.54
×1

Software apps and online services

thonny IDE

Story

Read more

Code

led_motion_detection.py

Python
application code
# Import necessary modules
from machine import Pin, SPI, I2C  # For interfacing with hardware components like pins, SPI, and I2C
from time import sleep            # For introducing delays in the program
import st7789                     # Library for interfacing with ST7789 displays
from dualdisplay import IMU       # Importing IMU class for reading accelerometer data
import neopixel                   # Library for controlling NeoPixel LEDs

# Initialize SPI communication for two displays
spi1 = SPI(0, baudrate=40000000, sck=Pin(18), mosi=Pin(19))  # SPI1 for Display 1
spi2 = SPI(1, baudrate=40000000, sck=Pin(10), mosi=Pin(11))  # SPI2 for Display 2

# Initialize ST7789 displays with respective SPI configurations and pins
tft1 = st7789.ST7789(spi1, 240, 240, dc=Pin(6, Pin.OUT), cs=Pin(17, Pin.OUT), 
                     backlight=Pin(7, Pin.OUT), rotation=0)  # Display 1
tft2 = st7789.ST7789(spi2, 240, 240, reset=Pin(12, Pin.OUT), cs=Pin(13, Pin.OUT), 
                     dc=Pin(14, Pin.OUT), backlight=Pin(15, Pin.OUT), rotation=0)  # Display 2

# Initialize both displays
tft1.init()
tft2.init()

# Display an image (logo.jpg) on both displays at coordinates (0,0)
tft1.jpg("img/logo.jpg", 0, 0, st7789.FAST)
tft2.jpg("img/logo.jpg", 0, 0, st7789.FAST)

# Initialize a GPIO pin (Pin 28) to control NeoPixel LEDs
led = Pin(28, Pin.OUT)
num = 4  # Number of NeoPixel LEDs
np = neopixel.NeoPixel(led, num)  # Initialize NeoPixel LED strip

# Initialize IMU (Inertial Measurement Unit) for reading accelerometer data
imudata = IMU()

# Infinite loop to continuously read IMU data and update NeoPixel LEDs
while 1:
    # Read accelerometer data (x, y, z values)
    data = imudata.Read_XYZ()
    
    # Convert raw accelerometer data into integers and take absolute values
    data1 = int(abs(data[3]))  # X-axis data
    data2 = int(abs(data[4]))  # Y-axis data
    data3 = int(abs(data[5]))  # Z-axis data

    # Print the accelerometer data for debugging purposes
    print("x= " + str(data1))
    print("y= " + str(data2))
    print("z= " + str(data3))

    # Limit values to a maximum of 255 (to match NeoPixel color range 0-255)
    if data1 > 255:
        data1 = 255
    if data2 > 255:
        data2 = 255
    if data3 > 255:
        data3 = 255

    # Update all 4 NeoPixel LEDs with the accelerometer data as RGB values
    np[0] = (data1, data2, data3)  # LED 1
    np[1] = (data1, data2, data3)  # LED 2
    np[2] = (data1, data2, data3)  # LED 3
    np[3] = (data1, data2, data3)  # LED 4

    # Write the updated color values to the NeoPixel LEDs
    np.write()

    # Wait for 1 second before the next reading
    sleep(1)

dualdisplay

Python
liberary
"""Dual Display : Library"""
from machine import Pin, ADC, PWM, UART, I2C,SPI
from sys import implementation
from framebuf import FrameBuffer, RGB565  # type: ignore
from neopixel import NeoPixel
import time,utime
from time import sleep
from micropython import const
from math import floor, modf
import math

Temp = '0123456789ABCDEF*'
BUFFSIZE = 1100

pi = 3.14159265358979324
a = 6378245.0
ee = 0.00669342162296594323
x_pi = 3.14159265358979324 * 3000.0 / 180.0


_CMD_TIMEOUT = const(100)

_R1_IDLE_STATE = const(1 << 0)
# R1_ERASE_RESET = const(1 << 1)
_R1_ILLEGAL_COMMAND = const(1 << 2)
# R1_COM_CRC_ERROR = const(1 << 3)
# R1_ERASE_SEQUENCE_ERROR = const(1 << 4)
# R1_ADDRESS_ERROR = const(1 << 5)
# R1_PARAMETER_ERROR = const(1 << 6)
_TOKEN_CMD25 = const(0xFC)
_TOKEN_STOP_TRAN = const(0xFD)
_TOKEN_DATA = const(0xFE)


I2C_SDA = 20
I2C_SCL = 21


''' Class to read onboard Buttons value '''
class BUTTON():
    def __init__(self, button_pin_number):
        """Initialize BUTTON.

        Args:
            button_pin_number (int):  1, 2, 3 for onboard Buttons,
                                      and pass GPIOs number for interfacing external buttons
            
        """
        if button_pin_number == 1:
            self.button_pin = Pin(22, Pin.IN) #input mode setup for read operation 
        elif button_pin_number == 2:
            self.button_pin = Pin(9, Pin.IN)
        elif button_pin_number == 3:
            self.button_pin = Pin(8, Pin.IN)
        else :
            self.button_pin = Pin(button_pin_number, Pin.IN) 
            
    def read(self):
        """ provides button status value
            -> 0 (Pressed)
            -> 1 (not Pressed)
        """
        return self.button_pin.value()
    
''' Class to control onboard RGB WS2812 LED '''
class RGBLED:
    def __init__(self, led_count = 4, rgb_pin=28 ):
        """Initialize RGB LED 

        Args:
            led_count (Optional int): default 4 LEDs onboard 
            rgb_led_pin (Optional int): default GPIO28 for PiCoder RGB LED connection
        """
       # Define the number of LEDs and the GPIO pin connected
        self.led_count = led_count 
        self.rgb_pin = Pin(rgb_pin, Pin.OUT) # set GPIO as output to drive NeoPixels
        self.np = NeoPixel(self.rgb_pin, self.led_count )   # create NeoPixel driver
        
    # Function to adjust brightness of a color
    def adjust_brightness(self, color, brightness):
        r, g, b = color
        r = int(r * brightness)
        g = int(g * brightness)
        b = int(b * brightness)
        return r, g, b
        
    def pixelon(self, ledpos, color=(255, 255, 255), brightness=0.5):
        self.np[ledpos] = self.adjust_brightness(color,brightness)
        self.np.write()   # write data to all pixels
    
    def pixeloff(self, ledpos):
        self.np[ledpos] = (0, 0, 0)
        self.np.write()   # write data to all pixels
    
    def on(self,color=(255, 255, 255), brightness=0.5):
        for i in range(self.led_count):
            self.pixelon(i,color,brightness)
    
    def off(self):
        for i in range(self.led_count):
            self.pixeloff(i)
                
    def readpixel(self,ledpos):
        r, g, b = self.np[ledpos]     # get pixel colour
        return r, g, b
    
    def chaser(self,startpos=0, endpos=4, delay=0.1, color=(255,255,255), brightness=0.2):
        for i in range(startpos,endpos):
            self.pixelon(i, color, brightness) 
            sleep(delay)
            
        for i in reversed(range(startpos, endpos)):
            self.pixeloff(i)
            sleep(delay)
            
        
''' Class to perform Operation for onboard SDcard '''
class SDCard:
    def __init__(self, spi, cs):
        self.spi = spi
        self.cs = cs

        self.cmdbuf = bytearray(6)
        self.dummybuf = bytearray(512)
        self.tokenbuf = bytearray(1)
        for i in range(512):
            self.dummybuf[i] = 0xFF
        self.dummybuf_memoryview = memoryview(self.dummybuf)

        # initialise the card
        self.init_card()

    def init_spi(self, baudrate):
        try:
            master = self.spi.MASTER
        except AttributeError:
            # on ESP8266
            self.spi.init(baudrate=baudrate, phase=0, polarity=0)
        else:
            # on pyboard
            self.spi.init(master, baudrate=baudrate, phase=0, polarity=0)

    def init_card(self):
        # init CS pin
        self.cs.init(self.cs.OUT, value=1)

        # init SPI bus; use low data rate for initialisation
        self.init_spi(100000)

        # clock card at least 100 cycles with cs high
        for i in range(16):
            self.spi.write(b"\xff")

        # CMD0: init card; should return _R1_IDLE_STATE (allow 5 attempts)
        for _ in range(5):
            if self.cmd(0, 0, 0x95) == _R1_IDLE_STATE:
                break
        else:
            raise OSError("no SD card")

        # CMD8: determine card version
        r = self.cmd(8, 0x01AA, 0x87, 4)
        if r == _R1_IDLE_STATE:
            self.init_card_v2()
        elif r == (_R1_IDLE_STATE | _R1_ILLEGAL_COMMAND):
            self.init_card_v1()
        else:
            raise OSError("couldn't determine SD card version")

        # get the number of sectors
        # CMD9: response R2 (R1 byte + 16-byte block read)
        if self.cmd(9, 0, 0, 0, False) != 0:
            raise OSError("no response from SD card")
        csd = bytearray(16)
        self.readinto(csd)
        if csd[0] & 0xC0 == 0x40:  # CSD version 2.0
            self.sectors = ((csd[8] << 8 | csd[9]) + 1) * 1024
        elif csd[0] & 0xC0 == 0x00:  # CSD version 1.0 (old, <=2GB)
            c_size = csd[6] & 0b11 | csd[7] << 2 | (csd[8] & 0b11000000) << 4
            c_size_mult = ((csd[9] & 0b11) << 1) | csd[10] >> 7
            self.sectors = (c_size + 1) * (2 ** (c_size_mult + 2))
        else:
            raise OSError("SD card CSD format not supported")
        # print('sectors', self.sectors)

        # CMD16: set block length to 512 bytes
        if self.cmd(16, 512, 0) != 0:
            raise OSError("can't set 512 block size")

        # set to high data rate now that it's initialised
        self.init_spi(1320000)

    def init_card_v1(self):
        for i in range(_CMD_TIMEOUT):
            self.cmd(55, 0, 0)
            if self.cmd(41, 0, 0) == 0:
                self.cdv = 512
                # print("[SDCard] v1 card")
                return
        raise OSError("timeout waiting for v1 card")

    def init_card_v2(self):
        for i in range(_CMD_TIMEOUT):
            time.sleep_ms(50)
            self.cmd(58, 0, 0, 4)
            self.cmd(55, 0, 0)
            if self.cmd(41, 0x40000000, 0) == 0:
                self.cmd(58, 0, 0, 4)
                self.cdv = 1
                # print("[SDCard] v2 card")
                return
        raise OSError("timeout waiting for v2 card")

    def cmd(self, cmd, arg, crc, final=0, release=True, skip1=False):
        self.cs(0)

        # create and send the command
        buf = self.cmdbuf
        buf[0] = 0x40 | cmd
        buf[1] = arg >> 24
        buf[2] = arg >> 16
        buf[3] = arg >> 8
        buf[4] = arg
        buf[5] = crc
        self.spi.write(buf)

        if skip1:
            self.spi.readinto(self.tokenbuf, 0xFF)

        # wait for the response (response[7] == 0)
        for i in range(_CMD_TIMEOUT):
            self.spi.readinto(self.tokenbuf, 0xFF)
            response = self.tokenbuf[0]
            if not (response & 0x80):
                # this could be a big-endian integer that we are getting here
                for j in range(final):
                    self.spi.write(b"\xff")
                if release:
                    self.cs(1)
                    self.spi.write(b"\xff")
                return response

        # timeout
        self.cs(1)
        self.spi.write(b"\xff")
        return -1

    def readinto(self, buf):
        self.cs(0)

        # read until start byte (0xff)
        for i in range(_CMD_TIMEOUT):
            self.spi.readinto(self.tokenbuf, 0xFF)
            if self.tokenbuf[0] == _TOKEN_DATA:
                break
            time.sleep_ms(1)
        else:
            self.cs(1)
            raise OSError("timeout waiting for response")

        # read data
        mv = self.dummybuf_memoryview
        if len(buf) != len(mv):
            mv = mv[: len(buf)]
        self.spi.write_readinto(mv, buf)

        # read checksum
        self.spi.write(b"\xff")
        self.spi.write(b"\xff")

        self.cs(1)
        self.spi.write(b"\xff")

    def write(self, token, buf):
        self.cs(0)

        # send: start of block, data, checksum
        self.spi.read(1, token)
        self.spi.write(buf)
        self.spi.write(b"\xff")
        self.spi.write(b"\xff")

        # check the response
        if (self.spi.read(1, 0xFF)[0] & 0x1F) != 0x05:
            self.cs(1)
            self.spi.write(b"\xff")
            return

        # wait for write to finish
        while self.spi.read(1, 0xFF)[0] == 0:
            pass

        self.cs(1)
        self.spi.write(b"\xff")

    def write_token(self, token):
        self.cs(0)
        self.spi.read(1, token)
        self.spi.write(b"\xff")
        # wait for write to finish
        while self.spi.read(1, 0xFF)[0] == 0x00:
            pass

        self.cs(1)
        self.spi.write(b"\xff")

    def readblocks(self, block_num, buf):
        nblocks = len(buf) // 512
        assert nblocks and not len(buf) % 512, "Buffer length is invalid"
        if nblocks == 1:
            # CMD17: set read address for single block
            if self.cmd(17, block_num * self.cdv, 0, release=False) != 0:
                # release the card
                self.cs(1)
                raise OSError(5)  # EIO
            # receive the data and release card
            self.readinto(buf)
        else:
            # CMD18: set read address for multiple blocks
            if self.cmd(18, block_num * self.cdv, 0, release=False) != 0:
                # release the card
                self.cs(1)
                raise OSError(5)  # EIO
            offset = 0
            mv = memoryview(buf)
            while nblocks:
                # receive the data and release card
                self.readinto(mv[offset : offset + 512])
                offset += 512
                nblocks -= 1
            if self.cmd(12, 0, 0xFF, skip1=True):
                raise OSError(5)  # EIO

    def writeblocks(self, block_num, buf):
        nblocks, err = divmod(len(buf), 512)
        assert nblocks and not err, "Buffer length is invalid"
        if nblocks == 1:
            # CMD24: set write address for single block
            if self.cmd(24, block_num * self.cdv, 0) != 0:
                raise OSError(5)  # EIO

            # send the data
            self.write(_TOKEN_DATA, buf)
        else:
            # CMD25: set write address for first block
            if self.cmd(25, block_num * self.cdv, 0) != 0:
                raise OSError(5)  # EIO
            # send the data
            offset = 0
            mv = memoryview(buf)
            while nblocks:
                self.write(_TOKEN_CMD25, mv[offset : offset + 512])
                offset += 512
                nblocks -= 1
            self.write_token(_TOKEN_STOP_TRAN)

    def ioctl(self, op, arg):
        if op == 4:  # get number of blocks
            return self.sectors

''' Class to Read IMU QMI8658G Data '''
class IMU(object):
    def __init__(self,address=0X6B):
        self._address = address
        self._bus = I2C(id=0,scl=Pin(I2C_SCL),sda=Pin(I2C_SDA),freq=100_000)
        bRet=self.WhoAmI()
        if bRet :
            self.Read_Revision()
        else    :
            return NULL
        self.Config_apply()

    def _read_byte(self,cmd):
        rec=self._bus.readfrom_mem(int(self._address),int(cmd),1)
        return rec[0]
    def _read_block(self, reg, length=1):
        rec=self._bus.readfrom_mem(int(self._address),int(reg),length)
        return rec
    def _read_u16(self,cmd):
        LSB = self._bus.readfrom_mem(int(self._address),int(cmd),1)
        MSB = self._bus.readfrom_mem(int(self._address),int(cmd)+1,1)
        return (MSB[0] << 8) + LSB[0]
    def _write_byte(self,cmd,val):
        self._bus.writeto_mem(int(self._address),int(cmd),bytes([int(val)]))
        
    def WhoAmI(self):
        bRet=False
        if (0x05) == self._read_byte(0x00):
            bRet = True
        return bRet
    def Read_Revision(self):
        return self._read_byte(0x01)
    def Config_apply(self):
        # REG CTRL1
        self._write_byte(0x02,0x60)
        # REG CTRL2 : QMI8658AccRange_8g  and QMI8658AccOdr_1000Hz
        self._write_byte(0x03,0x23)
        # REG CTRL3 : QMI8658GyrRange_512dps and QMI8658GyrOdr_1000Hz
        self._write_byte(0x04,0x53)
        # REG CTRL4 : No
        self._write_byte(0x05,0x00)
        # REG CTRL5 : Enable Gyroscope And Accelerometer Low-Pass Filter 
        self._write_byte(0x06,0x11)
        # REG CTRL6 : Disables Motion on Demand.
        self._write_byte(0x07,0x00)
        # REG CTRL7 : Enable Gyroscope And Accelerometer
        self._write_byte(0x08,0x03)

    def Read_Raw_XYZ(self):
        xyz=[0,0,0,0,0,0]
        raw_timestamp = self._read_block(0x30,3)
        raw_acc_xyz=self._read_block(0x35,6)
        raw_gyro_xyz=self._read_block(0x3b,6)
        raw_xyz=self._read_block(0x35,12)
        timestamp = (raw_timestamp[2]<<16)|(raw_timestamp[1]<<8)|(raw_timestamp[0])
        for i in range(6):
            # xyz[i]=(raw_acc_xyz[(i*2)+1]<<8)|(raw_acc_xyz[i*2])
            # xyz[i+3]=(raw_gyro_xyz[((i+3)*2)+1]<<8)|(raw_gyro_xyz[(i+3)*2])
            xyz[i] = (raw_xyz[(i*2)+1]<<8)|(raw_xyz[i*2])
            if xyz[i] >= 32767:
                xyz[i] = xyz[i]-65535
        return xyz
    def Read_XYZ(self):
        xyz=[0,0,0,0,0,0]
        raw_xyz=self.Read_Raw_XYZ()  
        #QMI8658AccRange_8g
        acc_lsb_div=(1<<12)
        #QMI8658GyrRange_512dps
        gyro_lsb_div = 64
        for i in range(3):
            xyz[i]=raw_xyz[i]/acc_lsb_div#(acc_lsb_div/1000.0)
            xyz[i+3]=raw_xyz[i+3]*1.0/gyro_lsb_div
        return xyz

Credits

Kulshrest Tiwari
5 projects • 1 follower
embedded sofware engineer

Comments