Spaces:
Sleeping
Sleeping
| """Avatar Backend - Coqui XTTS v2 with RHUBARB LIP SYNC (Production Quality)""" | |
| import os | |
| import uuid | |
| import time | |
| import wave | |
| import subprocess | |
| import json as json_lib | |
| from fastapi import FastAPI, Form, WebSocket | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.staticfiles import StaticFiles | |
| from fastapi.responses import JSONResponse | |
| from pydub import AudioSegment | |
| from typing import List, Optional | |
| from dotenv import load_dotenv | |
| import torch | |
| from TTS.api import TTS | |
| load_dotenv() | |
| OUT_DIR = "/tmp/avatar_static" | |
| os.makedirs(OUT_DIR, exist_ok=True) | |
| # Check if Rhubarb is available | |
| RHUBARB_AVAILABLE = False | |
| RHUBARB_PATH = "rhubarb" # Change this if Rhubarb is in a specific location | |
| try: | |
| result = subprocess.run([RHUBARB_PATH, "--version"], capture_output=True, timeout=2) | |
| if result.returncode == 0: | |
| RHUBARB_AVAILABLE = True | |
| print(f"[TTS] β Rhubarb Lip Sync found: {result.stdout.decode().strip()}") | |
| except: | |
| print("[TTS] β οΈ Rhubarb not found - using enhanced fallback") | |
| print("[TTS] π‘ Install from: https://github.com/DanielSWolf/rhubarb-lip-sync/releases") | |
| # XTTS v2 Standard Speakers | |
| VOICE_MAP = { | |
| "female": "Ana Florence", | |
| "male": "Damien Black" | |
| } | |
| app = FastAPI() | |
| app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"]) | |
| app.mount("/static", StaticFiles(directory=OUT_DIR), name="static") | |
| active_connections: List[WebSocket] = [] | |
| # Initialize Coqui XTTS v2 | |
| print("[TTS] π Initializing Coqui XTTS v2...") | |
| device = "cuda" if torch.cuda.is_available() else "cpu" | |
| print(f"[TTS] π₯οΈ Device: {device}") | |
| try: | |
| tts = TTS("tts_models/multilingual/multi-dataset/xtts_v2").to(device) | |
| print("[TTS] β XTTS v2 model loaded and ready") | |
| except Exception as e: | |
| print(f"[TTS] β FATAL: Could not load XTTS model: {e}") | |
| tts = None | |
| # ============ RHUBARB LIP SYNC INTEGRATION ============ | |
| def rhubarb_to_arkit(mouth_shape: str) -> dict: | |
| """ | |
| Convert Rhubarb mouth shapes (A-H, X) to ARKit blend shapes | |
| Rhubarb documentation: https://github.com/DanielSWolf/rhubarb-lip-sync | |
| Based on Preston Blair's mouth shapes for animation | |
| """ | |
| mappings = { | |
| 'X': {}, # Silence - mouth closed | |
| 'A': { # Closed mouth (m, b, p) | |
| "mouthPucker": 0.4, | |
| "mouthPressLeft": 0.2, | |
| "mouthPressRight": 0.2 | |
| }, | |
| 'B': { # Slightly open (k, s, t, d, n) | |
| "jawOpen": 0.3, | |
| "mouthSmile": 0.2 | |
| }, | |
| 'C': { # Open (e as in bed, ae as in cat) | |
| "jawOpen": 0.6, | |
| "mouthSmile": 0.3 | |
| }, | |
| 'D': { # Wide (aa as in father) | |
| "jawOpen": 0.8, | |
| "mouthShrugLower": 0.2 | |
| }, | |
| 'E': { # Slight rounded (eh as in meh, uh) | |
| "jawOpen": 0.4, | |
| "mouthFunnel": 0.3 | |
| }, | |
| 'F': { # Puckered (oo as in boot, w) | |
| "mouthPucker": 0.7, | |
| "mouthFunnel": 0.5, | |
| "jawOpen": 0.2 | |
| }, | |
| 'G': { # Teeth visible (f, v) | |
| "mouthPressLeft": 0.6, | |
| "mouthPressRight": 0.6, | |
| "mouthRollUpper": 0.4, | |
| "jawOpen": 0.2 | |
| }, | |
| 'H': { # Very wide (ee as in see) | |
| "mouthSmile": 0.7, | |
| "jawOpen": 0.4 | |
| } | |
| } | |
| return mappings.get(mouth_shape, {}) | |
| def generate_visemes_rhubarb(audio_path: str, text: str = None) -> Optional[List[dict]]: | |
| """ | |
| Generate visemes using Rhubarb Lip Sync (PROFESSIONAL QUALITY) | |
| Returns: | |
| List of viseme keyframes with precise timing, or None if failed | |
| """ | |
| if not RHUBARB_AVAILABLE: | |
| return None | |
| try: | |
| # Build Rhubarb command | |
| cmd = [RHUBARB_PATH, "-f", "json", audio_path] | |
| # Optional: provide dialog text for better recognition | |
| dialog_input = None | |
| if text: | |
| # Create temporary dialog file | |
| dialog_path = audio_path.replace('.wav', '.txt').replace('.mp3', '.txt') | |
| with open(dialog_path, 'w', encoding='utf-8') as f: | |
| f.write(text) | |
| cmd.extend(["--dialogFile", dialog_path]) | |
| print(f"[Rhubarb] π¬ Analyzing audio: {os.path.basename(audio_path)}") | |
| start = time.time() | |
| # Run Rhubarb | |
| result = subprocess.run( | |
| cmd, | |
| capture_output=True, | |
| timeout=30, | |
| text=True | |
| ) | |
| if result.returncode != 0: | |
| print(f"[Rhubarb] β Failed: {result.stderr}") | |
| return None | |
| # Parse Rhubarb JSON output | |
| rhubarb_data = json_lib.loads(result.stdout) | |
| # Convert to our viseme format | |
| visemes = [] | |
| for cue in rhubarb_data.get("mouthCues", []): | |
| start_time = cue["start"] | |
| mouth_shape = cue["value"] | |
| blend = rhubarb_to_arkit(mouth_shape) | |
| visemes.append({"t": round(start_time, 3), "blend": blend}) | |
| elapsed = time.time() - start | |
| print(f"[Rhubarb] β Generated {len(visemes)} visemes in {elapsed:.2f}s") | |
| # Clean up temp file | |
| if text and os.path.exists(dialog_path): | |
| os.remove(dialog_path) | |
| return visemes | |
| except subprocess.TimeoutExpired: | |
| print("[Rhubarb] β οΈ Timeout") | |
| return None | |
| except Exception as e: | |
| print(f"[Rhubarb] β οΈ Error: {e}") | |
| return None | |
| # ============ ENHANCED FALLBACK VISEME GENERATION ============ | |
| def detect_phonemes(word: str, language: str) -> list: | |
| """Detect phonemes in a word with language-specific rules""" | |
| word = word.lower() | |
| phonemes = [] | |
| i = 0 | |
| # Language-specific digraphs/trigraphs | |
| if language == "nl": | |
| special = { | |
| 'sch': 'sch', 'ch': 'ch', 'ng': 'ng', 'nk': 'nk', | |
| 'ij': 'ij', 'ei': 'ei', 'ui': 'ui', 'eu': 'eu', | |
| 'ou': 'ou', 'au': 'au', 'aa': 'aa', 'ee': 'ee', | |
| 'oo': 'oo', 'uu': 'uu' | |
| } | |
| else: # English | |
| special = { | |
| 'th': 'th', 'sh': 'sh', 'ch': 'ch', 'ph': 'ph', | |
| 'wh': 'wh', 'ng': 'ng', 'oo': 'oo', 'ee': 'ee', | |
| 'ea': 'ea', 'ou': 'ou', 'ow': 'ow', 'ai': 'ai', | |
| 'ay': 'ay' | |
| } | |
| while i < len(word): | |
| matched = False | |
| # Check 3-char, then 2-char patterns | |
| for length in [3, 2]: | |
| if i + length <= len(word): | |
| substr = word[i:i+length] | |
| if substr in special: | |
| phonemes.append(special[substr]) | |
| i += length | |
| matched = True | |
| break | |
| if not matched: | |
| phonemes.append(word[i]) | |
| i += 1 | |
| return phonemes | |
| def phoneme_to_blend(phoneme: str) -> dict: | |
| """ | |
| COMPREHENSIVE phoneme to ARKit blend shape mapping | |
| Supports English and Dutch phonemes | |
| """ | |
| # === VOWELS === | |
| # Open vowels | |
| if phoneme in ['a', 'aa', 'ah', 'Γ€']: | |
| return {"jawOpen": 0.7, "mouthShrugLower": 0.2} | |
| # Mid-front vowels | |
| elif phoneme in ['e', 'ee', 'ea', 'Γ©', 'Γ¨']: | |
| return {"mouthSmile": 0.5, "jawOpen": 0.35} | |
| # High-front vowels | |
| elif phoneme in ['i', 'ij', 'ei', 'ie', 'iΓ', 'y']: | |
| return {"mouthSmile": 0.7, "jawOpen": 0.25} | |
| # Back rounded vowels | |
| elif phoneme in ['o', 'oo', 'Γ³', 'ΓΆ']: | |
| return {"mouthFunnel": 0.65, "jawOpen": 0.45} | |
| # High-back vowels | |
| elif phoneme in ['u', 'uu', 'ΓΊ', 'ΓΌ']: | |
| return {"mouthPucker": 0.7, "jawOpen": 0.2} | |
| # Dutch diphthongs | |
| elif phoneme in ['ui']: | |
| return {"mouthPucker": 0.6, "mouthFunnel": 0.4, "jawOpen": 0.3} | |
| elif phoneme in ['eu']: | |
| return {"mouthPucker": 0.5, "mouthSmile": 0.2, "jawOpen": 0.3} | |
| elif phoneme in ['ou', 'au']: | |
| return {"mouthFunnel": 0.5, "jawOpen": 0.5} | |
| # English diphthongs | |
| elif phoneme in ['ai', 'ay', 'ow']: | |
| return {"jawOpen": 0.5, "mouthSmile": 0.3} | |
| # === CONSONANTS === | |
| # Bilabials (lips together) | |
| elif phoneme in ['m', 'p', 'b']: | |
| return { | |
| "mouthPucker": 0.5, | |
| "mouthPressLeft": 0.4, | |
| "mouthPressRight": 0.4, | |
| "jawOpen": 0.0 | |
| } | |
| # Labiodentals (teeth on lip) | |
| elif phoneme in ['f', 'v']: | |
| return { | |
| "mouthPressLeft": 0.7, | |
| "mouthPressRight": 0.7, | |
| "mouthRollUpper": 0.4, | |
| "jawOpen": 0.15 | |
| } | |
| # Dentals (tongue between teeth) | |
| elif phoneme in ['th']: | |
| return { | |
| "mouthRollLower": 0.5, | |
| "jawOpen": 0.25 | |
| } | |
| # Approximants | |
| elif phoneme in ['w']: | |
| return { | |
| "mouthPucker": 0.7, | |
| "mouthFunnel": 0.4, | |
| "jawOpen": 0.25 | |
| } | |
| elif phoneme in ['r']: | |
| return { | |
| "mouthSmile": 0.2, | |
| "jawOpen": 0.35, | |
| "mouthShrugUpper": 0.2 | |
| } | |
| elif phoneme in ['l']: | |
| return { | |
| "jawOpen": 0.35, | |
| "mouthSmile": 0.25 | |
| } | |
| # Postalveolar fricatives | |
| elif phoneme in ['sh', 'ch', 'sch']: | |
| return { | |
| "mouthPucker": 0.5, | |
| "mouthFunnel": 0.4, | |
| "jawOpen": 0.3 | |
| } | |
| # Alveolar | |
| elif phoneme in ['s', 'z', 't', 'd', 'n']: | |
| return { | |
| "mouthSmile": 0.35, | |
| "jawOpen": 0.25 | |
| } | |
| # Velars | |
| elif phoneme in ['k', 'g', 'ng', 'nk', 'x']: # x for Dutch 'g' | |
| return { | |
| "jawOpen": 0.45, | |
| "mouthShrugLower": 0.2 | |
| } | |
| # Palatal | |
| elif phoneme in ['j', 'y']: | |
| return { | |
| "mouthSmile": 0.5, | |
| "jawOpen": 0.3 | |
| } | |
| # Default - slight mouth movement | |
| return {"jawOpen": 0.25} | |
| def generate_visemes_enhanced(text: str, duration: float, language: str = "en") -> List[dict]: | |
| """ | |
| ENHANCED fallback viseme generation with proper phoneme analysis | |
| Used when Rhubarb is not available | |
| """ | |
| visemes = [] | |
| words = text.split() | |
| if not words: | |
| return [{"t": 0.0, "blend": {}}] | |
| # Add silence at start | |
| visemes.append({"t": 0.0, "blend": {}}) | |
| # Calculate timing | |
| time_per_word = duration / len(words) | |
| current_time = 0.05 # Small offset | |
| for word in words: | |
| word_lower = word.lower().strip('.,!?;:') | |
| # Detect phonemes with language rules | |
| phonemes = detect_phonemes(word_lower, language) | |
| if not phonemes: | |
| continue | |
| # Time for each phoneme | |
| phoneme_duration = time_per_word / len(phonemes) | |
| for i, phoneme in enumerate(phonemes): | |
| phoneme_start = current_time + (i * phoneme_duration) | |
| blend = phoneme_to_blend(phoneme) | |
| if blend: | |
| visemes.append({ | |
| "t": round(phoneme_start, 3), | |
| "blend": blend | |
| }) | |
| current_time += time_per_word | |
| # Add closing silence | |
| visemes.append({"t": round(duration - 0.05, 3), "blend": {}}) | |
| # Ensure sorted by time | |
| visemes.sort(key=lambda v: v["t"]) | |
| return visemes | |
| def generate_visemes_smart(audio_path: str, text: str, duration: float, language: str) -> List[dict]: | |
| """ | |
| SMART viseme generation - tries Rhubarb first, falls back to enhanced | |
| """ | |
| # Try Rhubarb first (professional quality) | |
| if RHUBARB_AVAILABLE: | |
| visemes = generate_visemes_rhubarb(audio_path, text) | |
| if visemes and len(visemes) > 0: | |
| return visemes | |
| else: | |
| print("[Visemes] β οΈ Rhubarb failed, using enhanced fallback") | |
| # Fallback to enhanced phoneme-based generation | |
| return generate_visemes_enhanced(text, duration, language) | |
| def generate_visemes_rhubarb(audio_path: str, text: str = None) -> Optional[List[dict]]: | |
| """ | |
| Generate visemes using Rhubarb Lip Sync analyzer | |
| Rhubarb analyzes the ACTUAL audio waveform and phonemes, | |
| not just text characters. Much more accurate! | |
| """ | |
| try: | |
| # Create dialog file for better recognition | |
| dialog_path = None | |
| if text: | |
| dialog_path = audio_path.replace('.wav', '.txt').replace('.mp3', '.txt') | |
| with open(dialog_path, 'w', encoding='utf-8') as f: | |
| f.write(text) | |
| # Build command | |
| cmd = [RHUBARB_PATH, "-f", "json", audio_path] | |
| if dialog_path: | |
| cmd.extend(["--dialogFile", dialog_path]) | |
| # Run Rhubarb | |
| result = subprocess.run( | |
| cmd, | |
| capture_output=True, | |
| timeout=30, | |
| text=True | |
| ) | |
| # Clean up dialog file | |
| if dialog_path and os.path.exists(dialog_path): | |
| os.remove(dialog_path) | |
| if result.returncode != 0: | |
| print(f"[Rhubarb] β Error: {result.stderr}") | |
| return None | |
| # Parse JSON output | |
| rhubarb_data = json_lib.loads(result.stdout) | |
| # Convert to ARKit visemes | |
| visemes = [] | |
| for cue in rhubarb_data.get("mouthCues", []): | |
| start_time = cue["start"] | |
| mouth_shape = cue["value"] | |
| blend = rhubarb_to_arkit(mouth_shape) | |
| visemes.append({"t": round(start_time, 3), "blend": blend}) | |
| return visemes | |
| except subprocess.TimeoutExpired: | |
| print("[Rhubarb] β οΈ Timeout") | |
| return None | |
| except Exception as e: | |
| print(f"[Rhubarb] β οΈ Error: {e}") | |
| return None | |
| async def websocket_endpoint(websocket: WebSocket): | |
| await websocket.accept() | |
| active_connections.append(websocket) | |
| try: | |
| while True: | |
| await websocket.receive_text() | |
| except: | |
| pass | |
| finally: | |
| if websocket in active_connections: | |
| active_connections.remove(websocket) | |
| async def broadcast_to_avatars(data: dict): | |
| for connection in active_connections[:]: | |
| try: | |
| await connection.send_json(data) | |
| except: | |
| if connection in active_connections: | |
| active_connections.remove(connection) | |
| async def speak(text: str = Form(...), voice: str = Form("female"), language: str = Form("en")): | |
| t_start = time.time() | |
| uid = uuid.uuid4().hex[:8] | |
| wav_path = os.path.join(OUT_DIR, f"{uid}.wav") | |
| mp3_path = os.path.join(OUT_DIR, f"{uid}.mp3") | |
| speaker_name = VOICE_MAP.get(voice, voice) | |
| print(f"\n{'='*60}") | |
| print(f"[Backend] TTS Generation") | |
| print(f"[Backend] Text: '{text[:60]}{'...' if len(text) > 60 else ''}'") | |
| print(f"[Backend] Lang: {language} | Speaker: {speaker_name}") | |
| print(f"[Backend] Lip Sync: {'Rhubarb' if RHUBARB_AVAILABLE else 'Enhanced Fallback'}") | |
| try: | |
| if tts is None: | |
| raise Exception("TTS Model not initialized") | |
| # Generate Audio | |
| tts.tts_to_file( | |
| text=text, | |
| file_path=wav_path, | |
| speaker=speaker_name, | |
| language=language, | |
| split_sentences=True | |
| ) | |
| t2 = time.time() | |
| print(f"[Backend] β Audio generated in {t2-t_start:.2f}s") | |
| # Convert to MP3 and get duration | |
| try: | |
| audio = AudioSegment.from_wav(wav_path) | |
| audio.export(mp3_path, format="mp3", bitrate="128k") | |
| duration_sec = len(audio) / 1000.0 | |
| audio_file = mp3_path | |
| # Keep WAV for Rhubarb analysis | |
| wav_for_analysis = wav_path | |
| except Exception as e: | |
| print(f"[Backend] β οΈ MP3 conversion failed: {e}") | |
| with wave.open(wav_path, 'rb') as wf: | |
| duration_sec = wf.getnframes() / float(wf.getframerate()) | |
| audio_file = wav_path | |
| wav_for_analysis = wav_path | |
| t3 = time.time() | |
| print(f"[Backend] β Audio ready ({duration_sec:.2f}s duration)") | |
| # Generate visemes with smart method selection | |
| visemes = generate_visemes_smart(wav_for_analysis, text, duration_sec, language) | |
| t4 = time.time() | |
| print(f"[Backend] β Visemes generated in {t4-t3:.2f}s ({len(visemes)} keyframes)") | |
| # Clean up WAV if we converted to MP3 | |
| if audio_file == mp3_path and os.path.exists(wav_path): | |
| os.remove(wav_path) | |
| response_data = { | |
| "audio_url": f"/static/{os.path.basename(audio_file)}", | |
| "visemes": visemes, | |
| "duration": duration_sec, | |
| "text": text, | |
| "method": "rhubarb" if RHUBARB_AVAILABLE else "enhanced_fallback" | |
| } | |
| await broadcast_to_avatars(response_data) | |
| total_time = time.time() - t_start | |
| print(f"[Backend] β Total time: {total_time:.2f}s") | |
| print(f"{'='*60}\n") | |
| return response_data | |
| except Exception as e: | |
| error_msg = f"TTS failed: {str(e)}" | |
| print(f"[Backend] β {error_msg}") | |
| return JSONResponse(status_code=500, content={"error": error_msg}) | |
| async def root(): | |
| return { | |
| "status": "running", | |
| "tts_engine": "coqui-xtts-v2", | |
| "lip_sync": "rhubarb" if RHUBARB_AVAILABLE else "enhanced_fallback", | |
| "languages": ["en", "nl", "fr", "de", "it", "es", "ja", "zh", "pt", "pl", "tr", "ru", "cs", "ar", "hu", "ko"], | |
| "voices": VOICE_MAP | |
| } | |
| if __name__ == "__main__": | |
| import uvicorn | |
| print("π Avatar Server (XTTS v2 + RHUBARB)") | |
| print(f"π¬ Lip Sync: {'Rhubarb (Professional)' if RHUBARB_AVAILABLE else 'Enhanced Fallback'}") | |
| uvicorn.run(app, host="0.0.0.0", port=8765) |