Sudhir Kshirsagar
Published

AI Assisted Coding in an IoT-monitored Secure Environment

Speech-to-text driven Generative AI creates code that can be read back through text-to-speech. IoT and NB-IoT are used to monitor health.

149
AI Assisted Coding in an IoT-monitored Secure Environment

Things used in this project

Story

Read more

Schematics

Hardware Components from Three Sponsors

Shows the XIAO Sense driving the Blues Notecard, collecting data from the BME680 and receiving Bluetooth Heart Rate (simulated) data from Nordic NRF52840DK

Arduino IDE in Action

Shows monitoring, BLE and Telemetry

Notehub Events

Shows the monitoring data that has been successfully sent to Notehub

Code Editor in Action

Shows how code is generated through an audio request to ChatGPT

Code

AI Assisted Code Editor: Speech-to-ChatGPT prompt generates code and outputs Code-to-Speech

Python
A code editor written in Python primarily for the visually impaired programmers that integrates Generative AI, speech-to-text and text-to-speech technologies.
import tkinter as tk
from os import linesep
from input_dialog import InputDialog
from stt import speech_to_text
from tts import text_to_speech
from open_ai_wrapper import OpenAIWrapper


class TextEditor(tk.Tk):
  tts_enabled = True
  gpt_enabled = True
  gpt_tts_enabled = False
  auto_insert_gpt_response = True
  auto_copy_gpt_response = True
  openai_wrapper: OpenAIWrapper = None

  def __init__(self):
    super().__init__()
    self.title("Simple Text Editor")
    self.geometry("600x400")

    # Create a Text widget for the editor
    self.editor_text_area = tk.Text(self, wrap=tk.WORD)
    self.editor_text_area.pack(expand=True, fill="both")

    # Bind Ctrl+P to open the input dialog
    self.bind_all("<Control-p>", self.show_input_dialog)
    self.bind_all("<Control-h>", self.read_help)
    # self.bind_all("<Control-></Control->", self.read_help)
    self.bind_all("<Control-t>", self.toggle_tts)
    self.bind_all("<Control-g>", self.toggle_gpt)
    self.bind_all("<Control-d>", self.request_documentation)
    self.bind_all("<Control-m>", self.speech_to_text_gpt)
    self.bind_all("<Control-n>", self.speech_to_text_editor)
    self.bind_all("<Control-b>", self.toggle_assistant_beta)
    self.bind_all("<Control-r>", self.toggle_gpt_response_tts)
    # self.bind_all("<Control-space>", self.interrupt_tts)
    # TODO: Read current line/line number

    self.openai_wrapper = OpenAIWrapper()

  # def interrupt_tts(self, event=None):
  #   print("TODO")

  def toggle_gpt_response_tts(self, event=None):
    new_status = "off" if self.gpt_tts_enabled else "on"
    message = f"Setting Chat GPT Response as text-to-speech to {new_status}"
    print(message)
    text_to_speech(message)
    self.gpt_tts_enabled = not self.gpt_tts_enabled

  def toggle_assistant_beta(self, event=None):
    new_status = "off" if self.openai_wrapper.is_beta_enabled() else "on"
    message = f"Setting Chat GPT Assistant Beta to {new_status}"
    print(message)
    text_to_speech(message)
    self.openai_wrapper.toggle_beta()

  def speech_to_text_gpt(self, event=None):
    text = speech_to_text(1,
                          voice_function=text_to_speech,
                          q="Please speak your GPT prompt into the microphone")
    print("#" * 50)
    print("speech_to_text_gpt")
    print(text)
    print("#" * 50)
    self.process_response(self.openai_wrapper.ask_openai(text))

  def speech_to_text_editor(self, event=None):
    text = speech_to_text(1,
                          voice_function=text_to_speech,
                          q="Please speak the text into the microphone")
    self.insert_text(text)

  def toggle_gpt(self, event=None):
    new_status = "off" if self.gpt_enabled else "on"
    message = f"Setting Chat GPT to {new_status}"
    print(message)
    text_to_speech(message)
    self.gpt_enabled = not self.gpt_enabled

  def toggle_tts(self, event=None):
    new_status = "off" if self.tts_enabled else "on"
    message = f"Setting text-to-speech {new_status}"
    print(message)
    text_to_speech(message)
    self.tts_enabled = not self.tts_enabled

  def read_help(self, event=None):
    help = """
      To toggle regular text-to-speech on or off, use the key command
        Control plus T.
      To toggle Chat GPT on or off, use the key command Control plus G.
      To open the Chat GPT prompt window, use the key command Control plus P.
      To send your prompt to Chat GPT, use the key command Control plus Enter.
      To toggle whether Chat GPT"s response should automatically insert into
        the text editor area, use the key command Control plus O.
    """
    text_to_speech(help)

  def insert_text(self, text):
      # TODO: Insert at beginning or end of line
      #       to avoid inserting in middle of an existing line
      self.editor_text_area.insert(tk.INSERT, text)

  def request_documentation(self, event=None):
    try:
      sel_text = self.editor_text_area.get(tk.SEL_FIRST, tk.SEL_LAST)
    except tk.TclError:
      # If no text is selected, get all content in the text window
      sel_text = self.editor_text_area.get("1.0", tk.END).strip()
      text_to_speech("No text selected. Using all content.")

    prompt = f"Write the docstrings for this function:{linesep}{sel_text}"
    prev_setting = self.auto_insert_gpt_response
    # We should always insert this response:
    self.auto_insert_gpt_response = True
    # Send the prompt to GPT:
    self.process_response(self.openai_wrapper.ask_openai(prompt))
    # Flip auto_insert back to previous setting:
    self.auto_insert_gpt_response = prev_setting

  def process_response(self, gpt_resp):
    if self.gpt_tts_enabled:
      text_to_speech(gpt_resp)
    # Copy the response to the user"s clipboard:
    if self.auto_copy_gpt_response:
      self.clipboard_clear()
      self.clipboard_append(gpt_resp)
      self.update()
    # Insert the
    if self.auto_insert_gpt_response:
      # Add line separators so the auto-insert code isn"t mangled:
      gpt_response = gpt_resp
      # TODO: For some reason each newline also inserts a space first.
      gpt_response = f"{linesep}{gpt_resp}{linesep}"
      self.insert_text(gpt_response)

  def show_input_dialog(self, event=None):
    dialog = InputDialog(self, title="Input Dialog")
    input_text = dialog.get_input_text()
    if input_text:
      # Let the user know that the prompt has been submitted:
      if self.tts_enabled:
        text_to_speech("Sending your prompt to Chat GPT!")
      # Make the call to GPT:
      if self.gpt_enabled:
        self.process_response(self.openai_wrapper.ask_openai(input_text))


if __name__ == "__main__":
  app = TextEditor()
  app.mainloop()

Arduino Sketch that runs on Seeedstudio XIAO ESP32S3 Sense MCU

C/C++
The MCU collects the pressure, temperature, humidity, gas concentration and altitude from the BME680 over I2C and collects the heart rate over BLE from NRF52840DK. It sends the collected data to the Blues Notehub using the NB-IoT Notecard connected using the serial port.
#include <Adafruit_BME680.h>

#define serialDebug Serial
#define serialNotecard Serial1

#define NOTE_PRODUCT_UID "com.gqc.sudhir:build2gether_2.0"

Adafruit_BME680 bmeSensor;
#define SEALEVELPRESSURE_HPA (1013.25)

#define RX_PIN D7
#define TX_PIN D6
#define BAUD 9600

float pressure,  altitude;
int gas_kohm;

uint16_t heartRate;

//BLE 

#include "BLEDevice.h"
//#include "BLEScan.h"
#include <ArduinoJson.h>

// The remote service we wish to connect to.
//static BLEUUID serviceUUID("4fafc201-1fb5-459e-8fcc-c5c9c331914b");

// The remote service we wish to connect to (Heart Rate Service).
static BLEUUID serviceUUID((uint16_t)0x180D);
// The characteristic of the remote service we are interested in.
//static BLEUUID charUUID("beb5483e-36e1-4688-b7f5-ea07361b26a8");

// The characteristic of the remote service we are interested in (Heart Rate Measurement).
static BLEUUID charUUID((uint16_t)0x2A37);

static boolean doConnect = false;
static boolean connected = false;
static boolean doScan = false;
static BLERemoteCharacteristic *pRemoteCharacteristic;
static BLEAdvertisedDevice *myDevice;

static void notifyCallback(BLERemoteCharacteristic *pBLERemoteCharacteristic, uint8_t *pData, size_t length, bool isNotify) {
  Serial.print("Notify callback for characteristic ");
  Serial.print(pBLERemoteCharacteristic->getUUID().toString().c_str());
  Serial.print(" of data length ");
  Serial.println(length);
  Serial.print("data: ");
  Serial.write(pData, length);
  Serial.println();

  // Parse the heart rate measurement data
  uint8_t flags = pData[0];
  bool isHeartRateIn16Bits = flags & 0x01;

 
  if (isHeartRateIn16Bits) {
        heartRate = (pData[2] << 8) | pData[1];
    } else {
        heartRate = pData[1];
  }

  // Create a JSON object
  StaticJsonDocument<200> doc;
  doc["heart_rate"] = heartRate;

  // Optionally, add more fields to the JSON if needed
  // For example, you might add energy expended, RR intervals, etc.

  // Serialize JSON to a string
  char jsonOutput[128];
  serializeJson(doc, jsonOutput);

  // Print the JSON string to Serial
  Serial.println("JSON data:");
  Serial.println(jsonOutput);

  }

class MyClientCallback : public BLEClientCallbacks {
  void onConnect(BLEClient *pclient) {}

  void onDisconnect(BLEClient *pclient) {
    connected = false;
    Serial.println("onDisconnect");
  }
};

bool connectToServer() {
  Serial.print("Forming a connection to ");
  Serial.println(myDevice->getAddress().toString().c_str());

  BLEClient *pClient = BLEDevice::createClient();
  Serial.println(" - Created client");

  pClient->setClientCallbacks(new MyClientCallback());

  // Connect to the remove BLE Server.
  pClient->connect(myDevice);  // if you pass BLEAdvertisedDevice instead of address, it will be recognized type of peer device address (public or private)
  Serial.println(" - Connected to server");
  pClient->setMTU(517);  //set client to request maximum MTU from server (default is 23 otherwise)

  // Obtain a reference to the service we are after in the remote BLE server.
  BLERemoteService *pRemoteService = pClient->getService(serviceUUID);
  if (pRemoteService == nullptr) {
    Serial.print("Failed to find our service UUID: ");
    Serial.println(serviceUUID.toString().c_str());
    pClient->disconnect();
    return false;
  }
  Serial.println(" - Found our service");

  // Obtain a reference to the characteristic in the service of the remote BLE server.
  pRemoteCharacteristic = pRemoteService->getCharacteristic(charUUID);
  if (pRemoteCharacteristic == nullptr) {
    Serial.print("Failed to find our characteristic UUID: ");
    Serial.println(charUUID.toString().c_str());
    pClient->disconnect();
    return false;
  }
  Serial.println(" - Found our characteristic");

  // Read the value of the characteristic.
  if (pRemoteCharacteristic->canRead()) {
    String value = pRemoteCharacteristic->readValue();
    Serial.print("The characteristic value was: ");
    Serial.println(value.c_str());
  }

  if (pRemoteCharacteristic->canNotify()) {
    pRemoteCharacteristic->registerForNotify(notifyCallback);
  }

  connected = true;
  return true;
}
/**
 * Scan for BLE servers and find the first one that advertises the service we are looking for.
 */
class MyAdvertisedDeviceCallbacks : public BLEAdvertisedDeviceCallbacks {
  /**
   * Called for each advertising BLE server.
   */
  void onResult(BLEAdvertisedDevice advertisedDevice) {
    Serial.print("BLE Advertised Device found: ");
    Serial.println(advertisedDevice.toString().c_str());

    // We have found a device, let us now see if it contains the service we are looking for.
    if (advertisedDevice.haveServiceUUID() && advertisedDevice.isAdvertisingService(serviceUUID)) {

      BLEDevice::getScan()->stop();
      myDevice = new BLEAdvertisedDevice(advertisedDevice);
      doConnect = true;
      doScan = true;

    }  // Found our server
  }  // onResult
};  // MyAdvertisedDeviceCallbacks

void setup() {
  // Initialize Serial Debug
  serialDebug.begin(115200);
  while (!serialDebug) {
    ; // wait for serial port to connect. Needed for native USB
  }
  serialDebug.println("Starting...");
  BLEDevice::init("");

  // Retrieve a Scanner and set the callback we want to use to be informed when we
  // have detected a new device.  Specify that we want active scanning and start the
  // scan to run for 5 seconds.
  BLEScan *pBLEScan = BLEDevice::getScan();
  pBLEScan->setAdvertisedDeviceCallbacks(new MyAdvertisedDeviceCallbacks());
  pBLEScan->setInterval(1349);
  pBLEScan->setWindow(449);
  pBLEScan->setActiveScan(true);
  pBLEScan->start(5, false);

  // Initialize Notecard Serial
  //serialNotecard.begin(9600);
  serialNotecard.begin(BAUD,SERIAL_8N1,RX_PIN,TX_PIN);
  serialNotecard.println("\n");

  // Configure the Notecard
  serialNotecard.println("{\"req\":\"hub.set\",\"product\":\"" NOTE_PRODUCT_UID "\",\"mode\":\"continuous\"}");

  // Await and log Notecard configuration response
  delay(50); // wait for the Notecard to respond
  while (serialNotecard.available() > 0) {
    char incomingByte = serialNotecard.read();
    if (incomingByte != '\r' && incomingByte != '\n') {
      serialDebug.print(incomingByte);
    }
  }
  serialDebug.println();

  // Initialize and configure the BME680
  if (!bmeSensor.begin()) {
    serialDebug.println("Could not find a valid BME680 sensor...");
    while(false); // halt sketch
  } else {
    serialDebug.println("BME680 Connected...");
  }
  bmeSensor.setTemperatureOversampling(BME680_OS_8X);
  bmeSensor.setHumidityOversampling(BME680_OS_2X);

  
  // Set up oversampling and filter initialization
  //bme.setTemperatureOversampling(BME680_OS_8X);
  bmeSensor.setHumidityOversampling(BME680_OS_2X);
  bmeSensor.setPressureOversampling(BME680_OS_4X);
  bmeSensor.setIIRFilterSize(BME680_FILTER_SIZE_3);
  bmeSensor.setGasHeater(320, 150); // 320*C for 150 ms
}

void loop() {
  // If the flag "doConnect" is true then we have scanned for and found the desired
  // BLE Server with which we wish to connect.  Now we connect to it.  Once we are
  // connected we set the connected flag to be true.
  if (doConnect == true) {
    if (connectToServer()) {
      Serial.println("We are now connected to the BLE Server.");
    } else {
      Serial.println("We have failed to connect to the server; there is nothing more we will do.");
    }
    doConnect = false;
  }

  // If we are connected to a peer BLE Server, update the characteristic each time we are reached
  // with the current time since boot.
  if (connected) {
    String newValue = "Time since boot: " + String(millis() / 1000);
    Serial.println("Setting new characteristic value to \"" + newValue + "\"");

    // Set the characteristic's value to be the array of bytes that is actually a string.
    pRemoteCharacteristic->writeValue(newValue.c_str(), newValue.length());
  } else if (doScan) {
    BLEDevice::getScan()->start(0);  // this is just example to start scan after disconnect, most likely there is better way to do it in arduino
  }



  // Sample the BME680 sensor
  if (!bmeSensor.performReading()) {
    serialDebug.println("Failed to obtain a reading...");
    delay(3000);
    return;
  }

  // Print readings to the console
  serialDebug.print("Temperature = ");
  serialDebug.print(bmeSensor.temperature);
  serialDebug.println(" *C");
  serialDebug.print("Humidity = ");
  serialDebug.print(bmeSensor.humidity);
  serialDebug.println(" %");

  // added more measurements
  serialDebug.print("Pressure = ");
  pressure=bmeSensor.pressure/100.0;
  serialDebug.print(pressure);
  serialDebug.println(" hPa");
  gas_kohm=bmeSensor.gas_resistance / 1000.0;
  serialDebug.print("Gas = ");
  serialDebug.print(gas_kohm);
  serialDebug.println(" KOhms");
  altitude=bmeSensor.readAltitude(SEALEVELPRESSURE_HPA);
  serialDebug.print("Approx. Altitude = ");
  serialDebug.print(altitude);
  serialDebug.println(" m");

  // Create the JSON request string
  char json_request[300];
  snprintf(json_request, sizeof(json_request),
    "{"
      "\"req\":\"note.add\","
      "\"file\":\"sensors.qo\","
      "\"start\":true,"
      "\"body\":{"
        "\"temp\":%d.%02d,"
        "\"humidity\":%d.%02d,"
        "\"pressure\":%d.%02d,"
        "\"gas_kohm\":%d,"
        "\"altitude\":%d.%02d,"
        "\"heart_rate\":%d"
      "}"
    "}",
    (int)bmeSensor.temperature, abs(((int)(bmeSensor.temperature*100.0)%100)),
    (int)bmeSensor.humidity, (int)(bmeSensor.humidity*100.0)%100,
    (int)(pressure), int((pressure-(int)(pressure))*100),
    (int)(gas_kohm),
    (int)(altitude), abs(int((altitude-(int)(altitude))*100)),
    (int)(heartRate)
    
  );
  serialDebug.println(json_request);
  // Send request to Notecard
  serialNotecard.println(json_request);
  delay(50); // wait for the Notecard to respond
  while (serialNotecard.available() > 0) {
    char incomingByte = serialNotecard.read();
    if (incomingByte != '\r' && incomingByte != '\n') {
      serialDebug.print(incomingByte);
    }
  }
  serialDebug.println();

  delay(15000);
}

Credits

Sudhir Kshirsagar

Sudhir Kshirsagar

6 projects • 2 followers
Thanks to Jacob Specht.

Comments