Spaces:
Paused
Paused
| import tempfile | |
| import os | |
| import shutil | |
| import librosa | |
| import json | |
| import subprocess | |
| import gc | |
| from googletrans import Translator | |
| import asyncio | |
| from flask import Flask, request, jsonify, send_from_directory | |
| from omegaconf import OmegaConf | |
| import torch | |
| from diffusers import AutoencoderKL, DDIMScheduler | |
| from latentsync.models.unet import UNet3DConditionModel | |
| from latentsync.pipelines.lipsync_pipeline import LipsyncPipeline | |
| from diffusers.utils.import_utils import is_xformers_available | |
| from accelerate.utils import set_seed | |
| from latentsync.whisper.audio2feature import Audio2Feature | |
| from openai import OpenAI | |
| from elevenlabs import set_api_key, generate, play, clone, Voice, VoiceSettings | |
| from torch.cuda.amp import autocast | |
| # Initialize the Flask app | |
| app = Flask(__name__) | |
| TEMP_DIR = None | |
| VIDEO_DIRECTORY = os.path.abspath("videos") | |
| os.makedirs(VIDEO_DIRECTORY, exist_ok=True) | |
| def clear_cuda_memory(): | |
| torch.cuda.empty_cache() | |
| gc.collect() | |
| def run_inference(video_path, audio_path, video_out_path, | |
| inference_ckpt_path, unet_config_path="configs/unet/second_stage.yaml", | |
| inference_steps=20, guidance_scale=1.0, seed=1247): | |
| clear_cuda_memory() | |
| # Load configuration | |
| config = OmegaConf.load(unet_config_path) | |
| # Determine proper dtype based on GPU capabilities | |
| is_fp16_supported = torch.cuda.is_available() and torch.cuda.get_device_capability()[0] > 7 | |
| dtype = torch.float16 if is_fp16_supported else torch.float32 | |
| # Setup scheduler | |
| scheduler = DDIMScheduler.from_pretrained("configs") | |
| # Choose whisper model based on config settings | |
| if config.model.cross_attention_dim == 768: | |
| whisper_model_path = "checkpoints/whisper/small.pt" | |
| elif config.model.cross_attention_dim == 384: | |
| whisper_model_path = "checkpoints/whisper/tiny.pt" | |
| else: | |
| raise NotImplementedError("cross_attention_dim must be 768 or 384") | |
| # Initialize the audio encoder | |
| audio_encoder = Audio2Feature(model_path=whisper_model_path, | |
| device="cuda", num_frames=config.data.num_frames) | |
| # Load VAE | |
| vae = AutoencoderKL.from_pretrained("stabilityai/sd-vae-ft-mse", torch_dtype=dtype) | |
| vae.config.scaling_factor = 0.18215 | |
| vae.config.shift_factor = 0 | |
| # Load UNet model from the checkpoint | |
| unet, _ = UNet3DConditionModel.from_pretrained( | |
| OmegaConf.to_container(config.model), | |
| inference_ckpt_path, # load checkpoint | |
| device="cpu", | |
| ) | |
| unet = unet.to(dtype=dtype) | |
| # Optionally enable memory-efficient attention if available | |
| if is_xformers_available(): | |
| unet.enable_xformers_memory_efficient_attention() | |
| # Initialize the pipeline and move to GPU | |
| pipeline = LipsyncPipeline( | |
| vae=vae, | |
| audio_encoder=audio_encoder, | |
| unet=unet, | |
| scheduler=scheduler, | |
| ).to("cuda") | |
| # Set seed | |
| if seed != -1: | |
| set_seed(seed) | |
| else: | |
| torch.seed() | |
| with autocast(): | |
| try: | |
| pipeline( | |
| video_path=video_path, | |
| audio_path=audio_path, | |
| video_out_path=video_out_path, | |
| video_mask_path=video_out_path.replace(".mp4", "_mask.mp4"), | |
| num_frames=config.data.num_frames, | |
| num_inference_steps=inference_steps, | |
| guidance_scale=guidance_scale, | |
| weight_dtype=dtype, | |
| width=config.data.resolution, | |
| height=config.data.resolution, | |
| ) | |
| finally: | |
| clear_cuda_memory() | |
| def create_temp_dir(): | |
| return tempfile.TemporaryDirectory() | |
| def generate_audio(voice_cloning, text_prompt): | |
| if voice_cloning == 'yes': | |
| print('Entering Custom Audio creation using elevenlabs') | |
| set_api_key('92e149985ea2732b4359c74346c3daee') | |
| voice = Voice(voice_id="VJpttplXHolgV2leGe5V",name="Marc",settings=VoiceSettings( | |
| stability=0.71, similarity_boost=0.9, style=0.0, use_speaker_boost=True),) | |
| audio = generate(text = text_prompt, voice = voice, model = "eleven_multilingual_v2",stream=True, latency=4) | |
| with tempfile.NamedTemporaryFile(suffix=".mp3", prefix="cloned_audio_",dir=TEMP_DIR.name, delete=False) as temp_file: | |
| for chunk in audio: | |
| temp_file.write(chunk) | |
| driven_audio_path = temp_file.name | |
| print('driven_audio_path',driven_audio_path) | |
| return driven_audio_path | |
| elif voice_cloning == 'no': | |
| voice = 'echo' | |
| print('Entering Default Audio creation using elevenlabs') | |
| set_api_key('92e149985ea2732b4359c74346c3daee') | |
| audio = generate(text = text_prompt, voice = "Daniel", model = "eleven_multilingual_v2",stream=True, latency=4) | |
| with tempfile.NamedTemporaryFile(suffix=".mp3", prefix="default_audio_",dir=TEMP_DIR.name, delete=False) as temp_file: | |
| for chunk in audio: | |
| temp_file.write(chunk) | |
| driven_audio_path = temp_file.name | |
| print('driven_audio_path',driven_audio_path) | |
| return driven_audio_path | |
| def get_video_duration(video_path): | |
| """Extracts video duration dynamically using ffprobe.""" | |
| cmd = [ | |
| "ffprobe", "-v", "error", "-show_entries", "format=duration", | |
| "-of", "json", video_path | |
| ] | |
| result = subprocess.run(cmd, capture_output=True, text=True) | |
| duration = json.loads(result.stdout)["format"]["duration"] | |
| return float(duration) | |
| def extend_video_simple(video_path, audio_path, output_path): | |
| """Extends video duration by appending a reversed version if audio is longer.""" | |
| audio_duration = librosa.get_duration(path=audio_path) | |
| video_duration = get_video_duration(video_path) | |
| print(f"Video Duration: {video_duration:.2f} sec") | |
| print(f"Audio Duration: {audio_duration:.2f} sec") | |
| if audio_duration > video_duration: | |
| print("Extending video by adding reversed version.") | |
| # Create a reversed version of the full video | |
| reversed_clip = tempfile.NamedTemporaryFile(dir=TEMP_DIR.name, delete=False, suffix=".mp4").name | |
| subprocess.run( | |
| f"ffmpeg -y -i {video_path} -vf reverse -an {reversed_clip}", shell=True | |
| ) | |
| # Merge original + reversed | |
| subprocess.run( | |
| f"ffmpeg -y -i {video_path} -i {reversed_clip} -filter_complex \"[0:v:0][1:v:0]concat=n=2:v=1[outv]\" -map \"[outv]\" -an {output_path}", | |
| shell=True | |
| ) | |
| else: | |
| print("Audio is not longer than video. No extension needed.") | |
| subprocess.run(f"cp {video_path} {output_path}", shell=True) | |
| def extend_video_loop(video_path, audio_path, output_path): | |
| """Extends video duration by repeating original and reversed video until it meets/exceeds audio duration.""" | |
| audio_duration = librosa.get_duration(path=audio_path) | |
| video_duration = get_video_duration(video_path) | |
| print(f"Video Duration: {video_duration:.2f} sec") | |
| print(f"Audio Duration: {audio_duration:.2f} sec") | |
| if audio_duration > video_duration: | |
| print("Extending video by repeating original and reversed versions.") | |
| # Create reversed video | |
| reversed_clip = tempfile.NamedTemporaryFile(dir=TEMP_DIR.name, delete=False, suffix=".mp4").name | |
| subprocess.run( | |
| f"ffmpeg -y -i {video_path} -vf reverse -an {reversed_clip}", shell=True | |
| ) | |
| # Generate a list of clips to reach/exceed audio duration | |
| video_clips = [video_path, reversed_clip] | |
| total_duration = video_duration * 2 # Original + reversed | |
| while total_duration < audio_duration: | |
| video_clips.append(video_path) | |
| video_clips.append(reversed_clip) | |
| total_duration += video_duration * 2 | |
| print(f"Total Clips: {len(video_clips)}") | |
| # Use FFmpeg filter_complex concat for seamless merging | |
| concat_filter = "".join(f"[{i}:v:0]" for i in range(len(video_clips))) + f"concat=n={len(video_clips)}:v=1[outv]" | |
| input_files = " ".join(f"-i {clip}" for clip in video_clips) | |
| subprocess.run( | |
| f"ffmpeg -y {input_files} -filter_complex \"{concat_filter}\" -map \"[outv]\" -an {output_path}", | |
| shell=True | |
| ) | |
| print(f"Extended video saved to {output_path}") | |
| else: | |
| print("Audio is not longer than video. No extension needed.") | |
| subprocess.run(f"cp {video_path} {output_path}", shell=True) | |
| def translate_text(text, target_language): | |
| if not text or text.strip() == "": | |
| return "" | |
| LANGUAGE_CODES = {"english": "en", "hindi": "hi"} | |
| try: | |
| # Convert language name to code | |
| target_language_code = LANGUAGE_CODES.get(target_language.lower()) | |
| # Use Google Translate with proper coroutine handling | |
| async def perform_translation(): | |
| translator = Translator() | |
| result = await translator.translate(text, dest=target_language_code) | |
| return result.text if hasattr(result, 'text') else text | |
| # Run the async function in the event loop | |
| loop = asyncio.new_event_loop() | |
| asyncio.set_event_loop(loop) | |
| result = loop.run_until_complete(perform_translation()) | |
| loop.close() | |
| return result | |
| except Exception as e: | |
| logger.error(f"Error translating text: {e}") | |
| # Return original text if translation fails | |
| return text | |
| def generate_video(): | |
| global TEMP_DIR | |
| # global VIDEO_DIRECTORY | |
| TEMP_DIR = create_temp_dir() | |
| if 'video' not in request.files: | |
| return jsonify({'error': 'Video file is required.'}), 400 | |
| video_file = request.files['video'] | |
| text_prompt = request.form['text_prompt'] | |
| print('Input text prompt: ',text_prompt) | |
| text_prompt = text_prompt.strip() | |
| if not text_prompt: | |
| return jsonify({'error': 'Input text prompt cannot be blank'}), 400 | |
| voice_cloning = request.form.get('voice_cloning', 'no') | |
| target_language = request.form.get('target_language', 'original_text') | |
| if target_language != 'original_text': | |
| response = translate_text(text_prompt, target_language) | |
| text_prompt = response.strip() | |
| print('Translated input text prompt: ',text_prompt) | |
| temp_audio_path = generate_audio(voice_cloning, text_prompt) | |
| with tempfile.NamedTemporaryFile(suffix=".mp4", prefix="input_",dir=TEMP_DIR.name, delete=False) as temp_file: | |
| temp_video_path = temp_file.name | |
| video_file.save(temp_video_path) | |
| print('temp_video_path',temp_video_path) | |
| # output_video = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4").name | |
| # You can pass additional parameters via form data if needed (e.g., checkpoint path) | |
| inference_ckpt_path = request.form.get('inference_ckpt_path', 'checkpoints/latentsync_unet.pt') | |
| unet_config_path = request.form.get('unet_config_path', 'configs/unet/second_stage.yaml') | |
| output_video = tempfile.NamedTemporaryFile(dir=TEMP_DIR.name, delete=False, suffix=".mp4").name | |
| extend_video_loop(temp_video_path, temp_audio_path, output_video) | |
| final_output_video = tempfile.NamedTemporaryFile(dir=TEMP_DIR.name, delete=False, suffix="_final_extended.mp4").name | |
| try: | |
| run_inference( | |
| video_path=output_video, | |
| audio_path=temp_audio_path, | |
| video_out_path=final_output_video, | |
| inference_ckpt_path=inference_ckpt_path, | |
| unet_config_path=unet_config_path, | |
| inference_steps=int(request.form.get('inference_steps', 20)), | |
| guidance_scale=float(request.form.get('guidance_scale', 1.0)), | |
| seed=int(request.form.get('seed', 1247)) | |
| ) | |
| # Return the output video path or further process the file for download | |
| if final_output_video and final_output_video.endswith('.mp4'): | |
| filename = os.path.basename(final_output_video) | |
| # os.makedirs('videos', exist_ok=True) | |
| # VIDEO_DIRECTORY = os.path.abspath('videos') | |
| print("VIDEO_DIRECTORY: ",VIDEO_DIRECTORY) | |
| destination_path = os.path.join(VIDEO_DIRECTORY, filename) | |
| shutil.copy(final_output_video, destination_path) | |
| video_url = f"/videos/{filename}" | |
| return jsonify({"message": "Video processed and saved successfully.", | |
| "output_video": video_url, | |
| "status": "success"}), 200 | |
| except Exception as e: | |
| return jsonify({'error': str(e)}), 500 | |
| def serve_video(filename): | |
| # global VIDEO_DIRECTORY | |
| return send_from_directory(VIDEO_DIRECTORY, filename, as_attachment=False) | |
| def health_status(): | |
| response = {"online": "true"} | |
| return jsonify(response) | |
| if __name__ == '__main__': | |
| app.run(debug=True) |