#!/usr/bin/env python3 """ Test script for OpenAI Realtime API connection and audio handling. This script tests: 1. OpenAI API connection 2. Event receiving 3. Audio sending/receiving (if Reachy Mini is available) 4. Audio conversion utilities Usage: python test_openai_connection.py """ import os import asyncio import json import base64 import logging from pathlib import Path from dotenv import load_dotenv import websockets # Load environment variables env_paths = [ Path(__file__).parent / ".env", Path.cwd() / ".env", ] for env_path in env_paths: if env_path.exists(): load_dotenv(env_path) print(f"โœ… Loaded .env from {env_path}") break else: load_dotenv() # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # OpenAI settings OPENAI_MODEL = "gpt-realtime-2025-08-28" OPENAI_VOICE = "alloy" async def test_openai_connection(): """Test basic OpenAI Realtime API connection""" api_key = os.getenv("OPENAI_API_KEY") if not api_key: print("โŒ OPENAI_API_KEY not set in environment!") return False print(f"๐Ÿ”‘ API Key found: {api_key[:10]}...") url = f"wss://api.openai.com/v1/realtime?model={OPENAI_MODEL}" headers = { "Authorization": f"Bearer {api_key}", "OpenAI-Beta": "realtime=v1" } print(f"๐Ÿ”Œ Connecting to OpenAI Realtime API...") print(f" URL: {url}") try: ws = await websockets.connect( url, additional_headers=headers, ping_interval=20, ping_timeout=10 ) print("โœ… Connected to OpenAI!") # Wait for session.created print("โณ Waiting for session.created event...") response = await asyncio.wait_for(ws.recv(), timeout=10.0) event = json.loads(response) if event.get("type") == "session.created": print(f"โœ… Session created: {event.get('session', {}).get('id', 'unknown')}") else: print(f"โš ๏ธ Unexpected event: {event.get('type')}") print(f" Event: {json.dumps(event, indent=2)}") # Configure session print("โš™๏ธ Configuring session...") config = { "type": "session.update", "session": { "modalities": ["audio", "text"], "instructions": "You are a helpful assistant. Respond briefly.", "voice": OPENAI_VOICE, "input_audio_format": "pcm16", "output_audio_format": "pcm16", "input_audio_transcription": { "model": "whisper-1" }, "turn_detection": { "type": "semantic_vad", "eagerness": "low", "create_response": True, "interrupt_response": True }, "temperature": 0.8, "max_response_output_tokens": 500 } } await ws.send(json.dumps(config)) print("โœ… Session configured") # Test: Trigger a response print("๐Ÿ’ฌ Triggering test response...") await ws.send(json.dumps({ "type": "response.create", "response": { "instructions": "Say 'Hello! This is a test. Can you hear me?'" } })) # Listen for events print("๐Ÿ‘‚ Listening for events (10 seconds)...") events_received = 0 audio_chunks_received = 0 transcription_received = False async def listen_for_events(): nonlocal events_received, audio_chunks_received, transcription_received async for message in ws: event = json.loads(message) event_type = event.get("type", "unknown") events_received += 1 print(f"๐Ÿ“จ Event #{events_received}: {event_type}") if event_type == "response.audio.delta": audio_b64 = event.get("delta", "") if audio_b64: audio_chunks_received += 1 if audio_chunks_received % 10 == 0: print(f" ๐Ÿ”Š Received {audio_chunks_received} audio chunks") elif event_type == "conversation.item.input_audio_transcription.completed": transcript = event.get("transcript", "") print(f" ๐Ÿ“ Transcription: {transcript}") transcription_received = True elif event_type == "response.done": print(f" โœ… Response completed") return True elif event_type == "error": error = event.get("error", {}) print(f" โŒ Error: {error}") if events_received >= 20: # Limit events for testing return True try: await asyncio.wait_for(listen_for_events(), timeout=10.0) except asyncio.TimeoutError: print("โฑ๏ธ Timeout waiting for events") # Summary print("\n๐Ÿ“Š Test Summary:") print(f" Events received: {events_received}") print(f" Audio chunks: {audio_chunks_received}") print(f" Transcription: {'โœ…' if transcription_received else 'โŒ'}") # Close connection await ws.close() print("โœ… Connection closed") return True except Exception as e: print(f"โŒ Error: {e}") import traceback traceback.print_exc() return False async def test_audio_transcription(): """Test audio transcription by sending audio to OpenAI""" print("\n๐Ÿงช Testing audio transcription...") api_key = os.getenv("OPENAI_API_KEY") if not api_key: print(" โŒ OPENAI_API_KEY not set!") return False try: from twenty_questions_game.audio_utils import prepare_audio_for_openai, OPENAI_SAMPLE_RATE import numpy as np url = f"wss://api.openai.com/v1/realtime?model={OPENAI_MODEL}" headers = { "Authorization": f"Bearer {api_key}", "OpenAI-Beta": "realtime=v1" } print(" ๐Ÿ”Œ Connecting to OpenAI...") ws = await websockets.connect( url, additional_headers=headers, ping_interval=20, ping_timeout=10 ) # Wait for session.created response = await ws.recv() event = json.loads(response) if event.get("type") != "session.created": print(f" โŒ Unexpected event: {event.get('type')}") await ws.close() return False # Configure session config = { "type": "session.update", "session": { "modalities": ["audio", "text"], "instructions": "You are a helpful assistant. Transcribe what you hear.", "voice": OPENAI_VOICE, "input_audio_format": "pcm16", "output_audio_format": "pcm16", "input_audio_transcription": { "model": "whisper-1" }, "turn_detection": { "type": "semantic_vad", "eagerness": "low", "create_response": False, # Don't create response, just transcribe "interrupt_response": False }, "temperature": 0.8 } } await ws.send(json.dumps(config)) # Generate test audio (simple sine wave to simulate speech-like audio) # OpenAI requires at least 100ms of audio (2400 samples at 24kHz = 4800 bytes) print(" ๐ŸŽต Generating test audio...") sample_rate = 16000 # Input sample rate duration = 0.5 # 500ms (well above 100ms minimum) frequency = 440 # A4 note samples = int(sample_rate * duration) t = np.linspace(0, duration, samples, False) # Create a more speech-like signal with modulation test_audio = np.sin(2 * np.pi * frequency * t) * (1 + 0.5 * np.sin(2 * np.pi * 5 * t)) test_audio = (test_audio * 0.3 * 32767).astype(np.int16) # Scale down to avoid clipping # Convert to OpenAI format (24kHz, PCM16) audio_bytes = prepare_audio_for_openai(test_audio, sample_rate) # Calculate expected samples at 24kHz expected_samples_24k = int(len(test_audio) * 24000 / sample_rate) expected_bytes = expected_samples_24k * 2 # 2 bytes per int16 sample print(f" ๐Ÿ“Š Audio: {len(test_audio)} samples @ {sample_rate}Hz -> {len(audio_bytes)} bytes @ 24kHz") print(f" ๐Ÿ“Š Expected: {expected_samples_24k} samples = {expected_bytes} bytes") # Split audio BYTES into chunks (not base64 string!) # Each chunk should be a complete base64-encoded segment chunk_size_bytes = len(audio_bytes) // 10 # 10 chunks if chunk_size_bytes == 0: chunk_size_bytes = len(audio_bytes) # If too small, send as one chunk chunks = [] for i in range(0, len(audio_bytes), chunk_size_bytes): chunk_bytes = audio_bytes[i:i+chunk_size_bytes] chunk_b64 = base64.b64encode(chunk_bytes).decode('ascii') chunks.append(chunk_b64) print(f" ๐Ÿ“ค Sending {len(chunks)} audio chunks ({len(audio_bytes)} total bytes) to OpenAI...") for i, chunk in enumerate(chunks): await ws.send(json.dumps({ "type": "input_audio_buffer.append", "audio": chunk })) if i < len(chunks) - 1: # Don't sleep after last chunk await asyncio.sleep(0.01) # Small delay between chunks # Wait a moment for buffer to process await asyncio.sleep(0.1) # Signal end of input print(" โœ… Committing audio buffer...") await ws.send(json.dumps({ "type": "input_audio_buffer.commit" })) print(" ๐Ÿ‘‚ Waiting for transcription (5 seconds)...") transcription_received = False transcript_text = "" events_received = 0 async def listen_for_transcription(): nonlocal transcription_received, transcript_text, events_received async for message in ws: event = json.loads(message) events_received += 1 event_type = event.get("type", "unknown") if event_type == "conversation.item.input_audio_transcription.completed": transcript = event.get("transcript", "") transcript_text = transcript transcription_received = True print(f" ๐Ÿ“ Transcription received: '{transcript}'") return True elif event_type == "conversation.item.input_audio_transcription.failed": error = event.get("error", {}) print(f" โŒ Transcription failed: {error}") return False elif event_type == "error": error = event.get("error", {}) print(f" โŒ Error: {error}") return False if events_received >= 50: # Limit events return False try: result = await asyncio.wait_for(listen_for_transcription(), timeout=5.0) except asyncio.TimeoutError: print(" โฑ๏ธ Timeout waiting for transcription") result = False await ws.close() if transcription_received: print(f" โœ… Transcription test passed: '{transcript_text}'") return True else: print(f" โŒ No transcription received (got {events_received} events)") return False except Exception as e: print(f" โŒ Error: {e}") import traceback traceback.print_exc() return False async def test_audio_conversion(): """Test audio conversion utilities""" print("\n๐Ÿงช Testing audio conversion utilities...") try: from twenty_questions_game.audio_utils import ( prepare_audio_for_openai, decode_audio_from_openai, prepare_audio_for_reachy, OPENAI_SAMPLE_RATE ) import numpy as np # Create test audio (sine wave) sample_rate = 16000 duration = 0.1 # 100ms frequency = 440 # A4 note samples = int(sample_rate * duration) t = np.linspace(0, duration, samples, False) test_audio = np.sin(2 * np.pi * frequency * t) test_audio = (test_audio * 32767).astype(np.int16) print(f" Created test audio: {len(test_audio)} samples at {sample_rate}Hz") # Test: Reachy -> OpenAI audio_bytes = prepare_audio_for_openai(test_audio, sample_rate) print(f" โœ… Reachy->OpenAI: {len(audio_bytes)} bytes") # Test: OpenAI -> Reachy audio_b64 = base64.b64encode(audio_bytes).decode('ascii') audio_decoded = decode_audio_from_openai(audio_b64) audio_for_reachy = prepare_audio_for_reachy(audio_decoded, 48000) print(f" โœ… OpenAI->Reachy: {len(audio_for_reachy)} samples at 48kHz") return True except Exception as e: print(f" โŒ Error: {e}") import traceback traceback.print_exc() return False async def test_with_reachy(): """Test with actual Reachy Mini (if available)""" print("\n๐Ÿค– Testing with Reachy Mini...") try: from reachy_mini import ReachyMini print(" Connecting to Reachy Mini...") reachy = ReachyMini() print(" โœ… Connected to Reachy Mini") # Test audio print(" Testing audio capture...") reachy.media.start_recording() samples_received = 0 for i in range(50): # Try for ~1 second at 50Hz audio = reachy.media.get_audio_sample() if audio is not None and len(audio) > 0: samples_received += 1 reachy.media.stop_recording() print(f" โœ… Audio capture: {samples_received}/50 samples received") # Test playback print(" Testing audio playback...") import numpy as np # Reachy Mini expects float32, normalized -1.0 to 1.0 test_audio = np.zeros(4800, dtype=np.float32) # 0.1s at 48kHz reachy.media.start_playing() reachy.media.push_audio_sample(test_audio) await asyncio.sleep(0.2) reachy.media.stop_playing() print(" โœ… Audio playback test completed") return True except ImportError: print(" โš ๏ธ Reachy Mini not available (this is OK for testing)") return None except Exception as e: print(f" โŒ Error: {e}") import traceback traceback.print_exc() return False async def main(): """Run all tests""" print("=" * 60) print("๐Ÿงช OpenAI Realtime API Test Script") print("=" * 60) results = {} # Test 1: OpenAI Connection print("\n" + "=" * 60) print("TEST 1: OpenAI Connection") print("=" * 60) results['openai'] = await test_openai_connection() # Test 2: Audio Transcription print("\n" + "=" * 60) print("TEST 2: Audio Transcription") print("=" * 60) results['transcription'] = await test_audio_transcription() # Test 3: Audio Conversion print("\n" + "=" * 60) print("TEST 3: Audio Conversion Utilities") print("=" * 60) results['audio_conversion'] = await test_audio_conversion() # Test 4: Reachy Mini (optional) print("\n" + "=" * 60) print("TEST 4: Reachy Mini Integration (Optional)") print("=" * 60) results['reachy'] = await test_with_reachy() # Final Summary print("\n" + "=" * 60) print("๐Ÿ“‹ FINAL SUMMARY") print("=" * 60) for test_name, result in results.items(): if result is None: status = "โš ๏ธ SKIPPED" elif result: status = "โœ… PASSED" else: status = "โŒ FAILED" print(f" {test_name:20s}: {status}") print("\n" + "=" * 60) if __name__ == "__main__": asyncio.run(main())