!pip install sentencepiece import sentencepiece as spm import os, json, numpy as np, tensorflow as tf from tensorflow.keras import layers, Model from tensorflow.keras.layers import Dense import requests from tensorflow import keras from tensorflow.keras import layers import tensorflow.keras.backend as K print('1') tf.get_logger().setLevel("ERROR") SEED = 42 tf.random.set_seed(SEED) np.random.seed(SEED) # TPU 초기화 try: resolver = tf.distribute.cluster_resolver.TPUClusterResolver(tpu="local") tf.tpu.experimental.initialize_tpu_system(resolver) strategy = tf.distribute.TPUStrategy(resolver) print("✅ TPU 초기화 완료:", resolver.cluster_spec().as_dict()) on_tpu = True except Exception as e: print("⚠️ TPU 미사용, GPU/CPU로 진행:", e) strategy = tf.distribute.get_strategy() on_tpu = False # Mixed precision from tensorflow.keras import mixed_precision policy = mixed_precision.Policy("mixed_bfloat16" if on_tpu else "float32") mixed_precision.set_global_policy(policy) print("✅ Mixed precision:", policy) # ======================= # 1) 파일 다운로드 # ======================= def download_file(url, save_path): r = requests.get(url, stream=True) r.raise_for_status() with open(save_path, "wb") as f: for chunk in r.iter_content(8192*2): f.write(chunk) print(f"✅ {save_path} 저장됨") DATA_PATH = "corpus.txt" TOKENIZER_PATH = "ko_unigram.model" if not os.path.exists(DATA_PATH): download_file( "https://huggingface.co/datasets/Yuchan5386/Prototype/resolve/main/corpus_ko.txt?download=true", DATA_PATH ) if not os.path.exists(TOKENIZER_PATH): download_file( "https://huggingface.co/Yuchan5386/inlam-100m/resolve/main/ko_unigram.model?download=true", TOKENIZER_PATH ) sp = spm.SentencePieceProcessor(TOKENIZER_PATH) pad_id = sp.piece_to_id("") if sp.piece_to_id("") != -1 else 0 start_id = sp.piece_to_id("") sep_id = sp.piece_to_id("") end_id = sp.piece_to_id("") unk_id = sp.piece_to_id("") vocab_size = sp.get_piece_size() print(f"✅ Vocabulary size: {vocab_size}") max_len = 512 batch_size = 32 def text_to_ids(text): return sp.encode(text, out_type=int) def ids_to_text(ids): return sp.decode(ids) def txt_stream(file_path, num_lines=None): with open(file_path, "r", encoding="utf-8") as f: for i, line in enumerate(f): if num_lines is not None and i >= num_lines: break # 지정한 라인까지만 읽음 text = line.strip() if not text: continue ids = text_to_ids(text) ids = ids[:max_len - 1] # 마지막에 넣기 위해 -1 full_input = ids + [end_id] pad_len = max_len - len(full_input) full_input += [pad_id] * pad_len # target = next-token shifted sequence target = full_input[1:] + [pad_id] yield ( tf.convert_to_tensor(full_input, dtype=tf.int32), tf.convert_to_tensor(target, dtype=tf.int32) ) # Dataset 생성 (예: 처음 10,000라인만) dataset = tf.data.Dataset.from_generator( lambda: txt_stream(DATA_PATH, num_lines=100000), output_signature=( tf.TensorSpec(shape=(max_len,), dtype=tf.int32), tf.TensorSpec(shape=(max_len,), dtype=tf.int32), ) ) dataset = dataset.shuffle(2000, seed=SEED).batch(batch_size, drop_remainder=True).prefetch(tf.data.AUTOTUNE) with strategy.scope(): dist_dataset = strategy.experimental_distribute_dataset(dataset) class SwiGLU(layers.Layer): def __init__(self, d_model): super().__init__() self.W = layers.Dense(3500, dtype='float32') self.W1 = layers.Dense(d_model, dtype='float32') def call(self, x): x = tf.cast(x, tf.float32) x = self.W(x) a, b = tf.split(x, 2, axis=-1) out = self.W1(tf.nn.silu(a) * b) return tf.cast(out, x.dtype) class SparseCausalAttention(tf.keras.layers.Layer): def __init__(self, num_heads, head_dim, window_size=8, **kwargs): super().__init__(**kwargs) self.num_heads = num_heads self.head_dim = head_dim self.window_size = window_size # 로컬 윈도우 크기 def build(self, input_shape): self.q_dense = Dense(self.num_heads * self.head_dim) self.k_dense = Dense(self.num_heads * self.head_dim) self.v_dense = Dense(self.num_heads * self.head_dim) self.out_dense = Dense(input_shape[-1]) def call(self, x): batch_size, seq_len, dim = tf.shape(x)[0], tf.shape(x)[1], tf.shape(x)[2] # Q, K, V q = tf.reshape(self.q_dense(x), (batch_size, seq_len, self.num_heads, self.head_dim)) k = tf.reshape(self.k_dense(x), (batch_size, seq_len, self.num_heads, self.head_dim)) v = tf.reshape(self.v_dense(x), (batch_size, seq_len, self.num_heads, self.head_dim)) # Transpose for matmul: (batch, heads, seq, head_dim) q = tf.transpose(q, perm=[0, 2, 1, 3]) k = tf.transpose(k, perm=[0, 2, 1, 3]) v = tf.transpose(v, perm=[0, 2, 1, 3]) # 스케일 scale = tf.math.sqrt(tf.cast(self.head_dim, tf.float32)) q = q / scale # 희소 마스크 계산: 로컬 윈도우 # 각 토큰 i는 max(i-window_size,0) ~ i까지 attention attn_scores = tf.matmul(q, k, transpose_b=True) # (batch, heads, seq, seq) mask = tf.linalg.band_part(tf.ones((seq_len, seq_len)), -1, 0) # causal mask # 윈도우 크기 제한 band_mask = tf.linalg.band_part(tf.ones((seq_len, seq_len)), self.window_size, 0) mask = mask * band_mask mask = tf.reshape(mask, (1, 1, seq_len, seq_len)) # 브로드캐스트 가능 attn_scores = tf.where(mask > 0, attn_scores, tf.fill(tf.shape(attn_scores), -1e9)) attn_probs = tf.nn.softmax(attn_scores, axis=-1) attn_output = tf.matmul(attn_probs, v) # (batch, heads, seq, head_dim) # 합치기 attn_output = tf.transpose(attn_output, perm=[0, 2, 1, 3]) attn_output = tf.reshape(attn_output, (batch_size, seq_len, self.num_heads*self.head_dim)) return self.out_dense(attn_output) class Lo(layers.Layer): def __init__(self, d_model): super().__init__() self.d = layers.Dense(64, activation='silu') self.w = layers.Dense(d_model) self.norm = layers.LayerNormalization(epsilon=1e-5, dtype='float32') def call(self, x): p = self.d(x) p = self.w(p) return self.norm(p) + x class Block(layers.Layer): def __init__(self, d_model): super().__init__() self.lou = SparseCausalAttention(num_heads=2, head_dim=64) self.glu = SwiGLU(d_model) self.norm = layers.LayerNormalization(epsilon=1e-5, dtype='float32') self.lo = Lo(d_model) def call(self, x): x = self.lou(x) x = self.norm(self.glu(x)) + x x = self.lo(x) return x class ReLM(tf.keras.Model): def __init__(self, vocab_size, max_seq_len, d_model, n_layers, dropout_rate=0.1): super().__init__() self.token_embedding = layers.Embedding(vocab_size, d_model) self.pos_embedding = layers.Embedding(max_seq_len, d_model) self.blocks = [Block(d_model) for _ in range(n_layers)] self.ln_f = layers.LayerNormalization(epsilon=1e-5, dtype="float32") def call(self, x, training=False): batch_size, seq_len = tf.shape(x)[0], tf.shape(x)[1] positions = tf.range(seq_len)[tf.newaxis, :] x = self.token_embedding(x) + self.pos_embedding(positions) for block in self.blocks: x = block(x) x = self.ln_f(x) embedding_matrix = tf.cast(self.token_embedding.embeddings, x.dtype) logits = tf.matmul(x, embedding_matrix, transpose_b=True) return tf.cast(logits, tf.float32) loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True, reduction='none') def masked_loss(y_true, y_pred): loss = loss_fn(y_true, y_pred) mask = tf.cast(tf.not_equal(y_true, pad_id), tf.float32) masked_loss = tf.reduce_sum(loss * mask) / tf.reduce_sum(mask) return masked_loss def masked_perplexity(y_true, y_pred): loss = loss_fn(y_true, y_pred) mask = tf.cast(tf.not_equal(y_true, pad_id), tf.float32) avg_loss = tf.reduce_sum(loss * mask) / tf.reduce_sum(mask) return tf.exp(tf.minimum(avg_loss, 10.0)) # 수치 안정성 확보 def create_lr_schedule(initial_lr=5e-5, decay_steps=10000, decay_rate=0.9): return tf.keras.optimizers.schedules.ExponentialDecay( initial_learning_rate=initial_lr, decay_steps=decay_steps, decay_rate=decay_rate, staircase=False ) # 모델 생성 model = ReLM( vocab_size=vocab_size, max_seq_len=max_len, d_model=128, n_layers=2 ) # 옵티마이저 설정 optimizer = tf.keras.optimizers.Adam( learning_rate=create_lr_schedule(), beta_1=0.9, beta_2=0.95, epsilon=1e-8, clipnorm=1.0 ) # 모델 컴파일 model.compile( optimizer=optimizer, loss=masked_loss, metrics=[ masked_perplexity ] ) # 더미 인풋으로 모델 초기화 dummy_input = np.zeros((1, max_len), dtype=np.int32) model(dummy_input) model.summary() history = model.fit(dataset, epochs=1, verbose=1) # 가중치 저장 model.save_weights("model.weights.h5") print("모델 가중치 저장 완료!") def generate_text_topp(model, prompt, max_len=150, max_gen=150, p=0.9, temperature=0.8, min_len=20): model_input = text_to_ids(f" {prompt}") model_input = model_input[:max_len] generated = list(model_input) for step in range(max_gen): if len(generated) > max_len: input_seq = generated[-max_len:] else: input_seq = generated input_padded = np.pad(input_seq, (0, max_len - len(input_seq)), constant_values=pad_id) input_tensor = tf.convert_to_tensor([input_padded]) logits = model(input_tensor, training=False) next_token_logits = logits[0, len(input_seq) - 1].numpy() next_token_logits[end_id] -= 5.0 next_token_logits[pad_id] -= 10.0 probs = tf.nn.softmax(next_token_logits / temperature).numpy() sorted_indices = np.argsort(probs)[::-1] sorted_probs = probs[sorted_indices] cumulative_probs = np.cumsum(sorted_probs) cutoff = np.searchsorted(cumulative_probs, p) top_indices = sorted_indices[:cutoff + 1] top_probs = sorted_probs[:cutoff + 1] top_probs /= np.sum(top_probs) next_token_id = np.random.choice(top_indices, p=top_probs) if next_token_id == end_id and len(generated) >= min_len: break generated.append(int(next_token_id)) return ids_to_text(generated) print("\n\n===== 생성 결과 =====") print(generate_text_topp(model, "지난 2년 동안 출연연이 국가가 필요한 연구를", p=0.9))