VibeThinker / app.py
VladBoyko's picture
Update app.py
8a46019 verified
import gradio as gr
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer
from threading import Thread, Event
import re
import time
import html
# --- Configuration ---
MODEL_ID = "WeiboAI/VibeThinker-1.5B"
class VibeThinkerModel:
def __init__(self):
self.model = None
self.tokenizer = None
self.device = "cuda" if torch.cuda.is_available() else "cpu"
self.stop_signal = Event()
def load_model(self):
if self.model is not None: return
print(f"🔄 Loading {MODEL_ID}...")
try:
self.tokenizer = AutoTokenizer.from_pretrained(MODEL_ID, trust_remote_code=True)
self.model = AutoModelForCausalLM.from_pretrained(
MODEL_ID,
torch_dtype=torch.bfloat16 if torch.cuda.is_bf16_supported() else torch.float16,
device_map="auto",
trust_remote_code=True,
low_cpu_mem_usage=True
)
print("✅ Model loaded.")
except Exception as e:
raise e
def stop_generation(self):
self.stop_signal.set()
def _detect_tail_loop(self, text, min_phrase_len=3, max_phrase_len=10, threshold=20):
"""
Detects if the generator has gotten stuck in a loop at the END of the text.
Criteria: A phrase of 3-10 words repeated at least 20 times consecutively.
"""
words = text.split()
total_words = len(words)
# We need at least (min_phrase * threshold) words to even check
if total_words < min_phrase_len * threshold:
return False
# Only check the end of the string (optimization)
# We look at the last (max_phrase * threshold) words
check_window = max_phrase_len * threshold
recent_words = words[-check_window:] if total_words > check_window else words
for phrase_len in range(min_phrase_len, max_phrase_len + 1):
# The candidate phrase is the very last 'phrase_len' words
candidate_phrase = recent_words[-phrase_len:]
# Construct what the tail SHOULD look like if it's looping
# e.g. if phrase is "and then", we expect "and then and then..."
# We check if the tail of the text matches (phrase * threshold)
required_len = phrase_len * threshold
if len(recent_words) < required_len:
continue
segment_to_check = recent_words[-required_len:]
# Efficient check: does the segment consist ONLY of the candidate phrase?
# We compare the segment against the candidate phrase repeated
expected_segment = candidate_phrase * threshold
if segment_to_check == expected_segment:
return True
return False
def generate_response_streaming(self, prompt, temperature=0.6, max_new_tokens=32000):
if not self.model: self.load_model()
self.stop_signal.clear()
try:
start_time = time.time()
# Optimized Prompt for VibeThinker
messages = [
{"role": "system", "content": "You are an expert algorithm engineer. Analyze the problem deeply, then provide a clean Python solution."},
{"role": "user", "content": prompt}
]
text_input = self.tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
inputs = self.tokenizer(text_input, return_tensors="pt").to(self.device)
streamer = TextIteratorStreamer(self.tokenizer, skip_prompt=True, skip_special_tokens=True)
generation_kwargs = dict(
**inputs,
max_new_tokens=max_new_tokens,
temperature=temperature,
top_p=0.95,
top_k=50,
do_sample=True,
pad_token_id=self.tokenizer.eos_token_id,
streamer=streamer,
)
thread = Thread(target=self.model.generate, kwargs=generation_kwargs)
thread.start()
generated_text = ""
loop_detected = False
# Token counter for loop check frequency
token_count = 0
for new_text in streamer:
if self.stop_signal.is_set(): break
generated_text += new_text
token_count += 1
# Check for loops every 10 tokens to save CPU
if token_count % 10 == 0:
if self._detect_tail_loop(generated_text):
loop_detected = True
self.stop_signal.set() # Stop the model
# Optional: Truncate the repetitive garbage
# (Simple truncation for UI cleanliness)
generated_text = generated_text + "\n\n[⚠️ Generation stopped: Infinite loop detected]"
break
yield generated_text, {
"time": time.time() - start_time,
"tokens": len(self.tokenizer.encode(generated_text)),
"generating": True
}
if not self.stop_signal.is_set():
thread.join()
yield generated_text, {
"time": time.time() - start_time,
"tokens": len(self.tokenizer.encode(generated_text)),
"generating": False
}
except Exception as e:
yield f"Error: {str(e)}", None
vibe_model = VibeThinkerModel()
class ModernUIParser:
"""Parses text into a structured, modern UI"""
def format_code(self, code, lang="python"):
"""Applies basic HTML syntax highlighting regex"""
code = html.escape(code)
# Comments
code = re.sub(r'(#.*?)(?=\n|$)', r'<span class="c">\1</span>', code)
# Keywords
keywords = r'\b(def|class|return|if|else|elif|for|while|import|from|try|except|with|as|pass|None|True|False)\b'
code = re.sub(keywords, r'<span class="k">\1</span>', code)
# Builtins/Calls
code = re.sub(r'\b(print|len|range|enumerate|zip|super|__init__)\b', r'<span class="nf">\1</span>', code)
# Strings
code = re.sub(r'(&quot;.*?&quot;)', r'<span class="s">\1</span>', code)
code = re.sub(r"('.*?')", r'<span class="s">\1</span>', code)
return code
def parse_and_render(self, text, stats):
# 1. Separate Thinking from Content
# Heuristic: Content before the first code block or explicit "Solution" header is usually thinking
thinking = ""
solution = text
# Find split point
markers = ["```", "Here is the solution", "### Solution", "Implementation:"]
first_marker_idx = len(text)
for m in markers:
idx = text.find(m)
if idx != -1 and idx < first_marker_idx:
first_marker_idx = idx
if first_marker_idx < len(text) and first_marker_idx > 50:
thinking = text[:first_marker_idx].strip()
solution = text[first_marker_idx:].strip()
# 2. Process Solution Text (Markdown-ish to HTML)
# Handle Code Blocks
parts = re.split(r'(```\w*\n.*?```)', solution, flags=re.DOTALL)
solution_html = ""
for part in parts:
if part.startswith('```'):
# Extract lang and code
match = re.match(r'```(\w*)\n(.*?)```', part, re.DOTALL)
if match:
lang = match.group(1) or "text"
code_content = match.group(2)
highlighted = self.format_code(code_content, lang)
solution_html += f"""
<div class="code-block">
<div class="code-header">
<span class="lang-tag">{lang}</span>
<span class="copy-btn" onclick="navigator.clipboard.writeText(this.parentElement.nextElementSibling.innerText)">Copy</span>
</div>
<pre>{highlighted}</pre>
</div>"""
else:
solution_html += f"<pre>{html.escape(part)}</pre>"
else:
# Normal text processing
clean_text = html.escape(part)
# Headers
clean_text = re.sub(r'^### (.*?)$', r'<h3>\1</h3>', clean_text, flags=re.M)
clean_text = re.sub(r'^## (.*?)$', r'<h2>\1</h2>', clean_text, flags=re.M)
clean_text = re.sub(r'\*\*(.*?)\*\*', r'<strong>\1</strong>', clean_text)
# Line breaks
clean_text = clean_text.replace('\n', '<br>')
solution_html += f"<div class='text-content'>{clean_text}</div>"
# 3. Process Thinking
thinking_html = html.escape(thinking).replace('\n', '<br>')
# 4. Stats & Cursor
is_gen = stats['generating'] if stats else False
t_sec = stats['tokens'] / stats['time'] if stats and stats['time'] > 0 else 0
cursor = '<span class="cursor"></span>' if is_gen else ''
# CSS Styles (Modern Dark Theme)
css = """
<style>
:root { --bg: #0f1117; --card: #1e293b; --accent: #6366f1; --text: #e2e8f0; --dim: #94a3b8; }
.ui-container { font-family: 'Inter', system-ui, sans-serif; color: var(--text); line-height: 1.6; }
/* Stats Bar */
.stats-bar { display: flex; gap: 15px; margin-bottom: 20px; font-size: 12px; text-transform: uppercase; letter-spacing: 1px; }
.stat-pill { background: #334155; padding: 4px 10px; border-radius: 20px; color: #cbd5e1; display: flex; align-items: center; gap: 6px; }
.stat-active { border: 1px solid var(--accent); color: var(--accent); background: rgba(99, 102, 241, 0.1); }
/* Thinking Section */
details.thinking-box { margin-bottom: 20px; border: 1px solid #312e81; border-radius: 8px; background: rgba(49, 46, 129, 0.1); overflow: hidden; }
details.thinking-box summary { padding: 12px 16px; cursor: pointer; font-weight: 600; color: #818cf8; list-style: none; outline: none; user-select: none; }
details.thinking-box summary::marker { display: none; }
details.thinking-box summary:hover { background: rgba(49, 46, 129, 0.2); }
.thought-content { padding: 16px; font-family: 'JetBrains Mono', monospace; font-size: 13px; color: #a5b4fc; border-top: 1px solid #312e81; }
/* Solution Section */
.solution-box { background: var(--bg); padding: 10px 0; }
.text-content { margin-bottom: 10px; }
h2, h3 { color: white; margin-top: 20px; margin-bottom: 10px; font-weight: 600; }
strong { color: #fff; font-weight: 700; }
/* Code Blocks */
.code-block { background: #0d1117; border: 1px solid #30363d; border-radius: 8px; margin: 15px 0; overflow: hidden; }
.code-header { background: #161b22; padding: 6px 12px; display: flex; justify-content: space-between; align-items: center; border-bottom: 1px solid #30363d; }
.lang-tag { font-size: 11px; color: #8b949e; text-transform: uppercase; font-weight: bold; }
.copy-btn { font-size: 11px; cursor: pointer; color: #58a6ff; }
.copy-btn:hover { text-decoration: underline; }
pre { margin: 0; padding: 16px; overflow-x: auto; font-family: 'Fira Code', 'Consolas', monospace; font-size: 14px; color: #c9d1d9; }
/* Syntax Highlighting Colors */
.k { color: #ff7b72; } /* Keyword */
.s { color: #a5d6ff; } /* String */
.c { color: #8b949e; font-style: italic; } /* Comment */
.nf { color: #d2a8ff; } /* Function */
/* Cursor Animation */
.cursor { display: inline-block; width: 8px; height: 18px; background: var(--accent); vertical-align: text-bottom; animation: blink 1s step-end infinite; margin-left: 2px; }
@keyframes blink { 0%, 100% { opacity: 1; } 50% { opacity: 0; } }
</style>
"""
html_out = f"""{css}
<div class="ui-container">
<div class="stats-bar">
<div class="stat-pill {'stat-active' if is_gen else ''}">
{ '🟢 GENERATING' if is_gen else '⚪ COMPLETE' }
</div>
<div class="stat-pill">⏱️ {stats['time']:.1f}s</div>
<div class="stat-pill">⚡ {t_sec:.1f} T/s</div>
<div class="stat-pill">📝 {stats['tokens']} Tok</div>
</div>
"""
if thinking:
# Open by default if generating, closed if done
is_open = "open" if is_gen else ""
html_out += f"""
<details class="thinking-box" {is_open}>
<summary>🧠 Chain of Thought (Process)</summary>
<div class="thought-content">
{thinking_html} {cursor if not solution else ''}
</div>
</details>
"""
html_out += f"""
<div class="solution-box">
{solution_html} {cursor if solution or not thinking else ''}
</div>
</div>
"""
return html_out
parser = ModernUIParser()
def run_gen(prompt, temp, max_tokens):
if not prompt: return "Please enter a prompt."
gen = vibe_model.generate_response_streaming(prompt, temp, max_tokens)
for text, stats in gen:
if stats:
yield parser.parse_and_render(text, stats)
else:
yield f"<div style='color:red'>Error: {text}</div>"
def stop_action():
vibe_model.stop_generation()
# --- GRADIO INTERFACE ---
with gr.Blocks(
title="VibeThinker IDE",
theme=gr.themes.Base(
primary_hue="indigo",
neutral_hue="slate",
font=("Inter", "sans-serif")
),
css=".gradio-container { background-color: #0f1117 !important; border: none; }"
) as demo:
gr.Markdown("""
<div style="text-align: center; margin-bottom: 20px;">
<h1 style="color: white; font-size: 2rem;">⚡ VibeThinker IDE</h1>
<p style="color: #94a3b8;">Specialized 1.5B Model for Algorithms & Competitive Coding</p>
</div>
""")
with gr.Row():
# Left Column: Inputs
with gr.Column(scale=1, variant="panel"):
input_text = gr.Textbox(
label="Problem Statement",
lines=8,
placeholder="Paste a LeetCode problem or ask for a specific algorithm...",
elem_id="input-box"
)
with gr.Accordion("Settings", open=False):
temp = gr.Slider(0.1, 1.0, value=0.6, label="Temperature")
tokens = gr.Slider(1024, 32000, value=8192, label="Max Tokens")
with gr.Row():
btn_run = gr.Button("▶ Run", variant="primary", scale=2)
btn_stop = gr.Button("⏹ Stop", variant="stop", scale=1)
# Right Column: Output
with gr.Column(scale=2):
out_html = gr.HTML(label="Result Console")
btn_run.click(run_gen, inputs=[input_text, temp, tokens], outputs=out_html)
btn_stop.click(stop_action, None, None)
gr.Examples(
examples=[
["Determine if a Sudoku board is valid. Provide a Python solution with O(1) space complexity if possible."],
["Explain the Knuth-Morris-Pratt (KMP) algorithm and implement it in Python."],
["Solve the 'Trapping Rain Water' problem using the two-pointer approach."],
],
inputs=input_text
)
if __name__ == "__main__":
demo.launch()