Hardware components | ||||||
| × | 1 | ||||
| × | 1 | ||||
| × | 1 | ||||
| × | 1 | ||||
Software apps and online services | ||||||
| ||||||
Hand tools and fabrication machines | ||||||
|
A major problem identified is the problem in navigating and recognizing everyday gadgets in the home because of conditions like macular degeneration, diabetic retinopathy, and different vision-associated diseases. These impairments can critically restriction independence, making it difficult to discover and identify critical items like medicinal drug, appliances, and personal property
This project is a prototype visual aiding system equipped with an integrated digital camera and audio feedback. It uses YOLOv8 and FLAN-T5, two very popular vision transformer models to identify items and generate description of the surroundings. The descriptions are sent to Unihiker which relays the audio via a GUI and bluetooth audio TTS, presenting auditory descriptions immediately to the wearer.
Server Side Code
Pythonimport paho.mqtt.client as mqtt
import base64
# Variables to hold the incoming data
image_data = b"" # This will store the received Base64 data
image_reception_complete = False # To track if the entire image has been received
# MQTT settings
broker_address = "broker.hivemq.com"
topic_img = "image/test_img"
output_image_file = "/content/drive/MyDrive/Colab Notebooks/images2/reconstructed_image.jpg" # File to save the reconstructed image
# The callback for when a message is received from the server
def on_message(client, userdata, message):
global image_data, image_reception_complete
print(f"Received chunk of size {len(message.payload)}")
# Check for the "END_OF_IMAGE" message
if message.payload == b"END_OF_IMAGE":
image_reception_complete = True
print("Received END_OF_IMAGE message.")
else:
image_data += message.payload # Append the incoming chunk to the image data
# MQTT client setup
def on_connect(client, userdata, flags, rc):
print("Connected to MQTT Broker")
client.subscribe(topic_img) # Subscribe to the image topic
def on_disconnect(client, userdata, rc):
print("Disconnected from MQTT Broker")
# Function to decode and save the image
def save_image_from_base64(encoded_data, output_file):
try:
with open(output_file, "wb") as img_file:
img_file.write(base64.b64decode(encoded_data)) # Decode and save the image
print(f"Image successfully saved as {output_file}")
except Exception as e:
print(f"Failed to save image: {e}")
# Create MQTT client and connect to broker
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.on_disconnect = on_disconnect
client.connect(broker_address)
# Start the loop to process received messages
client.loop_start()
# Wait to receive all the chunks, stopping when "END_OF_IMAGE" is received
try:
while not image_reception_complete:
pass # Keep the script running until the entire image is received
except KeyboardInterrupt:
client.loop_stop() # Stop the loop on exit
# Once all chunks are received, reconstruct and save the image
save_image_from_base64(image_data, output_image_file)
# Disconnect the client
client.disconnect()
import os
folder_path = '/content/drive/MyDrive/Colab Notebooks' # Replace 'your_folder' with the correct folder path
image_list = [os.path.join(folder_path, img) for img in os.listdir(folder_path) if img.endswith(('.jpg', '.png', '.jpeg'))]
# image_list.sort()
# print(image_list)
import concurrent.futures
from ultralytics import YOLO
from PIL import Image
model = YOLO('yolov8n.pt') # You can also use 'yolov5s.pt' for YOLOv5
# Define a function to process a single image and extract detected items
def process_image(image):
result = model(image) # result is a list
items_detected = set()
for r in result:
for detection in r.boxes:
items_detected.add(r.names[int(detection.cls)]) # r.names gives the class names
return list(items_detected)
# Set the number of threads (adjust this based on runtime performance)
num_threads = 4 # Example: using 4 threads for parallel execution
# Parallelize the processing of images
with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor:
results = list(executor.map(process_image, image_list))
# Print the results
for res in results:
print(res)
from transformers import BlipProcessor, BlipForConditionalGeneration
from PIL import Image
import os
# Load the BLIP processor and model
processor = BlipProcessor.from_pretrained("Salesforce/blip-image-captioning-base")
model = BlipForConditionalGeneration.from_pretrained("Salesforce/blip-image-captioning-base")
# Define the directory containing your images
image_directory = folder_path
# Get the list of all image files in the directory
image_files = sorted([f for f in os.listdir(image_directory) if f.endswith(('.png', '.jpg', '.jpeg'))])
# Load the last image in the list
if image_files:
latest_image_path = os.path.join(image_directory, image_files[-1])
image = Image.open(latest_image_path)
# Prepare the inputs for BLIP
inputs = processor(image, return_tensors="pt")
# Generate a caption for the latest image
output = model.generate(**inputs)
caption = processor.decode(output[0], skip_special_tokens=True)
print(f"Generated Caption for {image_files[-1]}: {caption}")
else:
print("No images found in the directory.")
from transformers import pipeline
# Load the FLAN-T5 XL model (publicly available)
llm = pipeline("text2text-generation", model="google/flan-t5-large") # You can change "large" to "base" for smaller models
# Construct the prompt
prompt = f"The image caption is: '{caption}'. The following objects are detected: {results}. Please generate a detailed scene description using this information, also mentioning what objects can be seen."
# Generate the scene description using FLAN-T5
generated_text = llm(prompt, max_length=500)[0]['generated_text']
# Output the generated description
print("Scene Description:", generated_text)
import paho.mqtt.client as mqtt
from time import sleep
# MQTT settings
broker_address = "broker.hivemq.com" # HiveMQ public broker
topic_description = "image/scene_description"
topic_objects = "image/frequent_objects"
description = generated_text
# Flatten the list
flat_results = [item for sublist in results for item in sublist]
# Join the flattened list into a string
frequent_objects_str = ', '.join(flat_results)
# Callback function when connected to the broker
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker!")
else:
print(f"Failed to connect, return code {rc}")
# Callback function when a message is published
def on_publish(client, userdata, mid):
print(f"Data published successfully with message ID: {mid}")
# Create an MQTT client instance
client = mqtt.Client()
# Assign the callback functions
client.on_connect = on_connect
client.on_publish = on_publish
# Connect to the MQTT broker
client.connect(broker_address)
# Start the network loop to maintain the connection
client.loop_start()
# # Example: Values to be sent
# description = "A beautiful scene with a sunset over the ocean and beach."
# frequent_objects_str = "ocean, sunset, clouds, beach"
# Publish the scene description
result_description = client.publish(topic_description, description)
print(f"Published description: {description}")
# Publish the frequent objects
result_objects = client.publish(topic_objects, frequent_objects_str)
print(f"Published frequent objects: {frequent_objects_str}")
# Give some time for the messages to be sent
sleep(2)
# Stop the network loop
client.loop_stop()
# Disconnect from the broker
client.disconnect()
Xiao code
MicroPythonimport gc
import esp
import os
import ubinascii # MicroPython's built-in binary/hex conversion library
from Wifi import Sta
from umqtt.simple import MQTTClient
from time import sleep
esp.osdebug(None)
# Constants for Wi-Fi and MQTT setup
UID = const('xiao')
PWD = const('mick')
# Wi-Fi connection setup
sta = Sta()
sta.wlan.disconnect()
AP = const('Blur')
PW = const('Blur1234')
sta.connect(AP, PW)
sta.wait()
if not sta.wlan.isconnected():
print("Wi-Fi not connected.")
print("System aborted.")
else:
print("Wi-Fi connected.")
# Initialize MQTT client
broker_address = "broker.hivemq.com"
mqtt_client_id = "ESP32_Client"
topic_img = "image/test_img"
def connect_to_mqtt():
client = MQTTClient(mqtt_client_id, broker_address, keepalive=60) # Set a longer keep-alive
client.connect()
print(f"Connected to {broker_address}")
return client
# Function to reconnect the MQTT client
def reconnect_mqtt(client):
try:
client.connect()
print(f"Reconnected to {broker_address}")
except Exception as e:
print(f"Reconnection failed: {e}")
# Function to read and Base64-encode the entire image file
def read_and_encode_image(file_path):
if file_path in os.listdir():
with open(file_path, "rb") as img_file:
img_data = img_file.read() # Read the entire image as binary
encoded_data = ubinascii.b2a_base64(img_data) # Encode the entire image as Base64
return encoded_data
else:
print(f"File {file_path} does not exist.")
return None
# Function to publish the encoded image data in larger chunks with retry and reconnection handling
def publish_encoded_image_in_chunks(client, encoded_data, chunk_size=4096, max_retries=10):
if encoded_data:
total_length = len(encoded_data)
total_chunks = (total_length + chunk_size - 1) // chunk_size # Calculate total number of chunks
print(f"Total length of encoded image: {total_length} bytes")
print(f"Total number of chunks: {total_chunks}")
for i in range(0, total_length, chunk_size):
chunk = encoded_data[i:i+chunk_size] # Break the data into larger chunks
chunk_number = i // chunk_size + 1 # Current chunk number
attempt = 0
success = False
while attempt < max_retries and not success:
try:
client.publish(topic_img, chunk)
print(f"Published chunk {chunk_number}/{total_chunks} of size {len(chunk)}")
success = True
except Exception as e:
attempt += 1
print(f"Error sending chunk {chunk_number}/{total_chunks}, attempt {attempt}/{max_retries}: {e}")
if attempt < max_retries:
reconnect_mqtt(client) # Try to reconnect to MQTT if not all retries have been exhausted
sleep(1) # Wait before retrying
if not success:
print(f"Failed to send chunk {chunk_number}/{total_chunks} after {max_retries} retries. Aborting.")
client.disconnect() # Disconnect in case of persistent failure
return
sleep(0.1) # Small delay between sending chunks
print(f"Entire image sent in chunks to {topic_img}.")
client.publish(topic_img, "END_OF_IMAGE")
print(f"Published END_OF_IMAGE message.")
else:
print("No image data to send.")
# Connect to MQTT broker
mqtt_client = connect_to_mqtt()
# Read and encode the entire image file
image_file = "test_img.jpg"
encoded_image_data = read_and_encode_image(image_file)
# Publish the encoded image data in larger chunks with retries and reconnections
publish_encoded_image_in_chunks(mqtt_client, encoded_image_data, chunk_size=4096, max_retries=10) # Larger chunk size of 1024 bytes
# Disconnect from the MQTT broker after publishing
mqtt_client.disconnect()
print("Disconnected from MQTT broker.")
# Collect garbage to free up memory
gc.collect()
print("Process completed.")
UNIHIKER GUI
Pythonimport tkinter as tk
from gtts import gTTS
from playsound import playsound
import os
import paho.mqtt.client as mqtt
import threading
# Global variables to store the latest data from MQTT topics
scene_description = ""
frequent_objects = []
# Function to display the list of frequent objects
def display_list():
global frequent_objects
text_area.delete(1.0, tk.END)
for item in frequent_objects:
text_area.insert(tk.END, item + '\n')
# Function to perform TTS for the scene description
def perform_tts():
global scene_description
if scene_description:
tts = gTTS(text=scene_description, lang='en')
tts.save("output.mp3")
playsound("output.mp3")
os.remove("output.mp3")
# MQTT client callbacks
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker!")
# Subscribe to both topics after connecting
client.subscribe("image/scene_description")
client.subscribe("image/frequent_objects")
else:
print("Failed to connect, return code %d\n", rc)
def on_message(client, userdata, message):
global scene_description, frequent_objects
# Decode message based on topic
topic = message.topic
mqtt_message = str(message.payload.decode("utf-8"))
if topic == "image/scene_description":
scene_description = mqtt_message
print(f"Scene Description: {scene_description}")
# Automatically perform TTS when the scene description is updated
perform_tts()
elif topic == "image/frequent_objects":
frequent_objects = mqtt_message.split(",") # Assuming the objects are sent as a comma-separated string
print(f"Frequent Objects: {frequent_objects}")
display_list()
# MQTT listener function
def mqtt_listener():
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
# Connect to HiveMQ broker
client.connect("broker.hivemq.com", 1883, 60)
# Keep the client connected and listening to topics
client.loop_forever()
# Create the main window
root = tk.Tk()
root.title("UniHiker GUI")
# Create a text area to display the list of frequent objects
text_area = tk.Text(root, height=10, width=40)
text_area.pack()
# Create a button to manually trigger TTS for scene description (optional)
tts_button = tk.Button(root, text="Speak Scene Description", command=perform_tts)
tts_button.pack()
# Start the MQTT listener in a separate thread
mqtt_thread = threading.Thread(target=mqtt_listener)
mqtt_thread.daemon = True
mqtt_thread.start()
# Run the GUI
root.mainloop()
Comments