prithivMLmods's picture
update app
e8a4265 verified
import os
import sys
import cv2
import time
import copy
import random
import torch
import spaces
import requests
import subprocess
import gradio as gr
from PIL import Image
import importlib.util
from threading import Thread
from typing import Iterable, Optional, Tuple, List
def check_and_install_package(package_name, import_name=None, pip_name=None):
"""Check if a package is installed, and if not, install it."""
if import_name is None:
import_name = package_name
if pip_name is None:
pip_name = package_name
spec = importlib.util.find_spec(import_name)
if spec is None:
print(f"Installing {package_name}...")
subprocess.check_call([sys.executable, "-m", "pip", "install", pip_name, "-q"])
print(f"✓ {package_name} installed successfully")
return True
print("Checking and installing transformers==4.57.3 ...")
check_and_install_package("transformers", "transformers", "transformers==4.57.3")
print("Done!")
from transformers import (
Qwen2_5_VLForConditionalGeneration,
AutoModelForImageTextToText,
AutoModelForCausalLM,
AutoProcessor,
TextIteratorStreamer,
)
try:
from vibevoice.modular.modeling_vibevoice_streaming_inference import (
VibeVoiceStreamingForConditionalGenerationInference,
)
from vibevoice.processor.vibevoice_streaming_processor import (
VibeVoiceStreamingProcessor,
)
except ImportError:
print("CRITICAL WARNING: 'vibevoice' modules not found. Ensure the vibevoice repository structure is present.")
VibeVoiceStreamingForConditionalGenerationInference = None
VibeVoiceStreamingProcessor = None
from gradio.themes import Soft
from gradio.themes.utils import colors, fonts, sizes
colors.orange_red = colors.Color(
name="orange_red",
c50="#FFF0E5",
c100="#FFE0CC",
c200="#FFC299",
c300="#FFA366",
c400="#FF8533",
c500="#FF4500",
c600="#E63E00",
c700="#CC3700",
c800="#B33000",
c900="#992900",
c950="#802200",
)
class OrangeRedTheme(Soft):
def __init__(
self,
*,
primary_hue: colors.Color | str = colors.gray,
secondary_hue: colors.Color | str = colors.orange_red,
neutral_hue: colors.Color | str = colors.slate,
text_size: sizes.Size | str = sizes.text_lg,
font: fonts.Font | str | Iterable[fonts.Font | str] = (
fonts.GoogleFont("Outfit"), "Arial", "sans-serif",
),
font_mono: fonts.Font | str | Iterable[fonts.Font | str] = (
fonts.GoogleFont("IBM Plex Mono"), "ui-monospace", "monospace",
),
):
super().__init__(
primary_hue=primary_hue,
secondary_hue=secondary_hue,
neutral_hue=neutral_hue,
text_size=text_size,
font=font,
font_mono=font_mono,
)
super().set(
background_fill_primary="*primary_50",
background_fill_primary_dark="*primary_900",
body_background_fill="linear-gradient(135deg, *primary_200, *primary_100)",
body_background_fill_dark="linear-gradient(135deg, *primary_900, *primary_800)",
button_primary_text_color="white",
button_primary_text_color_hover="white",
button_primary_background_fill="linear-gradient(90deg, *secondary_500, *secondary_600)",
button_primary_background_fill_hover="linear-gradient(90deg, *secondary_600, *secondary_700)",
button_primary_background_fill_dark="linear-gradient(90deg, *secondary_600, *secondary_700)",
button_primary_background_fill_hover_dark="linear-gradient(90deg, *secondary_500, *secondary_600)",
button_secondary_text_color="black",
button_secondary_text_color_hover="white",
button_secondary_background_fill="linear-gradient(90deg, *primary_300, *primary_300)",
button_secondary_background_fill_hover="linear-gradient(90deg, *primary_400, *primary_400)",
button_secondary_background_fill_dark="linear-gradient(90deg, *primary_500, *primary_600)",
button_secondary_background_fill_hover_dark="linear-gradient(90deg, *primary_500, *primary_500)",
slider_color="*secondary_500",
slider_color_dark="*secondary_600",
block_title_text_weight="600",
block_border_width="3px",
block_shadow="*shadow_drop_lg",
button_primary_shadow="*shadow_drop_lg",
button_large_padding="11px",
color_accent_soft="*primary_100",
block_label_background_fill="*primary_200",
)
orange_red_theme = OrangeRedTheme()
css = """
#main-title h1 {
font-size: 2.3em !important;
}
#output-title h2 {
font-size: 2.1em !important;
}
.generating {
border: 2px solid #4682B4;
}
"""
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(f"Using Main Device: {device}")
QWEN_VL_MODEL_ID = "Qwen/Qwen2.5-VL-7B-Instruct"
print(f"Loading OCR Model: {QWEN_VL_MODEL_ID}...")
qwen_processor = AutoProcessor.from_pretrained(QWEN_VL_MODEL_ID, trust_remote_code=True)
qwen_model = Qwen2_5_VLForConditionalGeneration.from_pretrained(
QWEN_VL_MODEL_ID,
#attn_implementation="flash_attention_2",
trust_remote_code=True,
torch_dtype=torch.float16
).to(device).eval()
print("Model loaded successfully.")
TTS_MODEL_PATH = "microsoft/VibeVoice-Realtime-0.5B"
print(f"Loading TTS Model: {TTS_MODEL_PATH}...")
print("VibeVoice Model loaded successfully.")
tts_processor = VibeVoiceStreamingProcessor.from_pretrained(TTS_MODEL_PATH)
tts_model = VibeVoiceStreamingForConditionalGenerationInference.from_pretrained(
TTS_MODEL_PATH,
torch_dtype=torch.float16,
device_map="cuda",
attn_implementation="sdpa",
)
tts_model.eval()
tts_model.set_ddpm_inference_steps(num_steps=5)
class VoiceMapper:
"""Maps speaker names to voice file paths"""
def __init__(self):
self.setup_voice_presets()
new_dict = {}
for name, path in self.voice_presets.items():
if "_" in name: name = name.split("_")[0]
if "-" in name: name = name.split("-")[-1]
new_dict[name] = path
self.voice_presets.update(new_dict)
def setup_voice_presets(self):
voices_dir = os.path.join(os.path.dirname(__file__), "demo/voices/streaming_model")
if not os.path.exists(voices_dir):
print(f"Warning: Voices directory not found at {voices_dir}")
self.voice_presets = {}
self.available_voices = {}
return
self.voice_presets = {}
pt_files = [f for f in os.listdir(voices_dir) if f.lower().endswith(".pt") and os.path.isfile(os.path.join(voices_dir, f))]
for pt_file in pt_files:
name = os.path.splitext(pt_file)[0]
full_path = os.path.join(voices_dir, pt_file)
self.voice_presets[name] = full_path
self.voice_presets = dict(sorted(self.voice_presets.items()))
self.available_voices = {name: path for name, path in self.voice_presets.items() if os.path.exists(path)}
print(f"Found {len(self.available_voices)} voice files.")
def get_voice_path(self, speaker_name: str) -> str:
if speaker_name in self.voice_presets:
return self.voice_presets[speaker_name]
speaker_lower = speaker_name.lower()
for preset_name, path in self.voice_presets.items():
if preset_name.lower() in speaker_lower or speaker_lower in preset_name.lower():
return path
if self.voice_presets:
return list(self.voice_presets.values())[0]
return ""
VOICE_MAPPER = VoiceMapper()
print("TTS Model loaded successfully.")
@spaces.GPU
def process_pipeline(
image: Image.Image,
query: str,
speaker_name: str,
cfg_scale: float,
ocr_max_tokens: int,
ocr_temp: float,
progress=gr.Progress()
):
"""
Combined pipeline: Image - Text -> TTS - Audio
"""
if image is None:
return "Please upload an image.", None, "Error: No image provided."
progress(0.2, desc="Analyzing Image ()...")
if not query:
query = "Analyze the content perfectly."
messages = [{
"role": "user",
"content": [
{"type": "image"},
{"type": "text", "text": query},
]
}]
prompt_full = qwen_processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
inputs = qwen_processor(
text=[prompt_full],
images=[image],
return_tensors="pt",
padding=True
).to(device)
generated_ids = qwen_model.generate(
**inputs,
max_new_tokens=ocr_max_tokens,
do_sample=True,
temperature=ocr_temp,
top_p=0.9,
)
generated_ids_trimmed = [
out_ids[len(in_ids):] for in_ids, out_ids in zip(inputs.input_ids, generated_ids)
]
extracted_text = qwen_processor.batch_decode(
generated_ids_trimmed, skip_special_tokens=True, clean_up_tokenization_spaces=False
)[0]
extracted_text = extracted_text.replace("<|im_end|>", "").strip()
progress(0.5, desc=f"Analyzing completed. Converting to speech ({len(extracted_text)} chars)...")
if not extracted_text:
return extracted_text, None, "produced no text."
try:
full_script = extracted_text.replace("'", "'").replace('"', '"').replace('"', '"')
voice_path = VOICE_MAPPER.get_voice_path(speaker_name)
if not voice_path:
return extracted_text, None, "Error: Voice file not found."
all_prefilled_outputs = torch.load(voice_path, map_location="cuda", weights_only=False)
tts_inputs = tts_processor.process_input_with_cached_prompt(
text=full_script,
cached_prompt=all_prefilled_outputs,
padding=True,
return_tensors="pt",
return_attention_mask=True,
)
tts_model.to("cuda")
for k, v in tts_inputs.items():
if torch.is_tensor(v):
tts_inputs[k] = v.to("cuda")
with torch.cuda.amp.autocast():
outputs = tts_model.generate(
**tts_inputs,
max_new_tokens=None,
cfg_scale=cfg_scale,
tokenizer=tts_processor.tokenizer,
generation_config={"do_sample": False},
verbose=False,
all_prefilled_outputs=copy.deepcopy(all_prefilled_outputs)
)
tts_model.to("cpu")
torch.cuda.empty_cache()
if outputs.speech_outputs and outputs.speech_outputs[0] is not None:
sample_rate = 24000
output_dir = "./outputs"
os.makedirs(output_dir, exist_ok=True)
output_path = os.path.join(output_dir, f"generated_{int(time.time())}.wav")
tts_processor.save_audio(
outputs.speech_outputs[0].cpu(),
output_path=output_path,
)
status = f"✅ Success! Text Length: {len(extracted_text)} chars."
return extracted_text, output_path, status
else:
return extracted_text, None, "TTS Generation failed (no output)."
except Exception as e:
tts_model.to("cpu")
torch.cuda.empty_cache()
import traceback
return extracted_text, None, f"Error during TTS: {str(e)}"
url = "https://huggingface.co/datasets/strangervisionhf/image-examples/resolve/main/2.jpg?download=true"
example_image = Image.open(requests.get(url, stream=True).raw).convert("RGB")
with gr.Blocks() as demo:
gr.Markdown("# **Vision-to-VibeVoice-en**", elem_id="main-title")
gr.Markdown("Perform vision-to-audio inference with [Qwen2.5VL](https://huggingface.co/Qwen/Qwen2.5-VL-7B-Instruct) + [VibeVoice-Realtime-0.5B](https://huggingface.co/microsoft/VibeVoice-Realtime-0.5B).")
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("### 1. Vision Input")
image_upload = gr.Image(type="pil", label="Upload Image", value=example_image, height=300)
image_query = gr.Textbox(label="Enter the prompt", value="Give a short description indicating whether the image is safe or unsafe.", placeholder="E.g., Read this page...")
gr.Markdown("### 2. Voice Settings")
voice_choices = list(VOICE_MAPPER.available_voices.keys())
if not voice_choices: voice_choices = ["Default"]
speaker_dropdown = gr.Dropdown(
choices=voice_choices,
value=voice_choices[0],
label="Speaker Voice"
)
cfg_slider = gr.Slider(minimum=1.0, maximum=3.0, value=1.5, step=0.1, label="CFG Scale (Speech Fidelity)")
with gr.Accordion("Advanced Options", open=False):
max_new_tokens = gr.Slider(label="Max Tokens", minimum=128, maximum=4096, step=128, value=1024)
temperature = gr.Slider(label="Temperature", minimum=0.1, maximum=2.0, step=0.1, value=0.1)
submit_btn = gr.Button("Generate Speech", variant="primary", size="lg")
with gr.Column(scale=1):
gr.Markdown("### 3. Results", elem_id="output-title")
text_output = gr.Textbox(
label="Extracted Text (Editable)",
interactive=True,
lines=11,
)
audio_output = gr.Audio(
label="Generated Speech",
type="filepath",
interactive=False
)
status_output = gr.Textbox(label="Status Log", lines=2)
gr.Examples(
examples=[["Caption the image...", "https://huggingface.co/datasets/merve/vlm_test_images/resolve/main/venice.jpg"]],
inputs=[image_query, image_upload],
label="Example"
)
submit_btn.click(
fn=process_pipeline,
inputs=[
image_upload,
image_query,
speaker_dropdown,
cfg_slider,
max_new_tokens,
temperature
],
outputs=[text_output, audio_output, status_output]
)
if __name__ == "__main__":
demo.queue(max_size=40).launch(css=css, theme=orange_red_theme, mcp_server=True, ssr_mode=False, show_error=True)