FPGA and Programmable Logic are great for accelerating Machine Learning inference, especially when high performance is needed. However, not all ML implementations need to process data with a high throughput as provided by the accelerated solutions which are immensely capable.
Programmable logic solutions such as a Zynq 7000 may still require machine learning for prognostics on sensor data provided by sensors connected to the programmable logic. For these applications Tiny ML can provide a significant performance boost over a traditional ML implementation when running on processors.
The SensorsThink Smart IoT board has been developed with the industrial applications in mind. As such it provides a a range of sensors such as accelerometers, gyroscopes, temperature, vibration, humidity and even thermal imaging using a FLIR Lepton 3.
In this project we are going to show how we are able to implement a simple machine learning project which uses the accelerometer data to predict direction of movement using the edge impulse Tiny ML framework.
Capturing the DataTo get started with the application first we need to capture the data, this development is taking place on a Zynq 7020 running petalinux with Python3 and Pip3 enabled.
The petalinux project has been updated with a device tree what enables I2C development. To Understand more about the development of the petalinux project see the how to articles here.
To get started with the edge impulse frame work we need to be able to connect to the edge impulse server and upload samples of data from the accelerometer.
The first thing we need to do in Edge Impulse is create a new project
Once the new project is created we will be able to see at the top of the dashboard, the keys necessary to remotely push data to the edge impulse server.
We will use these keys to push data to the edge impulse server as our data samples.
We can push the data to the edge impulse server using the a Python 3 script the basic format is provided here by edge impulse.
To be used with this application we need to modify the application to do the following
- Read the accelerometer using I2C
- Store Several samples for each of the X, Z, Y measurements
- Convert the X, Z, Y measurements to the
- Format a upload request
- Sign the message
- Attempt to upload it to the Edge Impulse server
To ensure that data is gathered uploaded sensibly the script has been developed to allow the data tag to be defined by from the command line along with the number of samples to be taken
import smbus2
from smbus2 import SMBus, i2c_msg
import json
import time, hmac, hashlib
import requests
import sys
MAXINT = 255 # Must be a power-of-two minus one
NUM_BITS = MAXINT.bit_length()
def merge(a, b):
c = (a << NUM_BITS) | b
return c
HMAC_KEY = "xxxx"
API_KEY = "ei_"
label = sys.argv[1]
loop = int(sys.argv[2])
i2c_bus = smbus2.SMBus(0)
Sensor_addr = 0x6a
msg = i2c_msg.write(Sensor_addr, [0x0f])
i2c_bus.i2c_rdwr(msg)
msg = i2c_msg.read(Sensor_addr, 0x1)
i2c_bus.i2c_rdwr(msg)
data = list(msg)
print("device ID =", data)
msg = i2c_msg.write(Sensor_addr, [0x10, 0x40])
i2c_bus.i2c_rdwr(msg)
values = []
for i in range(0,loop):
#read x direction
msg = i2c_msg.write(Sensor_addr, [0x28])
i2c_bus.i2c_rdwr(msg)
msg = i2c_msg.read(Sensor_addr, 0x2)
i2c_bus.i2c_rdwr(msg)
data = list(msg)
x_h = (data[1])
x_l = (data[0])
x = merge(x_h, x_l)
if (x & 0x8000 == 0):
x_real = x
else:
x_real = -(65536-x) +1
x_real = x_real * 0.061e-3
#read y direction
msg = i2c_msg.write(Sensor_addr, [0x2a])
i2c_bus.i2c_rdwr(msg)
msg = i2c_msg.read(Sensor_addr, 0x2)
i2c_bus.i2c_rdwr(msg)
data = list(msg)
y_h = (data[1])
y_l = (data[0])
y = merge(y_h, y_l)
if (y & 0x8000 == 0):
y_real = y
else:
y_real = -(65536-y) +1
y_real = y_real * 0.061e-3
#read z direction
msg = i2c_msg.write(Sensor_addr, [0x2c])
i2c_bus.i2c_rdwr(msg)
msg = i2c_msg.read(Sensor_addr, 0x2)
i2c_bus.i2c_rdwr(msg)
data = list(msg)
z_h = (data[1])
z_l = (data[0])
z = merge(z_h, z_l)
if (z & 0x8000 == 0):
z_real = z
else:
z_real = -(65536-z) +1
z_real = z_real * 0.061e-3
values.append([x_real,y_real,z_real])
time.sleep(1)
emptySignature = ''.join(['0'] * 64)
data = {
"protected": {
"ver": "v1",
"alg": "HS256",
"iat": time.time() # epoch time, seconds since 1970
},
"signature": emptySignature,
"payload": {
"device_name": "00:0a:35:00:22:01",
"device_type": "SensorsThinkIoT",
"interval_ms": 1000,
"sensors": [
{ "name": "accX", "units": "G" },
{ "name": "accY", "units": "G" },
{ "name": "accZ", "units": "G" },
],
"values":
values
}
}
# encode in JSON
encoded = json.dumps(data)
# sign message
signature = hmac.new(bytes(HMAC_KEY, 'utf-8'), msg = encoded.encode('utf-8'), digestmod = hashlib.sha256).hexdigest()
# set the signature again in the message, and encode again
data['signature'] = signature
encoded = json.dumps(data)
# and upload the file
res = requests.post(url='https://ingestion.edgeimpulse.com/api/training/data',
data=encoded,
headers={
'Content-Type': 'application/json',
'x-file-name': label,
'x-api-key': API_KEY
})
if (res.status_code == 200):
print('Uploaded file to Edge Impulse', res.status_code, res.content)
else:
print('Failed to upload file to Edge Impulse', res.status_code, res.content)
Provided the SmartSensor IoT board has a internet connection, the data will be uploaded to the edge impulse server under project.
Once sufficient data has been uploaded for movement in all axis, we can create the impulse.
Once the impulse is created we can further test it uploading data from the SensorsThink Smart IoT board to try out performance of the Impulse against the real world data.
The next step of course is to generate the application to run on the Zynq during deployment. We will look at that in another project soon!
Wrap UpThis project shows how easily we can connect a embedded Linux system to the edge impulse framework and train a TinyML model for deployment.
Comments
Please log in or sign up to comment.