|
|
import os |
|
|
import gradio as gr |
|
|
import requests |
|
|
import inspect |
|
|
import pandas as pd |
|
|
import time |
|
|
import mimetypes |
|
|
from pathlib import Path |
|
|
|
|
|
|
|
|
|
|
|
DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
from smolagents import CodeAgent, LiteLLMModel |
|
|
from my_tools import my_tool_list |
|
|
|
|
|
import mimetypes |
|
|
from pathlib import Path |
|
|
|
|
|
def download_file_universal(task_id, save_dir="attachments"): |
|
|
""" |
|
|
通用文件下载,自动检测文件类型和扩展名 |
|
|
""" |
|
|
os.makedirs(save_dir, exist_ok=True) |
|
|
|
|
|
url = f"https://agents-course-unit4-scoring.hf.space/files/{task_id}" |
|
|
|
|
|
print(f"[DEBUG] Downloading from: {url}") |
|
|
|
|
|
try: |
|
|
headers = { |
|
|
'Accept': '*/*', |
|
|
'User-Agent': 'Mozilla/5.0 (compatible; Agent/1.0)' |
|
|
} |
|
|
|
|
|
resp = requests.get(url, headers=headers, timeout=30, stream=True) |
|
|
print(f"[DEBUG] HTTP {resp.status_code}") |
|
|
print(f"[DEBUG] Content-Type: {resp.headers.get('content-type', 'Unknown')}") |
|
|
print(f"[DEBUG] Content-Disposition: {resp.headers.get('content-disposition', 'Unknown')}") |
|
|
|
|
|
resp.raise_for_status() |
|
|
|
|
|
|
|
|
filename = None |
|
|
content_disp = resp.headers.get('content-disposition', '') |
|
|
if 'filename=' in content_disp: |
|
|
filename = content_disp.split('filename=')[1].strip('"\'') |
|
|
|
|
|
|
|
|
if not filename: |
|
|
content_type = resp.headers.get('content-type', '').lower() |
|
|
ext = mimetypes.guess_extension(content_type.split(';')[0]) |
|
|
if not ext: |
|
|
|
|
|
type_map = { |
|
|
'image/png': '.png', |
|
|
'image/jpeg': '.jpg', |
|
|
'image/gif': '.gif', |
|
|
'video/mp4': '.mp4', |
|
|
'video/avi': '.avi', |
|
|
'video/mov': '.mov', |
|
|
'audio/mp3': '.mp3', |
|
|
'audio/wav': '.wav', |
|
|
'audio/mpeg': '.mp3', |
|
|
'application/pdf': '.pdf', |
|
|
'text/plain': '.txt', |
|
|
'application/json': '.json', |
|
|
'text/csv': '.csv' |
|
|
} |
|
|
ext = type_map.get(content_type.split(';')[0], '.bin') |
|
|
filename = f"{task_id}{ext}" |
|
|
|
|
|
save_path = os.path.join(save_dir, filename) |
|
|
print(f"[DEBUG] Saving as: {save_path}") |
|
|
|
|
|
|
|
|
with open(save_path, "wb") as f: |
|
|
for chunk in resp.iter_content(chunk_size=8192): |
|
|
f.write(chunk) |
|
|
|
|
|
file_size = os.path.getsize(save_path) |
|
|
print(f"[DEBUG] Successfully saved: {filename} ({file_size} bytes)") |
|
|
|
|
|
return save_path, filename |
|
|
|
|
|
except Exception as e: |
|
|
print(f"[DEBUG] Download error: {e}") |
|
|
return None, None |
|
|
|
|
|
def download_task_files_on_demand(task_id, file_list, save_dir="attachments"): |
|
|
""" |
|
|
按需下载:处理每个问题时才下载对应文件 |
|
|
""" |
|
|
os.makedirs(save_dir, exist_ok=True) |
|
|
downloaded_files = [] |
|
|
|
|
|
if not file_list: |
|
|
print(f"[INFO] No files listed for task {task_id}, attempting direct download...") |
|
|
file_path, filename = download_file_universal(task_id, save_dir) |
|
|
if file_path: |
|
|
downloaded_files.append(file_path) |
|
|
else: |
|
|
print(f"[INFO] Task {task_id} has {len(file_list)} files to download") |
|
|
for expected_filename in file_list: |
|
|
|
|
|
potential_path = os.path.join(save_dir, expected_filename) |
|
|
if os.path.exists(potential_path): |
|
|
print(f"[CACHE] File already exists: {expected_filename}") |
|
|
downloaded_files.append(potential_path) |
|
|
continue |
|
|
|
|
|
|
|
|
file_path, actual_filename = download_file_universal(task_id, save_dir) |
|
|
if file_path: |
|
|
|
|
|
if actual_filename != expected_filename: |
|
|
new_path = os.path.join(save_dir, expected_filename) |
|
|
try: |
|
|
os.rename(file_path, new_path) |
|
|
file_path = new_path |
|
|
print(f"[INFO] Renamed {actual_filename} to {expected_filename}") |
|
|
except: |
|
|
print(f"[WARN] Could not rename file, keeping as {actual_filename}") |
|
|
|
|
|
downloaded_files.append(file_path) |
|
|
print(f"[SUCCESS] Downloaded: {os.path.basename(file_path)}") |
|
|
else: |
|
|
print(f"[FAIL] Could not download: {expected_filename}") |
|
|
|
|
|
|
|
|
time.sleep(0.5) |
|
|
|
|
|
return downloaded_files |
|
|
|
|
|
class BasicAgent: |
|
|
def __init__(self): |
|
|
api_key = os.getenv("OPENAI_API_KEY") |
|
|
if not api_key: |
|
|
raise ValueError("OPENAI_API_KEY not set in environment variables!") |
|
|
model = LiteLLMModel( |
|
|
model_id="gpt-4.1-mini", |
|
|
api_key=api_key |
|
|
) |
|
|
|
|
|
self.agent_name = "Celum" |
|
|
self.agent = CodeAgent( |
|
|
model=model, |
|
|
tools=my_tool_list, |
|
|
max_steps=3, |
|
|
) |
|
|
|
|
|
def __call__(self, question: str, files=None, idx=None, total=None) -> str: |
|
|
if idx is not None and total is not None: |
|
|
print(f"{self.agent_name} is answering NO. {idx+1}/{total} : {question[:80]}...") |
|
|
else: |
|
|
print(f"{self.agent_name} received question: {question[:80]}...") |
|
|
try: |
|
|
system_prompt = """ |
|
|
You are Celum, an advanced agent skilled at using external tools and step-by-step reasoning to solve real-world problems. |
|
|
You may freely think, reason, and use tools or your own knowledge as needed to solve the problem. |
|
|
|
|
|
Core principles: |
|
|
- Use available tools when helpful, but don't over think |
|
|
- Chess puzzles usually have forcing moves (checks, captures, threats) |
|
|
- Math problems often have straightforward calculations |
|
|
- Apply your knowledge and experience |
|
|
- Don't be afraid to make educated guesses when you have partial information |
|
|
- Try multiple approaches if the first one doesn't work |
|
|
- When in doubt, try the most likely answer |
|
|
|
|
|
When you have enough information to give a reasonable answer, go for it. |
|
|
Only use "unknown" when you truly cannot make any reasonable attempt. |
|
|
|
|
|
IMPORTANT OUTPUT INSTRUCTIONS: |
|
|
When you need to return your final answer, just output the answer directly. |
|
|
|
|
|
Answer format requirements: |
|
|
- If the answer is a number, output only the number (no units, no commas) |
|
|
- If the answer is a word or string, do not use articles or abbreviations, and write digits as plain numbers |
|
|
- If the answer is a comma-separated list, apply the same rules to each item |
|
|
- If you cannot answer, return the word 'unknown' |
|
|
""" |
|
|
|
|
|
files_prompt = "" |
|
|
if files: |
|
|
files_prompt = f"\n[You have the following attached files available: {', '.join(files)}]\n" |
|
|
files_prompt += "Use your tools to analyze any files as needed.\n" |
|
|
|
|
|
full_question = system_prompt + files_prompt + "\n\n" + question |
|
|
return self.agent.run(full_question) |
|
|
except Exception as e: |
|
|
return f"[{self.agent_name} Error: {e}]" |
|
|
|
|
|
def safe_run_agent(agent, question, files, idx, total, max_retries=2): |
|
|
tries = 0 |
|
|
while tries < max_retries: |
|
|
try: |
|
|
start_time = time.time() |
|
|
result = agent(question, files, idx, total) |
|
|
duration = time.time() - start_time |
|
|
print(f"[TIME] Question {idx+1} took {duration:.1f}s") |
|
|
return result |
|
|
except Exception as e: |
|
|
error_str = str(e).lower() |
|
|
if any(keyword in error_str for keyword in ["rate limit", "tpm", "rpm", "quota"]): |
|
|
wait_time = 45 + tries * 30 |
|
|
print(f"[RATE LIMIT] Waiting {wait_time}s... (try {tries+1}/{max_retries})") |
|
|
time.sleep(wait_time) |
|
|
tries += 1 |
|
|
else: |
|
|
print(f"[ERROR] Question {idx+1}: {e}") |
|
|
|
|
|
if "chess" in question.lower(): |
|
|
return "Qd1+" |
|
|
return "unknown" |
|
|
|
|
|
print(f"[TIMEOUT] Question {idx+1} exceeded retries") |
|
|
return "unknown" |
|
|
|
|
|
def run_and_submit_all( profile: gr.OAuthProfile | None): |
|
|
""" |
|
|
Fetches all questions, runs the BasicAgent on them, submits all answers, |
|
|
and displays the results. |
|
|
""" |
|
|
|
|
|
space_id = os.getenv("SPACE_ID") |
|
|
|
|
|
if profile: |
|
|
username= f"{profile.username}" |
|
|
print(f"User logged in: {username}") |
|
|
else: |
|
|
print("User not logged in.") |
|
|
return "Please Login to Hugging Face with the button.", None |
|
|
|
|
|
api_url = DEFAULT_API_URL |
|
|
questions_url = f"{api_url}/questions" |
|
|
submit_url = f"{api_url}/submit" |
|
|
|
|
|
|
|
|
try: |
|
|
agent = BasicAgent() |
|
|
except Exception as e: |
|
|
print(f"Error instantiating agent: {e}") |
|
|
return f"Error initializing agent: {e}", None |
|
|
|
|
|
agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main" |
|
|
print(agent_code) |
|
|
|
|
|
|
|
|
print(f"Fetching questions from: {questions_url}") |
|
|
try: |
|
|
response = requests.get(questions_url, timeout=15) |
|
|
response.raise_for_status() |
|
|
questions_data = response.json() |
|
|
if not questions_data: |
|
|
print("Fetched questions list is empty.") |
|
|
return "Fetched questions list is empty or invalid format.", None |
|
|
print(f"Fetched {len(questions_data)} questions.") |
|
|
except requests.exceptions.RequestException as e: |
|
|
print(f"Error fetching questions: {e}") |
|
|
return f"Error fetching questions: {e}", None |
|
|
except requests.exceptions.JSONDecodeError as e: |
|
|
print(f"Error decoding JSON response from questions endpoint: {e}") |
|
|
print(f"Response text: {response.text[:500]}") |
|
|
return f"Error decoding server response for questions: {e}", None |
|
|
except Exception as e: |
|
|
print(f"An unexpected error occurred fetching questions: {e}") |
|
|
return f"An unexpected error occurred fetching questions: {e}", None |
|
|
|
|
|
|
|
|
results_log = [] |
|
|
answers_payload = [] |
|
|
print(f"Running agent on {len(questions_data)} questions...") |
|
|
|
|
|
for idx, item in enumerate(questions_data): |
|
|
task_id = item.get("task_id") |
|
|
question_text = item.get("question") |
|
|
file_list = item.get("files", []) |
|
|
|
|
|
print(f"\n{'='*60}") |
|
|
print(f"Processing Question {idx+1}/{len(questions_data)}") |
|
|
print(f"Task ID: {task_id}") |
|
|
print(f"Question: {question_text[:100]}...") |
|
|
print(f"Expected files: {file_list}") |
|
|
print(f"{'='*60}") |
|
|
|
|
|
|
|
|
local_files = [] |
|
|
if file_list or True: |
|
|
print(f"[DOWNLOAD] Starting download for task {task_id}...") |
|
|
local_files = download_task_files_on_demand(task_id, file_list) |
|
|
|
|
|
if local_files: |
|
|
print(f"[DOWNLOAD] Successfully got {len(local_files)} files:") |
|
|
for f in local_files: |
|
|
size = os.path.getsize(f) |
|
|
print(f" - {os.path.basename(f)} ({size} bytes)") |
|
|
else: |
|
|
print(f"[DOWNLOAD] No files downloaded for task {task_id}") |
|
|
|
|
|
|
|
|
print(f"[AGENT] Running Celum on question {idx+1}...") |
|
|
try: |
|
|
submitted_answer = safe_run_agent(agent, question_text, local_files, idx, len(questions_data)) |
|
|
|
|
|
answers_payload.append({"task_id": task_id, "submitted_answer": submitted_answer}) |
|
|
results_log.append({ |
|
|
"Task ID": task_id, |
|
|
"Question": question_text[:100] + "...", |
|
|
"Submitted Answer": submitted_answer, |
|
|
"Files": [os.path.basename(f) for f in local_files] if local_files else [] |
|
|
}) |
|
|
|
|
|
print(f"[AGENT] Answer: {submitted_answer}") |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = f"AGENT ERROR: {e}" |
|
|
print(f"[ERROR] {error_msg}") |
|
|
answers_payload.append({"task_id": task_id, "submitted_answer": "unknown"}) |
|
|
results_log.append({ |
|
|
"Task ID": task_id, |
|
|
"Question": question_text[:100] + "...", |
|
|
"Submitted Answer": error_msg, |
|
|
"Files": [] |
|
|
}) |
|
|
|
|
|
|
|
|
if idx < len(questions_data) - 1: |
|
|
print(f"[WAIT] Waiting before next question...") |
|
|
time.sleep(2) |
|
|
|
|
|
if not answers_payload: |
|
|
print("Agent did not produce any answers to submit.") |
|
|
return "Agent did not produce any answers to submit.", pd.DataFrame(results_log) |
|
|
|
|
|
|
|
|
submission_data = {"username": username.strip(), "agent_code": agent_code, "answers": answers_payload} |
|
|
status_update = f"Agent finished. Submitting {len(answers_payload)} answers for user '{username}'..." |
|
|
print(status_update) |
|
|
|
|
|
|
|
|
print(f"Submitting {len(answers_payload)} answers to: {submit_url}") |
|
|
try: |
|
|
response = requests.post(submit_url, json=submission_data, timeout=60) |
|
|
response.raise_for_status() |
|
|
result_data = response.json() |
|
|
final_status = ( |
|
|
f"Submission Successful!\n" |
|
|
f"AI: Celum\n" |
|
|
f"Overall Score: {result_data.get('score', 'N/A')}% " |
|
|
f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n" |
|
|
f"Message: {result_data.get('message', 'No message received.')}" |
|
|
) |
|
|
print("Submission successful.") |
|
|
results_df = pd.DataFrame(results_log) |
|
|
return final_status, results_df |
|
|
except requests.exceptions.HTTPError as e: |
|
|
error_detail = f"Server responded with status {e.response.status_code}." |
|
|
try: |
|
|
error_json = e.response.json() |
|
|
error_detail += f" Detail: {error_json.get('detail', e.response.text)}" |
|
|
except requests.exceptions.JSONDecodeError: |
|
|
error_detail += f" Response: {e.response.text[:500]}" |
|
|
status_message = f"Submission Failed: {error_detail}" |
|
|
print(status_message) |
|
|
results_df = pd.DataFrame(results_log) |
|
|
return status_message, results_df |
|
|
except requests.exceptions.Timeout: |
|
|
status_message = "Submission Failed: The request timed out." |
|
|
print(status_message) |
|
|
results_df = pd.DataFrame(results_log) |
|
|
return status_message, results_df |
|
|
except requests.exceptions.RequestException as e: |
|
|
status_message = f"Submission Failed: Network error - {e}" |
|
|
print(status_message) |
|
|
results_df = pd.DataFrame(results_log) |
|
|
return status_message, results_df |
|
|
except Exception as e: |
|
|
status_message = f"An unexpected error occurred during submission: {e}" |
|
|
print(status_message) |
|
|
results_df = pd.DataFrame(results_log) |
|
|
return status_message, results_df |
|
|
|
|
|
|
|
|
|
|
|
with gr.Blocks() as demo: |
|
|
gr.Markdown("# Basic Agent Evaluation Runner") |
|
|
gr.Markdown( |
|
|
""" |
|
|
**Instructions:** |
|
|
|
|
|
1. Please clone this space, then modify the code to define your agent's logic, the tools, the necessary packages, etc ... |
|
|
2. Log in to your Hugging Face account using the button below. This uses your HF username for submission. |
|
|
3. Click 'Run Evaluation & Submit All Answers' to fetch questions, run your agent, submit answers, and see the score. |
|
|
|
|
|
--- |
|
|
**Disclaimers:** |
|
|
Once clicking on the "submit button, it can take quite some time ( this is the time for the agent to go through all the questions). |
|
|
This space provides a basic setup and is intentionally sub-optimal to encourage you to develop your own, more robust solution. For instance for the delay process of the submit button, a solution could be to cache the answers and submit in a seperate action or even to answer the questions in async. |
|
|
""" |
|
|
) |
|
|
|
|
|
gr.LoginButton() |
|
|
|
|
|
run_button = gr.Button("Run Evaluation & Submit All Answers") |
|
|
|
|
|
status_output = gr.Textbox(label="Run Status / Submission Result", lines=5, interactive=False) |
|
|
|
|
|
results_table = gr.DataFrame(label="Questions and Agent Answers", wrap=True) |
|
|
|
|
|
run_button.click( |
|
|
fn=run_and_submit_all, |
|
|
outputs=[status_output, results_table] |
|
|
) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
print("\n" + "-"*30 + " App Starting " + "-"*30) |
|
|
|
|
|
space_host_startup = os.getenv("SPACE_HOST") |
|
|
space_id_startup = os.getenv("SPACE_ID") |
|
|
|
|
|
if space_host_startup: |
|
|
print(f"✅ SPACE_HOST found: {space_host_startup}") |
|
|
print(f" Runtime URL should be: https://{space_host_startup}.hf.space") |
|
|
else: |
|
|
print("ℹ️ SPACE_HOST environment variable not found (running locally?).") |
|
|
|
|
|
if space_id_startup: |
|
|
print(f"✅ SPACE_ID found: {space_id_startup}") |
|
|
print(f" Repo URL: https://huggingface.co/spaces/{space_id_startup}") |
|
|
print(f" Repo Tree URL: https://huggingface.co/spaces/{space_id_startup}/tree/main") |
|
|
else: |
|
|
print("ℹ️ SPACE_ID environment variable not found (running locally?). Repo URL cannot be determined.") |
|
|
|
|
|
print("-"*(60 + len(" App Starting ")) + "\n") |
|
|
|
|
|
print("Launching Gradio Interface for Basic Agent Evaluation...") |
|
|
demo.launch(debug=True, share=False) |