Hackster is hosting Hackster Holidays, Ep. 6: Livestream & Giveaway Drawing. Watch previous episodes or stream live on Monday!Stream Hackster Holidays, Ep. 6 on Monday!
donutsorelse
Published © GPL3+

The AI Tour Guide

A gadget that automatically gives you a guided tour wherever you are with a simple command or a press of a button.

BeginnerFull instructions provided2 hours47
The AI Tour Guide

Things used in this project

Hardware components

Blues Notecarrier F
×1
Blues Notecard (Cellular)
Blues Notecard (Cellular)
I used the global one
×1
UNIHIKER - IoT Python Programming Single Board Computer with Touchscreen
DFRobot UNIHIKER - IoT Python Programming Single Board Computer with Touchscreen
×1

Software apps and online services

Mind+

Story

Read more

Code

ai_tour_guide

Python
The full code to run an ai tour guide! Input your own keys and product uid and it will work for you right out of the gate!
import os
import speech_recognition as sr
import time
import logging
import pyttsx3
from notecard import notecard
import serial
import requests
import simpleaudio as sa
from pydub import AudioSegment
from pinpong.board import Board
from pinpong.extension.unihiker import button_a, button_b

os.environ["PYTHONWARNINGS"] = "ignore"  # Suppress excess logging

# Configuration
NOTECARD_SERIAL_PORT = "/dev/ttyACM0"
NOTECARD_BAUD_RATE = 9600
PRODUCT_UID = "<your product uid here>"
BLUES_OPENAI_CHAT_ROUTE = "openai_chat"
BLUES_OPENAI_TTS_ROUTE = "openai_vision"
OPENAI_API_KEY = "<your key here>"
GOOGLE_API_KEY = "<your key here>"
force_blues = False  # Set to True to force Blues usage (simulate no Wi-Fi)

# Initialize board and logging
board = Board()
board.begin()
logging.basicConfig(level=logging.INFO)

engine = pyttsx3.init()
engine.setProperty("rate", 150)
engine.setProperty("volume", 1.0)

notecard_port = None


# Helper Functions
def setup_serial_connection(port, baud_rate):
    try:
        return serial.Serial(port, baud_rate)
    except Exception as e:
        logging.error(f"Failed to open serial port: {e}")
        return None


def setup_notecard(serial_port):
    try:
        card = notecard.OpenSerial(serial_port)
        req = {"req": "hub.set", "product": PRODUCT_UID, "mode": "continuous"}
        rsp = card.Transaction(req)
        logging.info(f"Notecard setup response: {rsp}")
        return card
    except Exception as e:
        logging.error(f"Failed to initialize Notecard: {e}")
        return None


def initialize_blues_service():
    global notecard_port
    serial_port = setup_serial_connection(NOTECARD_SERIAL_PORT, NOTECARD_BAUD_RATE)
    if serial_port:
        notecard_port = setup_notecard(serial_port)
    else:
        logging.error("No valid serial port found for Notecard.")


def get_coordinates():
    """Fetch coordinates via Google Geolocation API or Blues Notecard."""
    # return "43.0896, 79.0849" # Test coordinates are Niagara falls - I dont want to tell you guys where I live!
    global force_blues
    if not force_blues:
        try:
            logging.info("Attempting to retrieve coordinates via Google Geolocation API...")
            payload = {"considerIp": "true"}
            response = requests.post(
                f"https://www.googleapis.com/geolocation/v1/geolocate?key={GOOGLE_API_KEY}",
                json=payload,
                timeout=15
            )
            if response.status_code == 200:
                data = response.json()
                lat, lon = data['location']['lat'], data['location']['lng']
                logging.info(f"Coordinates retrieved via Google API: {lat},{lon}")
                return f"{lat},{lon}"
        except Exception as e:
            logging.warning(f"Google Geolocation API failed: {e}")

    if notecard_port:
        try:
            logging.info("Attempting to retrieve coordinates via Blues Notecard...")
            req = {"req": "card.time"}
            rsp = notecard_port.Transaction(req)
            if "lat" in rsp and "lon" in rsp:
                lat, lon = rsp["lat"], rsp["lon"]
                if lat != 0.0 or lon != 0.0:
                    logging.info(f"Coordinates retrieved via Blues Notecard: {lat},{lon}")
                    return f"{lat},{lon}"
        except Exception as e:
            logging.error(f"Failed to fetch coordinates from Blues Notecard: {e}")
    return "0.0,0.0"


def generate_tour(prompt):
    """Send a prompt to OpenAI API via Wi-Fi or Blues Wireless."""
    global force_blues
    if not force_blues:
        try:
            response = requests.post(
                "https://api.openai.com/v1/chat/completions",
                headers={"Authorization": f"Bearer {OPENAI_API_KEY}"},
                json={"model": "gpt-4", "messages": [{"role": "user", "content": prompt}], "max_tokens": 1000},
                timeout=15
            )
            if response.status_code == 200:
                # return response.json()["choices"][0]["message"]["content"]
                response = response.json()["choices"][0]["message"]["content"]
                logging.info(response)
                audio_path = openai_tourguide_speech(response)
                if not audio_path:
                    logging.warning("Failed to generate speech. Skipping audio playback.")
                    return
                play_audio(audio_path)
            else:
                logging.error(f"Wi-Fi OpenAI API Error: {response.status_code}, {response.text}")
        except Exception as e:
            logging.warning(f"Wi-Fi OpenAI call failed: {e}")

    if notecard_port:
        try:
            req = {
                "req": "web.post",
                "route": BLUES_OPENAI_CHAT_ROUTE,
                "body": {
                    "model": "gpt-4",
                    "messages": [{"role": "user", "content": prompt}],
                    "max_tokens": 1000
                }
            }
            logging.info("Sending request to OpenAI via Blues...")
            rsp = notecard_port.Transaction(req)
            if rsp.get("result") == 200:
                body = rsp["body"]
                response_text = body["choices"][0]["message"]["content"]
                logging.info(response_text)
                try:
                    logging.info("Using Pico TTS")
                    # Escape problematic characters in the text
                    sanitized_text = response_text.replace('"', '\\"').replace("'", "\\'")
                    os.system(f'pico2wave -w output_pico.wav "{sanitized_text}" && aplay output_pico.wav')
                    logging.info("Done playing tour audio")
                except Exception as e:
                    logging.error(f"Pico TTS call failed: {e}")
            else:
                logging.error(f"Blues OpenAI call failed: {rsp.get('body', {}).get('err', 'Unknown error')}")
        except Exception as e:
            logging.error(f"Blues OpenAI call failed: {e}")
    return None


    if not response:
        logging.warning("Failed to get response from OpenAI Chat API. Skipping TTS generation.")
        return
    logging.info(response)
    if not audio_path:
        logging.warning("Failed to generate speech. Skipping audio playback.")
        return

    play_audio(audio_path)


def openai_tourguide_speech(text):
    """Generate speech using OpenAI's TTS API."""
    try:
        response = requests.post(
            "https://api.openai.com/v1/audio/speech",
            headers={"Authorization": f"Bearer {OPENAI_API_KEY}"},
            json={"model": "tts-1", "voice": "alloy", "input": text},
            timeout=15
        )
        if response.status_code == 200:
            with open("speech.mp3", "wb") as f:
                f.write(response.content)
            return "speech.mp3"
        logging.error(f"OpenAI TTS API Error (Wi-Fi): {response.status_code}, {response.text}")
    except Exception as e:
        logging.warning(f"Wi-Fi TTS call failed: {e}")


def play_audio(file_path):
    """Play audio from the given file."""
    try:
        if file_path.endswith(".mp3"):
            audio = AudioSegment.from_file(file_path, format="mp3")
            normalized_audio = audio.apply_gain(-audio.max_dBFS)
            wav_file_path = file_path.replace(".mp3", ".wav")
            normalized_audio.export(wav_file_path, format="wav")
            file_path = wav_file_path

        wave_obj = sa.WaveObject.from_wave_file(file_path)
        play_obj = wave_obj.play()
        play_obj.wait_done()
    except Exception as e:
        logging.error(f"Failed to play audio: {e}")


def start_tour():
    """Handle the complete process of starting a tour."""
    coords = get_coordinates()
    if coords == "0.0,0.0":
        logging.warning("Failed to retrieve valid coordinates. Skipping tour generation.")
        return

    generate_tour(f"You are a tour guide.  Please give a thorough and interesting tour for the following specific coordinates.  Do not repeat the coordinates.  Simply give a guided tour of all the most interesting things about that location as if you are really there: {coords}")
    


def main():
    initialize_blues_service()
    recognizer = sr.Recognizer()
    microphone = sr.Microphone()
    button_a_pressed = False
    button_b_pressed = False

    try:
        while True:
            if button_a.is_pressed():
                if not button_a_pressed:
                    logging.info("Button A pressed: Starting tour...")
                    start_tour()
                    button_a_pressed = True
            else:
                button_a_pressed = False

            if button_b.is_pressed():
                if not button_b_pressed:
                    logging.info("Button B pressed: Starting another action...")
                    start_tour()
                    button_b_pressed = True
            else:
                button_b_pressed = False

            time.sleep(0.1)
    except KeyboardInterrupt:
        logging.info("Shutting down...")


if __name__ == "__main__":
    main()

Credits

donutsorelse

donutsorelse

17 projects • 18 followers
I make different stuff every week of all kinds. Usually I make funny yet useful inventions.

Comments