aiavatarnew / app.py
Spanicin's picture
Update app.py
06968de verified
import tempfile
import os
import shutil
import librosa
import json
import subprocess
import gc
from googletrans import Translator
import asyncio
from flask import Flask, request, jsonify, send_from_directory
from omegaconf import OmegaConf
import torch
from diffusers import AutoencoderKL, DDIMScheduler
from latentsync.models.unet import UNet3DConditionModel
from latentsync.pipelines.lipsync_pipeline import LipsyncPipeline
from diffusers.utils.import_utils import is_xformers_available
from accelerate.utils import set_seed
from latentsync.whisper.audio2feature import Audio2Feature
from openai import OpenAI
from elevenlabs import set_api_key, generate, play, clone, Voice, VoiceSettings
from torch.cuda.amp import autocast
# Initialize the Flask app
app = Flask(__name__)
TEMP_DIR = None
VIDEO_DIRECTORY = os.path.abspath("videos")
os.makedirs(VIDEO_DIRECTORY, exist_ok=True)
def clear_cuda_memory():
torch.cuda.empty_cache()
gc.collect()
def run_inference(video_path, audio_path, video_out_path,
inference_ckpt_path, unet_config_path="configs/unet/second_stage.yaml",
inference_steps=20, guidance_scale=1.0, seed=1247):
clear_cuda_memory()
# Load configuration
config = OmegaConf.load(unet_config_path)
# Determine proper dtype based on GPU capabilities
is_fp16_supported = torch.cuda.is_available() and torch.cuda.get_device_capability()[0] > 7
dtype = torch.float16 if is_fp16_supported else torch.float32
# Setup scheduler
scheduler = DDIMScheduler.from_pretrained("configs")
# Choose whisper model based on config settings
if config.model.cross_attention_dim == 768:
whisper_model_path = "checkpoints/whisper/small.pt"
elif config.model.cross_attention_dim == 384:
whisper_model_path = "checkpoints/whisper/tiny.pt"
else:
raise NotImplementedError("cross_attention_dim must be 768 or 384")
# Initialize the audio encoder
audio_encoder = Audio2Feature(model_path=whisper_model_path,
device="cuda", num_frames=config.data.num_frames)
# Load VAE
vae = AutoencoderKL.from_pretrained("stabilityai/sd-vae-ft-mse", torch_dtype=dtype)
vae.config.scaling_factor = 0.18215
vae.config.shift_factor = 0
# Load UNet model from the checkpoint
unet, _ = UNet3DConditionModel.from_pretrained(
OmegaConf.to_container(config.model),
inference_ckpt_path, # load checkpoint
device="cpu",
)
unet = unet.to(dtype=dtype)
# Optionally enable memory-efficient attention if available
if is_xformers_available():
unet.enable_xformers_memory_efficient_attention()
# Initialize the pipeline and move to GPU
pipeline = LipsyncPipeline(
vae=vae,
audio_encoder=audio_encoder,
unet=unet,
scheduler=scheduler,
).to("cuda")
# Set seed
if seed != -1:
set_seed(seed)
else:
torch.seed()
with autocast():
try:
pipeline(
video_path=video_path,
audio_path=audio_path,
video_out_path=video_out_path,
video_mask_path=video_out_path.replace(".mp4", "_mask.mp4"),
num_frames=config.data.num_frames,
num_inference_steps=inference_steps,
guidance_scale=guidance_scale,
weight_dtype=dtype,
width=config.data.resolution,
height=config.data.resolution,
)
finally:
clear_cuda_memory()
def create_temp_dir():
return tempfile.TemporaryDirectory()
def generate_audio(voice_cloning, text_prompt):
if voice_cloning == 'yes':
print('Entering Custom Audio creation using elevenlabs')
set_api_key('92e149985ea2732b4359c74346c3daee')
voice = Voice(voice_id="VJpttplXHolgV2leGe5V",name="Marc",settings=VoiceSettings(
stability=0.71, similarity_boost=0.9, style=0.0, use_speaker_boost=True),)
audio = generate(text = text_prompt, voice = voice, model = "eleven_multilingual_v2",stream=True, latency=4)
with tempfile.NamedTemporaryFile(suffix=".mp3", prefix="cloned_audio_",dir=TEMP_DIR.name, delete=False) as temp_file:
for chunk in audio:
temp_file.write(chunk)
driven_audio_path = temp_file.name
print('driven_audio_path',driven_audio_path)
return driven_audio_path
elif voice_cloning == 'no':
voice = 'echo'
print('Entering Default Audio creation using elevenlabs')
set_api_key('92e149985ea2732b4359c74346c3daee')
audio = generate(text = text_prompt, voice = "Daniel", model = "eleven_multilingual_v2",stream=True, latency=4)
with tempfile.NamedTemporaryFile(suffix=".mp3", prefix="default_audio_",dir=TEMP_DIR.name, delete=False) as temp_file:
for chunk in audio:
temp_file.write(chunk)
driven_audio_path = temp_file.name
print('driven_audio_path',driven_audio_path)
return driven_audio_path
def get_video_duration(video_path):
"""Extracts video duration dynamically using ffprobe."""
cmd = [
"ffprobe", "-v", "error", "-show_entries", "format=duration",
"-of", "json", video_path
]
result = subprocess.run(cmd, capture_output=True, text=True)
duration = json.loads(result.stdout)["format"]["duration"]
return float(duration)
def extend_video_simple(video_path, audio_path, output_path):
"""Extends video duration by appending a reversed version if audio is longer."""
audio_duration = librosa.get_duration(path=audio_path)
video_duration = get_video_duration(video_path)
print(f"Video Duration: {video_duration:.2f} sec")
print(f"Audio Duration: {audio_duration:.2f} sec")
if audio_duration > video_duration:
print("Extending video by adding reversed version.")
# Create a reversed version of the full video
reversed_clip = tempfile.NamedTemporaryFile(dir=TEMP_DIR.name, delete=False, suffix=".mp4").name
subprocess.run(
f"ffmpeg -y -i {video_path} -vf reverse -an {reversed_clip}", shell=True
)
# Merge original + reversed
subprocess.run(
f"ffmpeg -y -i {video_path} -i {reversed_clip} -filter_complex \"[0:v:0][1:v:0]concat=n=2:v=1[outv]\" -map \"[outv]\" -an {output_path}",
shell=True
)
else:
print("Audio is not longer than video. No extension needed.")
subprocess.run(f"cp {video_path} {output_path}", shell=True)
def extend_video_loop(video_path, audio_path, output_path):
"""Extends video duration by repeating original and reversed video until it meets/exceeds audio duration."""
audio_duration = librosa.get_duration(path=audio_path)
video_duration = get_video_duration(video_path)
print(f"Video Duration: {video_duration:.2f} sec")
print(f"Audio Duration: {audio_duration:.2f} sec")
if audio_duration > video_duration:
print("Extending video by repeating original and reversed versions.")
# Create reversed video
reversed_clip = tempfile.NamedTemporaryFile(dir=TEMP_DIR.name, delete=False, suffix=".mp4").name
subprocess.run(
f"ffmpeg -y -i {video_path} -vf reverse -an {reversed_clip}", shell=True
)
# Generate a list of clips to reach/exceed audio duration
video_clips = [video_path, reversed_clip]
total_duration = video_duration * 2 # Original + reversed
while total_duration < audio_duration:
video_clips.append(video_path)
video_clips.append(reversed_clip)
total_duration += video_duration * 2
print(f"Total Clips: {len(video_clips)}")
# Use FFmpeg filter_complex concat for seamless merging
concat_filter = "".join(f"[{i}:v:0]" for i in range(len(video_clips))) + f"concat=n={len(video_clips)}:v=1[outv]"
input_files = " ".join(f"-i {clip}" for clip in video_clips)
subprocess.run(
f"ffmpeg -y {input_files} -filter_complex \"{concat_filter}\" -map \"[outv]\" -an {output_path}",
shell=True
)
print(f"Extended video saved to {output_path}")
else:
print("Audio is not longer than video. No extension needed.")
subprocess.run(f"cp {video_path} {output_path}", shell=True)
def translate_text(text, target_language):
if not text or text.strip() == "":
return ""
LANGUAGE_CODES = {"english": "en", "hindi": "hi"}
try:
# Convert language name to code
target_language_code = LANGUAGE_CODES.get(target_language.lower())
# Use Google Translate with proper coroutine handling
async def perform_translation():
translator = Translator()
result = await translator.translate(text, dest=target_language_code)
return result.text if hasattr(result, 'text') else text
# Run the async function in the event loop
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
result = loop.run_until_complete(perform_translation())
loop.close()
return result
except Exception as e:
logger.error(f"Error translating text: {e}")
# Return original text if translation fails
return text
@app.route('/run', methods=['POST'])
def generate_video():
global TEMP_DIR
# global VIDEO_DIRECTORY
TEMP_DIR = create_temp_dir()
if 'video' not in request.files:
return jsonify({'error': 'Video file is required.'}), 400
video_file = request.files['video']
text_prompt = request.form['text_prompt']
print('Input text prompt: ',text_prompt)
text_prompt = text_prompt.strip()
if not text_prompt:
return jsonify({'error': 'Input text prompt cannot be blank'}), 400
voice_cloning = request.form.get('voice_cloning', 'no')
target_language = request.form.get('target_language', 'original_text')
if target_language != 'original_text':
response = translate_text(text_prompt, target_language)
text_prompt = response.strip()
print('Translated input text prompt: ',text_prompt)
temp_audio_path = generate_audio(voice_cloning, text_prompt)
with tempfile.NamedTemporaryFile(suffix=".mp4", prefix="input_",dir=TEMP_DIR.name, delete=False) as temp_file:
temp_video_path = temp_file.name
video_file.save(temp_video_path)
print('temp_video_path',temp_video_path)
# output_video = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4").name
# You can pass additional parameters via form data if needed (e.g., checkpoint path)
inference_ckpt_path = request.form.get('inference_ckpt_path', 'checkpoints/latentsync_unet.pt')
unet_config_path = request.form.get('unet_config_path', 'configs/unet/second_stage.yaml')
output_video = tempfile.NamedTemporaryFile(dir=TEMP_DIR.name, delete=False, suffix=".mp4").name
extend_video_loop(temp_video_path, temp_audio_path, output_video)
final_output_video = tempfile.NamedTemporaryFile(dir=TEMP_DIR.name, delete=False, suffix="_final_extended.mp4").name
try:
run_inference(
video_path=output_video,
audio_path=temp_audio_path,
video_out_path=final_output_video,
inference_ckpt_path=inference_ckpt_path,
unet_config_path=unet_config_path,
inference_steps=int(request.form.get('inference_steps', 20)),
guidance_scale=float(request.form.get('guidance_scale', 1.0)),
seed=int(request.form.get('seed', 1247))
)
# Return the output video path or further process the file for download
if final_output_video and final_output_video.endswith('.mp4'):
filename = os.path.basename(final_output_video)
# os.makedirs('videos', exist_ok=True)
# VIDEO_DIRECTORY = os.path.abspath('videos')
print("VIDEO_DIRECTORY: ",VIDEO_DIRECTORY)
destination_path = os.path.join(VIDEO_DIRECTORY, filename)
shutil.copy(final_output_video, destination_path)
video_url = f"/videos/{filename}"
return jsonify({"message": "Video processed and saved successfully.",
"output_video": video_url,
"status": "success"}), 200
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route("/videos/<string:filename>", methods=['GET'])
def serve_video(filename):
# global VIDEO_DIRECTORY
return send_from_directory(VIDEO_DIRECTORY, filename, as_attachment=False)
@app.route("/health", methods=["GET"])
def health_status():
response = {"online": "true"}
return jsonify(response)
if __name__ == '__main__':
app.run(debug=True)