abhaypratapsingh111's picture
Upload app.py with huggingface_hub
7697331 verified
"""
Main Dash application for Chronos 2 Time Series Forecasting
"""
import base64
import io
import logging
from pathlib import Path
from dash import Dash, html, dcc, Input, Output, State, callback_context
import dash_bootstrap_components as dbc
import pandas as pd
# Import components
from components.upload import (
create_upload_component,
create_column_selector,
create_sample_data_loader,
format_upload_status,
create_data_preview_table,
create_quality_report
)
from components.chart import (
create_forecast_chart,
create_empty_chart,
create_metrics_display,
create_backtest_metrics_display,
decimate_data
)
from components.controls import (
create_forecast_controls,
create_model_status_bar,
create_results_section,
create_app_header,
create_footer
)
# Import services
from services.model_service import model_service
from services.data_processor import data_processor
from services.cache_manager import cache_manager
# Import utilities
from utils.validators import (
validate_file_upload,
validate_column_selection,
validate_forecast_parameters
)
from utils.metrics import calculate_metrics
# Import configuration
from config.settings import CONFIG, APP_METADATA, LOG_LEVEL, LOG_FORMAT, LOG_FILE, setup_directories
from config.constants import MAX_CHART_POINTS
# Setup logging with both file and console handlers
def setup_logging():
"""Configure logging to write to both file and console"""
# Create logs directory first
Path(LOG_FILE).parent.mkdir(parents=True, exist_ok=True)
# Get root logger
root_logger = logging.getLogger()
root_logger.setLevel(LOG_LEVEL)
# Remove any existing handlers
root_logger.handlers = []
# Create formatters
formatter = logging.Formatter(LOG_FORMAT)
# File handler - writes all logs to file
file_handler = logging.FileHandler(LOG_FILE, mode='a', encoding='utf-8')
file_handler.setLevel(LOG_LEVEL)
file_handler.setFormatter(formatter)
root_logger.addHandler(file_handler)
# Console handler - writes to stderr
console_handler = logging.StreamHandler()
console_handler.setLevel(LOG_LEVEL)
console_handler.setFormatter(formatter)
root_logger.addHandler(console_handler)
logger = logging.getLogger(__name__)
logger.info(f"Logging configured - writing to {LOG_FILE}")
return logger
logger = setup_logging()
# Initialize Dash app
app = Dash(
__name__,
external_stylesheets=[
dbc.themes.BOOTSTRAP,
'https://use.fontawesome.com/releases/v5.15.4/css/all.css'
],
suppress_callback_exceptions=True,
title=APP_METADATA['title']
)
# App layout
app.layout = dbc.Container([
# Header
create_app_header(),
# Model status
html.Div(id='model-status-bar'),
# Stores for data
dcc.Store(id='uploaded-data-store'),
dcc.Store(id='processed-data-store'),
dcc.Store(id='forecast-results-store'),
# Sample data loader
create_sample_data_loader(),
# Upload section
create_upload_component(),
# Column selector (hidden initially)
create_column_selector(),
# Forecast controls
create_forecast_controls(),
# Results section (hidden initially)
create_results_section(),
# Footer
create_footer()
], fluid=True, className="py-4")
# Callback: Load model on startup
@app.callback(
Output('model-status-bar', 'children'),
Input('model-status-bar', 'id')
)
def load_model_on_startup(_):
"""Load the model when the app starts"""
logger.info("=" * 80)
logger.info("CALLBACK: load_model_on_startup - ENTRY")
logger.info("=" * 80)
try:
logger.info("Attempting to load Chronos-2 model...")
result = model_service.load_model()
logger.info(f"Model loading result: {result}")
if result['status'] == 'success':
logger.info("βœ“ Model loaded successfully - returning 'ready' status bar")
status_bar = create_model_status_bar('ready')
logger.info(f"Status bar created: {type(status_bar)}")
return status_bar
else:
logger.error(f"βœ— Model loading failed: {result.get('error')}")
return create_model_status_bar('error')
except Exception as e:
logger.error(f"βœ— EXCEPTION in load_model_on_startup: {str(e)}", exc_info=True)
return create_model_status_bar('error')
finally:
logger.info("CALLBACK: load_model_on_startup - EXIT")
logger.info("=" * 80)
# Callback: Handle file upload
@app.callback(
[Output('uploaded-data-store', 'data'),
Output('upload-status', 'children'),
Output('column-selector-card', 'style'),
Output('date-column-dropdown', 'options'),
Output('target-column-dropdown', 'options'),
Output('id-column-dropdown', 'options'),
Output('covariate-columns-dropdown', 'options')],
Input('upload-data', 'contents'),
State('upload-data', 'filename')
)
def handle_file_upload(contents, filename):
"""Handle file upload and extract column information"""
logger.info("=" * 80)
logger.info("CALLBACK: handle_file_upload - ENTRY")
logger.info(f"Filename: {filename}")
logger.info(f"Contents received: {contents is not None}")
logger.info("=" * 80)
if contents is None:
logger.warning("No contents provided - returning empty response")
return None, '', {'display': 'none'}, [], [], [], []
try:
# Parse uploaded file
content_type, content_string = contents.split(',')
decoded = base64.b64decode(content_string)
# Server-side validation
validation = validate_file_upload(filename, len(decoded))
if not validation['valid']:
error_msg = ' '.join(validation['issues'])
logger.warning(f"File upload validation failed: {error_msg}")
return None, format_upload_status('error', error_msg, True), {'display': 'none'}, [], [], [], []
# Additional security: Sanitize filename
import re
safe_filename = re.sub(r'[^\w\-\.]', '_', filename)
if safe_filename != filename:
logger.info(f"Sanitized filename from '{filename}' to '{safe_filename}'")
# Load file
logger.info(f"Loading file with data_processor: {len(decoded)} bytes")
result = data_processor.load_file(decoded, filename)
logger.info(f"Load result status: {result['status']}")
if result['status'] == 'error':
logger.error(f"βœ— File loading error: {result['error']}")
return None, format_upload_status('error', result['error'], True), {'display': 'none'}, [], [], [], []
# Get column information
logger.info("Getting column information from data_processor")
col_info = data_processor.get_column_info()
logger.info(f"Column info: date_cols={col_info['date_columns']}, numeric_cols={col_info['numeric_columns'][:5]}...")
# Create dropdown options
date_options = [{'label': col, 'value': col} for col in col_info['date_columns']]
target_options = [{'label': col, 'value': col} for col in col_info['numeric_columns']]
id_options = [{'label': col, 'value': col} for col in col_info['all_columns']]
# Covariates can be any numeric column
covariate_options = [{'label': col, 'value': col} for col in col_info['numeric_columns']]
logger.info(f"Created dropdown options: {len(date_options)} date, {len(target_options)} target, {len(id_options)} id, {len(covariate_options)} covariate")
success_msg = f"Successfully loaded {filename} ({len(result['data'])} rows, {len(result['data'].columns)} columns)"
logger.info(f"βœ“ {success_msg}")
logger.info("CALLBACK: handle_file_upload - EXIT (success)")
logger.info("=" * 80)
return (
result['metadata'],
format_upload_status('success', success_msg),
{'display': 'block'},
date_options,
target_options,
id_options,
covariate_options
)
except Exception as e:
logger.error(f"βœ— EXCEPTION in handle_file_upload: {str(e)}", exc_info=True)
logger.info("CALLBACK: handle_file_upload - EXIT (exception)")
logger.info("=" * 80)
return None, format_upload_status('error', f"Error: {str(e)}", True), {'display': 'none'}, [], [], [], []
# Callback: Load sample data
@app.callback(
[Output('uploaded-data-store', 'data', allow_duplicate=True),
Output('upload-status', 'children', allow_duplicate=True),
Output('column-selector-card', 'style', allow_duplicate=True),
Output('date-column-dropdown', 'options', allow_duplicate=True),
Output('target-column-dropdown', 'options', allow_duplicate=True),
Output('id-column-dropdown', 'options', allow_duplicate=True),
Output('covariate-columns-dropdown', 'options', allow_duplicate=True)],
[Input('load-weather', 'n_clicks'),
Input('load-airquality', 'n_clicks'),
Input('load-bitcoin', 'n_clicks'),
Input('load-stock', 'n_clicks'),
Input('load-traffic', 'n_clicks'),
Input('load-electricity', 'n_clicks')],
prevent_initial_call=True
)
def load_sample_data(weather_clicks, airquality_clicks, bitcoin_clicks, stock_clicks, traffic_clicks, electricity_clicks):
"""Load sample datasets"""
ctx = callback_context
if not ctx.triggered:
return None, '', {'display': 'none'}, [], [], [], []
button_id = ctx.triggered[0]['prop_id'].split('.')[0]
# Map button to filename
sample_files = {
'load-weather': 'weather_stations.csv',
'load-airquality': 'air_quality_uci.csv',
'load-bitcoin': 'bitcoin_price.csv',
'load-stock': 'stock_sp500.csv',
'load-traffic': 'traffic_speeds.csv',
'load-electricity': 'electricity_consumption.csv'
}
filename = sample_files.get(button_id)
if not filename:
return None, '', {'display': 'none'}, [], [], [], []
try:
# Load sample file
filepath = f"{CONFIG['datasets_folder']}/{filename}"
with open(filepath, 'rb') as f:
contents = f.read()
result = data_processor.load_file(contents, filename)
if result['status'] == 'error':
return None, format_upload_status('error', result['error'], True), {'display': 'none'}, [], [], [], []
# Get column information
col_info = data_processor.get_column_info()
date_options = [{'label': col, 'value': col} for col in col_info['date_columns']]
target_options = [{'label': col, 'value': col} for col in col_info['numeric_columns']]
id_options = [{'label': col, 'value': col} for col in col_info['all_columns']]
covariate_options = [{'label': col, 'value': col} for col in col_info['numeric_columns']]
success_msg = f"Loaded sample dataset: {filename}"
return (
result['metadata'],
format_upload_status('success', success_msg),
{'display': 'block'},
date_options,
target_options,
id_options,
covariate_options
)
except Exception as e:
logger.error(f"Error loading sample data: {str(e)}", exc_info=True)
error_msg = f"Sample data not found. Please ensure datasets folder exists: {CONFIG['datasets_folder']}"
return None, format_upload_status('warning', error_msg), {'display': 'none'}, [], [], [], []
# Callback: Handle forecasting mode changes
@app.callback(
[Output('covariate-section', 'style'),
Output('target-help-text', 'children')],
Input('forecasting-mode', 'value')
)
def update_forecasting_mode(mode):
"""Update UI based on selected forecasting mode"""
if mode == 'univariate':
return (
{'display': 'none'},
'Select ONE target variable (multi-select available, but use only one for univariate)'
)
elif mode == 'multivariate':
return (
{'display': 'none'},
'Select MULTIPLE target variables to forecast together'
)
else: # covariate-informed
return (
{'display': 'block'},
'Select target variable(s) to forecast (can select multiple)'
)
# Callback: Handle backtest enable/disable
@app.callback(
Output('backtest-controls', 'style'),
Input('backtest-enable', 'value')
)
def toggle_backtest_controls(backtest_enabled):
"""Show/hide backtest controls based on checkbox"""
if 'enabled' in backtest_enabled:
return {'display': 'block'}
return {'display': 'none'}
# Callback: Update data preview and quality report
@app.callback(
[Output('data-preview-container', 'children'),
Output('data-quality-report', 'children'),
Output('processed-data-store', 'data'),
Output('generate-forecast-btn', 'disabled')],
[Input('date-column-dropdown', 'value'),
Input('target-column-dropdown', 'value'),
Input('forecasting-mode', 'value'),
Input('covariate-columns-dropdown', 'value')],
State('id-column-dropdown', 'value')
)
def update_preview_and_process(date_col, target_col, mode, covariate_cols, id_col):
"""Update data preview and process data when columns are selected"""
logger.info("=" * 80)
logger.info("CALLBACK: update_preview_and_process - ENTRY")
logger.info(f"date_col: {date_col}")
logger.info(f"target_col: {target_col}")
logger.info(f"mode: {mode}")
logger.info(f"covariate_cols: {covariate_cols}")
logger.info(f"id_col: {id_col}")
logger.info("=" * 80)
if not date_col or not target_col:
logger.warning(f"Missing required columns - date_col: {date_col}, target_col: {target_col}")
return '', '', None, True
try:
# Ensure target_col is a list for consistency
if not isinstance(target_col, list):
target_col = [target_col] if target_col else []
# Ensure covariate_cols is a list
if covariate_cols and not isinstance(covariate_cols, list):
covariate_cols = [covariate_cols]
# Validate column selection
# For multivariate, validate each target column
for t_col in target_col:
validation = validate_column_selection(data_processor.data, date_col, t_col)
if not validation['valid']:
error_msg = ' '.join(validation['issues'])
return format_upload_status('error', error_msg, True), '', None, True
# Show preview
preview = create_data_preview_table(data_processor.data)
# Process data - pass target columns based on mode
# For univariate: single target, for multivariate: list of targets
if mode == 'univariate':
target_to_process = target_col[0] # Single target string
else:
target_to_process = target_col # List of targets for multivariate
result = data_processor.preprocess(
date_column=date_col,
target_column=target_to_process,
id_column=id_col,
forecast_horizon=30
)
if result['status'] == 'error':
return preview, format_upload_status('error', result['error'], True), None, True
# Show quality report
quality_report = create_quality_report(result['quality_report'])
# Store processed data with forecasting mode and columns
processed_data = {
'data': result['data'].to_json(date_format='iso'),
'quality_report': result['quality_report'],
'forecasting_mode': mode,
'target_columns': target_col,
'covariate_columns': covariate_cols if covariate_cols else [],
'date_column': date_col,
'id_column': id_col
}
return preview, quality_report, processed_data, False
except Exception as e:
logger.error(f"Error in preview/process: {str(e)}", exc_info=True)
return '', format_upload_status('error', f"Error: {str(e)}", True), None, True
# Callback: Generate forecast
@app.callback(
[Output('forecast-chart', 'figure'),
Output('metrics-display', 'children'),
Output('results-card', 'style'),
Output('loading-output', 'children')],
Input('generate-forecast-btn', 'n_clicks'),
[State('processed-data-store', 'data'),
State('horizon-slider', 'value'),
State('confidence-checklist', 'value'),
State('backtest-enable', 'value'),
State('backtest-size-slider', 'value')],
prevent_initial_call=True
)
def generate_forecast(n_clicks, processed_data, horizon, confidence_levels, backtest_enabled, backtest_size):
"""Generate forecast using the Chronos model, optionally with backtesting"""
logger.info("=" * 80)
logger.info("CALLBACK: generate_forecast - ENTRY")
logger.info(f"n_clicks: {n_clicks}")
logger.info(f"horizon: {horizon}")
logger.info(f"confidence_levels: {confidence_levels}")
logger.info(f"processed_data is None: {processed_data is None}")
logger.info("=" * 80)
if not processed_data or not n_clicks:
logger.warning(f"Early return - processed_data exists: {processed_data is not None}, n_clicks: {n_clicks}")
return create_empty_chart(), '', {'display': 'none'}, ''
try:
# Load processed data
logger.info("Loading processed data from JSON...")
df = pd.read_json(processed_data['data'])
logger.info(f"Loaded DataFrame: shape={df.shape}, columns={df.columns.tolist()}")
# Get forecasting mode and metadata
mode = processed_data.get('forecasting_mode', 'univariate')
target_columns = processed_data.get('target_columns', [])
covariate_columns = processed_data.get('covariate_columns', [])
logger.info(f"Forecasting mode: {mode}")
logger.info(f"Target columns: {target_columns}")
logger.info(f"Covariate columns: {covariate_columns}")
# Validate parameters
logger.info("Validating forecast parameters...")
validation = validate_forecast_parameters(horizon, confidence_levels, len(df))
logger.info(f"Validation result: {validation}")
if not validation['valid']:
error_msg = ' '.join(validation['issues'])
logger.error(f"βœ— Validation failed: {error_msg}")
return create_empty_chart(error_msg), '', {'display': 'none'}, ''
# Perform backtesting if enabled
backtest_df = None
backtest_metrics = None
if backtest_enabled and 'enabled' in backtest_enabled:
logger.info(f"Backtesting enabled with test_size={backtest_size}")
backtest_result = model_service.backtest(
data=df,
test_size=min(backtest_size, len(df) // 3), # Ensure we have enough training data
forecast_horizon=horizon,
confidence_levels=confidence_levels
)
if backtest_result['status'] == 'success':
backtest_df = backtest_result['backtest_data']
backtest_metrics = backtest_result['metrics']
logger.info(f"βœ“ Backtest completed: {backtest_metrics}")
else:
logger.warning(f"Backtest failed: {backtest_result.get('error', 'Unknown error')}")
# Generate forecast
logger.info(f"Calling model_service.predict() - horizon={horizon}, confidence={confidence_levels}, mode={mode}")
logger.info(f"Model service state: is_loaded={model_service.is_loaded}, variant={model_service.model_variant}")
forecast_result = model_service.predict(
data=df,
horizon=horizon,
confidence_levels=confidence_levels
)
logger.info(f"Forecast result status: {forecast_result['status']}")
if forecast_result['status'] == 'error':
logger.error(f"βœ— Forecast generation failed: {forecast_result['error']}")
return create_empty_chart(f"Forecast failed: {forecast_result['error']}"), '', {'display': 'none'}, ''
# Get forecast data
forecast_df = forecast_result['forecast']
logger.info(f"Forecast DataFrame shape: {forecast_df.shape}, columns: {forecast_df.columns.tolist()}")
# Decimate data if too large
logger.info("Decimating data for chart...")
historical_decimated = decimate_data(df, MAX_CHART_POINTS // 2)
forecast_decimated = decimate_data(forecast_df, MAX_CHART_POINTS // 2)
logger.info(f"Decimated - historical: {len(historical_decimated)}, forecast: {len(forecast_decimated)}")
# Prepare data for chart (rename Chronos 2 columns to chart format)
logger.info("Renaming columns for chart...")
historical_for_chart = historical_decimated.rename(columns={
'timestamp': 'ds',
'target': 'y'
})
logger.info(f"Historical chart data columns: {historical_for_chart.columns.tolist()}")
# Create chart title and labels based on target columns
logger.info("Creating forecast chart...")
primary_target = target_columns[0] if target_columns else 'Target'
if mode == 'multivariate' and len(target_columns) > 1:
chart_title = f"Forecast: {primary_target} (with {', '.join(target_columns[1:])} as covariates)"
y_label = primary_target
elif covariate_columns:
chart_title = f"Forecast: {primary_target} (with covariates)"
y_label = primary_target
else:
chart_title = f"Forecast: {primary_target}"
y_label = primary_target
fig = create_forecast_chart(
historical_data=historical_for_chart,
forecast_data=forecast_decimated,
confidence_levels=confidence_levels,
title=chart_title,
y_axis_label=y_label,
backtest_data=backtest_df
)
logger.info(f"Chart created: {type(fig)}")
# Create metrics display
metrics = {
'inference_time': forecast_result['inference_time'],
'data_points': len(df),
'horizon': horizon
}
logger.info(f"Creating metrics display: {metrics}")
# Add backtest metrics if available
if backtest_metrics:
metrics_components = dbc.Row([
dbc.Col(create_metrics_display(metrics, forecast_result['inference_time']), md=6),
dbc.Col(create_backtest_metrics_display(backtest_metrics), md=6)
])
else:
metrics_components = dbc.Row(create_metrics_display(
metrics,
forecast_result['inference_time']
))
logger.info("βœ“ Forecast generation successful - returning chart and metrics")
logger.info("CALLBACK: generate_forecast - EXIT (success)")
logger.info("=" * 80)
return fig, metrics_components, {'display': 'block'}, ''
except Exception as e:
logger.error(f"βœ— EXCEPTION in generate_forecast: {str(e)}", exc_info=True)
logger.info("CALLBACK: generate_forecast - EXIT (exception)")
logger.info("=" * 80)
return create_empty_chart(f"Error: {str(e)}"), '', {'display': 'none'}, ''
# Health check endpoint
@app.server.route('/health')
def health_check():
"""Health check endpoint for deployment monitoring"""
status = {
'status': 'healthy' if model_service.is_loaded else 'degraded',
'model_loaded': model_service.is_loaded,
'model_variant': model_service.model_variant,
'device': model_service.device
}
return status
# Run the app
if __name__ == '__main__':
# Setup directories
setup_directories()
logger.info(f"Starting Chronos 2 Forecasting App")
logger.info(f"Configuration: {CONFIG}")
# Get host and port from environment variables (for HuggingFace Spaces, Render, etc.)
import os
host = os.getenv('HOST', '127.0.0.1')
port = int(os.getenv('PORT', '7860')) # 7860 is HuggingFace Spaces default
debug = os.getenv('DEBUG', 'True').lower() == 'true'
# Run the app
app.run_server(
host=host,
port=port,
debug=debug
)