Spaces:
Runtime error
Runtime error
| import os | |
| import asyncio | |
| import logging | |
| from datetime import datetime, timedelta | |
| from telegram import Update | |
| from telegram.ext import ApplicationBuilder, CommandHandler, MessageHandler, filters, ContextTypes | |
| from transformers import AutoTokenizer, AutoModelForSeq2SeqLM | |
| import re | |
| import nltk | |
| from nltk.tokenize import sent_tokenize | |
| import torch | |
| # تنظیم مسیر cache برای Transformers | |
| #cache_dir = '/tmp/transformers_cache' | |
| #os.environ['TRANSFORMERS_CACHE'] = cache_dir | |
| #os.environ['HF_HOME'] = cache_dir | |
| #os.makedirs(cache_dir, exist_ok=True) | |
| # تنظیم مسیر nltk | |
| try: | |
| nltk.download('punkt', download_dir='./nltk_data', quiet=True) | |
| nltk.data.path.append('./nltk_data') | |
| except: | |
| pass | |
| # تنظیمات لاگ | |
| logging.basicConfig( | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
| level=logging.INFO | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # اطلاعات مدل | |
| MODEL_NAME = "nafisehNik/mt5-persian-summary" | |
| model = None | |
| tokenizer = None | |
| # ذخیره پیامها برای هر چت | |
| MAX_MESSAGES_PER_CHAT = 1000 | |
| class MessageStore: | |
| def __init__(self): | |
| self.messages = {} | |
| def add_message(self, chat_id, user_id, username, text, timestamp): | |
| if chat_id not in self.messages: | |
| self.messages[chat_id] = [] | |
| if len(self.messages[chat_id]) >= MAX_MESSAGES_PER_CHAT: | |
| self.messages[chat_id] = self.messages[chat_id][-MAX_MESSAGES_PER_CHAT // 2:] | |
| self.messages[chat_id].append({ | |
| "user_id": user_id, | |
| "username": username, | |
| "text": text, | |
| "timestamp": timestamp | |
| }) | |
| def get_messages(self, chat_id, count=50, hours_back=None): | |
| if chat_id not in self.messages: | |
| return [] | |
| messages = self.messages[chat_id] | |
| if hours_back: | |
| cutoff = datetime.now() - timedelta(hours=hours_back) | |
| messages = [m for m in messages if m["timestamp"] >= cutoff] | |
| return messages[-count:] if count else messages | |
| message_store = MessageStore() | |
| def load_persian_model(): | |
| global model, tokenizer | |
| try: | |
| logger.info(f"Loading Persian model: {MODEL_NAME}") | |
| tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME) | |
| model = AutoModelForSeq2SeqLM.from_pretrained( | |
| MODEL_NAME, | |
| torch_dtype=torch.float32 | |
| ) | |
| model.eval() | |
| logger.info("Model loaded successfully") | |
| except Exception as e: | |
| logger.error(f"Error loading Persian model: {e}") | |
| model, tokenizer = None, None | |
| def preprocess_persian_text(text): | |
| text = re.sub(r'\s+', ' ', text) | |
| text = re.sub(r'\n+', '\n', text) | |
| text = re.sub(r'\d{2}:\d{2}', '', text) | |
| text = re.sub(r'@\w+', '', text) | |
| text = re.sub(r'http\S+', '', text) | |
| text = re.sub(r'[^\w\s\u0600-\u06FF]', ' ', text) | |
| return text.strip() | |
| def chunk_text_smart(text, max_length=300): | |
| try: | |
| sentences = sent_tokenize(text) | |
| except: | |
| sentences = re.split(r'[.!?؟]+', text) | |
| chunks = [] | |
| current = "" | |
| for sentence in sentences: | |
| if len(current + sentence) < max_length: | |
| current += sentence + " " | |
| else: | |
| if current: | |
| chunks.append(current.strip()) | |
| current = sentence + " " | |
| if current: | |
| chunks.append(current.strip()) | |
| return chunks | |
| def summarize_messages(messages_data): | |
| global model, tokenizer | |
| if not model or not tokenizer: | |
| return "❌ مدل خلاصهسازی در دسترس نیست" | |
| if not messages_data: | |
| return "❌ پیامی برای خلاصهسازی یافت نشد" | |
| try: | |
| text = "" | |
| for msg in messages_data: | |
| username = msg['username'] or "کاربر" | |
| text += f"{username}: {msg['text']}\n" | |
| text = preprocess_persian_text(text) | |
| if len(text) < 100: | |
| return "❌ متن برای خلاصهسازی بسیار کوتاه است" | |
| chunks = chunk_text_smart(text, max_length=400) | |
| summaries = [] | |
| for chunk in chunks[:2]: | |
| inputs = tokenizer.encode(f"خلاصه: {chunk}", return_tensors="pt", max_length=512, truncation=True) | |
| output = model.generate( | |
| inputs, | |
| max_length=100, | |
| min_length=30, | |
| length_penalty=1.2, | |
| num_beams=3, | |
| early_stopping=True, | |
| no_repeat_ngram_size=3 | |
| ) | |
| summary = tokenizer.decode(output[0], skip_special_tokens=True) | |
| summaries.append(summary.replace("خلاصه:", "").strip()) | |
| if not summaries: | |
| return "❌ خطا در خلاصهسازی" | |
| stats = f"\n\n📊 آمار: {len(messages_data)} پیام، {len(text)} کاراکتر" | |
| return f"📝 خلاصه گفتگو:\n\n" + "\n\n".join(summaries) + stats | |
| except Exception as e: | |
| logger.error(f"Summarization error: {e}") | |
| return "❌ خطا در خلاصهسازی" | |
| def parse_summary_request(text): | |
| text = text.lower() | |
| count = 50 | |
| hours = None | |
| match = re.search(r'(\d+)\s*(پیام|تا|عدد)', text) | |
| if match: | |
| count = min(int(match.group(1)), 200) | |
| match = re.search(r'(\d+)\s*(ساعت|روز)', text) | |
| if match: | |
| hours = int(match.group(1)) | |
| if "روز" in match.group(2): | |
| hours *= 24 | |
| hours = min(hours, 72) | |
| return count, hours | |
| async def start(update: Update, context: ContextTypes.DEFAULT_TYPE): | |
| await update.message.reply_text("🤖 سلام! برای خلاصهسازی، عبارت «خلاصه» به همراه تعداد پیام یا مدت زمان را بفرست.") | |
| async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE): | |
| message = update.message | |
| if not message or not message.text: | |
| return | |
| chat_id = message.chat_id | |
| user_id = message.from_user.id | |
| username = message.from_user.username | |
| text = message.text.strip() | |
| timestamp = message.date or datetime.utcnow() | |
| message_store.add_message(chat_id, user_id, username, text, timestamp) | |
| if "خلاصه" in text: | |
| count, hours = parse_summary_request(text) | |
| msgs = message_store.get_messages(chat_id, count, hours) | |
| summary = summarize_messages(msgs) | |
| await update.message.reply_text(summary) | |
| if __name__ == "__main__": | |
| load_persian_model() | |
| TOKEN = os.getenv("BOT_TOKEN") # یا مستقیم وارد کن: 'your_token_here' | |
| if not TOKEN: | |
| raise ValueError("❌ توکن تلگرام تعریف نشده.") | |
| app = ApplicationBuilder().token(TOKEN).build() | |
| app.add_handler(CommandHandler("start", start)) | |
| app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message)) | |
| logger.info("Starting bot...") | |
| app.run_polling() | |