pollux labs
Published © MIT

Retro Calling: ChatGPT on Your Rotary Phone

Turn your vintage rotary phone into a ChatGPT hotline. Dial 1 for the future!

AdvancedFull instructions provided3 hours706
Retro Calling: ChatGPT on Your Rotary Phone

Things used in this project

Hardware components

Rotary dial telephone
×1
Raspberry Pi 4 Model B
Raspberry Pi 4 Model B
×1
Lavalier microphone
×1
2.8 mm flat connectors
×1
3.5 mm audio cable
×1
Button
×1
Jumper wires (generic)
Jumper wires (generic)
×1

Software apps and online services

VS Code
Microsoft VS Code

Hand tools and fabrication machines

Soldering iron (generic)
Soldering iron (generic)

Story

Read more

Schematics

Wiring diagram

Code

callGPT.py

Python
Upload it to your Raspberry Pi with the name callGPT.py
#!/usr/bin/env python3
"""
ChatGPT for Rotary Phone
-------------------------------------------------------------
https://en.polluxlabs.net
MIT License
Copyright (c) 2025 Frederik Kumbartzki
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""
import os
import sys
import time
import threading
from queue import Queue
from pathlib import Path
# Audio and speech libraries
os.environ['PYGAME_HIDE_SUPPORT_PROMPT'] = "hide"
import pygame
import pyaudio
import numpy as np
import wave
from openai import OpenAI
# OpenAI API Key
from dotenv import load_dotenv
load_dotenv()
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY")
if not OPENAI_API_KEY:
    print("Error: OPENAI_API_KEY not found.")
    sys.exit(1)
# Hardware libraries
from gpiozero import Button
# Constants and configurations
AUDIO_DIR = "/home/pi/Desktop/callGPT"
AUDIO_FILES = {
    "tone": f"{AUDIO_DIR}/a440.mp3",
    "try_again": f"{AUDIO_DIR}/tryagain.mp3",
    "error": f"{AUDIO_DIR}/error.mp3"
}
DIAL_PIN = 23  # GPIO pin for rotary dial
SWITCH_PIN = 17  # GPIO pin for hook switch
# Audio parameters
AUDIO_FORMAT = pyaudio.paInt16
CHANNELS = 1
SAMPLE_RATE = 16000
CHUNK_SIZE = 1024
SILENCE_THRESHOLD = 500
MAX_SILENCE_CHUNKS = 20  # About 1.3 seconds of silence
DEBOUNCE_TIME = 0.1  # Time in seconds for debouncing button inputs
class AudioManager:
    """Manages audio playback and recording."""
    def __init__(self):
        pygame.mixer.init(frequency=44100, buffer=2048)
        self.playing_audio = False
        self.audio_thread = None
        # Create temp directory
        self.temp_dir = Path(__file__).parent / "temp_audio"
        self.temp_dir.mkdir(exist_ok=True)
        # Preload sounds
        self.sounds = {}
        for name, path in AUDIO_FILES.items():
            try:
                self.sounds[name] = pygame.mixer.Sound(path)
            except:
                print(f"Error loading {path}")
    def play_file(self, file_path, wait=True):
        try:
            sound = pygame.mixer.Sound(file_path)
            channel = sound.play()
            if wait and channel:
                while channel.get_busy():
                    pygame.time.Clock().tick(30)
        except:
            pygame.mixer.music.load(file_path)
            pygame.mixer.music.play()
            if wait:
                while pygame.mixer.music.get_busy():
                    pygame.time.Clock().tick(30)
    def start_continuous_tone(self):
        self.playing_audio = True
        if self.audio_thread and self.audio_thread.is_alive():
            self.playing_audio = False
            self.audio_thread.join(timeout=1.0)
        self.audio_thread = threading.Thread(target=self._play_continuous_tone)
        self.audio_thread.daemon = True
        self.audio_thread.start()
    def _play_continuous_tone(self):
        try:
            if "tone" in self.sounds:
                self.sounds["tone"].play(loops=-1)
                while self.playing_audio:
                    time.sleep(0.1)
                self.sounds["tone"].stop()
            else:
                pygame.mixer.music.load(AUDIO_FILES["tone"])
                pygame.mixer.music.play(loops=-1)
                while self.playing_audio:
                    time.sleep(0.1)
                pygame.mixer.music.stop()
        except Exception as e:
            print(f"Error during tone playback: {e}")
    def stop_continuous_tone(self):
        self.playing_audio = False
        if "tone" in self.sounds:
            self.sounds["tone"].stop()
        if pygame.mixer.get_init() and pygame.mixer.music.get_busy():
            pygame.mixer.music.stop()
class SpeechRecognizer:
    """Handles real-time speech recognition using OpenAI's Whisper API."""
    def __init__(self, openai_client):
        self.client = openai_client
        self.audio = pyaudio.PyAudio()
        self.stream = None
    def capture_and_transcribe(self):
        # Setup audio stream if not already initialized
        if not self.stream:
            self.stream = self.audio.open(
                format=AUDIO_FORMAT,
                channels=CHANNELS,
                rate=SAMPLE_RATE,
                input=True,
                frames_per_buffer=CHUNK_SIZE,
            )
        # Set up queue and threading
        audio_queue = Queue()
        stop_event = threading.Event()
        # Start audio capture thread
        capture_thread = threading.Thread(
            target=self._capture_audio,
            args=(audio_queue, stop_event)
        )
        capture_thread.daemon = True
        capture_thread.start()
        # Process the audio
        result = self._process_audio(audio_queue, stop_event)
        # Cleanup
        stop_event.set()
        capture_thread.join()
        return result
    def _capture_audio(self, queue, stop_event):
        while not stop_event.is_set():
            try:
                data = self.stream.read(CHUNK_SIZE, exception_on_overflow=False)
                queue.put(data)
            except KeyboardInterrupt:
                break
    def _process_audio(self, queue, stop_event):
        buffer = b""
        speaking = False
        silence_counter = 0
        while not stop_event.is_set():
            if not queue.empty():
                chunk = queue.get()
                # Check volume
                data_np = np.frombuffer(chunk, dtype=np.int16)
                volume = np.abs(data_np).mean()
                # Detect speaking
                if volume > SILENCE_THRESHOLD:
                    speaking = True
                    silence_counter = 0
                elif speaking:
                    silence_counter += 1
                # Add chunk to buffer
                buffer += chunk
                # Process if we've detected end of speech
                if speaking and silence_counter > MAX_SILENCE_CHUNKS:
                    print("Processing speech...")
                    # Save to temp file
                    temp_file = Path(__file__).parent / "temp_recording.wav"
                    self._save_audio(buffer, temp_file)
                    # Transcribe
                    try:
                        return self._transcribe_audio(temp_file)
                    except Exception as e:
                        print(f"Error during transcription: {e}")
                        buffer = b""
                        speaking = False
                        silence_counter = 0
        return None
    def _save_audio(self, buffer, file_path):
        with wave.open(str(file_path), "wb") as wf:
            wf.setnchannels(CHANNELS)
            wf.setsampwidth(self.audio.get_sample_size(AUDIO_FORMAT))
            wf.setframerate(SAMPLE_RATE)
            wf.writeframes(buffer)
    def _transcribe_audio(self, file_path):
        with open(file_path, "rb") as audio_file:
            transcription = self.client.audio.transcriptions.create(
                model="whisper-1",
                file=audio_file,
                language="en"
            )
        return transcription.text
    def cleanup(self):
        if self.stream:
            self.stream.stop_stream()
            self.stream.close()
            self.stream = None
        if self.audio:
            self.audio.terminate()
            self.audio = None
class ResponseGenerator:
    """Generates and speaks streaming responses from OpenAI's API."""
    def __init__(self, openai_client, temp_dir):
        self.client = openai_client
        self.temp_dir = temp_dir
        self.answer = ""
    def generate_streaming_response(self, user_input, conversation_history=None):
        self.answer = ""
        collected_messages = []
        chunk_files = []
        # Audio playback queue and control variables
        audio_queue = Queue()
        playing_event = threading.Event()
        stop_event = threading.Event()
        # Start the audio playback thread
        playback_thread = threading.Thread(
            target=self._audio_playback_worker,
            args=(audio_queue, playing_event, stop_event)
        )
        playback_thread.daemon = True
        playback_thread.start()
        # Prepare messages
        messages = [
            {"role": "system", "content": "You are a humorous conversation partner engaged in a natural phone call. Keep your answers concise and to the point."}
        ]
        # Use conversation history if available, but limit to last 4 pairs
        if conversation_history and len(conversation_history) > 0:
            if len(conversation_history) > 8:
                conversation_history = conversation_history[-8:]
            messages.extend(conversation_history)
        else:
            messages.append({"role": "user", "content": user_input})
        # Stream the response
        stream = self.client.chat.completions.create(
            model="gpt-4o-mini",
            messages=messages,
            stream=True
        )
        # Variables for sentence chunking
        sentence_buffer = ""
        chunk_counter = 0
        for chunk in stream:
            if chunk.choices and hasattr(chunk.choices[0], 'delta') and hasattr(chunk.choices[0].delta, 'content'):
                content = chunk.choices[0].delta.content
                if content:
                    collected_messages.append(content)
                    sentence_buffer += content
                    # Process when we have a complete sentence or phrase
                    if any(end in content for end in [".", "!", "?", ":"]) or len(sentence_buffer) > 100:
                        # Generate speech for this chunk
                        chunk_file_path = self.temp_dir / f"chunk_{chunk_counter}.mp3"
                        try:
                            # Generate speech
                            response = self.client.audio.speech.create(
                                model="tts-1",
                                voice="alloy",
                                input=sentence_buffer,
                                speed=1.0
                            )
                            response.stream_to_file(str(chunk_file_path))
                            chunk_files.append(str(chunk_file_path))
                            # Add to playback queue
                            audio_queue.put(str(chunk_file_path))
                            # Signal playback thread if it's waiting
                            playing_event.set()
                        except Exception as e:
                            print(f"Error generating speech for chunk: {e}")
                        # Reset buffer and increment counter
                        sentence_buffer = ""
                        chunk_counter += 1
        # Process any remaining text
        if sentence_buffer.strip():
            chunk_file_path = self.temp_dir / f"chunk_{chunk_counter}.mp3"
            try:
                response = self.client.audio.speech.create(
                    model="tts-1",
                    voice="alloy",
                    input=sentence_buffer,
                    speed=1.2
                )
                response.stream_to_file(str(chunk_file_path))
                chunk_files.append(str(chunk_file_path))
                audio_queue.put(str(chunk_file_path))
                playing_event.set()
            except Exception as e:
                print(f"Error generating final speech chunk: {e}")
        # Signal end of generation
        audio_queue.put(None)  # Sentinel to signal end of queue
        # Wait for playback to complete
        playback_thread.join()
        stop_event.set()  # Ensure the thread stops
        # Combine all messages
        self.answer = "".join(collected_messages)
        print(self.answer)
        # Clean up temp files
        self._cleanup_temp_files(chunk_files)
        return self.answer
    def _audio_playback_worker(self, queue, playing_event, stop_event):
        while not stop_event.is_set():
            # Wait for a signal that there's something to play
            if queue.empty():
                playing_event.wait(timeout=0.1)
                playing_event.clear()
                continue
            # Get the next file to play
            file_path = queue.get()
            # None is our sentinel value to signal end of queue
            if file_path is None:
                break
            try:
                # Play audio and wait for completion
                pygame.mixer.music.load(file_path)
                pygame.mixer.music.play()
                # Wait for playback to complete before moving to next chunk
                while pygame.mixer.music.get_busy() and not stop_event.is_set():
                    pygame.time.Clock().tick(30)
                # Small pause between chunks for more natural flow
                time.sleep(0.05)
            except Exception as e:
                print(f"Error playing audio chunk: {e}")
    def _cleanup_temp_files(self, file_list):
        # Wait a moment to ensure files aren't in use
        time.sleep(0.5)
        for file_path in file_list:
            try:
                if os.path.exists(file_path):
                    os.remove(file_path)
            except Exception as e:
                print(f"Error removing temp file: {e}")
class RotaryDialer:
    """Handles rotary phone dialing and services."""
    def __init__(self, openai_client):
        self.client = openai_client
        self.audio_manager = AudioManager()
        self.speech_recognizer = SpeechRecognizer(openai_client)
        self.response_generator = ResponseGenerator(openai_client, self.audio_manager.temp_dir)
        # Set up GPIO
        self.dial_button = Button(DIAL_PIN, pull_up=True)
        self.switch = Button(SWITCH_PIN, pull_up=True)
        # State variables
        self.pulse_count = 0
        self.last_pulse_time = 0
        self.running = True
    def start(self):
        # Set up callbacks
        self.dial_button.when_pressed = self._pulse_detected
        self.switch.when_released = self._handle_switch_released
        self.switch.when_pressed = self._handle_switch_pressed
        # Start in ready state
        if not self.switch.is_pressed:
            # Receiver is picked up
            self.audio_manager.start_continuous_tone()
        else:
            # Receiver is on hook
            print("Phone in idle state. Pick up the receiver to begin.")
        print("Rotary dial ready. Dial a number when the receiver is picked up.")
        try:
            self._main_loop()
        except KeyboardInterrupt:
            print("Terminating...")
            self._cleanup()
    def _main_loop(self):
        while self.running:
            self._check_number()
            time.sleep(0.1)
    def _pulse_detected(self):
        if not self.switch.is_pressed:
            current_time = time.time()
            if current_time - self.last_pulse_time > DEBOUNCE_TIME:
                self.pulse_count += 1
                self.last_pulse_time = current_time
    def _check_number(self):
        if not self.switch.is_pressed and self.pulse_count > 0:
            self.audio_manager.stop_continuous_tone()
            time.sleep(1.5)  # Wait between digits
            if self.pulse_count == 10:
                self.pulse_count = 0  # "0" is sent as 10 pulses
            print("Dialed service number:", self.pulse_count)
            if self.pulse_count == 1:
                self._call_gpt_service()
                # Return to dial tone after conversation
                if not self.switch.is_pressed:  # Only if the receiver wasn't hung up
                    self._reset_state()
            self.pulse_count = 0
    def _call_gpt_service(self):
        # Conversation history for context
        conversation_history = []
        first_interaction = True
        # For faster transitions
        speech_recognizer = self.speech_recognizer
        response_generator = self.response_generator
        # Preparation for next recording
        next_recording_thread = None
        next_recording_queue = Queue()
        # Conversation loop - runs until the receiver is hung up
        while not self.switch.is_pressed:
            # If there's a prepared next recording thread, use its result
            if next_recording_thread:
                next_recording_thread.join()
                recognized_text = next_recording_queue.get()
                next_recording_thread = None
            else:
                # Only during first iteration or as fallback
                print("Listening..." + (" (Speak now)" if first_interaction else ""))
                first_interaction = False
                # Start audio processing
                recognized_text = speech_recognizer.capture_and_transcribe()
            if not recognized_text:
                print("Could not recognize your speech")
                self.audio_manager.play_file(AUDIO_FILES["try_again"])
                continue
            print("Understood:", recognized_text)
            # Update conversation history
            conversation_history.append({"role": "user", "content": recognized_text})
            # Start the next recording thread PARALLEL to API response
            next_recording_thread = threading.Thread(
                target=self._background_capture,
                args=(speech_recognizer, next_recording_queue)
            )
            next_recording_thread.daemon = True
            next_recording_thread.start()
            # Generate the response
            response = response_generator.generate_streaming_response(recognized_text, conversation_history)
            # Add response to history
            conversation_history.append({"role": "assistant", "content": response})
            # Check if the receiver was hung up in the meantime
            if self.switch.is_pressed:
                break
        # If we get here, the receiver was hung up
        if next_recording_thread and next_recording_thread.is_alive():
            next_recording_thread.join(timeout=0.5)
    def _background_capture(self, recognizer, result_queue):
        try:
            result = recognizer.capture_and_transcribe()
            result_queue.put(result)
        except Exception as e:
            print(f"Error in background recording: {e}")
            result_queue.put(None)
    def _reset_state(self):
        self.pulse_count = 0
        self.audio_manager.stop_continuous_tone()
        self.audio_manager.start_continuous_tone()
        print("Rotary dial ready. Dial a number.")
    def _handle_switch_released(self):
        print("Receiver picked up - System restarting")
        self._restart_script()
    def _handle_switch_pressed(self):
        print("Receiver hung up - System terminating")
        self._cleanup()
        self.running = False
        # Complete termination after short delay
        threading.Timer(1.0, self._restart_script).start()
        return
    def _restart_script(self):
        print("Script restarting...")
        self.audio_manager.stop_continuous_tone()
        os.execv(sys.executable, ['python'] + sys.argv)
    def _cleanup(self):
        # Terminate Audio Manager
        self.audio_manager.stop_continuous_tone()
        # Terminate Speech Recognizer if it exists
        if hasattr(self, 'speech_recognizer') and self.speech_recognizer:
            self.speech_recognizer.cleanup()
        print("Resources have been released.")
def main():
    # Initialize OpenAI client
    client = OpenAI(api_key=OPENAI_API_KEY)
    # Create and start the rotary dialer
    dialer = RotaryDialer(client)
    dialer.start()
    print("Program terminated.")
if __name__ == "__main__":
    main()

Credits

pollux labs
12 projects • 17 followers
Fun Microcontroller Projects
Contact

Comments

Please log in or sign up to comment.