Spaces:
Running
Running
| import os | |
| import json | |
| import subprocess | |
| import pandas as pd | |
| # from sklearn.manifold import TSNE | |
| from generate import get_solution_file_path, all_models | |
| from openai import OpenAI | |
| import time | |
| import os | |
| import subprocess | |
| client = OpenAI() | |
| def evaluate_submission(day: int, model: str): | |
| """Evaluates the submission for the given day and model. Returns the result captured from stdout and the total time taken.""" | |
| # cd to the day directory | |
| os.chdir(f"day{day:02d}") | |
| # get the solution file path, check if it exists | |
| file_path = get_solution_file_path(model=model) | |
| if not os.path.exists(file_path): | |
| print(f"File {file_path} does not exist, skipping") | |
| return | |
| else: | |
| print(f"Evaluating {file_path} for day {day} with model {model}") | |
| # run the solution, and capture the output | |
| timeout = 60 * 5 | |
| start_time = time.time() | |
| try: | |
| result = subprocess.run(["python", file_path], capture_output=True, text=True, timeout=timeout) | |
| print(f"Result: {result.stdout}") | |
| except subprocess.TimeoutExpired: | |
| result = subprocess.CompletedProcess(args=["python", file_path], returncode=1, stdout="", stderr="Timeout") | |
| print(f"Timeout after {timeout} seconds") | |
| end_time = time.time() | |
| total_time = end_time - start_time | |
| result = result.stdout if result.returncode == 0 else f"Error: {result.stderr}" | |
| os.chdir("..") | |
| return { | |
| "result": result, | |
| "total_time": total_time, | |
| } | |
| def get_solution_code(day: int, model: str) -> str: | |
| """Returns the solution code (as a string) for the given day and model.""" | |
| file_path = get_solution_file_path(day=day, model=model) | |
| with open(file_path, "r") as file: | |
| return file.read() | |
| def extract_solutions(df, output_file = "solutions.json"): | |
| # TODO: better way of getting this? | |
| solutions = {} | |
| for day in range(1, 25): | |
| sub_df = df[(df.model == "jerpint") & (df.day == day)] | |
| part1, part2 = sub_df.result.to_list()[0].strip("\n").split("\n") | |
| solutions[day] = [part1, part2] | |
| with open(output_file, "w") as f: | |
| json.dump(solutions, f, indent=2) | |
| return solutions | |
| def evaluate_submissions(all_models, results_file = "results.csv", skip = True): | |
| """Runs the python code and collects their results""" | |
| if os.path.exists(results_file): | |
| df = pd.read_csv(results_file) | |
| else: | |
| df = pd.DataFrame(columns=["day", "model", "result", "total_time"]) | |
| # for day in range(1, 26): | |
| for day in range(1, 11): | |
| print("*" * 80) | |
| print(f"Evaluating day {day}") | |
| for provider in all_models: | |
| for model in all_models[provider]: | |
| print("-" * 80) | |
| if df.loc[(df["day"] == day) & (df["model"] == model)].shape[0] > 0 and skip: | |
| print(f"Skipping {provider} {model} for day {day} because it already exists") | |
| continue | |
| print(f"Evaluating day {day} with model {model}") | |
| result = evaluate_submission(day, model) | |
| df = pd.concat([df, pd.DataFrame({"day": [day], "model": [model], "result": [result["result"]], "total_time": [result["total_time"]]})], ignore_index=True) | |
| df.to_csv("results.csv", index=False) | |
| print("-" * 80) | |
| print("*" * 80) | |
| return df | |
| if __name__ == "__main__": | |
| all_models["human"] = ["jerpint"] | |
| df = evaluate_submissions(all_models, results_file="results.csv") | |
| # For now, only evaluate first 9 days | |
| # TODO: All days | |
| df = df[df.day < 10] | |
| # Run once to save results | |
| # solutions = extract_solutions(df) | |
| with open("solutions.json") as f: | |
| solutions = json.load(f) | |
| def score_submissions(row): | |
| result = row["result"] | |
| day = row["day"] | |
| solution = solutions[str(day)] | |
| score_1 = solution[0] in result | |
| score_2 = solution[1] in result | |
| return [score_1, score_2] | |
| df["scores"] = df.apply(score_submissions, axis=1) | |
| df["part_1"] = df["scores"].apply(lambda x: x[0]) | |
| df["part_2"] = df["scores"].apply(lambda x: x[1]) | |
| for model in df.model.unique(): | |
| df_model = df[df.model == model] | |
| silver_stars = df_model.part_1.sum() | |
| gold_stars = df_model.part_2.sum() | |
| total_stars = silver_stars + gold_stars | |
| print(model, total_stars) |