|
|
|
|
|
""" |
|
|
Test that movements don't block audio playback. |
|
|
|
|
|
This test verifies that robot movements execute independently and never |
|
|
interrupt or delay audio processing, ensuring smooth audio playback. |
|
|
""" |
|
|
|
|
|
import asyncio |
|
|
import time |
|
|
import numpy as np |
|
|
from typing import Optional |
|
|
import sys |
|
|
from pathlib import Path |
|
|
from unittest.mock import Mock, MagicMock, patch |
|
|
|
|
|
|
|
|
sys.path.insert(0, str(Path(__file__).parent.parent)) |
|
|
|
|
|
from twenty_questions_game.main import ReachyMiniTwentyQuestions |
|
|
|
|
|
|
|
|
class AudioPlaybackSimulator: |
|
|
"""Simulates continuous audio playback to test for interruptions""" |
|
|
|
|
|
def __init__(self): |
|
|
self.audio_chunks_sent = 0 |
|
|
self.audio_timestamps = [] |
|
|
self.interruptions = [] |
|
|
self.last_audio_time = None |
|
|
|
|
|
def push_audio_sample(self, chunk): |
|
|
"""Simulate pushing audio - records timing to detect interruptions""" |
|
|
current_time = time.time() |
|
|
self.audio_chunks_sent += 1 |
|
|
self.audio_timestamps.append(current_time) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if self.last_audio_time is not None: |
|
|
gap = current_time - self.last_audio_time |
|
|
if gap > 0.1: |
|
|
self.interruptions.append({ |
|
|
'gap': gap, |
|
|
'chunk': self.audio_chunks_sent, |
|
|
'timestamp': current_time |
|
|
}) |
|
|
|
|
|
self.last_audio_time = current_time |
|
|
|
|
|
def get_stats(self): |
|
|
"""Get statistics about audio playback""" |
|
|
if len(self.audio_timestamps) < 2: |
|
|
return { |
|
|
'chunks_sent': self.audio_chunks_sent, |
|
|
'interruptions': len(self.interruptions), |
|
|
'max_gap': 0.0, |
|
|
'avg_gap': 0.0 |
|
|
} |
|
|
|
|
|
gaps = [ |
|
|
self.audio_timestamps[i] - self.audio_timestamps[i-1] |
|
|
for i in range(1, len(self.audio_timestamps)) |
|
|
] |
|
|
|
|
|
return { |
|
|
'chunks_sent': self.audio_chunks_sent, |
|
|
'interruptions': len(self.interruptions), |
|
|
'max_gap': max(gaps) if gaps else 0.0, |
|
|
'avg_gap': sum(gaps) / len(gaps) if gaps else 0.0, |
|
|
'interruption_details': self.interruptions |
|
|
} |
|
|
|
|
|
|
|
|
async def test_movements_dont_block_audio(): |
|
|
"""Test that movements execute without blocking audio playback""" |
|
|
print("🧪 Testing that movements don't block audio...") |
|
|
|
|
|
|
|
|
app = ReachyMiniTwentyQuestions() |
|
|
|
|
|
|
|
|
mock_reachy = Mock() |
|
|
audio_simulator = AudioPlaybackSimulator() |
|
|
mock_reachy.media.push_audio_sample = audio_simulator.push_audio_sample |
|
|
mock_reachy.goto_target = Mock() |
|
|
mock_reachy.play_move = Mock() |
|
|
|
|
|
|
|
|
app._emotions_library = Mock() |
|
|
app._emotions_library.get = Mock(return_value=Mock()) |
|
|
app._available_emotions = {'enthusiastic1', 'attentive1', 'curious1'} |
|
|
|
|
|
|
|
|
loop = asyncio.get_event_loop() |
|
|
app._loop = loop |
|
|
|
|
|
|
|
|
app.motion_control.start() |
|
|
|
|
|
try: |
|
|
|
|
|
app.is_speaking = True |
|
|
app.speaking_movement_stop = False |
|
|
|
|
|
|
|
|
app.speaking_movement_task = asyncio.create_task( |
|
|
app._speaking_movements(mock_reachy) |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
audio_task = asyncio.create_task( |
|
|
simulate_audio_playback(app, mock_reachy, duration=3.0) |
|
|
) |
|
|
|
|
|
|
|
|
movement_task = asyncio.create_task( |
|
|
trigger_movements_during_audio(app, mock_reachy) |
|
|
) |
|
|
|
|
|
|
|
|
await asyncio.gather(audio_task, movement_task, return_exceptions=True) |
|
|
|
|
|
|
|
|
await asyncio.sleep(0.5) |
|
|
|
|
|
|
|
|
app.speaking_movement_stop = True |
|
|
if app.speaking_movement_task and not app.speaking_movement_task.done(): |
|
|
app.speaking_movement_task.cancel() |
|
|
try: |
|
|
await app.speaking_movement_task |
|
|
except asyncio.CancelledError: |
|
|
pass |
|
|
|
|
|
|
|
|
stats = audio_simulator.get_stats() |
|
|
|
|
|
print(f"\n📊 Audio Playback Statistics:") |
|
|
print(f" Chunks sent: {stats['chunks_sent']}") |
|
|
print(f" Interruptions (>100ms gap): {stats['interruptions']}") |
|
|
print(f" Max gap: {stats['max_gap']*1000:.2f}ms") |
|
|
print(f" Avg gap: {stats['avg_gap']*1000:.2f}ms") |
|
|
|
|
|
if stats['interruptions'] > 0: |
|
|
print(f"\n⚠️ Detected {stats['interruptions']} audio interruptions (>100ms):") |
|
|
for i, intr in enumerate(stats['interruption_details'][:5], 1): |
|
|
print(f" {i}. Gap of {intr['gap']*1000:.2f}ms at chunk {intr['chunk']}") |
|
|
|
|
|
|
|
|
|
|
|
assert stats['chunks_sent'] > 0, "No audio chunks were sent" |
|
|
assert stats['interruptions'] == 0, f"Audio was interrupted {stats['interruptions']} times - movements are blocking! (gaps >100ms detected)" |
|
|
assert stats['max_gap'] < 0.1, f"Max gap of {stats['max_gap']*1000:.2f}ms is too large - audio is being delayed!" |
|
|
|
|
|
print("\n✅ Test passed: Movements don't block audio playback!") |
|
|
return True |
|
|
|
|
|
finally: |
|
|
app.motion_control.stop() |
|
|
app.motion_control.shutdown(wait=True) |
|
|
|
|
|
|
|
|
async def simulate_audio_playback(app, reachy_mini, duration=3.0): |
|
|
"""Simulate continuous audio playback""" |
|
|
sample_rate = 48000 |
|
|
chunk_duration = 0.05 |
|
|
chunk_size = int(sample_rate * chunk_duration) |
|
|
|
|
|
start_time = time.time() |
|
|
chunk_count = 0 |
|
|
|
|
|
while time.time() - start_time < duration and app.is_speaking: |
|
|
|
|
|
audio_chunk = np.random.randn(chunk_size).astype(np.float32) * 0.1 |
|
|
|
|
|
|
|
|
try: |
|
|
|
|
|
for i in range(0, len(audio_chunk), chunk_size): |
|
|
chunk = audio_chunk[i : i + chunk_size] |
|
|
if len(chunk) > 0: |
|
|
reachy_mini.media.push_audio_sample(chunk) |
|
|
|
|
|
chunk_count += 1 |
|
|
|
|
|
|
|
|
await asyncio.sleep(chunk_duration) |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error in audio simulation: {e}") |
|
|
break |
|
|
|
|
|
print(f" Simulated {chunk_count} audio chunks over {duration:.1f}s") |
|
|
|
|
|
|
|
|
async def trigger_movements_during_audio(app, reachy_mini): |
|
|
"""Trigger various movements during audio playback""" |
|
|
await asyncio.sleep(0.2) |
|
|
|
|
|
movements_triggered = 0 |
|
|
|
|
|
|
|
|
for i in range(5): |
|
|
if not app.is_speaking: |
|
|
break |
|
|
|
|
|
|
|
|
try: |
|
|
|
|
|
if hasattr(app, 'movement_agent') and app.movement_agent: |
|
|
|
|
|
movement_name = ['enthusiastic1', 'attentive1', 'curious1'][i % 3] |
|
|
await app._execute_selected_movement(reachy_mini, movement_name) |
|
|
movements_triggered += 1 |
|
|
else: |
|
|
|
|
|
await app._check_transcript_for_movements_keyword(reachy_mini) |
|
|
movements_triggered += 1 |
|
|
except Exception as e: |
|
|
print(f" Error triggering movement: {e}") |
|
|
|
|
|
|
|
|
await asyncio.sleep(0.5) |
|
|
|
|
|
print(f" Triggered {movements_triggered} movements during audio playback") |
|
|
|
|
|
|
|
|
async def test_recorded_move_playback_non_blocking(): |
|
|
"""Test that recorded move playback doesn't block""" |
|
|
print("\n🧪 Testing recorded move playback doesn't block...") |
|
|
|
|
|
app = ReachyMiniTwentyQuestions() |
|
|
mock_reachy = Mock() |
|
|
audio_simulator = AudioPlaybackSimulator() |
|
|
mock_reachy.media.push_audio_sample = audio_simulator.push_audio_sample |
|
|
mock_reachy.play_move = Mock() |
|
|
|
|
|
|
|
|
mock_move = Mock() |
|
|
app._emotions_library = Mock() |
|
|
app._emotions_library.get = Mock(return_value=mock_move) |
|
|
|
|
|
loop = asyncio.get_event_loop() |
|
|
app._loop = loop |
|
|
app.motion_control.start() |
|
|
|
|
|
try: |
|
|
app.is_speaking = True |
|
|
|
|
|
|
|
|
audio_task = asyncio.create_task( |
|
|
simulate_audio_playback(app, mock_reachy, duration=2.0) |
|
|
) |
|
|
|
|
|
|
|
|
move_task = asyncio.create_task( |
|
|
app._play_recorded_move(mock_reachy, mock_move, "test_move") |
|
|
) |
|
|
|
|
|
|
|
|
await asyncio.gather(audio_task, move_task, return_exceptions=True) |
|
|
|
|
|
await asyncio.sleep(0.2) |
|
|
|
|
|
stats = audio_simulator.get_stats() |
|
|
|
|
|
print(f" Audio chunks during move: {stats['chunks_sent']}") |
|
|
print(f" Interruptions (>100ms): {stats['interruptions']}") |
|
|
print(f" Max gap: {stats['max_gap']*1000:.2f}ms") |
|
|
|
|
|
|
|
|
assert stats['interruptions'] == 0, f"Recorded move playback blocked audio! (detected {stats['interruptions']} gaps >100ms)" |
|
|
assert stats['max_gap'] < 0.1, f"Max gap of {stats['max_gap']*1000:.2f}ms indicates blocking!" |
|
|
|
|
|
print("✅ Test passed: Recorded moves don't block audio!") |
|
|
return True |
|
|
|
|
|
finally: |
|
|
app.motion_control.stop() |
|
|
app.motion_control.shutdown(wait=True) |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
print("=" * 60) |
|
|
print("Testing Non-Blocking Movement System") |
|
|
print("=" * 60) |
|
|
|
|
|
async def run_all_tests(): |
|
|
results = [] |
|
|
|
|
|
try: |
|
|
result1 = await test_movements_dont_block_audio() |
|
|
results.append(("Movements don't block audio", result1)) |
|
|
except Exception as e: |
|
|
print(f"\n❌ Test 1 failed: {e}") |
|
|
import traceback |
|
|
traceback.print_exc() |
|
|
results.append(("Movements don't block audio", False)) |
|
|
|
|
|
try: |
|
|
result2 = await test_recorded_move_playback_non_blocking() |
|
|
results.append(("Recorded moves don't block", result2)) |
|
|
except Exception as e: |
|
|
print(f"\n❌ Test 2 failed: {e}") |
|
|
import traceback |
|
|
traceback.print_exc() |
|
|
results.append(("Recorded moves don't block", False)) |
|
|
|
|
|
print("\n" + "=" * 60) |
|
|
print("Test Results:") |
|
|
print("=" * 60) |
|
|
for name, result in results: |
|
|
status = "✅ PASS" if result else "❌ FAIL" |
|
|
print(f"{status}: {name}") |
|
|
|
|
|
all_passed = all(r[1] for r in results) |
|
|
if all_passed: |
|
|
print("\n🎉 All tests passed!") |
|
|
else: |
|
|
print("\n⚠️ Some tests failed") |
|
|
sys.exit(1) |
|
|
|
|
|
asyncio.run(run_all_tests()) |
|
|
|
|
|
|