Hackster is hosting Hackster Holidays, Ep. 6: Livestream & Giveaway Drawing. Watch previous episodes or stream live on Monday!Stream Hackster Holidays, Ep. 6 on Monday!
Amitabh SharmaRaghavendra N.VAditya Siddheshwar
Created September 5, 2024 © GPL3+

AI for the Eye: Vision to Voice in Real-Time

An intelligent visual aid that sees identifies and describes the surrounding to the user via a bluetooth audio device

IntermediateWork in progress9 hours27
AI for the Eye: Vision to Voice in Real-Time

Things used in this project

Hardware components

Seeed Studio XIAO ESP32S3 Sense
Seeed Studio XIAO ESP32S3 Sense
×1
UNIHIKER - IoT Python Programming Single Board Computer with Touchscreen
DFRobot UNIHIKER - IoT Python Programming Single Board Computer with Touchscreen
×1
Bluetooth Earphone
×1
Laptop/AI server
×1

Software apps and online services

HiveMQ

Hand tools and fabrication machines

3D Printer (generic)
3D Printer (generic)

Story

Read more

Custom parts and enclosures

UNIHIKER CASE

These is a very good case for the UNIHIKER created by SpaceBod
Link - https://www.printables.com/model/561174-unihiker-case/files

Code

Server Side Code

Python
This is the server side code that acts as a middleman between the Unihiker and xiao esp32s3. It takes the image packets via MQTT from xiao esp32s3, reconstructs the image, and feeds it to pretrained YOLOv8 and FLAN-T5 for processing, it then publishes the generated objects and scene description data to the unihiker.
import paho.mqtt.client as mqtt
import base64

# Variables to hold the incoming data
image_data = b""  # This will store the received Base64 data
image_reception_complete = False  # To track if the entire image has been received

# MQTT settings
broker_address = "broker.hivemq.com"
topic_img = "image/test_img"
output_image_file = "/content/drive/MyDrive/Colab Notebooks/images2/reconstructed_image.jpg"  # File to save the reconstructed image

# The callback for when a message is received from the server
def on_message(client, userdata, message):
    global image_data, image_reception_complete
    print(f"Received chunk of size {len(message.payload)}")

    # Check for the "END_OF_IMAGE" message
    if message.payload == b"END_OF_IMAGE":
        image_reception_complete = True
        print("Received END_OF_IMAGE message.")
    else:
        image_data += message.payload  # Append the incoming chunk to the image data

# MQTT client setup
def on_connect(client, userdata, flags, rc):
    print("Connected to MQTT Broker")
    client.subscribe(topic_img)  # Subscribe to the image topic

def on_disconnect(client, userdata, rc):
    print("Disconnected from MQTT Broker")

# Function to decode and save the image
def save_image_from_base64(encoded_data, output_file):
    try:
        with open(output_file, "wb") as img_file:
            img_file.write(base64.b64decode(encoded_data))  # Decode and save the image
        print(f"Image successfully saved as {output_file}")
    except Exception as e:
        print(f"Failed to save image: {e}")

# Create MQTT client and connect to broker
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.on_disconnect = on_disconnect

client.connect(broker_address)

# Start the loop to process received messages
client.loop_start()

# Wait to receive all the chunks, stopping when "END_OF_IMAGE" is received
try:
    while not image_reception_complete:
        pass  # Keep the script running until the entire image is received
except KeyboardInterrupt:
    client.loop_stop()  # Stop the loop on exit

# Once all chunks are received, reconstruct and save the image
save_image_from_base64(image_data, output_image_file)

# Disconnect the client
client.disconnect()

import os

folder_path = '/content/drive/MyDrive/Colab Notebooks'  # Replace 'your_folder' with the correct folder path
image_list = [os.path.join(folder_path, img) for img in os.listdir(folder_path) if img.endswith(('.jpg', '.png', '.jpeg'))]
# image_list.sort()
# print(image_list)

import concurrent.futures
from ultralytics import YOLO
from PIL import Image


model = YOLO('yolov8n.pt')  # You can also use 'yolov5s.pt' for YOLOv5

# Define a function to process a single image and extract detected items
def process_image(image):
    result = model(image)  # result is a list
    items_detected = set()

    for r in result:
        for detection in r.boxes:
            items_detected.add(r.names[int(detection.cls)])  # r.names gives the class names

    return list(items_detected)

# Set the number of threads (adjust this based on runtime performance)
num_threads = 4  # Example: using 4 threads for parallel execution

# Parallelize the processing of images
with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor:
    results = list(executor.map(process_image, image_list))

# Print the results
for res in results:
    print(res)

from transformers import BlipProcessor, BlipForConditionalGeneration
from PIL import Image
import os

# Load the BLIP processor and model
processor = BlipProcessor.from_pretrained("Salesforce/blip-image-captioning-base")
model = BlipForConditionalGeneration.from_pretrained("Salesforce/blip-image-captioning-base")

# Define the directory containing your images
image_directory = folder_path

# Get the list of all image files in the directory
image_files = sorted([f for f in os.listdir(image_directory) if f.endswith(('.png', '.jpg', '.jpeg'))])

# Load the last image in the list
if image_files:
    latest_image_path = os.path.join(image_directory, image_files[-1])
    image = Image.open(latest_image_path)

    # Prepare the inputs for BLIP
    inputs = processor(image, return_tensors="pt")

    # Generate a caption for the latest image
    output = model.generate(**inputs)
    caption = processor.decode(output[0], skip_special_tokens=True)

    print(f"Generated Caption for {image_files[-1]}: {caption}")
else:
    print("No images found in the directory.")

from transformers import pipeline

# Load the FLAN-T5 XL model (publicly available)
llm = pipeline("text2text-generation", model="google/flan-t5-large")  # You can change "large" to "base" for smaller models

# Construct the prompt
prompt = f"The image caption is: '{caption}'. The following objects are detected: {results}. Please generate a detailed scene description using this information, also mentioning what objects can be seen."

# Generate the scene description using FLAN-T5
generated_text = llm(prompt, max_length=500)[0]['generated_text']

# Output the generated description
print("Scene Description:", generated_text)

import paho.mqtt.client as mqtt
from time import sleep

# MQTT settings
broker_address = "broker.hivemq.com"  # HiveMQ public broker
topic_description = "image/scene_description"
topic_objects = "image/frequent_objects"

description = generated_text

# Flatten the list
flat_results = [item for sublist in results for item in sublist]

# Join the flattened list into a string
frequent_objects_str = ', '.join(flat_results)

# Callback function when connected to the broker
def on_connect(client, userdata, flags, rc):
    if rc == 0:
        print("Connected to MQTT Broker!")
    else:
        print(f"Failed to connect, return code {rc}")

# Callback function when a message is published
def on_publish(client, userdata, mid):
    print(f"Data published successfully with message ID: {mid}")

# Create an MQTT client instance
client = mqtt.Client()

# Assign the callback functions
client.on_connect = on_connect
client.on_publish = on_publish

# Connect to the MQTT broker
client.connect(broker_address)

# Start the network loop to maintain the connection
client.loop_start()

# # Example: Values to be sent
# description = "A beautiful scene with a sunset over the ocean and beach."
# frequent_objects_str = "ocean, sunset, clouds, beach"

# Publish the scene description
result_description = client.publish(topic_description, description)
print(f"Published description: {description}")

# Publish the frequent objects
result_objects = client.publish(topic_objects, frequent_objects_str)
print(f"Published frequent objects: {frequent_objects_str}")

# Give some time for the messages to be sent
sleep(2)

# Stop the network loop
client.loop_stop()

# Disconnect from the broker
client.disconnect()

Xiao code

MicroPython
Captures and image and breaks it into packets of intended size to prevent interruptions due to connectivity issues. It then sends the packets via MQTT with HiveMQ as the broker
import gc
import esp
import os
import ubinascii  # MicroPython's built-in binary/hex conversion library
from Wifi import Sta
from umqtt.simple import MQTTClient
from time import sleep

esp.osdebug(None)

# Constants for Wi-Fi and MQTT setup
UID = const('xiao')
PWD = const('mick')

# Wi-Fi connection setup
sta = Sta()
sta.wlan.disconnect()
AP = const('Blur')
PW = const('Blur1234')
sta.connect(AP, PW)
sta.wait()

if not sta.wlan.isconnected():
    print("Wi-Fi not connected.")
    print("System aborted.")
else:
    print("Wi-Fi connected.")

    # Initialize MQTT client
    broker_address = "broker.hivemq.com"
    mqtt_client_id = "ESP32_Client"
    topic_img = "image/test_img"

    def connect_to_mqtt():
        client = MQTTClient(mqtt_client_id, broker_address, keepalive=60)  # Set a longer keep-alive
        client.connect()
        print(f"Connected to {broker_address}")
        return client

    # Function to reconnect the MQTT client
    def reconnect_mqtt(client):
        try:
            client.connect()
            print(f"Reconnected to {broker_address}")
        except Exception as e:
            print(f"Reconnection failed: {e}")

    # Function to read and Base64-encode the entire image file
    def read_and_encode_image(file_path):
        if file_path in os.listdir():
            with open(file_path, "rb") as img_file:
                img_data = img_file.read()  # Read the entire image as binary
                encoded_data = ubinascii.b2a_base64(img_data)  # Encode the entire image as Base64
                return encoded_data
        else:
            print(f"File {file_path} does not exist.")
            return None

    # Function to publish the encoded image data in larger chunks with retry and reconnection handling
    def publish_encoded_image_in_chunks(client, encoded_data, chunk_size=4096, max_retries=10):
        if encoded_data:
            total_length = len(encoded_data)
            total_chunks = (total_length + chunk_size - 1) // chunk_size  # Calculate total number of chunks
            print(f"Total length of encoded image: {total_length} bytes")
            print(f"Total number of chunks: {total_chunks}")

            for i in range(0, total_length, chunk_size):
                chunk = encoded_data[i:i+chunk_size]  # Break the data into larger chunks
                chunk_number = i // chunk_size + 1  # Current chunk number
                attempt = 0
                success = False
                while attempt < max_retries and not success:
                    try:
                        client.publish(topic_img, chunk)
                        print(f"Published chunk {chunk_number}/{total_chunks} of size {len(chunk)}")
                        success = True
                    except Exception as e:
                        attempt += 1
                        print(f"Error sending chunk {chunk_number}/{total_chunks}, attempt {attempt}/{max_retries}: {e}")
                        if attempt < max_retries:
                            reconnect_mqtt(client)  # Try to reconnect to MQTT if not all retries have been exhausted
                        sleep(1)  # Wait before retrying
                if not success:
                    print(f"Failed to send chunk {chunk_number}/{total_chunks} after {max_retries} retries. Aborting.")
                    client.disconnect()  # Disconnect in case of persistent failure
                    return
                sleep(0.1)  # Small delay between sending chunks
            print(f"Entire image sent in chunks to {topic_img}.")
            client.publish(topic_img, "END_OF_IMAGE")
            print(f"Published END_OF_IMAGE message.")
        else:
            print("No image data to send.")

    # Connect to MQTT broker
    mqtt_client = connect_to_mqtt()

    # Read and encode the entire image file
    image_file = "test_img.jpg"
    encoded_image_data = read_and_encode_image(image_file)

    # Publish the encoded image data in larger chunks with retries and reconnections
    publish_encoded_image_in_chunks(mqtt_client, encoded_image_data, chunk_size=4096, max_retries=10)  # Larger chunk size of 1024 bytes

    
    # Disconnect from the MQTT broker after publishing
    mqtt_client.disconnect()
    print("Disconnected from MQTT broker.")

    # Collect garbage to free up memory
    gc.collect()

print("Process completed.")

UNIHIKER GUI

Python
End user interface that directly connects with the specified bluetooth device on boot (added the mac address to bashrc, changes will be have to made there), and plays the audio sent by the AI server to aid the user.
import tkinter as tk
from gtts import gTTS
from playsound import playsound
import os
import paho.mqtt.client as mqtt
import threading

# Global variables to store the latest data from MQTT topics
scene_description = ""
frequent_objects = []

# Function to display the list of frequent objects
def display_list():
    global frequent_objects
    text_area.delete(1.0, tk.END)
    for item in frequent_objects:
        text_area.insert(tk.END, item + '\n')

# Function to perform TTS for the scene description
def perform_tts():
    global scene_description
    if scene_description:
        tts = gTTS(text=scene_description, lang='en')
        tts.save("output.mp3")
        playsound("output.mp3")
        os.remove("output.mp3")

# MQTT client callbacks
def on_connect(client, userdata, flags, rc):
    if rc == 0:
        print("Connected to MQTT Broker!")
        # Subscribe to both topics after connecting
        client.subscribe("image/scene_description")
        client.subscribe("image/frequent_objects")
    else:
        print("Failed to connect, return code %d\n", rc)

def on_message(client, userdata, message):
    global scene_description, frequent_objects
    
    # Decode message based on topic
    topic = message.topic
    mqtt_message = str(message.payload.decode("utf-8"))
    
    if topic == "image/scene_description":
        scene_description = mqtt_message
        print(f"Scene Description: {scene_description}")
        # Automatically perform TTS when the scene description is updated
        perform_tts()
        
    elif topic == "image/frequent_objects":
        frequent_objects = mqtt_message.split(",")  # Assuming the objects are sent as a comma-separated string
        print(f"Frequent Objects: {frequent_objects}")
        display_list()

# MQTT listener function
def mqtt_listener():
    client = mqtt.Client()
    client.on_connect = on_connect
    client.on_message = on_message

    # Connect to HiveMQ broker
    client.connect("broker.hivemq.com", 1883, 60)

    # Keep the client connected and listening to topics
    client.loop_forever()

# Create the main window
root = tk.Tk()
root.title("UniHiker GUI")

# Create a text area to display the list of frequent objects
text_area = tk.Text(root, height=10, width=40)
text_area.pack()

# Create a button to manually trigger TTS for scene description (optional)
tts_button = tk.Button(root, text="Speak Scene Description", command=perform_tts)
tts_button.pack()

# Start the MQTT listener in a separate thread
mqtt_thread = threading.Thread(target=mqtt_listener)
mqtt_thread.daemon = True
mqtt_thread.start()

# Run the GUI
root.mainloop()

Credits

Amitabh Sharma

Amitabh Sharma

2 projects • 1 follower
Raghavendra N.V

Raghavendra N.V

4 projects • 3 followers
Aditya Siddheshwar

Aditya Siddheshwar

1 project • 1 follower

Comments