Spaces:
Runtime error
Runtime error
| import torch | |
| import gradio as gr | |
| from transformers import AutoModelForCausalLM, AutoTokenizer, PreTrainedModel, PretrainedConfig | |
| from huggingface_hub import hf_hub_download | |
| import json | |
| import torch.nn as nn | |
| import torch.nn.functional as F | |
| import math | |
| # Define the model architecture | |
| class SmolLM2Config(PretrainedConfig): | |
| model_type = "smollm2" | |
| def __init__( | |
| self, | |
| vocab_size=49152, | |
| hidden_size=576, | |
| intermediate_size=1536, | |
| num_hidden_layers=30, | |
| num_attention_heads=9, | |
| num_key_value_heads=3, | |
| hidden_act="silu", | |
| max_position_embeddings=2048, | |
| initializer_range=0.041666666666666664, | |
| rms_norm_eps=1e-5, | |
| use_cache=True, | |
| pad_token_id=None, | |
| bos_token_id=0, | |
| eos_token_id=0, | |
| tie_word_embeddings=True, | |
| rope_theta=10000.0, | |
| **kwargs | |
| ): | |
| self.vocab_size = vocab_size | |
| self.hidden_size = hidden_size | |
| self.intermediate_size = intermediate_size | |
| self.num_hidden_layers = num_hidden_layers | |
| self.num_attention_heads = num_attention_heads | |
| self.num_key_value_heads = num_key_value_heads | |
| self.hidden_act = hidden_act | |
| self.max_position_embeddings = max_position_embeddings | |
| self.initializer_range = initializer_range | |
| self.rms_norm_eps = rms_norm_eps | |
| self.use_cache = use_cache | |
| self.rope_theta = rope_theta | |
| super().__init__( | |
| pad_token_id=pad_token_id, | |
| bos_token_id=bos_token_id, | |
| eos_token_id=eos_token_id, | |
| tie_word_embeddings=tie_word_embeddings, | |
| **kwargs | |
| ) | |
| # Register the model architecture | |
| from transformers import AutoConfig | |
| AutoConfig.register("smollm2", SmolLM2Config) | |
| class RMSNorm(nn.Module): | |
| def __init__(self, hidden_size, eps=1e-5): | |
| super().__init__() | |
| self.weight = nn.Parameter(torch.ones(hidden_size)) | |
| self.eps = eps | |
| def forward(self, x): | |
| variance = x.pow(2).mean(-1, keepdim=True) | |
| x = x * torch.rsqrt(variance + self.eps) | |
| return self.weight * x | |
| def precompute_rope_frequencies(dim: int, max_position_embeddings: int, theta: float = 10000.0): | |
| position = torch.arange(max_position_embeddings).unsqueeze(1) # [seq_len, 1] | |
| div_term = theta ** (torch.arange(0, dim, 2).float() / dim) # [dim/2] | |
| freqs = position / div_term # [seq_len, dim/2] | |
| return freqs | |
| def apply_rotary_embeddings(x: torch.Tensor, freqs: torch.Tensor): | |
| # x shape: [batch, seq_len, heads, head_dim] | |
| # freqs shape: [seq_len, head_dim/2] | |
| x_rot = x.float() | |
| # Reshape freqs to match x's dimensions | |
| freqs = freqs.unsqueeze(0).unsqueeze(2) # [1, seq_len, 1, dim/2] | |
| # Split channels for rotation | |
| x1, x2 = x_rot[..., :x_rot.shape[-1]//2], x_rot[..., x_rot.shape[-1]//2:] | |
| # Apply rotary embeddings | |
| cos = torch.cos(freqs).to(x.device) | |
| sin = torch.sin(freqs).to(x.device) | |
| # Ensure broadcasting dimensions match | |
| cos = cos.expand_as(x1) | |
| sin = sin.expand_as(x1) | |
| # Rotate x1 and x2 | |
| x1_rot = x1 * cos - x2 * sin | |
| x2_rot = x2 * cos + x1 * sin | |
| # Concatenate back | |
| return torch.cat([x1_rot, x2_rot], dim=-1).to(x.dtype) | |
| class LlamaAttention(nn.Module): | |
| def __init__(self, config: SmolLM2Config): | |
| super().__init__() | |
| self.hidden_size = config.hidden_size | |
| self.num_heads = config.num_attention_heads | |
| self.num_kv_heads = config.num_key_value_heads | |
| self.head_dim = config.hidden_size // config.num_attention_heads | |
| # Adjust projections to match head dimensions | |
| self.q_proj = nn.Linear(config.hidden_size, self.num_heads * self.head_dim, bias=False) | |
| self.k_proj = nn.Linear(config.hidden_size, self.num_kv_heads * self.head_dim, bias=False) | |
| self.v_proj = nn.Linear(config.hidden_size, self.num_kv_heads * self.head_dim, bias=False) | |
| self.o_proj = nn.Linear(self.num_heads * self.head_dim, config.hidden_size, bias=False) | |
| # Initialize rotary embeddings | |
| self.register_buffer( | |
| "rope_freqs", | |
| precompute_rope_frequencies( | |
| self.head_dim, # Use full head_dim for frequencies | |
| config.max_position_embeddings, | |
| config.rope_theta | |
| ), | |
| persistent=False | |
| ) | |
| def forward(self, hidden_states, attention_mask=None): | |
| batch_size, seq_length, _ = hidden_states.size() | |
| # Project and reshape | |
| q = self.q_proj(hidden_states).view(batch_size, seq_length, self.num_heads, self.head_dim) | |
| k = self.k_proj(hidden_states).view(batch_size, seq_length, self.num_kv_heads, self.head_dim) | |
| v = self.v_proj(hidden_states).view(batch_size, seq_length, self.num_kv_heads, self.head_dim) | |
| # Apply rotary embeddings | |
| q = apply_rotary_embeddings(q, self.rope_freqs[:seq_length]) | |
| k = apply_rotary_embeddings(k, self.rope_freqs[:seq_length]) | |
| # Repeat k/v heads if num_kv_heads < num_heads | |
| if self.num_kv_heads < self.num_heads: | |
| k = k.repeat_interleave(self.num_heads // self.num_kv_heads, dim=2) | |
| v = v.repeat_interleave(self.num_heads // self.num_kv_heads, dim=2) | |
| # Scaled dot-product attention | |
| q = q.transpose(1, 2) # (batch, num_heads, seq_len, head_dim) | |
| k = k.transpose(1, 2) | |
| v = v.transpose(1, 2) | |
| attention_scores = torch.matmul(q, k.transpose(-2, -1)) / math.sqrt(self.head_dim) | |
| if attention_mask is not None: | |
| attention_scores = attention_scores + attention_mask | |
| attention_probs = F.softmax(attention_scores, dim=-1) | |
| context = torch.matmul(attention_probs, v) | |
| context = context.transpose(1, 2).contiguous() | |
| context = context.view(batch_size, seq_length, -1) | |
| return self.o_proj(context) | |
| class LlamaMLP(nn.Module): | |
| def __init__(self, config: SmolLM2Config): | |
| super().__init__() | |
| self.gate_proj = nn.Linear(config.hidden_size, config.intermediate_size, bias=False) | |
| self.up_proj = nn.Linear(config.hidden_size, config.intermediate_size, bias=False) | |
| self.down_proj = nn.Linear(config.intermediate_size, config.hidden_size, bias=False) | |
| self.act_fn = nn.SiLU() | |
| def forward(self, x): | |
| gate = self.act_fn(self.gate_proj(x)) | |
| up = self.up_proj(x) | |
| return self.down_proj(gate * up) | |
| class LlamaDecoderLayer(nn.Module): | |
| def __init__(self, config: SmolLM2Config): | |
| super().__init__() | |
| self.self_attn = LlamaAttention(config) | |
| self.mlp = LlamaMLP(config) | |
| self.input_layernorm = RMSNorm(config.hidden_size, config.rms_norm_eps) | |
| self.post_attention_layernorm = RMSNorm(config.hidden_size, config.rms_norm_eps) | |
| def forward(self, hidden_states, attention_mask=None): | |
| residual = hidden_states | |
| hidden_states = self.input_layernorm(hidden_states) | |
| hidden_states = self.self_attn(hidden_states, attention_mask) | |
| hidden_states = residual + hidden_states | |
| residual = hidden_states | |
| hidden_states = self.post_attention_layernorm(hidden_states) | |
| hidden_states = self.mlp(hidden_states) | |
| hidden_states = residual + hidden_states | |
| return hidden_states | |
| class SmolLM2ForCausalLM(PreTrainedModel): | |
| config_class = SmolLM2Config | |
| def __init__(self, config): | |
| super().__init__(config) | |
| self.config = config | |
| self.embed_tokens = nn.Embedding(config.vocab_size, config.hidden_size) | |
| self.layers = nn.ModuleList([LlamaDecoderLayer(config) for _ in range(config.num_hidden_layers)]) | |
| self.norm = RMSNorm(config.hidden_size, config.rms_norm_eps) | |
| # Add lm_head before weight tying | |
| self.lm_head = nn.Linear(config.hidden_size, config.vocab_size, bias=False) | |
| # Initialize weights | |
| self.apply(self._init_weights) | |
| # Tie weights if configured | |
| if config.tie_word_embeddings: | |
| self.lm_head.weight = self.embed_tokens.weight | |
| def _init_weights(self, module): | |
| if isinstance(module, nn.Linear): | |
| torch.nn.init.normal_(module.weight, mean=0.0, std=self.config.initializer_range) | |
| if module.bias is not None: | |
| torch.nn.init.zeros_(module.bias) | |
| elif isinstance(module, nn.Embedding): | |
| torch.nn.init.normal_(module.weight, mean=0.0, std=self.config.initializer_range) | |
| def forward(self, input_ids=None, attention_mask=None, labels=None, **kwargs): | |
| hidden_states = self.embed_tokens(input_ids) | |
| # Create causal attention mask if none provided | |
| if attention_mask is None: | |
| # Create causal mask | |
| seq_length = input_ids.size(1) | |
| # [batch_size, 1, seq_length, seq_length] | |
| causal_mask = torch.triu( | |
| torch.ones((seq_length, seq_length), dtype=torch.bool, device=input_ids.device), | |
| diagonal=1 | |
| ).unsqueeze(0).unsqueeze(0) | |
| attention_mask = torch.zeros( | |
| (1, 1, seq_length, seq_length), | |
| dtype=hidden_states.dtype, | |
| device=hidden_states.device | |
| ) | |
| attention_mask.masked_fill_(causal_mask, float("-inf")) | |
| for layer in self.layers: | |
| hidden_states = layer(hidden_states, attention_mask) | |
| hidden_states = self.norm(hidden_states) | |
| logits = self.lm_head(hidden_states) | |
| loss = None | |
| if labels is not None: | |
| loss = F.cross_entropy(logits.view(-1, logits.size(-1)), labels.view(-1)) | |
| return logits if loss is None else (loss, logits) | |
| def prepare_inputs_for_generation(self, input_ids, **kwargs): | |
| return { | |
| "input_ids": input_ids, | |
| "attention_mask": kwargs.get("attention_mask", None) | |
| } | |
| def generate( | |
| self, | |
| input_ids, | |
| max_length=100, | |
| temperature=0.7, | |
| top_k=50, | |
| do_sample=True, | |
| num_return_sequences=1, | |
| pad_token_id=None, | |
| eos_token_id=None, | |
| **kwargs | |
| ): | |
| cur_len = input_ids.shape[1] | |
| batch_size = input_ids.shape[0] | |
| if max_length < cur_len: | |
| max_length = cur_len | |
| unfinished_sequences = torch.ones(batch_size, dtype=torch.long, device=input_ids.device) | |
| while cur_len < max_length: | |
| # Prepare model inputs | |
| model_inputs = self.prepare_inputs_for_generation(input_ids) | |
| # Forward pass | |
| with torch.no_grad(): | |
| outputs = self(**model_inputs) | |
| next_token_logits = outputs[:, -1, :] | |
| # Temperature scaling | |
| if temperature != 1.0 and temperature > 0: | |
| next_token_logits = next_token_logits / temperature | |
| # Top-k filtering | |
| if top_k > 0: | |
| indices_to_remove = next_token_logits < torch.topk(next_token_logits, top_k)[0][..., -1, None] | |
| next_token_logits[indices_to_remove] = float('-inf') | |
| # Sample or greedy | |
| if do_sample: | |
| probs = F.softmax(next_token_logits, dim=-1) | |
| next_tokens = torch.multinomial(probs, num_samples=1) | |
| else: | |
| next_tokens = torch.argmax(next_token_logits, dim=-1) | |
| next_tokens = next_tokens.unsqueeze(-1) | |
| # Append next tokens | |
| input_ids = torch.cat([input_ids, next_tokens], dim=-1) | |
| cur_len = input_ids.shape[1] | |
| # Early stopping if all sequences have reached the EOS token | |
| if eos_token_id is not None: | |
| unfinished_sequences = unfinished_sequences.mul( | |
| next_tokens.squeeze(-1).ne(eos_token_id).long() | |
| ) | |
| if unfinished_sequences.max() == 0: | |
| break | |
| return input_ids | |
| # Register the model | |
| AutoModelForCausalLM.register(SmolLM2Config, SmolLM2ForCausalLM) | |
| # Cache for model and tokenizer | |
| MODEL = None | |
| TOKENIZER = None | |
| CONFIG = None | |
| def initialize(): | |
| global MODEL, TOKENIZER, CONFIG | |
| if MODEL is None: | |
| print("Loading model and tokenizer...") | |
| model_id = "jatingocodeo/SmolLM2" | |
| try: | |
| # Download and load config | |
| print("Loading config...") | |
| config_path = hf_hub_download(repo_id=model_id, filename="config.json") | |
| with open(config_path, 'r') as f: | |
| config_dict = json.load(f) | |
| CONFIG = SmolLM2Config(**config_dict) | |
| # Load tokenizer | |
| print("Loading tokenizer...") | |
| TOKENIZER = AutoTokenizer.from_pretrained( | |
| model_id, | |
| model_max_length=CONFIG.max_position_embeddings, | |
| padding_side="left", | |
| truncation_side="left", | |
| trust_remote_code=True | |
| ) | |
| # Make sure we're using the correct special tokens | |
| special_tokens = { | |
| 'bos_token': '<|endoftext|>', | |
| 'eos_token': '<|endoftext|>', | |
| 'unk_token': '<|endoftext|>', | |
| 'pad_token': '<|endoftext|>' # Using endoftext as pad token since it's not specified | |
| } | |
| TOKENIZER.add_special_tokens(special_tokens) | |
| # Load model weights | |
| print("Loading model...") | |
| weights_path = hf_hub_download(repo_id=model_id, filename="pytorch_model.bin") | |
| # Initialize model | |
| MODEL = SmolLM2ForCausalLM(CONFIG) | |
| # Resize token embeddings to match tokenizer | |
| MODEL.resize_token_embeddings(len(TOKENIZER)) | |
| # Load state dict | |
| state_dict = torch.load(weights_path, map_location="cpu") | |
| MODEL.load_state_dict(state_dict) | |
| # Move model to device | |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| MODEL = MODEL.to(device) | |
| print(f"Model loaded successfully on {device}") | |
| except Exception as e: | |
| print(f"Error initializing: {str(e)}") | |
| raise | |
| def generate_text(prompt, max_length=100, temperature=0.7, top_k=50): | |
| # Initialize if not already done | |
| if MODEL is None: | |
| try: | |
| initialize() | |
| except Exception as e: | |
| return f"Failed to initialize model: {str(e)}" | |
| try: | |
| # Process prompt | |
| if not prompt.strip(): | |
| return "Please enter a prompt." | |
| # Add BOS token if needed | |
| if not prompt.startswith(TOKENIZER.bos_token): | |
| prompt = TOKENIZER.bos_token + prompt | |
| # Encode prompt | |
| encoded = TOKENIZER.encode_plus( | |
| prompt, | |
| add_special_tokens=True, | |
| return_tensors="pt", | |
| padding=True, | |
| truncation=True, | |
| max_length=CONFIG.max_position_embeddings | |
| ) | |
| input_ids = encoded["input_ids"].to(MODEL.device) | |
| attention_mask = encoded["attention_mask"].to(MODEL.device) | |
| # Generate | |
| with torch.no_grad(): | |
| outputs = MODEL.generate( | |
| input_ids, | |
| attention_mask=attention_mask, | |
| max_length=min(max_length + len(input_ids[0]), CONFIG.max_position_embeddings), | |
| temperature=max(0.1, min(temperature, 1.0)), # Clamp temperature | |
| top_k=max(1, min(top_k, 100)), # Clamp top_k | |
| do_sample=True if temperature > 0 else False, | |
| num_return_sequences=1, | |
| pad_token_id=TOKENIZER.pad_token_id, | |
| eos_token_id=TOKENIZER.eos_token_id, | |
| ) | |
| # Decode and return | |
| generated_text = TOKENIZER.decode(outputs[0], skip_special_tokens=True) | |
| return generated_text.strip() | |
| except Exception as e: | |
| import traceback | |
| traceback.print_exc() | |
| return f"Error during text generation: {str(e)}" | |
| # Create Gradio interface | |
| iface = gr.Interface( | |
| fn=generate_text, | |
| inputs=[ | |
| gr.Textbox(label="Prompt", placeholder="Enter your prompt here...", lines=2), | |
| gr.Slider(minimum=10, maximum=200, value=100, step=1, label="Max Length"), | |
| gr.Slider(minimum=0.1, maximum=1.0, value=0.7, step=0.1, label="Temperature"), | |
| gr.Slider(minimum=1, maximum=100, value=50, step=1, label="Top K"), | |
| ], | |
| outputs=gr.Textbox(label="Generated Text", lines=5), | |
| title="SmolLM2 Text Generator", | |
| description="Generate text using the fine-tuned SmolLM2 model. Adjust parameters to control the generation.", | |
| examples=[ | |
| ["Once upon a time", 100, 0.7, 50], | |
| ["The quick brown fox", 150, 0.8, 40], | |
| ], | |
| allow_flagging="never" | |
| ) | |
| # Initialize on startup | |
| try: | |
| initialize() | |
| except Exception as e: | |
| print(f"Warning: Model initialization failed: {str(e)}") | |
| print("Model will be initialized on first request") | |
| if __name__ == "__main__": | |
| iface.launch() |