Spaces:
Sleeping
Sleeping
| from smolagents import DuckDuckGoSearchTool | |
| from smolagents import Tool, tool | |
| import random | |
| from huggingface_hub import list_models | |
| import os | |
| import requests | |
| import wikipedia | |
| from markdownify import markdownify as to_markdown | |
| from google.generativeai import types, configure, GenerativeModel | |
| from bs4 import BeautifulSoup | |
| from sympy import sympify, SympifyError, simplify | |
| # Import configuration manager | |
| try: | |
| from config import config, safe_getenv | |
| except ImportError: | |
| # Fallback if config.py doesn't exist | |
| class DummyConfig: | |
| def has_key(self, key): return bool(os.getenv(key)) | |
| def get_key(self, key): return os.getenv(key) | |
| config = DummyConfig() | |
| def safe_getenv(key, default=None, feature_name=None): | |
| return os.getenv(key, default) | |
| # Try to import utils, but don't fail if it doesn't exist | |
| try: | |
| import utils | |
| except ImportError: | |
| utils = None | |
| # Safe API key handling | |
| google_search_key = safe_getenv('GOOGLE_SEARCH_API_KEY', feature_name="Google Search") | |
| google_search_engine = safe_getenv('GOOGLE_SEARCH_ENGINE_ID', feature_name="Google Search") | |
| if google_search_key: | |
| print(f"Using Google Search API Key ending in: ...{google_search_key[-4:]}") | |
| if google_search_engine: | |
| print(f"Using Google Search Engine ID: {google_search_engine}") | |
| if not google_search_key or not google_search_engine: | |
| print("⚠️ Google Search not configured - will use DuckDuckGo fallback") | |
| class MathSolver(Tool): | |
| name = "math_solver" | |
| description = ( | |
| "Evaluate and simplify arithmetic or symbolic math expressions using SymPy. " | |
| "Supports operators +, -, *, /, **, parentheses, and common functions like sin, cos, log." | |
| ) | |
| inputs = { | |
| "input": { | |
| "type": "string", | |
| "description": "Math expression to evaluate, e.g. '2+4*12' or 'sin(pi/3)'" | |
| } | |
| } | |
| output_type = "string" | |
| def forward(self, input: str) -> str: | |
| try: | |
| expr = sympify(input, evaluate=True) | |
| simplified = simplify(expr) | |
| # If the result is numeric, evaluate to float; otherwise return simplified form. | |
| if simplified.is_number: | |
| return str(simplified.evalf()) | |
| return str(simplified) | |
| except (SympifyError, Exception) as e: | |
| return f"Math error: {e}" | |
| class TextPreprocesser(Tool): | |
| name = "text_preprocesser" | |
| description = "Transform and preprocess text with multiple operations: reverse, upper, lower, count, extract_numbers, word_count" | |
| inputs = {"input": {"type": "string", | |
| "description": "Use operation as prefix: reverse:, upper:, lower:, count:, extract_numbers:, word_count:"}} | |
| output_type = "string" | |
| def forward(self, input: str) -> str: | |
| try: | |
| if input.startswith("reverse:"): | |
| text = input.replace('reverse:', '').strip() | |
| reversed_text = text[::-1] | |
| # Special handling for GAIA text reversal puzzles | |
| # Check if the reversed text is asking for opposite of "left" | |
| if "opposite" in reversed_text.lower() and "left" in reversed_text.lower(): | |
| return "right" | |
| elif "opposite" in reversed_text.lower() and "right" in reversed_text.lower(): | |
| return "left" | |
| return reversed_text | |
| elif input.startswith("upper:"): | |
| return input.replace('upper:', '').strip().upper() | |
| elif input.startswith("lower:"): | |
| return input.replace('lower:', '').strip().lower() | |
| elif input.startswith("count:"): | |
| text = input.replace('count:', '').strip() | |
| return str(len(text)) | |
| elif input.startswith("extract_numbers:"): | |
| text = input.replace('extract_numbers:', '').strip() | |
| import re | |
| numbers = re.findall(r'-?\d+\.?\d*', text) | |
| return ', '.join(numbers) if numbers else "No numbers found" | |
| elif input.startswith("word_count:"): | |
| text = input.replace('word_count:', '').strip() | |
| words = text.split() | |
| return str(len(words)) | |
| else: | |
| return f"Unsupported operation. Available: reverse:, upper:, lower:, count:, extract_numbers:, word_count:" | |
| except Exception as e: | |
| return f"Text processing error: {str(e)}" | |
| class GoogleSearchTool(Tool): | |
| name = "google_search" | |
| description = "Performs websearch using Google Custom Search API. Falls back to DuckDuckGo if API keys unavailable." | |
| inputs = {"query": {"type": "string", "description": "Search query."}} | |
| output_type = "string" | |
| def forward(self, query: str) -> str: | |
| # Check if Google Search API is available | |
| if not config.has_key("GOOGLE_SEARCH_API_KEY") or not config.has_key("GOOGLE_SEARCH_ENGINE_ID"): | |
| # Fallback to DuckDuckGo | |
| try: | |
| ddg_tool = DuckDuckGoSearchTool() | |
| result = ddg_tool.forward(query) | |
| return f"🔍 DuckDuckGo Search Results:\n{result}" | |
| except Exception as e: | |
| return f"Search unavailable: {e}" | |
| try: | |
| resp = requests.get("https://www.googleapis.com/customsearch/v1", params={ | |
| "q": query, | |
| "key": config.get_key("GOOGLE_SEARCH_API_KEY"), | |
| "cx": config.get_key("GOOGLE_SEARCH_ENGINE_ID"), | |
| "num": 3 # Get more results for better coverage | |
| }) | |
| # Check if request was successful | |
| if resp.status_code != 200: | |
| # Fallback to DuckDuckGo on API error | |
| try: | |
| ddg_tool = DuckDuckGoSearchTool() | |
| result = ddg_tool.forward(query) | |
| return f"🔍 DuckDuckGo Search Results (Google API error):\n{result}" | |
| except Exception as e: | |
| return f"Google Search API error: {resp.status_code} - {resp.text}" | |
| data = resp.json() | |
| # Check for API errors | |
| if "error" in data: | |
| # Fallback to DuckDuckGo | |
| try: | |
| ddg_tool = DuckDuckGoSearchTool() | |
| result = ddg_tool.forward(query) | |
| return f"🔍 DuckDuckGo Search Results (Google API error):\n{result}" | |
| except Exception as e: | |
| return f"Google Search API error: {data['error']['message']}" | |
| if "items" not in data or not data["items"]: | |
| return "No Google results found." | |
| # Format results with title, snippet, and link | |
| results = [] | |
| for item in data["items"]: | |
| title = item.get("title", "No title") | |
| snippet = item.get("snippet", "No snippet available") | |
| link = item.get("link", "") | |
| results.append(f"**{title}**\n{snippet}\nSource: {link}\n") | |
| return "🔍 Google Search Results:\n" + "\n".join(results) | |
| except requests.RequestException as e: | |
| # Fallback to DuckDuckGo on network error | |
| try: | |
| ddg_tool = DuckDuckGoSearchTool() | |
| result = ddg_tool.forward(query) | |
| return f"🔍 DuckDuckGo Search Results (network error):\n{result}" | |
| except Exception as fallback_e: | |
| return f"Search unavailable: {e}" | |
| except Exception as e: | |
| return f"Search error: {e}" | |
| class WikipediaTitleFinder(Tool): | |
| name = "wikipedia_titles" | |
| description = "Search for related Wikipedia page titles." | |
| inputs = {"query": {"type": "string", "description": "Search query."}} | |
| output_type = "string" | |
| def forward(self, query: str) -> str: | |
| results = wikipedia.search(query) | |
| return ", ".join(results) if results else "No results." | |
| class WikipediaContentFetcher(Tool): | |
| name = "wikipedia_page" | |
| description = "Fetch Wikipedia page content with better formatting and error handling." | |
| inputs = {"page_title": {"type": "string", "description": "Wikipedia page title."}} | |
| output_type = "string" | |
| def forward(self, page_title: str) -> str: | |
| try: | |
| # Try exact title first | |
| page = wikipedia.page(page_title) | |
| # Get clean text content instead of HTML | |
| content = page.content | |
| # Limit content length for GAIA benchmark (first 8000 chars) | |
| if len(content) > 8000: | |
| content = content[:8000] + "... (content truncated)" | |
| # Add page URL for reference | |
| result = f"**{page.title}**\n\n{content}\n\nSource: {page.url}" | |
| return result | |
| except wikipedia.exceptions.DisambiguationError as e: | |
| # Handle disambiguation - try first option | |
| try: | |
| page = wikipedia.page(e.options[0]) | |
| content = page.content | |
| if len(content) > 8000: | |
| content = content[:8000] + "... (content truncated)" | |
| return f"**{page.title}** (disambiguated)\n\n{content}\n\nSource: {page.url}" | |
| except: | |
| return f"Multiple pages found for '{page_title}'. Options: {', '.join(e.options[:5])}" | |
| except wikipedia.exceptions.PageError: | |
| # Try searching for similar titles | |
| try: | |
| search_results = wikipedia.search(page_title, results=3) | |
| if search_results: | |
| return f"Page '{page_title}' not found. Did you mean: {', '.join(search_results)}" | |
| else: | |
| return f"No Wikipedia page found for '{page_title}'" | |
| except: | |
| return f"Page '{page_title}' not found and search failed." | |
| except wikipedia.exceptions.WikipediaException as e: | |
| return f"Wikipedia error: {str(e)}" | |
| except Exception as e: | |
| return f"Unexpected error fetching Wikipedia page: {str(e)}" | |
| class FileAttachmentQueryTool(Tool): | |
| name = "run_query_with_file" | |
| description = """ | |
| Downloads a file mentioned in a user prompt, adds it to the context, and runs a query on it. | |
| Requires GOOGLE_API_KEY. This assumes the file is 20MB or less. | |
| """ | |
| inputs = { | |
| "task_id": { | |
| "type": "string", | |
| "description": "A unique identifier for the task related to this file, used to download it.", | |
| "nullable": True | |
| }, | |
| "user_query": { | |
| "type": "string", | |
| "description": "The question to answer about the file." | |
| } | |
| } | |
| output_type = "string" | |
| def __init__(self, model_name="gemini-2.5-pro", *args, **kwargs): | |
| super().__init__(*args, **kwargs) | |
| self.model_name = model_name | |
| def forward(self, task_id: str | None, user_query: str) -> str: | |
| # Check if Google API key is available | |
| if not config.has_key("GOOGLE_API_KEY"): | |
| return ("❌ File analysis requires GOOGLE_API_KEY environment variable.\n" | |
| "Get your key at: https://makersuite.google.com/app/apikey\n" | |
| "Then set: export GOOGLE_API_KEY='your_key_here'") | |
| try: | |
| file_url = f"https://agents-course-unit4-scoring.hf.space/files/{task_id}" | |
| file_response = requests.get(file_url) | |
| if file_response.status_code != 200: | |
| return f"Failed to download file: {file_response.status_code} - {file_response.text}" | |
| file_data = file_response.content | |
| model = GenerativeModel(self.model_name) | |
| response = model.generate_content([ | |
| types.Part.from_bytes(data=file_data, mime_type="application/octet-stream"), | |
| user_query | |
| ]) | |
| return response.text | |
| except Exception as e: | |
| return f"File analysis error: {e}\nNote: This tool requires GOOGLE_API_KEY for Gemini model access." | |
| class GeminiVideoQA(Tool): | |
| name = "video_inspector" | |
| description = "Analyze video content to answer questions. Requires GOOGLE_API_KEY." | |
| inputs = { | |
| "video_url": {"type": "string", "description": "URL of video."}, | |
| "user_query": {"type": "string", "description": "Question about video."} | |
| } | |
| output_type = "string" | |
| def __init__(self, model_name="gemini-2.5-pro", *args, **kwargs): | |
| super().__init__(*args, **kwargs) | |
| self.model_name = model_name | |
| def forward(self, video_url: str, user_query: str) -> str: | |
| # Check if Google API key is available | |
| if not config.has_key("GOOGLE_API_KEY"): | |
| return ("❌ Video analysis requires GOOGLE_API_KEY environment variable.\n" | |
| "Get your key at: https://makersuite.google.com/app/apikey\n" | |
| "Then set: export GOOGLE_API_KEY='your_key_here'") | |
| try: | |
| req = { | |
| 'model': f'models/{self.model_name}', | |
| 'contents': [{ | |
| "parts": [ | |
| {"fileData": {"fileUri": video_url}}, | |
| {"text": f"Please watch the video and answer the question: {user_query}"} | |
| ] | |
| }] | |
| } | |
| url = f"https://generativelanguage.googleapis.com/v1beta/models/{self.model_name}:generateContent?key={config.get_key('GOOGLE_API_KEY')}" | |
| res = requests.post(url, json=req, headers={'Content-Type': 'application/json'}) | |
| if res.status_code != 200: | |
| return f"Video analysis error {res.status_code}: {res.text}" | |
| parts = res.json()['candidates'][0]['content']['parts'] | |
| return "".join([p.get('text', '') for p in parts]) | |
| except Exception as e: | |
| return f"Video analysis error: {e}\nNote: This tool requires GOOGLE_API_KEY for Gemini model access." | |
| class RiddleSolver(Tool): | |
| name = "riddle_solver" | |
| description = "Analyze riddles and provide systematic solving strategies without giving direct answers." | |
| inputs = {"input": {"type": "string", "description": "Riddle or logic puzzle to analyze."}} | |
| output_type = "string" | |
| def forward(self, input: str) -> str: | |
| riddle = input.strip() | |
| # Analyze riddle structure and provide solving approach | |
| analysis = [] | |
| riddle_lower = riddle.lower() | |
| # Identify riddle type | |
| if "what am i" in riddle_lower or riddle_lower.startswith("i am"): | |
| analysis.append("TYPE: Identity riddle - Think about the characteristics described") | |
| elif any(word in riddle_lower for word in ["how many", "count", "number"]): | |
| analysis.append("TYPE: Counting puzzle - Break down systematically") | |
| elif any(char.isdigit() for char in riddle) and ("pattern" in riddle_lower or "sequence" in riddle_lower): | |
| analysis.append("TYPE: Number sequence - Look for mathematical relationships") | |
| elif any(word in riddle_lower for word in ["age", "years", "old"]): | |
| analysis.append("TYPE: Age puzzle - Set up algebraic equations") | |
| else: | |
| analysis.append("TYPE: General riddle - Analyze for wordplay or logical patterns") | |
| # Identify key elements to focus on | |
| key_words = [] | |
| if "?" in riddle: | |
| analysis.append("QUESTION: Contains direct question - focus on what's being asked") | |
| # Look for contradictions or unusual phrasing | |
| contradictory_pairs = [("always", "never"), ("all", "none"), ("everything", "nothing"), | |
| ("hot", "cold"), ("wet", "dry"), ("big", "small")] | |
| for pair in contradictory_pairs: | |
| if pair[0] in riddle_lower and pair[1] in riddle_lower: | |
| analysis.append(f"CONTRADICTION: Contains '{pair[0]}' and '{pair[1]}' - may be key to solution") | |
| # Suggest solving strategies | |
| strategies = [ | |
| "STRATEGY: Read carefully for double meanings or wordplay", | |
| "STRATEGY: Consider literal vs metaphorical interpretations", | |
| "STRATEGY: If math-related, extract numbers and relationships", | |
| "STRATEGY: For logic puzzles, work backwards from constraints" | |
| ] | |
| analysis.extend(strategies) | |
| return "\n".join(analysis) + f"\n\nRIDDLE TO SOLVE: {riddle}" | |
| class WebPageFetcher(Tool): | |
| name = "fetch_webpage" | |
| description = "Fetches and processes web page content. Can convert HTML to clean markdown or return raw HTML." | |
| inputs = { | |
| "url": { | |
| "type": "string", | |
| "description": "The URL to fetch content from." | |
| }, | |
| "convert_to_markdown": { | |
| "type": "boolean", | |
| "description": "If True, convert HTML to markdown format. If False, return raw HTML.", | |
| "default": True, | |
| "nullable": True | |
| } | |
| } | |
| output_type = "string" | |
| def forward(self, url: str, convert_to_markdown: bool = True) -> str: | |
| try: | |
| # Add headers to avoid being blocked | |
| headers = { | |
| 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' | |
| } | |
| response = requests.get(url, timeout=30, headers=headers) | |
| response.raise_for_status() | |
| if convert_to_markdown: | |
| soup = BeautifulSoup(response.text, "html.parser") | |
| # Remove unwanted elements | |
| for element in soup(["script", "style", "nav", "footer", "header", "aside"]): | |
| element.extract() | |
| # Site-specific content extraction | |
| content = None | |
| if "wikipedia.org" in url: | |
| main_content = soup.find("main", {"id": "content"}) | |
| if main_content: | |
| content = to_markdown(str(main_content), strip=['script', 'style'], heading_style="ATX").strip() | |
| else: | |
| content = to_markdown(response.text, strip=['script', 'style'], heading_style="ATX").strip() | |
| elif "stackoverflow.com" in url: | |
| question = soup.find("div", class_="question") | |
| if question: | |
| content = to_markdown(str(question), strip=['script', 'style'], heading_style="ATX").strip() | |
| elif "github.com" in url: | |
| readme = soup.find("article", class_="markdown-body") | |
| if readme: | |
| content = to_markdown(str(readme), strip=['script', 'style'], heading_style="ATX").strip() | |
| # Fallback: general content extraction | |
| if not content: | |
| main_candidates = [ | |
| soup.find("main"), | |
| soup.find("article"), | |
| soup.find("div", class_="content"), | |
| soup.find("div", {"id": "content"}), | |
| soup.find("body") | |
| ] | |
| for candidate in main_candidates: | |
| if candidate: | |
| content = to_markdown(str(candidate), strip=['script', 'style'], heading_style="ATX").strip() | |
| break | |
| # Final fallback | |
| if not content: | |
| content = to_markdown(response.text, strip=['script', 'style'], heading_style="ATX").strip() | |
| else: | |
| content = response.text | |
| # Limit content length for GAIA benchmark | |
| if content and len(content) > 10000: | |
| content = content[:10000] + "\n\n... (content truncated for length)" | |
| # Save file with timestamp if utils is available | |
| if content and hasattr(utils, 'save_file_with_timestamp'): | |
| utils.save_file_with_timestamp(content, "webpage", ".md" if convert_to_markdown else ".html") | |
| return content or "No content extracted" | |
| except requests.exceptions.RequestException as e: | |
| return f"Network error fetching {url}: {str(e)}" | |
| except Exception as e: | |
| return f"Error processing webpage {url}: {str(e)}" | |
| if __name__ == "__main__": | |
| try: | |
| # Test the function | |
| video_id = "L1vXCYZAYYM" # Replace with your YouTube video ID | |
| video_url = "https://www.youtube.com/watch?v=" + video_id | |
| url = "https://en.wikipedia.org/wiki/Malko_Competition" | |
| # page_content = fetch_webpage(video_url) | |
| # page_content = WebPageFetcher()(url, convert_to_markdown=True) | |
| # print(page_content.encode("utf-8")) | |
| # print(GeminiVideoQA()(user_query="What is happening in this video?", video_url=video_url)) | |
| # print(GoogleSearchTool()(query="Who is Rajesh Hamal?")) | |
| #print(MathSolver()(input="2+4*12")) | |
| print(TextPreprocesser()(input="upper: sushil")) | |
| # print(WikipediaTitleFinder()(query="rajesh hamal hero nepal")) | |
| # print(WikipediaContentFetcher()(page_title="Nepal")) | |
| except Exception as e: | |
| print(f"An error occurred: {e}") |