A major problem identified is the problem in navigating and recognizing everyday gadgets in the home because of conditions like macular degeneration, diabetic retinopathy, and different vision-associated diseases. These impairments can critically restriction independence, making it difficult to discover and identify critical items like medicinal drug, appliances, and personal property
This project is a prototype visual aiding system equipped with an integrated digital camera and audio feedback. It uses YOLOv8 and FLAN-T5, two very popular vision transformer models to identify items and generate description of the surroundings. The descriptions are sent to Unihiker which relays the audio via a GUI and bluetooth audio TTS, presenting auditory descriptions immediately to the wearer.
Server Side Code
Pythonimport paho.mqtt.client as mqtt
import base64
# Variables to hold the incoming data
image_data = b"" # This will store the received Base64 data
image_reception_complete = False # To track if the entire image has been received
# MQTT settings
broker_address = "broker.hivemq.com"
topic_img = "image/test_img"
output_image_file = "/content/drive/MyDrive/Colab Notebooks/images2/reconstructed_image.jpg" # File to save the reconstructed image
# The callback for when a message is received from the server
def on_message(client, userdata, message):
global image_data, image_reception_complete
print(f"Received chunk of size {len(message.payload)}")
# Check for the "END_OF_IMAGE" message
if message.payload == b"END_OF_IMAGE":
image_reception_complete = True
print("Received END_OF_IMAGE message.")
image_data += message.payload # Append the incoming chunk to the image data
# MQTT client setup
def on_connect(client, userdata, flags, rc):
print("Connected to MQTT Broker")
client.subscribe(topic_img) # Subscribe to the image topic
def on_disconnect(client, userdata, rc):
print("Disconnected from MQTT Broker")
# Function to decode and save the image
def save_image_from_base64(encoded_data, output_file):
with open(output_file, "wb") as img_file:
img_file.write(base64.b64decode(encoded_data)) # Decode and save the image
print(f"Image successfully saved as {output_file}")
except Exception as e:
print(f"Failed to save image: {e}")
# Create MQTT client and connect to broker
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.on_disconnect = on_disconnect
# Start the loop to process received messages
# Wait to receive all the chunks, stopping when "END_OF_IMAGE" is received
while not image_reception_complete:
pass # Keep the script running until the entire image is received
except KeyboardInterrupt:
client.loop_stop() # Stop the loop on exit
# Once all chunks are received, reconstruct and save the image
save_image_from_base64(image_data, output_image_file)
# Disconnect the client
import os
folder_path = '/content/drive/MyDrive/Colab Notebooks' # Replace 'your_folder' with the correct folder path
image_list = [os.path.join(folder_path, img) for img in os.listdir(folder_path) if img.endswith(('.jpg', '.png', '.jpeg'))]
# image_list.sort()
# print(image_list)
import concurrent.futures
from ultralytics import YOLO
from PIL import Image
model = YOLO('yolov8n.pt') # You can also use 'yolov5s.pt' for YOLOv5
# Define a function to process a single image and extract detected items
def process_image(image):
result = model(image) # result is a list
items_detected = set()
for r in result:
for detection in r.boxes:
items_detected.add(r.names[int(detection.cls)]) # r.names gives the class names
return list(items_detected)
# Set the number of threads (adjust this based on runtime performance)
num_threads = 4 # Example: using 4 threads for parallel execution
# Parallelize the processing of images
with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor:
results = list(executor.map(process_image, image_list))
# Print the results
for res in results:
from transformers import BlipProcessor, BlipForConditionalGeneration
from PIL import Image
import os
# Load the BLIP processor and model
processor = BlipProcessor.from_pretrained("Salesforce/blip-image-captioning-base")
model = BlipForConditionalGeneration.from_pretrained("Salesforce/blip-image-captioning-base")
# Define the directory containing your images
image_directory = folder_path
# Get the list of all image files in the directory
image_files = sorted([f for f in os.listdir(image_directory) if f.endswith(('.png', '.jpg', '.jpeg'))])
# Load the last image in the list
if image_files:
latest_image_path = os.path.join(image_directory, image_files[-1])
image = Image.open(latest_image_path)
# Prepare the inputs for BLIP
inputs = processor(image, return_tensors="pt")
# Generate a caption for the latest image
output = model.generate(**inputs)
caption = processor.decode(output[0], skip_special_tokens=True)
print(f"Generated Caption for {image_files[-1]}: {caption}")
print("No images found in the directory.")
from transformers import pipeline
# Load the FLAN-T5 XL model (publicly available)
llm = pipeline("text2text-generation", model="google/flan-t5-large") # You can change "large" to "base" for smaller models
# Construct the prompt
prompt = f"The image caption is: '{caption}'. The following objects are detected: {results}. Please generate a detailed scene description using this information, also mentioning what objects can be seen."
# Generate the scene description using FLAN-T5
generated_text = llm(prompt, max_length=500)[0]['generated_text']
# Output the generated description
print("Scene Description:", generated_text)
import paho.mqtt.client as mqtt
from time import sleep
# MQTT settings
broker_address = "broker.hivemq.com" # HiveMQ public broker
topic_description = "image/scene_description"
topic_objects = "image/frequent_objects"
description = generated_text
# Flatten the list
flat_results = [item for sublist in results for item in sublist]
# Join the flattened list into a string
frequent_objects_str = ', '.join(flat_results)
# Callback function when connected to the broker
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker!")
print(f"Failed to connect, return code {rc}")
# Callback function when a message is published
def on_publish(client, userdata, mid):
print(f"Data published successfully with message ID: {mid}")
# Create an MQTT client instance
client = mqtt.Client()
# Assign the callback functions
client.on_connect = on_connect
client.on_publish = on_publish
# Connect to the MQTT broker
# Start the network loop to maintain the connection
# # Example: Values to be sent
# description = "A beautiful scene with a sunset over the ocean and beach."
# frequent_objects_str = "ocean, sunset, clouds, beach"
# Publish the scene description
result_description = client.publish(topic_description, description)
print(f"Published description: {description}")
# Publish the frequent objects
result_objects = client.publish(topic_objects, frequent_objects_str)
print(f"Published frequent objects: {frequent_objects_str}")
# Give some time for the messages to be sent
# Stop the network loop
# Disconnect from the broker
Xiao code
MicroPythonimport gc
import esp
import os
import ubinascii # MicroPython's built-in binary/hex conversion library
from Wifi import Sta
from umqtt.simple import MQTTClient
from time import sleep
# Constants for Wi-Fi and MQTT setup
UID = const('xiao')
PWD = const('mick')
# Wi-Fi connection setup
sta = Sta()
AP = const('Blur')
PW = const('Blur1234')
sta.connect(AP, PW)
if not sta.wlan.isconnected():
print("Wi-Fi not connected.")
print("System aborted.")
print("Wi-Fi connected.")
# Initialize MQTT client
broker_address = "broker.hivemq.com"
mqtt_client_id = "ESP32_Client"
topic_img = "image/test_img"
def connect_to_mqtt():
client = MQTTClient(mqtt_client_id, broker_address, keepalive=60) # Set a longer keep-alive
print(f"Connected to {broker_address}")
return client
# Function to reconnect the MQTT client
def reconnect_mqtt(client):
print(f"Reconnected to {broker_address}")
except Exception as e:
print(f"Reconnection failed: {e}")
# Function to read and Base64-encode the entire image file
def read_and_encode_image(file_path):
if file_path in os.listdir():
with open(file_path, "rb") as img_file:
img_data = img_file.read() # Read the entire image as binary
encoded_data = ubinascii.b2a_base64(img_data) # Encode the entire image as Base64
return encoded_data
print(f"File {file_path} does not exist.")
return None
# Function to publish the encoded image data in larger chunks with retry and reconnection handling
def publish_encoded_image_in_chunks(client, encoded_data, chunk_size=4096, max_retries=10):
if encoded_data:
total_length = len(encoded_data)
total_chunks = (total_length + chunk_size - 1) // chunk_size # Calculate total number of chunks
print(f"Total length of encoded image: {total_length} bytes")
print(f"Total number of chunks: {total_chunks}")
for i in range(0, total_length, chunk_size):
chunk = encoded_data[i:i+chunk_size] # Break the data into larger chunks
chunk_number = i // chunk_size + 1 # Current chunk number
attempt = 0
success = False
while attempt < max_retries and not success:
client.publish(topic_img, chunk)
print(f"Published chunk {chunk_number}/{total_chunks} of size {len(chunk)}")
success = True
except Exception as e:
attempt += 1
print(f"Error sending chunk {chunk_number}/{total_chunks}, attempt {attempt}/{max_retries}: {e}")
if attempt < max_retries:
reconnect_mqtt(client) # Try to reconnect to MQTT if not all retries have been exhausted
sleep(1) # Wait before retrying
if not success:
print(f"Failed to send chunk {chunk_number}/{total_chunks} after {max_retries} retries. Aborting.")
client.disconnect() # Disconnect in case of persistent failure
sleep(0.1) # Small delay between sending chunks
print(f"Entire image sent in chunks to {topic_img}.")
client.publish(topic_img, "END_OF_IMAGE")
print(f"Published END_OF_IMAGE message.")
print("No image data to send.")
# Connect to MQTT broker
mqtt_client = connect_to_mqtt()
# Read and encode the entire image file
image_file = "test_img.jpg"
encoded_image_data = read_and_encode_image(image_file)
# Publish the encoded image data in larger chunks with retries and reconnections
publish_encoded_image_in_chunks(mqtt_client, encoded_image_data, chunk_size=4096, max_retries=10) # Larger chunk size of 1024 bytes
# Disconnect from the MQTT broker after publishing
print("Disconnected from MQTT broker.")
# Collect garbage to free up memory
print("Process completed.")
Pythonimport tkinter as tk
from gtts import gTTS
from playsound import playsound
import os
import paho.mqtt.client as mqtt
import threading
# Global variables to store the latest data from MQTT topics
scene_description = ""
frequent_objects = []
# Function to display the list of frequent objects
def display_list():
global frequent_objects
text_area.delete(1.0, tk.END)
for item in frequent_objects:
text_area.insert(tk.END, item + '\n')
# Function to perform TTS for the scene description
def perform_tts():
global scene_description
if scene_description:
tts = gTTS(text=scene_description, lang='en')
# MQTT client callbacks
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker!")
# Subscribe to both topics after connecting
print("Failed to connect, return code %d\n", rc)
def on_message(client, userdata, message):
global scene_description, frequent_objects
# Decode message based on topic
topic = message.topic
mqtt_message = str(message.payload.decode("utf-8"))
if topic == "image/scene_description":
scene_description = mqtt_message
print(f"Scene Description: {scene_description}")
# Automatically perform TTS when the scene description is updated
elif topic == "image/frequent_objects":
frequent_objects = mqtt_message.split(",") # Assuming the objects are sent as a comma-separated string
print(f"Frequent Objects: {frequent_objects}")
# MQTT listener function
def mqtt_listener():
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
# Connect to HiveMQ broker
client.connect("broker.hivemq.com", 1883, 60)
# Keep the client connected and listening to topics
# Create the main window
root = tk.Tk()
root.title("UniHiker GUI")
# Create a text area to display the list of frequent objects
text_area = tk.Text(root, height=10, width=40)
# Create a button to manually trigger TTS for scene description (optional)
tts_button = tk.Button(root, text="Speak Scene Description", command=perform_tts)
# Start the MQTT listener in a separate thread
mqtt_thread = threading.Thread(target=mqtt_listener)
mqtt_thread.daemon = True
# Run the GUI