Hackster is hosting Hackster Holidays, Ep. 7: Livestream & Giveaway Drawing. Watch previous episodes or stream live on Friday!Stream Hackster Holidays, Ep. 7 on Friday!
CatMan
Published

Talk your way into Where (Your) Things Are. Star Treky Way!

An easy, techie, real time way of recording and retrieving anything’s location as you store it or look for it, using your voice.

IntermediateFull instructions provided20 hours546
Talk your way into Where (Your) Things Are. Star Treky Way!

Things used in this project

Hardware components

LOLIN32 Lite ESP32 Dev Board
×1
MAX98357 I2S 3W Class D Amplifier
×1
INMP441 I2S microphone breakout
×1
Minisforum Venus UM790 Pro with AMD Ryzen™ 9
×1
Speaker, Micro
Speaker, Micro
×1

Software apps and online services

AMD Ryzen AI SOftware
Windows 11
Arduino IDE
Arduino IDE
spaCy · Industrial-strength Natural Language Processing
MiniConda

Hand tools and fabrication machines

Soldering iron (generic)
Soldering iron (generic)
Solder Flux, Soldering
Solder Flux, Soldering
Wire Stripper & Cutter, 22-10 AWG / 0.64-2.6mm Capacity Single & Stranded Wires
Wire Stripper & Cutter, 22-10 AWG / 0.64-2.6mm Capacity Single & Stranded Wires
Wires of different colors

Story

Read more

Schematics

ESP32 I2S Voice To AI Communicator

This is my attempt at a wearable badge that has a push-button, a microphone and a speaker. When you push the button, you can speak to it and it sends your voice wirelessly to a PC for processing. When you release the button it stops listening and the processing begins. The PC can send an audio reply that will be played back through the built in speaker.

Badge Top

Wiring like this will allow me to arrange the boards in an enclosure, when I come around to it.

Badge Bottom

Code

Badge to AI Communicator with PTT (Push To Talk)

Arduino
Push button and speak to it. When button is released it will initiate voice processing. It can also receive audio that it will play through the speaker.
#include <WiFi.h>
#include <WiFiClient.h>
#include <WiFiUdp.h>
#include <driver/i2s.h>

const char *ssid = "mySSID";
const char *password = "MyPWD";

const int i2sBCK = 17;       // I2S Bit Clock (BCK) pin
const int i2sLRCK = 15;      // I2S Left/Right Clock (LRCK) pin
const int i2sDataIn = 27;    // I2S Data Input (DI) pin
const int i2sDataOut = 16;   // I2S Data Output (DO) pin
const int ledPin = 22;
const int buttonPin = 14;     // Change this to your GPIO input pin for the button
volatile bool buttonPressed = false;
volatile unsigned long lastDebounceTime = 0;  // the last time the output pin was toggled
unsigned long debounceDelay = 50;    // the debounce time; increase if the output flickers

WiFiClient client;
WiFiServer server(12347);  // Server on port 12347
const char *host = "192.168.0.32";  // MinisForum
const uint16_t port = 12346;        // port number used in Python code

void IRAM_ATTR buttonISR() {
  unsigned long currentMillis = millis();
  if ((currentMillis - lastDebounceTime) > debounceDelay) {
    buttonPressed = !buttonPressed;
    lastDebounceTime = currentMillis;
  }
}
IPAddress staticIP(192, 168, 0, 148);
IPAddress gateway(192, 168, 0, 1);
IPAddress subnet(255, 255, 255, 0);

void setup() {
  Serial.begin(115200);
  pinMode(i2sDataOut, OUTPUT);  // So it's not three-stated when sending audio, or there's noise

  pinMode(buttonPin, INPUT_PULLUP);
  attachInterrupt(buttonPin, buttonISR, CHANGE);
  pinMode(ledPin, OUTPUT);  // Set the LED pin as output

  WiFi.config(staticIP, gateway, subnet);
  WiFi.begin(ssid, password);
  while (WiFi.status() != WL_CONNECTED) {
    delay(1000);
    Serial.println("Connecting to WiFi...");
  }
  Serial.print("Connected to WiFi: "); Serial.println(WiFi.localIP());

  i2s_config_t i2sConfig = {
    .mode = (i2s_mode_t)(I2S_MODE_MASTER | I2S_MODE_RX | I2S_MODE_TX),
    .sample_rate = 22050,  // This should fit pyttsx4 with "save_to_file"
    .bits_per_sample = I2S_BITS_PER_SAMPLE_16BIT,
    .channel_format = I2S_CHANNEL_FMT_ONLY_LEFT,  // due to the way my hardware is connected
    .communication_format = I2S_COMM_FORMAT_I2S,
    .intr_alloc_flags = ESP_INTR_FLAG_LEVEL1,
    .dma_buf_count = 2,
    .dma_buf_len = 1024,
    .use_apll = false,
    .tx_desc_auto_clear = false,
    .fixed_mclk = 0
  };
  i2s_pin_config_t pinConfig = {
    .bck_io_num = i2sBCK,
    .ws_io_num = i2sLRCK,
    .data_out_num = i2sDataOut,
    .data_in_num = i2sDataIn
  };
  i2s_driver_install(I2S_NUM_0, &i2sConfig, 0, NULL);
  i2s_set_pin(I2S_NUM_0, &pinConfig);

  server.begin();  // Start the server

  Serial.println("Setup complete");
}

void loop() {
  if (buttonPressed) {
    Serial.println("Button pressed. Sending audio...");
    if (client.connect(host, port)) {
      digitalWrite(ledPin, LOW);  // Turn the LED on
      i2s_start(I2S_NUM_0);
      size_t bytesWritten = 0;
      uint8_t buffer[1024];
      while (buttonPressed) {
        i2s_read(I2S_NUM_0, buffer, sizeof(buffer), &bytesWritten, portMAX_DELAY);

          // Apply gain
        for (int i = 0; i < bytesWritten / 2; i++) {
            int16_t sample = ((int16_t*)buffer)[i];
            sample = min(max(sample * 2.0, -32768.0), 32767.0);
            ((int16_t*)buffer)[i] = sample;
        }
  
        client.write(buffer, bytesWritten);
      }
      i2s_stop(I2S_NUM_0);
      client.stop();
      digitalWrite(ledPin, HIGH);
      Serial.println("Audio Sending stopped. Waiting to receive.");
    }
  }
  else {
    digitalWrite(ledPin, HIGH);  // Turn the LED off
    WiFiClient client = server.available();  // Listen for incoming clients
    if (client) {
//      Serial.println("Client connected. Receiving audio...");
      i2s_start(I2S_NUM_0);
      size_t bytesRead = 0;
      uint8_t buffer[1024];
      while (client.connected() && !buttonPressed) {
        if (client.available()) {
          client.read(buffer, sizeof(buffer));
          i2s_write(I2S_NUM_0, buffer, sizeof(buffer), &bytesRead, portMAX_DELAY);
        }
      }
      i2s_stop(I2S_NUM_0);
      client.stop();
      Serial.println("Audio receiving stopped");
    }
  }
}

Socket Audio To File From ESP32 With Headers, Length 22050sps

Python
This runs a server waiting to receive audio from the ESP32 badge. It adds a header and saves it to an incrementally named file that will be processed by another program.
import socket
import os
import wave
import struct

def receive_audio_data(port, base_output_file):
    # Create a TCP/IP socket
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
        s.bind(("", port))  # Bind the socket to the server address and port
        s.listen(1)         # Listen for incoming connections
        s.settimeout(1)     # Set a timeout period (in seconds)
        print("Waiting for a connection...")

        while True:
            try:
                connection, client_address = s.accept()
            except socket.timeout:
                continue
            print("Connection from:", client_address)

            try:
                audio_data = b"" # Receive audio data
                while True:
                    data = connection.recv(1024)
                    if not data:
                        break
                    audio_data += data

                if audio_data: # Check if any data was received
                    # Generate a filename with a sequential number
                    output_file = generate_output_file(base_output_file)
                    print(output_file)

                    with open(output_file, "wb") as f: # Write WAV header and audio data
                        wav_header = generate_wav_header(len(audio_data), sample_rate=22050, channels=1, bit_depth=16)
                        f.write(wav_header)
                        f.write(audio_data)
            finally:
                connection.close() # Clean up the connection

def generate_output_file(base_output_file):
    if not os.path.exists(base_output_file): # If the file does not yet exist
        return base_output_file
    base_name, extension = os.path.splitext(base_output_file) # If the file already exists...
    index = 1
    while True:
        numbered_file = f"{base_name}_{index}{extension}" # add a sequential number...
        if not os.path.exists(numbered_file):             # until new name doesn't exist
            return numbered_file
        index += 1

def generate_wav_header(audio_data_size, sample_rate, channels, bit_depth):     # Generate WAV header for audio file
    wav_header = struct.pack('<4sI4s4sIHHIIHH4sI', b'RIFF', 36 + audio_data_size, b'WAVE', b'fmt ', 16, 1, channels, sample_rate, sample_rate * channels * bit_depth // 8, channels * bit_depth // 8, bit_depth, b'data', audio_data_size)
    return wav_header

if __name__ == "__main__":
    port = 12346  # Define the port and base output file name
    base_output_file = "C:\\Temp\\WTAp_sounds\\received_audio.wav"

    receive_audio_data(port, base_output_file) # start receiving audio data

Send Audio To ESP32 On New File

Python
This program sends audio files to be played by the ESP32 badge's speaker. It waits for new files created by the main program, and when found, it sends it.
# Monitor folder and if new file appears send it to ESP32

import time
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler

import socket
import wave

class MyHandler(FileSystemEventHandler):
    def on_created(self, event):
        SendAudio(event.src_path)

def monitor_directory(path):
    event_handler = MyHandler()
    observer = Observer()
    observer.schedule(event_handler, path, recursive=False)
    observer.start()

    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.stop()
    observer.join()

def SendAudio(filename):
    while True:
        try:  # We may have to wait until the "file writer" finishes 
            waveFile = wave.open(filename, 'rb')
            break # If the file opens successfully, break the loop  
        except PermissionError:
            time.sleep(0.1)  # If the file is still being written, wait for 1 second and try again
            continue
    
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.connect(('192.168.0.148', 12347))
    data = waveFile.readframes(1024)
    while data:
        sock.send(data)
        data = waveFile.readframes(1024)
    sock.close()

#This is where ACTION starts
monitor_directory("C:\Temp\WTAp_sounds\ToESP32")

Main Program: ThingsVoiceDatabase.py

Python
When an audio file appears in the watched directory, it sends it to Whisper and receives back the text.
It processes the text using spaCy in SpacyCat.py, takes actions and prepares an audio response using TTS in LibCat.py.
# First get SocketAudioToFileFromESP32_WithLength.py running in another terminal
# conda activate MinisWhisper
# cd ...\SpacyWhereThingsAre
# python ThingsVoiceDatabase.py
# (in env) python SendToESP32OnNewFile.py
# (env or not) python SocketAudioToFileFromESP32_WithLength22050.py

import time
import os
import json
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import whisper
model = whisper.load_model("tiny.en")
import SpacyCat as MyClassifier
import LibCat

TTSfileName = 'C:\Temp\WTAp_sounds\ToESP32\TTStoESP32.wav'

DBfile = "ThingsDBfile.json"
if os.path.exists(DBfile):
    with open(DBfile, 'r') as read_file:
        ThingsDB = json.loads(read_file.read())
        print(ThingsDB)
        LibCat.TTStoFile("Your wish is my command!", TTSfileName) # Say the sentence
else:
    ThingsDB = {"object": ["room", "place"]} # Initialize our 'database' with a sample string
    with open(DBfile, 'w') as outfile:
        json.dump(ThingsDB, outfile)


class NewFileHandler(FileSystemEventHandler):
    def on_created(self, event):
#        print(f'New file {event.src_path} has been created!')
        CallWhisper(event.src_path)

def monitor_directory(path):
    event_handler = NewFileHandler()
    observer = Observer()
    observer.schedule(event_handler, path, recursive=False)
    observer.start()
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.stop()
    observer.join()

def WriteDBfile():
    with open(DBfile, 'w') as outfile:
        outfile.write("{\n")
        for i, (key, value) in enumerate(ThingsDB.items()):
            outfile.write(f' "{key}": {json.dumps(value)}') # Write key-value with custom formatting
            if i < len(ThingsDB) - 1: # Add comma if it's not the last entry
                outfile.write(",")
            outfile.write("\n")
        outfile.write("}\n")

def CallWhisper(filename):
#    print(filename)
    result = model.transcribe(filename)["text"].strip() # strip removes spaces at start/end
    print(result)
    operation, obj, room, place = MyClassifier.process_text(result)
#p    print(operation, obj, room, place)
# Do Database Operations
    if operation == 'Create': # If create then we just add it to the database
        ThingsDB[obj] = [room, place]
        TTStext = f"Created: '{obj}' in {ThingsDB[obj]}"

    else: ###### Find the object
        DictWithObj = {key: value for key, value in ThingsDB.items() if obj.lower() in key.lower()}
        print("DictWithObj: ", DictWithObj, 'len(DictWithObj) =', len(DictWithObj))
        if len(DictWithObj) == 0:
            TTStext = f"Object '{obj}' not found!"
        else:
            obj =  list(DictWithObj.keys())[0]

            if operation == 'Read':
                TTStext = f"Found {obj} in {ThingsDB[obj][0]}; {ThingsDB[obj][1]}"
            elif operation == 'Update':
                del ThingsDB[obj]
                ThingsDB[obj] = [room, place]
                TTStext = f"Moved {obj} to {ThingsDB[obj]}"
            elif operation == 'Delete':
                TTStext = f"Removed {obj} from database."
                del ThingsDB[obj]
    TTStext = TTStext.replace("None", "")
    print(TTStext)
    LibCat.TTStoFile(TTStext, TTSfileName)

    WriteDBfile()
    print(ThingsDB)

#This is where ACTION starts
monitor_directory("C:\Temp\WTAp_sounds")

LibCat.py

Python
Small library, mainly for TTS using "pyttsx4"
import pyttsx4  # For TTStoFile 
import os


def GenerateOutputFileName(BaseOutputFullPathFileName): #Generate sequentially numbered filenames based on provided path
    if not os.path.exists(BaseOutputFullPathFileName): # If the file does not yet exist
        return BaseOutputFullPathFileName
    base_name, extension = os.path.splitext(BaseOutputFullPathFileName) # If the file already exists...
    index = 1
    while True:
        NumberedFileName = f"{base_name}_{index}{extension}" # add a sequential number...
        if not os.path.exists(NumberedFileName):             # until new name doesn't exist
            return NumberedFileName
        index += 1

def TTStoFile(text, FullFileRootWithPath):   # Creates an audio file, that will be spoken if SendToESP32OnNewFile.py is running
    FileName = GenerateOutputFileName(FullFileRootWithPath)
    engine = pyttsx4.init(driverName = 'sapi5')
    engine.setProperty('rate',140) # Default 200, too fast
    engine.save_to_file(text, FileName)
    engine.runAndWait()

Mainly Classifier using "spaCy"

Python
NLP using spaCy to find subject, room, remainder of location. Can also be run by itself.
# SpacyCat.py
# conda activate MinisWhisper
# cd ...\SpacyWhereThingsAre

import spacy
import re

# Initialize our 'database' with a sample string
database = {"object": ["room", "place"]}

# Load medium spacy model
nlp = spacy.load("en_core_web_md")  # en_core_web_md

def GetSubject(l_text):
    doc = nlp(l_text)
    MySubject = None
    for token in doc:
        if token.dep_ in ('nsubj', 'nsubjpass'):
            MySubject = ' '.join([w.text for w in token.subtree])
        # Check for sentences with inverted subject and verb
        elif token.dep_ == 'attr' and token.head.dep_ in ('ROOT', 'ccomp'):
            MySubject = ' '.join([w.text for w in token.subtree])
        # Check for sentences with imperative structure
        elif token.dep_ == 'dobj' and token.head.dep_ == 'ROOT':
            MySubject = ' '.join([w.text for w in token.subtree])
    MyRemainder = l_text.replace(MySubject, '').strip()
    if MySubject.split()[0].lower() == 'the':
        MySubject = MySubject[4:].strip()
#p    print ("MySubject: ", MySubject)
#p    print ("MyRemainder: ", MyRemainder)
    return MySubject, MyRemainder

rooms = ['room', 'den', 'garage', 'basement', 'hallway', 'landing', 'door', 'kitchen', 'office']
def FindRoom(l_text):  #!!! Works, returns the room and the rest of the phrase
    doc = nlp(l_text)
    l_Room = None
    for chunk in doc.noun_chunks:    # Iterate over the noun chunks
        if any(room in chunk.text.lower() for room in rooms): # If the chunk contains a room
            l_Room = chunk.text
#p    print ("l_Room: ", l_Room)
    l_text = re.sub(r'\bin ' + l_Room + r'\b', '', l_text) #remove l_Room and preposition
    return l_Room, l_text.replace(l_Room, '').strip()

mapping = {"remove": "Delete", "delete": "Delete", "change": "Update", "move": "Update",  
           "where": "Read", "where is": "Read", "where are": "Read","find": "Read", "is": "Create", "are": "Create"}
def GetOperation(l_text):
    for key in mapping.keys():
        if key in l_text:
            l_text = l_text.replace(key, '')
#            print(f"'{l_text}' returns ('{mapping[key]}', '{l_text.strip()}')")
            return mapping[key], l_text.strip()

def ToLowerExceptNames(l_text):
    doc = nlp(l_text)
    processed_words = []
    for i, token in enumerate(doc):
        if token.ent_type_ == 'PERSON':
            processed_words.append(token.text)
        else:
            processed_words.append(token.text.lower())
    lower_sentence = ''     # Join the processed words back into a sentence without adding extra spaces
    for i, word in enumerate(processed_words):
        if i < len(processed_words) - 1 and processed_words[i+1] == "'s":
            lower_sentence += word
        elif word in [',', '.', ';', ':', "'"]:
            lower_sentence = lower_sentence.rstrip() + word + ' '
        else:
            lower_sentence += word + ' '
    return lower_sentence.strip()


def clean_sentence(l_text):
    l_text = l_text.strip(",. ") # Remove leading and trailing commas and spaces
    prepositions = ["in", "on", "at", "under", "over", "by"] # Remove unwanted prepositions at the end
    words = l_text.split()
    while words and words[-1] in prepositions:
        words.pop()
    l_text = " ".join(words) # Reconstruct the sentence
    l_text = l_text.rstrip(",. ") # Remove trailing commas and spaces again, in case a preposition was removed
    return l_text

def process_text(l_text): # Returns operation, obj, room, place
    Remainder = ToLowerExceptNames(l_text)
#p    print ("RemainderLower: ", Remainder)
    obj, Remainder = GetSubject(Remainder)
    operation, Remainder = GetOperation(Remainder)
    if operation == 'Create':
        room, Remainder = FindRoom(Remainder)
        place = clean_sentence(Remainder)
    else:
        room = None
        place = None
    return operation, obj, room, place
    
##### MAIN LOOP
if __name__ == "__main__":
    UserInput = input("> ")
    while UserInput != 'x':

        Remainder = ToLowerExceptNames(UserInput)
        print(UserInput)

        obj, Remainder = GetSubject(Remainder)
        print('Subject, Remainder: ', obj, ';', Remainder)

        operation, Remainder = GetOperation(Remainder)
        print('Operation, Remainder: ', operation,';', Remainder)

        if operation == 'Create':

            room, Remainder = FindRoom(Remainder)
            print('Room, Remainder: ', room,';', Remainder)
    
            place = clean_sentence(Remainder)
            print('Location: ', place)
        
        UserInput = input("> ")
        

Credits

CatMan
3 projects • 3 followers

Comments