#!/usr/bin/env python3
"""
ChatGPT for Rotary Phone
-------------------------------------------------------------
https://en.polluxlabs.net
MIT License
Copyright (c) 2025 Frederik Kumbartzki
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""
import os
import sys
import time
import threading
from queue import Queue
from pathlib import Path
# Audio and speech libraries
os.environ['PYGAME_HIDE_SUPPORT_PROMPT'] = "hide"
import pygame
import pyaudio
import numpy as np
import wave
from openai import OpenAI
# OpenAI API Key
from dotenv import load_dotenv
load_dotenv()
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY")
if not OPENAI_API_KEY:
print("Error: OPENAI_API_KEY not found.")
sys.exit(1)
# Hardware libraries
from gpiozero import Button
# Constants and configurations
AUDIO_DIR = "/home/pi/Desktop/callGPT"
AUDIO_FILES = {
"tone": f"{AUDIO_DIR}/a440.mp3",
"try_again": f"{AUDIO_DIR}/tryagain.mp3",
"error": f"{AUDIO_DIR}/error.mp3"
}
DIAL_PIN = 23 # GPIO pin for rotary dial
SWITCH_PIN = 17 # GPIO pin for hook switch
# Audio parameters
AUDIO_FORMAT = pyaudio.paInt16
CHANNELS = 1
SAMPLE_RATE = 16000
CHUNK_SIZE = 1024
SILENCE_THRESHOLD = 500
MAX_SILENCE_CHUNKS = 20 # About 1.3 seconds of silence
DEBOUNCE_TIME = 0.1 # Time in seconds for debouncing button inputs
class AudioManager:
"""Manages audio playback and recording."""
def __init__(self):
pygame.mixer.init(frequency=44100, buffer=2048)
self.playing_audio = False
self.audio_thread = None
# Create temp directory
self.temp_dir = Path(__file__).parent / "temp_audio"
self.temp_dir.mkdir(exist_ok=True)
# Preload sounds
self.sounds = {}
for name, path in AUDIO_FILES.items():
try:
self.sounds[name] = pygame.mixer.Sound(path)
except:
print(f"Error loading {path}")
def play_file(self, file_path, wait=True):
try:
sound = pygame.mixer.Sound(file_path)
channel = sound.play()
if wait and channel:
while channel.get_busy():
pygame.time.Clock().tick(30)
except:
pygame.mixer.music.load(file_path)
pygame.mixer.music.play()
if wait:
while pygame.mixer.music.get_busy():
pygame.time.Clock().tick(30)
def start_continuous_tone(self):
self.playing_audio = True
if self.audio_thread and self.audio_thread.is_alive():
self.playing_audio = False
self.audio_thread.join(timeout=1.0)
self.audio_thread = threading.Thread(target=self._play_continuous_tone)
self.audio_thread.daemon = True
self.audio_thread.start()
def _play_continuous_tone(self):
try:
if "tone" in self.sounds:
self.sounds["tone"].play(loops=-1)
while self.playing_audio:
time.sleep(0.1)
self.sounds["tone"].stop()
else:
pygame.mixer.music.load(AUDIO_FILES["tone"])
pygame.mixer.music.play(loops=-1)
while self.playing_audio:
time.sleep(0.1)
pygame.mixer.music.stop()
except Exception as e:
print(f"Error during tone playback: {e}")
def stop_continuous_tone(self):
self.playing_audio = False
if "tone" in self.sounds:
self.sounds["tone"].stop()
if pygame.mixer.get_init() and pygame.mixer.music.get_busy():
pygame.mixer.music.stop()
class SpeechRecognizer:
"""Handles real-time speech recognition using OpenAI's Whisper API."""
def __init__(self, openai_client):
self.client = openai_client
self.audio = pyaudio.PyAudio()
self.stream = None
def capture_and_transcribe(self):
# Setup audio stream if not already initialized
if not self.stream:
self.stream = self.audio.open(
format=AUDIO_FORMAT,
channels=CHANNELS,
rate=SAMPLE_RATE,
input=True,
frames_per_buffer=CHUNK_SIZE,
)
# Set up queue and threading
audio_queue = Queue()
stop_event = threading.Event()
# Start audio capture thread
capture_thread = threading.Thread(
target=self._capture_audio,
args=(audio_queue, stop_event)
)
capture_thread.daemon = True
capture_thread.start()
# Process the audio
result = self._process_audio(audio_queue, stop_event)
# Cleanup
stop_event.set()
capture_thread.join()
return result
def _capture_audio(self, queue, stop_event):
while not stop_event.is_set():
try:
data = self.stream.read(CHUNK_SIZE, exception_on_overflow=False)
queue.put(data)
except KeyboardInterrupt:
break
def _process_audio(self, queue, stop_event):
buffer = b""
speaking = False
silence_counter = 0
while not stop_event.is_set():
if not queue.empty():
chunk = queue.get()
# Check volume
data_np = np.frombuffer(chunk, dtype=np.int16)
volume = np.abs(data_np).mean()
# Detect speaking
if volume > SILENCE_THRESHOLD:
speaking = True
silence_counter = 0
elif speaking:
silence_counter += 1
# Add chunk to buffer
buffer += chunk
# Process if we've detected end of speech
if speaking and silence_counter > MAX_SILENCE_CHUNKS:
print("Processing speech...")
# Save to temp file
temp_file = Path(__file__).parent / "temp_recording.wav"
self._save_audio(buffer, temp_file)
# Transcribe
try:
return self._transcribe_audio(temp_file)
except Exception as e:
print(f"Error during transcription: {e}")
buffer = b""
speaking = False
silence_counter = 0
return None
def _save_audio(self, buffer, file_path):
with wave.open(str(file_path), "wb") as wf:
wf.setnchannels(CHANNELS)
wf.setsampwidth(self.audio.get_sample_size(AUDIO_FORMAT))
wf.setframerate(SAMPLE_RATE)
wf.writeframes(buffer)
def _transcribe_audio(self, file_path):
with open(file_path, "rb") as audio_file:
transcription = self.client.audio.transcriptions.create(
model="whisper-1",
file=audio_file,
language="en"
)
return transcription.text
def cleanup(self):
if self.stream:
self.stream.stop_stream()
self.stream.close()
self.stream = None
if self.audio:
self.audio.terminate()
self.audio = None
class ResponseGenerator:
"""Generates and speaks streaming responses from OpenAI's API."""
def __init__(self, openai_client, temp_dir):
self.client = openai_client
self.temp_dir = temp_dir
self.answer = ""
def generate_streaming_response(self, user_input, conversation_history=None):
self.answer = ""
collected_messages = []
chunk_files = []
# Audio playback queue and control variables
audio_queue = Queue()
playing_event = threading.Event()
stop_event = threading.Event()
# Start the audio playback thread
playback_thread = threading.Thread(
target=self._audio_playback_worker,
args=(audio_queue, playing_event, stop_event)
)
playback_thread.daemon = True
playback_thread.start()
# Prepare messages
messages = [
{"role": "system", "content": "You are a humorous conversation partner engaged in a natural phone call. Keep your answers concise and to the point."}
]
# Use conversation history if available, but limit to last 4 pairs
if conversation_history and len(conversation_history) > 0:
if len(conversation_history) > 8:
conversation_history = conversation_history[-8:]
messages.extend(conversation_history)
else:
messages.append({"role": "user", "content": user_input})
# Stream the response
stream = self.client.chat.completions.create(
model="gpt-4o-mini",
messages=messages,
stream=True
)
# Variables for sentence chunking
sentence_buffer = ""
chunk_counter = 0
for chunk in stream:
if chunk.choices and hasattr(chunk.choices[0], 'delta') and hasattr(chunk.choices[0].delta, 'content'):
content = chunk.choices[0].delta.content
if content:
collected_messages.append(content)
sentence_buffer += content
# Process when we have a complete sentence or phrase
if any(end in content for end in [".", "!", "?", ":"]) or len(sentence_buffer) > 100:
# Generate speech for this chunk
chunk_file_path = self.temp_dir / f"chunk_{chunk_counter}.mp3"
try:
# Generate speech
response = self.client.audio.speech.create(
model="tts-1",
voice="alloy",
input=sentence_buffer,
speed=1.0
)
response.stream_to_file(str(chunk_file_path))
chunk_files.append(str(chunk_file_path))
# Add to playback queue
audio_queue.put(str(chunk_file_path))
# Signal playback thread if it's waiting
playing_event.set()
except Exception as e:
print(f"Error generating speech for chunk: {e}")
# Reset buffer and increment counter
sentence_buffer = ""
chunk_counter += 1
# Process any remaining text
if sentence_buffer.strip():
chunk_file_path = self.temp_dir / f"chunk_{chunk_counter}.mp3"
try:
response = self.client.audio.speech.create(
model="tts-1",
voice="alloy",
input=sentence_buffer,
speed=1.2
)
response.stream_to_file(str(chunk_file_path))
chunk_files.append(str(chunk_file_path))
audio_queue.put(str(chunk_file_path))
playing_event.set()
except Exception as e:
print(f"Error generating final speech chunk: {e}")
# Signal end of generation
audio_queue.put(None) # Sentinel to signal end of queue
# Wait for playback to complete
playback_thread.join()
stop_event.set() # Ensure the thread stops
# Combine all messages
self.answer = "".join(collected_messages)
print(self.answer)
# Clean up temp files
self._cleanup_temp_files(chunk_files)
return self.answer
def _audio_playback_worker(self, queue, playing_event, stop_event):
while not stop_event.is_set():
# Wait for a signal that there's something to play
if queue.empty():
playing_event.wait(timeout=0.1)
playing_event.clear()
continue
# Get the next file to play
file_path = queue.get()
# None is our sentinel value to signal end of queue
if file_path is None:
break
try:
# Play audio and wait for completion
pygame.mixer.music.load(file_path)
pygame.mixer.music.play()
# Wait for playback to complete before moving to next chunk
while pygame.mixer.music.get_busy() and not stop_event.is_set():
pygame.time.Clock().tick(30)
# Small pause between chunks for more natural flow
time.sleep(0.05)
except Exception as e:
print(f"Error playing audio chunk: {e}")
def _cleanup_temp_files(self, file_list):
# Wait a moment to ensure files aren't in use
time.sleep(0.5)
for file_path in file_list:
try:
if os.path.exists(file_path):
os.remove(file_path)
except Exception as e:
print(f"Error removing temp file: {e}")
class RotaryDialer:
"""Handles rotary phone dialing and services."""
def __init__(self, openai_client):
self.client = openai_client
self.audio_manager = AudioManager()
self.speech_recognizer = SpeechRecognizer(openai_client)
self.response_generator = ResponseGenerator(openai_client, self.audio_manager.temp_dir)
# Set up GPIO
self.dial_button = Button(DIAL_PIN, pull_up=True)
self.switch = Button(SWITCH_PIN, pull_up=True)
# State variables
self.pulse_count = 0
self.last_pulse_time = 0
self.running = True
def start(self):
# Set up callbacks
self.dial_button.when_pressed = self._pulse_detected
self.switch.when_released = self._handle_switch_released
self.switch.when_pressed = self._handle_switch_pressed
# Start in ready state
if not self.switch.is_pressed:
# Receiver is picked up
self.audio_manager.start_continuous_tone()
else:
# Receiver is on hook
print("Phone in idle state. Pick up the receiver to begin.")
print("Rotary dial ready. Dial a number when the receiver is picked up.")
try:
self._main_loop()
except KeyboardInterrupt:
print("Terminating...")
self._cleanup()
def _main_loop(self):
while self.running:
self._check_number()
time.sleep(0.1)
def _pulse_detected(self):
if not self.switch.is_pressed:
current_time = time.time()
if current_time - self.last_pulse_time > DEBOUNCE_TIME:
self.pulse_count += 1
self.last_pulse_time = current_time
def _check_number(self):
if not self.switch.is_pressed and self.pulse_count > 0:
self.audio_manager.stop_continuous_tone()
time.sleep(1.5) # Wait between digits
if self.pulse_count == 10:
self.pulse_count = 0 # "0" is sent as 10 pulses
print("Dialed service number:", self.pulse_count)
if self.pulse_count == 1:
self._call_gpt_service()
# Return to dial tone after conversation
if not self.switch.is_pressed: # Only if the receiver wasn't hung up
self._reset_state()
self.pulse_count = 0
def _call_gpt_service(self):
# Conversation history for context
conversation_history = []
first_interaction = True
# For faster transitions
speech_recognizer = self.speech_recognizer
response_generator = self.response_generator
# Preparation for next recording
next_recording_thread = None
next_recording_queue = Queue()
# Conversation loop - runs until the receiver is hung up
while not self.switch.is_pressed:
# If there's a prepared next recording thread, use its result
if next_recording_thread:
next_recording_thread.join()
recognized_text = next_recording_queue.get()
next_recording_thread = None
else:
# Only during first iteration or as fallback
print("Listening..." + (" (Speak now)" if first_interaction else ""))
first_interaction = False
# Start audio processing
recognized_text = speech_recognizer.capture_and_transcribe()
if not recognized_text:
print("Could not recognize your speech")
self.audio_manager.play_file(AUDIO_FILES["try_again"])
continue
print("Understood:", recognized_text)
# Update conversation history
conversation_history.append({"role": "user", "content": recognized_text})
# Start the next recording thread PARALLEL to API response
next_recording_thread = threading.Thread(
target=self._background_capture,
args=(speech_recognizer, next_recording_queue)
)
next_recording_thread.daemon = True
next_recording_thread.start()
# Generate the response
response = response_generator.generate_streaming_response(recognized_text, conversation_history)
# Add response to history
conversation_history.append({"role": "assistant", "content": response})
# Check if the receiver was hung up in the meantime
if self.switch.is_pressed:
break
# If we get here, the receiver was hung up
if next_recording_thread and next_recording_thread.is_alive():
next_recording_thread.join(timeout=0.5)
def _background_capture(self, recognizer, result_queue):
try:
result = recognizer.capture_and_transcribe()
result_queue.put(result)
except Exception as e:
print(f"Error in background recording: {e}")
result_queue.put(None)
def _reset_state(self):
self.pulse_count = 0
self.audio_manager.stop_continuous_tone()
self.audio_manager.start_continuous_tone()
print("Rotary dial ready. Dial a number.")
def _handle_switch_released(self):
print("Receiver picked up - System restarting")
self._restart_script()
def _handle_switch_pressed(self):
print("Receiver hung up - System terminating")
self._cleanup()
self.running = False
# Complete termination after short delay
threading.Timer(1.0, self._restart_script).start()
return
def _restart_script(self):
print("Script restarting...")
self.audio_manager.stop_continuous_tone()
os.execv(sys.executable, ['python'] + sys.argv)
def _cleanup(self):
# Terminate Audio Manager
self.audio_manager.stop_continuous_tone()
# Terminate Speech Recognizer if it exists
if hasattr(self, 'speech_recognizer') and self.speech_recognizer:
self.speech_recognizer.cleanup()
print("Resources have been released.")
def main():
# Initialize OpenAI client
client = OpenAI(api_key=OPENAI_API_KEY)
# Create and start the rotary dialer
dialer = RotaryDialer(client)
dialer.start()
print("Program terminated.")
if __name__ == "__main__":
main()
Comments
Please log in or sign up to comment.