mrrrme-emotion-ai / avatar /speak_server.py
michon's picture
better lip syncing
85e0d2b
"""Avatar Backend - Coqui XTTS v2 with RHUBARB LIP SYNC (Production Quality)"""
import os
import uuid
import time
import wave
import subprocess
import json as json_lib
from fastapi import FastAPI, Form, WebSocket
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from fastapi.responses import JSONResponse
from pydub import AudioSegment
from typing import List, Optional
from dotenv import load_dotenv
import torch
from TTS.api import TTS
load_dotenv()
OUT_DIR = "/tmp/avatar_static"
os.makedirs(OUT_DIR, exist_ok=True)
# Check if Rhubarb is available
RHUBARB_AVAILABLE = False
RHUBARB_PATH = "rhubarb" # Change this if Rhubarb is in a specific location
try:
result = subprocess.run([RHUBARB_PATH, "--version"], capture_output=True, timeout=2)
if result.returncode == 0:
RHUBARB_AVAILABLE = True
print(f"[TTS] βœ… Rhubarb Lip Sync found: {result.stdout.decode().strip()}")
except:
print("[TTS] ⚠️ Rhubarb not found - using enhanced fallback")
print("[TTS] πŸ’‘ Install from: https://github.com/DanielSWolf/rhubarb-lip-sync/releases")
# XTTS v2 Standard Speakers
VOICE_MAP = {
"female": "Ana Florence",
"male": "Damien Black"
}
app = FastAPI()
app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"])
app.mount("/static", StaticFiles(directory=OUT_DIR), name="static")
active_connections: List[WebSocket] = []
# Initialize Coqui XTTS v2
print("[TTS] πŸš€ Initializing Coqui XTTS v2...")
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"[TTS] πŸ–₯️ Device: {device}")
try:
tts = TTS("tts_models/multilingual/multi-dataset/xtts_v2").to(device)
print("[TTS] βœ… XTTS v2 model loaded and ready")
except Exception as e:
print(f"[TTS] ❌ FATAL: Could not load XTTS model: {e}")
tts = None
# ============ RHUBARB LIP SYNC INTEGRATION ============
def rhubarb_to_arkit(mouth_shape: str) -> dict:
"""
Convert Rhubarb mouth shapes (A-H, X) to ARKit blend shapes
Rhubarb documentation: https://github.com/DanielSWolf/rhubarb-lip-sync
Based on Preston Blair's mouth shapes for animation
"""
mappings = {
'X': {}, # Silence - mouth closed
'A': { # Closed mouth (m, b, p)
"mouthPucker": 0.4,
"mouthPressLeft": 0.2,
"mouthPressRight": 0.2
},
'B': { # Slightly open (k, s, t, d, n)
"jawOpen": 0.3,
"mouthSmile": 0.2
},
'C': { # Open (e as in bed, ae as in cat)
"jawOpen": 0.6,
"mouthSmile": 0.3
},
'D': { # Wide (aa as in father)
"jawOpen": 0.8,
"mouthShrugLower": 0.2
},
'E': { # Slight rounded (eh as in meh, uh)
"jawOpen": 0.4,
"mouthFunnel": 0.3
},
'F': { # Puckered (oo as in boot, w)
"mouthPucker": 0.7,
"mouthFunnel": 0.5,
"jawOpen": 0.2
},
'G': { # Teeth visible (f, v)
"mouthPressLeft": 0.6,
"mouthPressRight": 0.6,
"mouthRollUpper": 0.4,
"jawOpen": 0.2
},
'H': { # Very wide (ee as in see)
"mouthSmile": 0.7,
"jawOpen": 0.4
}
}
return mappings.get(mouth_shape, {})
def generate_visemes_rhubarb(audio_path: str, text: str = None) -> Optional[List[dict]]:
"""
Generate visemes using Rhubarb Lip Sync (PROFESSIONAL QUALITY)
Returns:
List of viseme keyframes with precise timing, or None if failed
"""
if not RHUBARB_AVAILABLE:
return None
try:
# Build Rhubarb command
cmd = [RHUBARB_PATH, "-f", "json", audio_path]
# Optional: provide dialog text for better recognition
dialog_input = None
if text:
# Create temporary dialog file
dialog_path = audio_path.replace('.wav', '.txt').replace('.mp3', '.txt')
with open(dialog_path, 'w', encoding='utf-8') as f:
f.write(text)
cmd.extend(["--dialogFile", dialog_path])
print(f"[Rhubarb] 🎬 Analyzing audio: {os.path.basename(audio_path)}")
start = time.time()
# Run Rhubarb
result = subprocess.run(
cmd,
capture_output=True,
timeout=30,
text=True
)
if result.returncode != 0:
print(f"[Rhubarb] ❌ Failed: {result.stderr}")
return None
# Parse Rhubarb JSON output
rhubarb_data = json_lib.loads(result.stdout)
# Convert to our viseme format
visemes = []
for cue in rhubarb_data.get("mouthCues", []):
start_time = cue["start"]
mouth_shape = cue["value"]
blend = rhubarb_to_arkit(mouth_shape)
visemes.append({"t": round(start_time, 3), "blend": blend})
elapsed = time.time() - start
print(f"[Rhubarb] βœ… Generated {len(visemes)} visemes in {elapsed:.2f}s")
# Clean up temp file
if text and os.path.exists(dialog_path):
os.remove(dialog_path)
return visemes
except subprocess.TimeoutExpired:
print("[Rhubarb] ⚠️ Timeout")
return None
except Exception as e:
print(f"[Rhubarb] ⚠️ Error: {e}")
return None
# ============ ENHANCED FALLBACK VISEME GENERATION ============
def detect_phonemes(word: str, language: str) -> list:
"""Detect phonemes in a word with language-specific rules"""
word = word.lower()
phonemes = []
i = 0
# Language-specific digraphs/trigraphs
if language == "nl":
special = {
'sch': 'sch', 'ch': 'ch', 'ng': 'ng', 'nk': 'nk',
'ij': 'ij', 'ei': 'ei', 'ui': 'ui', 'eu': 'eu',
'ou': 'ou', 'au': 'au', 'aa': 'aa', 'ee': 'ee',
'oo': 'oo', 'uu': 'uu'
}
else: # English
special = {
'th': 'th', 'sh': 'sh', 'ch': 'ch', 'ph': 'ph',
'wh': 'wh', 'ng': 'ng', 'oo': 'oo', 'ee': 'ee',
'ea': 'ea', 'ou': 'ou', 'ow': 'ow', 'ai': 'ai',
'ay': 'ay'
}
while i < len(word):
matched = False
# Check 3-char, then 2-char patterns
for length in [3, 2]:
if i + length <= len(word):
substr = word[i:i+length]
if substr in special:
phonemes.append(special[substr])
i += length
matched = True
break
if not matched:
phonemes.append(word[i])
i += 1
return phonemes
def phoneme_to_blend(phoneme: str) -> dict:
"""
COMPREHENSIVE phoneme to ARKit blend shape mapping
Supports English and Dutch phonemes
"""
# === VOWELS ===
# Open vowels
if phoneme in ['a', 'aa', 'ah', 'Γ€']:
return {"jawOpen": 0.7, "mouthShrugLower": 0.2}
# Mid-front vowels
elif phoneme in ['e', 'ee', 'ea', 'Γ©', 'Γ¨']:
return {"mouthSmile": 0.5, "jawOpen": 0.35}
# High-front vowels
elif phoneme in ['i', 'ij', 'ei', 'ie', 'iΓ­', 'y']:
return {"mouthSmile": 0.7, "jawOpen": 0.25}
# Back rounded vowels
elif phoneme in ['o', 'oo', 'Γ³', 'ΓΆ']:
return {"mouthFunnel": 0.65, "jawOpen": 0.45}
# High-back vowels
elif phoneme in ['u', 'uu', 'ΓΊ', 'ΓΌ']:
return {"mouthPucker": 0.7, "jawOpen": 0.2}
# Dutch diphthongs
elif phoneme in ['ui']:
return {"mouthPucker": 0.6, "mouthFunnel": 0.4, "jawOpen": 0.3}
elif phoneme in ['eu']:
return {"mouthPucker": 0.5, "mouthSmile": 0.2, "jawOpen": 0.3}
elif phoneme in ['ou', 'au']:
return {"mouthFunnel": 0.5, "jawOpen": 0.5}
# English diphthongs
elif phoneme in ['ai', 'ay', 'ow']:
return {"jawOpen": 0.5, "mouthSmile": 0.3}
# === CONSONANTS ===
# Bilabials (lips together)
elif phoneme in ['m', 'p', 'b']:
return {
"mouthPucker": 0.5,
"mouthPressLeft": 0.4,
"mouthPressRight": 0.4,
"jawOpen": 0.0
}
# Labiodentals (teeth on lip)
elif phoneme in ['f', 'v']:
return {
"mouthPressLeft": 0.7,
"mouthPressRight": 0.7,
"mouthRollUpper": 0.4,
"jawOpen": 0.15
}
# Dentals (tongue between teeth)
elif phoneme in ['th']:
return {
"mouthRollLower": 0.5,
"jawOpen": 0.25
}
# Approximants
elif phoneme in ['w']:
return {
"mouthPucker": 0.7,
"mouthFunnel": 0.4,
"jawOpen": 0.25
}
elif phoneme in ['r']:
return {
"mouthSmile": 0.2,
"jawOpen": 0.35,
"mouthShrugUpper": 0.2
}
elif phoneme in ['l']:
return {
"jawOpen": 0.35,
"mouthSmile": 0.25
}
# Postalveolar fricatives
elif phoneme in ['sh', 'ch', 'sch']:
return {
"mouthPucker": 0.5,
"mouthFunnel": 0.4,
"jawOpen": 0.3
}
# Alveolar
elif phoneme in ['s', 'z', 't', 'd', 'n']:
return {
"mouthSmile": 0.35,
"jawOpen": 0.25
}
# Velars
elif phoneme in ['k', 'g', 'ng', 'nk', 'x']: # x for Dutch 'g'
return {
"jawOpen": 0.45,
"mouthShrugLower": 0.2
}
# Palatal
elif phoneme in ['j', 'y']:
return {
"mouthSmile": 0.5,
"jawOpen": 0.3
}
# Default - slight mouth movement
return {"jawOpen": 0.25}
def generate_visemes_enhanced(text: str, duration: float, language: str = "en") -> List[dict]:
"""
ENHANCED fallback viseme generation with proper phoneme analysis
Used when Rhubarb is not available
"""
visemes = []
words = text.split()
if not words:
return [{"t": 0.0, "blend": {}}]
# Add silence at start
visemes.append({"t": 0.0, "blend": {}})
# Calculate timing
time_per_word = duration / len(words)
current_time = 0.05 # Small offset
for word in words:
word_lower = word.lower().strip('.,!?;:')
# Detect phonemes with language rules
phonemes = detect_phonemes(word_lower, language)
if not phonemes:
continue
# Time for each phoneme
phoneme_duration = time_per_word / len(phonemes)
for i, phoneme in enumerate(phonemes):
phoneme_start = current_time + (i * phoneme_duration)
blend = phoneme_to_blend(phoneme)
if blend:
visemes.append({
"t": round(phoneme_start, 3),
"blend": blend
})
current_time += time_per_word
# Add closing silence
visemes.append({"t": round(duration - 0.05, 3), "blend": {}})
# Ensure sorted by time
visemes.sort(key=lambda v: v["t"])
return visemes
def generate_visemes_smart(audio_path: str, text: str, duration: float, language: str) -> List[dict]:
"""
SMART viseme generation - tries Rhubarb first, falls back to enhanced
"""
# Try Rhubarb first (professional quality)
if RHUBARB_AVAILABLE:
visemes = generate_visemes_rhubarb(audio_path, text)
if visemes and len(visemes) > 0:
return visemes
else:
print("[Visemes] ⚠️ Rhubarb failed, using enhanced fallback")
# Fallback to enhanced phoneme-based generation
return generate_visemes_enhanced(text, duration, language)
def generate_visemes_rhubarb(audio_path: str, text: str = None) -> Optional[List[dict]]:
"""
Generate visemes using Rhubarb Lip Sync analyzer
Rhubarb analyzes the ACTUAL audio waveform and phonemes,
not just text characters. Much more accurate!
"""
try:
# Create dialog file for better recognition
dialog_path = None
if text:
dialog_path = audio_path.replace('.wav', '.txt').replace('.mp3', '.txt')
with open(dialog_path, 'w', encoding='utf-8') as f:
f.write(text)
# Build command
cmd = [RHUBARB_PATH, "-f", "json", audio_path]
if dialog_path:
cmd.extend(["--dialogFile", dialog_path])
# Run Rhubarb
result = subprocess.run(
cmd,
capture_output=True,
timeout=30,
text=True
)
# Clean up dialog file
if dialog_path and os.path.exists(dialog_path):
os.remove(dialog_path)
if result.returncode != 0:
print(f"[Rhubarb] ❌ Error: {result.stderr}")
return None
# Parse JSON output
rhubarb_data = json_lib.loads(result.stdout)
# Convert to ARKit visemes
visemes = []
for cue in rhubarb_data.get("mouthCues", []):
start_time = cue["start"]
mouth_shape = cue["value"]
blend = rhubarb_to_arkit(mouth_shape)
visemes.append({"t": round(start_time, 3), "blend": blend})
return visemes
except subprocess.TimeoutExpired:
print("[Rhubarb] ⚠️ Timeout")
return None
except Exception as e:
print(f"[Rhubarb] ⚠️ Error: {e}")
return None
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
active_connections.append(websocket)
try:
while True:
await websocket.receive_text()
except:
pass
finally:
if websocket in active_connections:
active_connections.remove(websocket)
async def broadcast_to_avatars(data: dict):
for connection in active_connections[:]:
try:
await connection.send_json(data)
except:
if connection in active_connections:
active_connections.remove(connection)
@app.post("/speak")
async def speak(text: str = Form(...), voice: str = Form("female"), language: str = Form("en")):
t_start = time.time()
uid = uuid.uuid4().hex[:8]
wav_path = os.path.join(OUT_DIR, f"{uid}.wav")
mp3_path = os.path.join(OUT_DIR, f"{uid}.mp3")
speaker_name = VOICE_MAP.get(voice, voice)
print(f"\n{'='*60}")
print(f"[Backend] TTS Generation")
print(f"[Backend] Text: '{text[:60]}{'...' if len(text) > 60 else ''}'")
print(f"[Backend] Lang: {language} | Speaker: {speaker_name}")
print(f"[Backend] Lip Sync: {'Rhubarb' if RHUBARB_AVAILABLE else 'Enhanced Fallback'}")
try:
if tts is None:
raise Exception("TTS Model not initialized")
# Generate Audio
tts.tts_to_file(
text=text,
file_path=wav_path,
speaker=speaker_name,
language=language,
split_sentences=True
)
t2 = time.time()
print(f"[Backend] βœ… Audio generated in {t2-t_start:.2f}s")
# Convert to MP3 and get duration
try:
audio = AudioSegment.from_wav(wav_path)
audio.export(mp3_path, format="mp3", bitrate="128k")
duration_sec = len(audio) / 1000.0
audio_file = mp3_path
# Keep WAV for Rhubarb analysis
wav_for_analysis = wav_path
except Exception as e:
print(f"[Backend] ⚠️ MP3 conversion failed: {e}")
with wave.open(wav_path, 'rb') as wf:
duration_sec = wf.getnframes() / float(wf.getframerate())
audio_file = wav_path
wav_for_analysis = wav_path
t3 = time.time()
print(f"[Backend] βœ… Audio ready ({duration_sec:.2f}s duration)")
# Generate visemes with smart method selection
visemes = generate_visemes_smart(wav_for_analysis, text, duration_sec, language)
t4 = time.time()
print(f"[Backend] βœ… Visemes generated in {t4-t3:.2f}s ({len(visemes)} keyframes)")
# Clean up WAV if we converted to MP3
if audio_file == mp3_path and os.path.exists(wav_path):
os.remove(wav_path)
response_data = {
"audio_url": f"/static/{os.path.basename(audio_file)}",
"visemes": visemes,
"duration": duration_sec,
"text": text,
"method": "rhubarb" if RHUBARB_AVAILABLE else "enhanced_fallback"
}
await broadcast_to_avatars(response_data)
total_time = time.time() - t_start
print(f"[Backend] βœ… Total time: {total_time:.2f}s")
print(f"{'='*60}\n")
return response_data
except Exception as e:
error_msg = f"TTS failed: {str(e)}"
print(f"[Backend] ❌ {error_msg}")
return JSONResponse(status_code=500, content={"error": error_msg})
@app.get("/")
async def root():
return {
"status": "running",
"tts_engine": "coqui-xtts-v2",
"lip_sync": "rhubarb" if RHUBARB_AVAILABLE else "enhanced_fallback",
"languages": ["en", "nl", "fr", "de", "it", "es", "ja", "zh", "pt", "pl", "tr", "ru", "cs", "ar", "hu", "ko"],
"voices": VOICE_MAP
}
if __name__ == "__main__":
import uvicorn
print("πŸš€ Avatar Server (XTTS v2 + RHUBARB)")
print(f"🎬 Lip Sync: {'Rhubarb (Professional)' if RHUBARB_AVAILABLE else 'Enhanced Fallback'}")
uvicorn.run(app, host="0.0.0.0", port=8765)