|
|
""" |
|
|
Data preprocessing pipeline for time series data |
|
|
""" |
|
|
|
|
|
import logging |
|
|
from typing import Dict, List, Optional, Tuple, Any |
|
|
import pandas as pd |
|
|
import numpy as np |
|
|
from io import BytesIO |
|
|
|
|
|
from config.constants import ( |
|
|
DATE_FORMATS, |
|
|
MAX_MISSING_PERCENT, |
|
|
MIN_DATA_POINTS_MULTIPLIER, |
|
|
ALLOWED_EXTENSIONS |
|
|
) |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
class DataProcessor: |
|
|
""" |
|
|
Handles all data preprocessing tasks for time series forecasting |
|
|
""" |
|
|
|
|
|
def __init__(self): |
|
|
self.data = None |
|
|
self.original_data = None |
|
|
self.metadata = {} |
|
|
|
|
|
def _timedelta_to_freq_string(self, td: pd.Timedelta) -> str: |
|
|
""" |
|
|
Convert a Timedelta to a pandas frequency string |
|
|
|
|
|
Args: |
|
|
td: Timedelta object |
|
|
|
|
|
Returns: |
|
|
Frequency string (e.g., 'H', 'D', '5min', etc.) |
|
|
""" |
|
|
total_seconds = td.total_seconds() |
|
|
|
|
|
|
|
|
if total_seconds == 0: |
|
|
return 'D' |
|
|
elif total_seconds % 604800 == 0: |
|
|
weeks = int(total_seconds / 604800) |
|
|
return f'{weeks}W' if weeks > 1 else 'W' |
|
|
elif total_seconds % 86400 == 0: |
|
|
days = int(total_seconds / 86400) |
|
|
return f'{days}D' if days > 1 else 'D' |
|
|
elif total_seconds % 3600 == 0: |
|
|
hours = int(total_seconds / 3600) |
|
|
return f'{hours}H' if hours > 1 else 'H' |
|
|
elif total_seconds % 60 == 0: |
|
|
minutes = int(total_seconds / 60) |
|
|
return f'{minutes}min' if minutes > 1 else 'min' |
|
|
elif total_seconds % 1 == 0: |
|
|
seconds = int(total_seconds) |
|
|
return f'{seconds}s' if seconds > 1 else 's' |
|
|
else: |
|
|
|
|
|
logger.warning(f"Irregular frequency detected ({td}), defaulting to Daily") |
|
|
return 'D' |
|
|
|
|
|
def load_file(self, contents: bytes, filename: str) -> Dict[str, Any]: |
|
|
""" |
|
|
Load data from uploaded file |
|
|
|
|
|
Args: |
|
|
contents: File contents as bytes |
|
|
filename: Original filename |
|
|
|
|
|
Returns: |
|
|
Dictionary with status and data/error |
|
|
""" |
|
|
try: |
|
|
|
|
|
extension = filename.split('.')[-1].lower() |
|
|
|
|
|
if extension not in ALLOWED_EXTENSIONS: |
|
|
return { |
|
|
'status': 'error', |
|
|
'error': f'Invalid file type. Allowed: {", ".join(ALLOWED_EXTENSIONS)}' |
|
|
} |
|
|
|
|
|
|
|
|
if extension == 'csv': |
|
|
self.data = pd.read_csv(BytesIO(contents)) |
|
|
elif extension in ['xlsx', 'xls']: |
|
|
self.data = pd.read_excel(BytesIO(contents)) |
|
|
|
|
|
self.original_data = self.data.copy() |
|
|
|
|
|
logger.info(f"Loaded file {filename} with shape {self.data.shape}") |
|
|
|
|
|
|
|
|
self.metadata = { |
|
|
'filename': filename, |
|
|
'rows': len(self.data), |
|
|
'columns': list(self.data.columns), |
|
|
'dtypes': {col: str(dtype) for col, dtype in self.data.dtypes.items()} |
|
|
} |
|
|
|
|
|
return { |
|
|
'status': 'success', |
|
|
'data': self.data, |
|
|
'metadata': self.metadata |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Failed to load file {filename}: {str(e)}", exc_info=True) |
|
|
return { |
|
|
'status': 'error', |
|
|
'error': f'Failed to load file: {str(e)}' |
|
|
} |
|
|
|
|
|
def validate_data( |
|
|
self, |
|
|
date_column: str, |
|
|
target_column: str, |
|
|
id_column: Optional[str] = None |
|
|
) -> Dict[str, Any]: |
|
|
""" |
|
|
Validate the selected columns and data quality |
|
|
|
|
|
Args: |
|
|
date_column: Name of the date/time column |
|
|
target_column: Name of the target variable column |
|
|
id_column: Optional ID column for multivariate series |
|
|
|
|
|
Returns: |
|
|
Validation result dictionary |
|
|
""" |
|
|
try: |
|
|
issues = [] |
|
|
warnings = [] |
|
|
|
|
|
|
|
|
if date_column not in self.data.columns: |
|
|
issues.append(f"Date column '{date_column}' not found") |
|
|
if target_column not in self.data.columns: |
|
|
issues.append(f"Target column '{target_column}' not found") |
|
|
if id_column and id_column not in self.data.columns: |
|
|
issues.append(f"ID column '{id_column}' not found") |
|
|
|
|
|
if issues: |
|
|
return {'status': 'error', 'issues': issues} |
|
|
|
|
|
|
|
|
missing_pct = (self.data[target_column].isna().sum() / len(self.data)) * 100 |
|
|
if missing_pct > MAX_MISSING_PERCENT: |
|
|
warnings.append( |
|
|
f"Target column has {missing_pct:.1f}% missing values (>{MAX_MISSING_PERCENT}%)" |
|
|
) |
|
|
|
|
|
|
|
|
if not pd.api.types.is_numeric_dtype(self.data[target_column]): |
|
|
issues.append(f"Target column must be numeric, found {self.data[target_column].dtype}") |
|
|
|
|
|
|
|
|
try: |
|
|
_ = pd.to_datetime(self.data[date_column]) |
|
|
except Exception as e: |
|
|
issues.append(f"Cannot parse date column: {str(e)}") |
|
|
|
|
|
if issues: |
|
|
return {'status': 'error', 'issues': issues, 'warnings': warnings} |
|
|
|
|
|
return { |
|
|
'status': 'success', |
|
|
'warnings': warnings, |
|
|
'missing_pct': missing_pct |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Validation failed: {str(e)}", exc_info=True) |
|
|
return {'status': 'error', 'issues': [str(e)]} |
|
|
|
|
|
def preprocess( |
|
|
self, |
|
|
date_column: str, |
|
|
target_column: any, |
|
|
id_column: Optional[str] = None, |
|
|
forecast_horizon: int = 30, |
|
|
max_rows: int = 100000 |
|
|
) -> Dict[str, Any]: |
|
|
""" |
|
|
Complete preprocessing pipeline |
|
|
|
|
|
Args: |
|
|
date_column: Name of the date column |
|
|
target_column: Name of the target column (string) or list of target columns for multivariate |
|
|
id_column: Optional ID column |
|
|
forecast_horizon: Number of periods to forecast |
|
|
|
|
|
Returns: |
|
|
Processed data and metadata |
|
|
""" |
|
|
try: |
|
|
logger.info("Starting preprocessing pipeline") |
|
|
|
|
|
|
|
|
original_row_count = len(self.data) |
|
|
if original_row_count > max_rows: |
|
|
logger.warning(f"Dataset has {original_row_count} rows, sampling to {max_rows} for performance") |
|
|
|
|
|
self.data = self.data.tail(max_rows).reset_index(drop=True) |
|
|
|
|
|
|
|
|
logger.info("Parsing dates...") |
|
|
self.data[date_column] = pd.to_datetime(self.data[date_column]) |
|
|
|
|
|
|
|
|
self.data = self.data.sort_values(date_column).reset_index(drop=True) |
|
|
|
|
|
|
|
|
duplicate_count = self.data[date_column].duplicated().sum() |
|
|
if duplicate_count > 0: |
|
|
logger.warning(f"Found {duplicate_count} duplicate timestamps, keeping first occurrence") |
|
|
self.data = self.data.drop_duplicates(subset=[date_column], keep='first').reset_index(drop=True) |
|
|
|
|
|
|
|
|
logger.info("Detecting frequency...") |
|
|
freq = pd.infer_freq(self.data[date_column]) |
|
|
if freq is None: |
|
|
|
|
|
diffs = self.data[date_column].diff().dropna() |
|
|
if len(diffs) > 0: |
|
|
|
|
|
mode_diff = diffs.mode() |
|
|
if len(mode_diff) > 0 and mode_diff[0] != pd.Timedelta(0): |
|
|
|
|
|
td = mode_diff[0] |
|
|
freq = self._timedelta_to_freq_string(td) |
|
|
logger.warning(f"Could not auto-detect frequency, inferred from mode: {freq}") |
|
|
else: |
|
|
freq = 'D' |
|
|
logger.warning("Using default frequency: Daily") |
|
|
else: |
|
|
freq = 'D' |
|
|
logger.warning("Using default frequency: Daily") |
|
|
|
|
|
|
|
|
|
|
|
target_columns = [target_column] if isinstance(target_column, str) else target_column |
|
|
logger.info(f"Processing {len(target_columns)} target column(s): {target_columns}") |
|
|
|
|
|
logger.info("Handling missing values...") |
|
|
total_missing_count = 0 |
|
|
|
|
|
for tcol in target_columns: |
|
|
missing_count = self.data[tcol].isna().sum() |
|
|
total_missing_count += missing_count |
|
|
|
|
|
if missing_count > 0: |
|
|
|
|
|
self.data[tcol] = self.data[tcol].ffill(limit=5) |
|
|
|
|
|
|
|
|
self.data[tcol] = self.data[tcol].interpolate(method='linear') |
|
|
|
|
|
|
|
|
self.data[tcol] = self.data[tcol].bfill() |
|
|
|
|
|
logger.info(f"Filled {missing_count} missing values in '{tcol}'") |
|
|
|
|
|
|
|
|
logger.info("Detecting outliers...") |
|
|
primary_target = target_columns[0] |
|
|
Q1 = self.data[primary_target].quantile(0.25) |
|
|
Q3 = self.data[primary_target].quantile(0.75) |
|
|
IQR = Q3 - Q1 |
|
|
outlier_mask = ( |
|
|
(self.data[primary_target] < (Q1 - 3 * IQR)) | |
|
|
(self.data[primary_target] > (Q3 + 3 * IQR)) |
|
|
) |
|
|
outlier_count = outlier_mask.sum() |
|
|
|
|
|
|
|
|
min_required = forecast_horizon * MIN_DATA_POINTS_MULTIPLIER |
|
|
if len(self.data) < min_required: |
|
|
return { |
|
|
'status': 'error', |
|
|
'error': f'Insufficient data. Need at least {min_required} points for {forecast_horizon}-period forecast.' |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
processed_df = pd.DataFrame({ |
|
|
'id': self.data[id_column] if id_column else 'series_1', |
|
|
'timestamp': self.data[date_column], |
|
|
'target': self.data[target_columns[0]].astype(float) |
|
|
}) |
|
|
|
|
|
|
|
|
if len(target_columns) > 1: |
|
|
logger.info(f"Adding {len(target_columns)-1} additional target column(s) as covariates") |
|
|
for tcol in target_columns[1:]: |
|
|
processed_df[tcol] = self.data[tcol].astype(float) |
|
|
|
|
|
|
|
|
quality_report = { |
|
|
'total_points': len(processed_df), |
|
|
'original_points': original_row_count, |
|
|
'sampled': original_row_count > max_rows, |
|
|
'date_range': { |
|
|
'start': processed_df['timestamp'].min().strftime('%Y-%m-%d'), |
|
|
'end': processed_df['timestamp'].max().strftime('%Y-%m-%d') |
|
|
}, |
|
|
'frequency': str(freq), |
|
|
'missing_filled': total_missing_count, |
|
|
'outliers_detected': outlier_count, |
|
|
'duplicates_removed': duplicate_count if duplicate_count > 0 else 0, |
|
|
'target_columns': target_columns, |
|
|
'statistics': { |
|
|
'mean': float(processed_df['target'].mean()), |
|
|
'std': float(processed_df['target'].std()), |
|
|
'min': float(processed_df['target'].min()), |
|
|
'max': float(processed_df['target'].max()) |
|
|
} |
|
|
} |
|
|
|
|
|
logger.info("Preprocessing completed successfully") |
|
|
|
|
|
return { |
|
|
'status': 'success', |
|
|
'data': processed_df, |
|
|
'quality_report': quality_report, |
|
|
'frequency': freq |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Preprocessing failed: {str(e)}", exc_info=True) |
|
|
return { |
|
|
'status': 'error', |
|
|
'error': str(e) |
|
|
} |
|
|
|
|
|
def get_column_info(self) -> Dict[str, List[str]]: |
|
|
""" |
|
|
Get information about columns for UI dropdowns |
|
|
|
|
|
Returns: |
|
|
Dictionary with potential date and numeric columns |
|
|
""" |
|
|
if self.data is None: |
|
|
return {'date_columns': [], 'numeric_columns': [], 'all_columns': []} |
|
|
|
|
|
date_columns = [] |
|
|
numeric_columns = [] |
|
|
|
|
|
for col in self.data.columns: |
|
|
|
|
|
if self.data[col].dtype == 'object': |
|
|
|
|
|
try: |
|
|
pd.to_datetime(self.data[col].iloc[:5]) |
|
|
date_columns.append(col) |
|
|
except: |
|
|
pass |
|
|
elif pd.api.types.is_datetime64_any_dtype(self.data[col]): |
|
|
date_columns.append(col) |
|
|
|
|
|
|
|
|
if pd.api.types.is_numeric_dtype(self.data[col]): |
|
|
numeric_columns.append(col) |
|
|
|
|
|
return { |
|
|
'date_columns': date_columns, |
|
|
'numeric_columns': numeric_columns, |
|
|
'all_columns': list(self.data.columns) |
|
|
} |
|
|
|
|
|
def get_preview(self, n_rows: int = 10) -> pd.DataFrame: |
|
|
""" |
|
|
Get a preview of the data |
|
|
|
|
|
Args: |
|
|
n_rows: Number of rows to return |
|
|
|
|
|
Returns: |
|
|
DataFrame preview |
|
|
""" |
|
|
if self.data is None: |
|
|
return pd.DataFrame() |
|
|
|
|
|
return self.data.head(n_rows) |
|
|
|
|
|
|
|
|
|
|
|
data_processor = DataProcessor() |
|
|
|