from fastapi import APIRouter from datetime import datetime from datasets import load_dataset from sklearn.metrics import accuracy_score import random import os import tensorflow as tf import numpy as np from .utils.evaluation import AudioEvaluationRequest from .utils.emissions import tracker, clean_emissions_data, get_space_info from dotenv import load_dotenv load_dotenv() router = APIRouter() DESCRIPTION = "Random Baseline" ROUTE = "/audio" @router.post(ROUTE, tags=["Audio Task"], description=DESCRIPTION) async def evaluate_audio(request: AudioEvaluationRequest): """ Evaluate audio classification for rainforest sound detection. Current Model: Random Baseline - Makes random predictions from the label space (0-1) - Used as a baseline for comparison """ # Get space info username, space_url = get_space_info() # Define the label mapping LABEL_MAPPING = { "chainsaw": 0, "environment": 1 } # Load and prepare the dataset # Because the dataset is gated, we need to use the HF_TOKEN environment variable to authenticate dataset = load_dataset(request.dataset_name,token=os.getenv("HF_TOKEN")) # Split dataset train_test = dataset["train"].train_test_split(test_size=request.test_size, seed=request.test_seed) test_dataset = train_test["test"] def compute_spectrogram(audio_array, sample_rate=16000, frame_length=256, frame_step=128): spectrogram = tf.signal.stft(audio_array, frame_length=frame_length, frame_step=frame_step) spectrogram = tf.abs(spectrogram) return tf.expand_dims(spectrogram, axis=-1) def preprocess(item, max_length=16000): audio_array = item["audio"]["array"] audio_array = tf.convert_to_tensor(audio_array, dtype=tf.float32) if len(audio_array) < max_length: pad_size = max_length - len(audio_array) audio_array = tf.concat([audio_array, tf.zeros(pad_size)], axis=0) else: audio_array = audio_array[:max_length] spectrogram = compute_spectrogram(audio_array) return spectrogram # Start tracking emissions tracker.start() tracker.start_task("inference") #-------------------------------------------------------------------------------------------- # YOUR MODEL INFERENCE CODE HERE # Update the code below to replace the random baseline by your model inference within the inference pass where the energy consumption and emissions are tracked. #-------------------------------------------------------------------------------------------- MODEL_PATH = './model' model = tf.keras.models.load_model(MODEL_PATH) true_labels = test_dataset["label"] predictions = [] for item in test_dataset: spectrogram = preprocess(item) spectrogram = tf.expand_dims(spectrogram, axis=0) # Add batch dimension pred_probs = model.predict(spectrogram, verbose=0) predicted_label = np.argmax(pred_probs) predictions.append(predicted_label) # Make random predictions (placeholder for actual model inference) #true_labels = test_dataset["label"] #predictions = [random.randint(0, 1) for _ in range(len(true_labels))] #-------------------------------------------------------------------------------------------- # YOUR MODEL INFERENCE STOPS HERE #-------------------------------------------------------------------------------------------- # Stop tracking emissions emissions_data = tracker.stop_task() # Calculate accuracy accuracy = accuracy_score(true_labels, predictions) # Prepare results dictionary results = { "username": username, "space_url": space_url, "submission_timestamp": datetime.now().isoformat(), "model_description": DESCRIPTION, "accuracy": float(accuracy), "energy_consumed_wh": emissions_data.energy_consumed * 1000, "emissions_gco2eq": emissions_data.emissions * 1000, "emissions_data": clean_emissions_data(emissions_data), "api_route": ROUTE, "dataset_config": { "dataset_name": request.dataset_name, "test_size": request.test_size, "test_seed": request.test_seed } } return results