Source code for Hydrological_model_validator.Processing.Efficiency_metrics

###############################################################################
##                                                                           ##
##                               LIBRARIES                                   ##
##                                                                           ##
###############################################################################

# Data handling libraries
import numpy as np
import xarray as xr
import pandas as pd
from typing import Literal, Tuple, Dict, Union, List, Sequence

# Logging and tracing
import logging
from eliot import start_action, log_message

# Module utilities and stats functions
from Hydrological_model_validator.Processing.time_utils import Timer
from Hydrological_model_validator.Processing.stats_math_utils import (
    mean_bias,
    standard_deviation_error,
    std_dev,
    cross_correlation,
    unbiased_rmse,
)

###############################################################################
##                                                                           ##
##                               FUNCTIONS                                   ##
##                                                                           ##
###############################################################################


[docs] def r_squared(obs: Union[np.ndarray, Sequence[float]], pred: Union[np.ndarray, Sequence[float]]) -> float: """ Calculate the coefficient of determination (r²) between observed and predicted data. Parameters ---------- obs : np.ndarray Array of observed values. pred : np.ndarray Array of predicted values. Returns ------- float The coefficient of determination (r²), which quantifies how well predictions approximate the observed values. Returns np.nan if fewer than 2 valid (non-NaN) pairs. Notes ----- - NaN values in either input array are ignored. - r² is the square of the Pearson correlation coefficient between obs and pred. - r² ranges from 0 (no correlation) to 1 (perfect linear correlation). Examples -------- >>> import numpy as np >>> obs = np.array([3.0, 4.5, 5.2, np.nan, 6.1]) >>> pred = np.array([2.8, 4.7, 5.0, 5.9, np.nan]) >>> r_squared(obs, pred) 0.9911... # Very high correlation with missing data ignored >>> obs = np.array([1.0, 2.0, 3.0]) >>> pred = np.array([1.1, 1.9, 3.1]) >>> r_squared(obs, pred) 0.9983... >>> obs = np.array([np.nan, np.nan]) >>> pred = np.array([1.0, 2.0]) >>> r_squared(obs, pred) nan """ # ===== INPUT VALIDATION ===== if obs is None or pred is None: raise ValueError("❌ Input arrays 'obs' and 'pred' must not be None. ❌") obs = np.asarray(obs) pred = np.asarray(pred) if obs.shape != pred.shape: raise ValueError(f"❌ Shape mismatch: 'obs' has shape {obs.shape}, but 'pred' has shape {pred.shape}. They must match.") if obs.ndim != 1: raise ValueError("❌ Inputs must be one-dimensional arrays.") # ===== COMPUTATIONS AND LOGGING ===== with Timer("r_squared function"): with start_action(action_type="r_squared") as action: log_message("Entered r_squared", obs_shape=np.shape(obs), pred_shape=np.shape(pred)) logging.info("[Start] r_squared calculation") # Create a mask to ignore any NaN values in either array mask = ~np.isnan(obs) & ~np.isnan(pred) if np.sum(mask) < 2: logging.info("[Info] Not enough valid data points, returning np.nan") log_message("Insufficient valid data for r_squared", valid_points=int(np.sum(mask))) return np.nan # ===== COMPUTATIONS ===== corr = np.corrcoef(obs[mask], pred[mask])[0, 1] r2 = corr ** 2 log_message("Computed r_squared", r_squared=r2) logging.info(f"[Done] r_squared computed: {r2}") return r2
############################################################################### ###############################################################################
[docs] def monthly_r_squared(data_dict: Dict[str, Dict[int, List[Union[np.ndarray, List[float]]]]]) -> List[float]: """ Compute monthly R² values between model and satellite datasets over multiple years. Parameters ---------- data_dict : dict Dictionary with structure: { 'BASSTmod': {year1: [12 arrays], year2: [...], ...}, 'BASSTsat': {year1: [12 arrays], year2: [...], ...} } Keys should contain 'mod' for model and 'sat' for satellite data. Each value is a dictionary mapping years to lists of 12 monthly 2D arrays. Returns ------- list of float List of 12 R² values (one for each month from January to December). Notes ----- - This function concatenates data across all years for each month and calculates the R² between the model and satellite data for that month. - NaN values are excluded from the computation. - If no valid data exists for a month, the R² for that month is set to np.nan. Examples -------- >>> mod_data = { ... 2000: [np.array([[1, 2], [3, 4]]) for _ in range(12)], ... 2001: [np.array([[2, 3], [4, 5]]) for _ in range(12)] ... } >>> sat_data = { ... 2000: [np.array([[1, 2.1], [3.1, 4]]) for _ in range(12)], ... 2001: [np.array([[2.2, 3], [4.1, 5.1]]) for _ in range(12)] ... } >>> data_dict = {'BASSTmod': mod_data, 'BASSTsat': sat_data} >>> monthly_r_squared(data_dict) [0.999..., 0.999..., ..., 0.999...] # 12 values """ # ===== INPUT VALIDATION ===== if not isinstance(data_dict, dict): raise TypeError("❌ Input must be a dictionary.") # Find keys for model and satellite data by searching for 'mod' and 'sat' substrings # Define keyword groups for matching model_keywords = ['sim', 'simulated', 'model', 'mod'] sat_keywords = ['obs', 'observed', 'sat', 'satellite'] # Try to find keys that match each group model_key = next((k for k in data_dict if any(kw in k.lower() for kw in model_keywords)), None) sat_key = next((k for k in data_dict if any(kw in k.lower() for kw in sat_keywords)), None) # Raise error if keys are not found to avoid silent failures if model_key is None or sat_key is None: raise KeyError("❌ Model or satellite key not found in the dictionary. Expected keys containing 'mod' and 'sat'.") mod_monthly = data_dict[model_key] sat_monthly = data_dict[sat_key] # Ensure model and satellite data are dictionaries if not isinstance(mod_monthly, dict) or not isinstance(sat_monthly, dict): raise TypeError("❌ Model and satellite values must be dictionaries mapping years to lists of arrays.") # Ensure both datasets have the same years mod_years = set(mod_monthly.keys()) sat_years = set(sat_monthly.keys()) if mod_years != sat_years: raise ValueError(f"❌ Mismatched years between model and satellite data: {mod_years ^ sat_years}") years = sorted(mod_years) # List all available years from model data # Determine number of months dynamically from the first year in model data first_year = years[0] n_months = len(mod_monthly[first_year]) # ===== COMPUTATION AND LOGGING ===== with Timer("monthly_r_squared function"): with start_action(action_type="monthly_r_squared") as action: log_message("Entered monthly_r_squared", years=years, n_months=n_months) logging.info("[Start] monthly_r_squared computation") r2_monthly = [] # Initialize list to store R² for each month # ===== LOOPING THE COMPUTATIONS ===== for month in range(n_months): # Extract monthly data arrays from all years and flatten to 1D for comparison mod_arrays = [] sat_arrays = [] for year in years: mod_list = mod_monthly[year] sat_list = sat_monthly[year] mod_arrays.append(np.asarray(mod_list[month]).ravel()) sat_arrays.append(np.asarray(sat_list[month]).ravel()) # Concatenate monthly arrays from all years into single long arrays mod_concat = np.concatenate(mod_arrays) sat_concat = np.concatenate(sat_arrays) # Create mask to ignore NaNs in either dataset valid_mask = ~np.isnan(mod_concat) & ~np.isnan(sat_concat) if np.any(valid_mask): # Compute R² using valid data points r2 = r_squared(mod_concat[valid_mask], sat_concat[valid_mask]) else: # If no valid data, assign NaN to indicate missing correlation r2 = np.nan log_message("Computed monthly r_squared", month=month + 1, r_squared=r2) logging.info(f"Month {month + 1}: R² = {r2}") r2_monthly.append(r2) # Append monthly R² to list logging.info("[Done] monthly_r_squared computation completed") log_message("Completed monthly_r_squared", total_months=len(r2_monthly)) return r2_monthly
############################################################################### ###############################################################################
[docs] def weighted_r_squared(obs: Union[np.ndarray, list], pred: Union[np.ndarray, list]) -> float: """ Compute weighted coefficient of determination (weighted R²) between observed and predicted data. The weighting accounts for the slope of the regression line between predicted and observed values, emphasizing cases where the slope is close to 1 (ideal 1:1 relation). Parameters ---------- obs : array-like Observed values. pred : array-like Predicted values. Returns ------- float Weighted R² value or np.nan if insufficient data. Notes ----- - The function first computes the standard R² between obs and pred. - It then fits a linear regression line pred = slope * obs + intercept. - The absolute value of the slope is used to weight the R²: - If slope is close to 1, weight ≈ 1 (no change). - If slope deviates far from 1, weight is reduced, clipped between 0.1 and 1. - This penalizes cases where prediction trends differ substantially from observations, even if correlation is high. - A small floor of 0.1 prevents the weighted R² from becoming zero or negligible when the slope is near zero. Examples -------- >>> obs = np.array([1, 2, 3, 4, 5]) >>> pred = np.array([1.1, 2.1, 2.9, 4.2, 4.8]) >>> weighted_r_squared(obs, pred) 0.98 # example output (actual value depends on data) """ # ===== INPUT VALIDATION ===== if obs is None or pred is None: raise ValueError("❌ Input arrays 'obs' and 'pred' must not be None. ❌") obs = np.asarray(obs) pred = np.asarray(pred) if obs.shape != pred.shape: raise ValueError(f"❌ Input arrays must have the same shape, got {obs.shape} and {pred.shape}. ❌") # Create mask to ignore NaNs in either obs or pred mask = ~np.isnan(obs) & ~np.isnan(pred) # Return NaN if fewer than 2 valid data points (insufficient for regression) if np.sum(mask) < 2: return np.nan x = obs[mask] y = pred[mask] # ===== COMPUTATION AND LOGGING ===== with Timer("weighted_r_squared function"): with start_action(action_type="weighted_r_squared") as action: log_message("Entered weighted_r_squared", valid_points=len(x)) logging.info("[Start] weighted_r_squared computation") # Compute standard R² between observed and predicted for valid data r2 = r_squared(x, y) # Fit a linear regression line: pred = slope * obs + intercept slope, intercept = np.polyfit(x, y, 1) # Calculate weight based on how close slope is to 1: # If slope > 1, invert it to keep weight <= 1, # otherwise use slope directly (absolute value) slope_abs = abs(slope) weight = slope_abs if slope_abs <= 1 else 1 / slope_abs # Set minimum weight to 0.1 to avoid zero or near-zero weights weight = max(weight, 0.1) weighted_r2 = weight * r2 log_message( "Computed weighted_r_squared", slope=slope, weight=weight, r_squared=r2, weighted_r_squared=weighted_r2, ) logging.info( f"Weighted R²: {weighted_r2:.4f} (Slope: {slope:.4f}, Weight: {weight:.4f})" ) logging.info("[Done] weighted_r_squared computation completed") log_message("Completed weighted_r_squared") return weighted_r2
############################################################################### ###############################################################################
[docs] def monthly_weighted_r_squared(dictionary: Dict[str, Dict[int, List[Union[np.ndarray, List[float]]]]]) -> List[float]: """ Compute weighted coefficient of determination (weighted R²) for each calendar month across multiple years, using paired model and satellite datasets. The weighting adjusts the R² based on the slope of the linear regression between predicted (model) and observed (satellite) values, emphasizing months where the relationship is closer to a 1:1 correspondence. Parameters ---------- dictionary : dict Dictionary containing keys with substrings 'mod' and 'sat' representing model and satellite data, respectively. Each key maps to a dictionary of years, where each year corresponds to a list or array of 12 monthly arrays of data points. Returns ------- list of float List of 12 weighted R² values, each representing one calendar month (January to December). Raises ------ KeyError Raised if no keys containing 'mod' or 'sat' are found in the input dictionary. Notes ----- - For each month, data from all years are concatenated to form a single paired dataset of model and satellite values. - NaN values in either dataset are excluded from the calculations. - The weighted R² combines the classical coefficient of determination with a weighting factor derived from the slope of the regression line between satellite (observed) and model (predicted) data. - This method penalizes cases where the prediction slope deviates significantly from unity, even if correlation is high. Examples -------- >>> data = { ... 'model': { ... 2000: [np.array([1, 2]), np.array([3, 4])] * 6, ... 2001: [np.array([2, 3]), np.array([4, 5])] * 6, ... }, ... 'satellite': { ... 2000: [np.array([1.1, 2.1]), np.array([2.9, 4.1])] * 6, ... 2001: [np.array([2.2, 2.9]), np.array([3.8, 5.2])] * 6, ... } ... } >>> monthly_weighted_r_squared(data) [0.95, 0.92, ..., 0.90] """ from .Efficiency_metrics import weighted_r_squared # ===== INPUT VALIDATIONS ===== if not isinstance(dictionary, dict): raise ValueError("❌ Input must be a dictionary. ❌") # Define keyword groups for matching model_keywords = ['sim', 'simulated', 'model', 'mod'] sat_keywords = ['obs', 'observed', 'sat', 'satellite'] # Try to find keys that match each group model_key = next((k for k in dictionary if any(kw in k.lower() for kw in model_keywords)), None) sat_key = next((k for k in dictionary if any(kw in k.lower() for kw in sat_keywords)), None) if not model_key or not sat_key: raise KeyError("❌ Model or satellite key not found in the dictionary. ❌") mod_monthly = dictionary[model_key] sat_monthly = dictionary[sat_key] if not isinstance(mod_monthly, dict) or not isinstance(sat_monthly, dict): raise ValueError("❌ Model and satellite data must be dictionaries keyed by year. ❌") years = list(mod_monthly.keys()) if not years: raise ValueError("❌ No year data found in the model dataset. ❌") first_year = years[0] n_months = len(mod_monthly[first_year]) wr2_monthly = [] # ===== COMPUTATION AND LOGGING ===== with Timer("monthly_weighted_r_squared function"): with start_action(action_type="monthly_weighted_r_squared") as action: log_message("Entered monthly_weighted_r_squared", years=years, n_months=n_months) logging.info("[Start] monthly_weighted_r_squared computation") # ===== LOOPING THE COMPUTATIONS ===== for month in range(n_months): try: mod_all = np.concatenate([np.asarray(mod_monthly[year][month]).ravel() for year in years]) sat_all = np.concatenate([np.asarray(sat_monthly[year][month]).ravel() for year in years]) except IndexError: raise ValueError(f"❌ Month index {month} is out of bounds in one of the datasets. ❌") valid_mask = ~np.isnan(mod_all) & ~np.isnan(sat_all) if np.any(valid_mask): wr2 = weighted_r_squared(sat_all[valid_mask], mod_all[valid_mask]) else: wr2 = np.nan wr2_monthly.append(wr2) log_message("Computed monthly weighted_r_squared", month=month + 1, weighted_r_squared=wr2) logging.info(f"Month {month + 1}: Weighted R² = {wr2}") logging.info("[Done] monthly_weighted_r_squared computation completed") log_message("Completed monthly_weighted_r_squared", total_months=len(wr2_monthly)) return wr2_monthly
############################################################################### ###############################################################################
[docs] def nse(obs: Union[np.ndarray, Sequence[float]], pred: Union[np.ndarray, Sequence[float]]) -> float: """ Compute Nash–Sutcliffe Efficiency (NSE) between observed and predicted data. NSE is a normalized statistic that determines the relative magnitude of the residual variance ("noise") compared to the variance of the observed data ("signal"). It is widely used to assess the predictive skill of hydrological models. Parameters ---------- obs : array-like Observed values. pred : array-like Predicted values. Returns ------- float NSE value, ranging from -∞ to 1: - NSE = 1 indicates a perfect match between observed and predicted data. - NSE = 0 indicates that the model predictions are as accurate as the mean of the observations. - NSE < 0 indicates that the observed mean is a better predictor than the model. Returns np.nan if insufficient valid data or if variance of observed data is zero. Notes ----- - The function ignores pairs where either observation or prediction is NaN. - At least two valid data points are required to compute NSE. - NSE is sensitive to extreme values and assumes that observations are error-free. Examples -------- >>> obs = np.array([3, -0.5, 2, 7]) >>> pred = np.array([2.5, 0.0, 2, 8]) >>> nse(obs, pred) 0.8571428571428571 """ obs = np.asarray(obs) # Convert to numpy array for vectorized operations pred = np.asarray(pred) # ==== INPUT VALIDATION ===== if obs.shape != pred.shape: raise ValueError("❌ Input arrays must have the same shape. ❌") # Create a boolean mask to filter out any pairs where obs or pred is NaN, # since these invalid pairs would distort the NSE calculation mask = ~np.isnan(obs) & ~np.isnan(pred) # Require at least two valid pairs to compute a meaningful NSE; # otherwise return NaN since statistic can't be computed reliably if np.sum(mask) < 2: return np.nan obs_masked = obs[mask] pred_masked = pred[mask] # ===== COMPUTATION AND LOGGING ===== with Timer("nse function"): with start_action(action_type="nse") as action: log_message("Entered nse function", valid_data_points=len(obs_masked)) logging.info("[Start] NSE computation") # Calculate the sum of squared residuals (difference between observed and predicted), # representing the "noise" or error variance of the model predictions numerator = np.sum((obs_masked - pred_masked) ** 2) # Calculate the variance of the observed data relative to its mean, # representing the "signal" or natural variance in observations denominator = np.sum((obs_masked - np.mean(obs_masked)) ** 2) if denominator == 0: logging.warning("Observed variance is zero; NSE is undefined (NaN returned).") log_message("NSE undefined due to zero variance in observations") return np.nan nse_value = 1 - numerator / denominator log_message("Computed NSE", nse=nse_value) logging.info(f"[Done] NSE computation: {nse_value}") return nse_value
############################################################################### ###############################################################################
[docs] def monthly_nse(dictionary: Dict[str, Dict[int, List[Union[np.ndarray, List[float]]]]]) -> List[float]: """ Compute monthly Nash–Sutcliffe Efficiency (NSE) between model and satellite datasets. This function aggregates paired model and satellite data over multiple years, calculates the NSE for each calendar month, and returns a list of monthly NSE values. Parameters ---------- dictionary : dict Dictionary containing keys with 'mod' and 'sat' for model and satellite data. Each key maps to a dictionary of years, where each year is a list or array of 12 monthly data arrays. Returns ------- list of float NSE values for each month (length 12). Each value represents the NSE aggregated over all years for that month. Raises ------ KeyError If no model or satellite keys are found in the input dictionary. Notes ----- - The function concatenates monthly data across all years before computing NSE. - Pairs with NaN values in either dataset are excluded. - Returns np.nan for months where valid paired data is insufficient. Examples -------- >>> data = { ... 'model_data': { ... 2020: [np.array([1, 2]), np.array([2, 3]), ...], # 12 monthly arrays ... 2021: [np.array([1.1, 1.9]), np.array([2.1, 3.1]), ...] ... }, ... 'satellite_data': { ... 2020: [np.array([1, 2]), np.array([2, 2.9]), ...], ... 2021: [np.array([1.0, 2.0]), np.array([2.0, 3.0]), ...] ... } ... } >>> monthly_nse(data) [0.95, 0.89, ..., 0.92] # Example output, one value per month """ from .Efficiency_metrics import nse # ===== INPUT VALIDATION ===== if not isinstance(dictionary, dict): raise TypeError("❌ Input must be a dictionary containing model and satellite data keys. ❌") # Define keyword groups for matching model_keywords = ['sim', 'simulated', 'model', 'mod'] sat_keywords = ['obs', 'observed', 'sat', 'satellite'] # Try to find keys that match each group model_key = next((k for k in dictionary if any(kw in k.lower() for kw in model_keywords)), None) sat_key = next((k for k in dictionary if any(kw in k.lower() for kw in sat_keywords)), None) if not model_key or not sat_key: raise KeyError("❌ Model or satellite key not found in the dictionary. ❌") mod_monthly = dictionary[model_key] sat_monthly = dictionary[sat_key] # List all years present in the data (assumed same for model and satellite) years = list(mod_monthly.keys()) if not years: raise ValueError("❌ No year data found in the model dataset. ❌") nse_monthly = [] # ===== COMPUTATION AND LOGGING ===== with Timer("monthly_nse function"): with start_action(action_type="monthly_nse") as action: log_message("Entered monthly_nse", years=years, n_months=len(mod_monthly[years[0]])) logging.info("[Start] monthly_nse computation") for month in range(len(mod_monthly[years[0]])): try: # Concatenate monthly data from all years into one flat array for model mod_all = np.concatenate([np.asarray(mod_monthly[year][month]).ravel() for year in years]) sat_all = np.concatenate([np.asarray(sat_monthly[year][month]).ravel() for year in years]) except IndexError: raise ValueError(f"❌ Month index {month} is out of bounds in one of the datasets. ❌") # Create a mask that selects only pairs where both model and satellite data are valid (not NaN) valid_mask = ~np.isnan(mod_all) & ~np.isnan(sat_all) # If any valid pairs exist, compute NSE for that month; else assign NaN if np.any(valid_mask): nse_val = nse(sat_all[valid_mask], mod_all[valid_mask]) else: nse_val = np.nan # Store the monthly NSE result nse_monthly.append(nse_val) log_message("Computed monthly NSE", month=month + 1, nse=nse_val) logging.info(f"Month {month + 1}: NSE = {nse_val}") logging.info("[Done] monthly_nse computation completed") log_message("Completed monthly_nse", total_months=len(nse_monthly)) return nse_monthly
############################################################################### ###############################################################################
[docs] def index_of_agreement(obs: Union[np.ndarray, Sequence[float]], pred: Union[np.ndarray, Sequence[float]]) -> float: """ Calculate the Index of Agreement (d) between observed and predicted values. The Index of Agreement is a standardized measure of the degree of model prediction error, which varies between 0 (no agreement) and 1 (perfect agreement). Parameters ---------- obs : array-like Observed values. pred : array-like Predicted values. Returns ------- float Index of Agreement (d), or np.nan if insufficient valid data or denominator is zero. Notes ----- - Excludes any pairs where either observed or predicted values are NaN. - Requires at least two valid data points to compute. - Denominator involves sums of absolute deviations from the observed mean. - The metric penalizes both under- and over-prediction differently than simple correlation. Examples -------- >>> obs = np.array([1, 2, 3, 4, 5]) >>> pred = np.array([1.1, 2.1, 2.9, 4.2, 4.8]) >>> index_of_agreement(obs, pred) 0.97 # example output (actual value depends on data) """ # ===== INPUT VALIDATION ===== if not (hasattr(obs, "__iter__") and hasattr(pred, "__iter__")): raise TypeError("❌ Inputs obs and pred must be array-like sequences. ❌") obs = np.asarray(obs) pred = np.asarray(pred) if obs.shape != pred.shape: raise ValueError(f"❌ Input arrays must have the same shape, got {obs.shape} and {pred.shape}. ❌") # Create mask to ignore pairs where either value is NaN mask = ~np.isnan(obs) & ~np.isnan(pred) # Require at least two valid data points for meaningful calculation if np.sum(mask) < 2: return np.nan # Extract valid observed and predicted values obs_masked = obs[mask] pred_masked = pred[mask] # ===== COMPUTATION AND LOGGING ===== with Timer("index_of_agreement function"): with start_action(action_type="index_of_agreement") as action: log_message("Entered index_of_agreement function", valid_data_points=len(obs_masked)) logging.info("[Start] Index of Agreement computation") # Numerator: sum of squared differences between observed and predicted values (prediction error) numerator = np.sum((obs_masked - pred_masked) ** 2) # Denominator: sum of squared sums of absolute deviations from the observed mean # This measures the total potential error, considering both over- and under-predictions denominator = np.sum((np.abs(pred_masked - np.mean(obs_masked)) + np.abs(obs_masked - np.mean(obs_masked))) ** 2) # If denominator is zero (no variation in observations), index is undefined if denominator == 0: logging.warning("Observed variance is zero; Index of Agreement is undefined (NaN returned).") log_message("Index of Agreement undefined due to zero variance in observations") return np.nan # Calculate Index of Agreement as 1 minus the ratio of error to potential error index_value = 1 - numerator / denominator log_message("Computed Index of Agreement", index_of_agreement=index_value) logging.info(f"[Done] Index of Agreement computation: {index_value}") return index_value
############################################################################### ###############################################################################
[docs] def monthly_index_of_agreement( dictionary: Dict[str, Dict[int, List[Union[np.ndarray, List[float]]]]] ) -> List[float]: """ Compute the monthly Index of Agreement (d) between model and satellite datasets. This function aggregates paired model and satellite data across multiple years, then calculates the Index of Agreement for each month to evaluate the agreement between predicted and observed values. Parameters ---------- dictionary : dict Dictionary containing keys with 'mod' and 'sat' indicating model and satellite data. Each key maps to a dict where each year contains a list or array of 12 monthly data arrays. Returns ------- list of float List of 12 Index of Agreement values, one per month. Raises ------ KeyError If no keys containing 'mod' or 'sat' are found in the dictionary. Notes ----- - The function concatenates monthly data across all years before computing the metric. - Handles NaNs by excluding them pairwise in the calculation. - Requires at least two valid data points per month to return a numeric result. - Returns np.nan for months with insufficient data. Examples -------- >>> data = { ... 'mod_data': {2020: [np.array([1,2]), np.array([3,4]), ...], ...}, ... 'sat_data': {2020: [np.array([1.1,1.9]), np.array([2.9,4.1]), ...], ...} ... } >>> monthly_index_of_agreement(data) [0.98, 0.95, ..., 0.97] # 12 values for each month """ from .Efficiency_metrics import index_of_agreement # Input type validation if not isinstance(dictionary, dict): raise TypeError("❌ Input must be a dictionary. ❌") # Find keys that likely correspond to model and satellite data (case-insensitive) # Define keyword groups for matching model_keywords = ['sim', 'simulated', 'model', 'mod'] sat_keywords = ['obs', 'observed', 'sat', 'satellite'] # Try to find keys that match each group model_key = next((k for k in dictionary if any(kw in k.lower() for kw in model_keywords)), None) sat_key = next((k for k in dictionary if any(kw in k.lower() for kw in sat_keywords)), None) # ===== INPUT VALIDATION ===== # Raise error if either model or satellite keys are missing if not model_key or not sat_key: raise KeyError("❌ Model or satellite key not found in the dictionary ❌") # Extract dictionaries of monthly data by year for model and satellite mod_monthly = dictionary[model_key] sat_monthly = dictionary[sat_key] # Check that both have the same years years_mod = set(mod_monthly.keys()) years_sat = set(sat_monthly.keys()) if years_mod != years_sat: raise ValueError("❌ Mismatch in years between model and satellite data ❌") years = list(years_mod) if not years: raise ValueError("❌ No yearly data found in model and satellite datasets ❌") # Determine number of months dynamically from first available year/model data first_year = years[0] n_months = len(mod_monthly[first_year]) # Validate structure for all years (same number of months) for year in years: if len(mod_monthly[year]) != n_months or len(sat_monthly[year]) != n_months: raise ValueError(f"❌ Inconsistent number of months for year {year} in model or satellite data ❌") d_monthly = [] # ===== COMPUTATION AND LOGGING ===== with Timer("monthly_index_of_agreement function"): with start_action(action_type="monthly_index_of_agreement") as action: log_message("Entered monthly_index_of_agreement", years=years, n_months=n_months) logging.info("[Start] monthly_index_of_agreement computation") # Loop over months dynamically for month in range(n_months): # Concatenate monthly data across all years for model and satellite mod_all = np.concatenate([np.asarray(mod_monthly[year][month]).ravel() for year in years]) sat_all = np.concatenate([np.asarray(sat_monthly[year][month]).ravel() for year in years]) # Create a mask to keep only pairs without NaNs in either dataset valid_mask = ~np.isnan(mod_all) & ~np.isnan(sat_all) if np.any(valid_mask): # Compute the Index of Agreement on valid paired data d_val = index_of_agreement(sat_all[valid_mask], mod_all[valid_mask]) else: # Not enough valid data, assign NaN for this month d_val = np.nan d_monthly.append(d_val) log_message("Computed monthly Index of Agreement", month=month + 1, index_of_agreement=d_val) logging.info(f"Month {month + 1}: Index of Agreement = {d_val}") logging.info("[Done] monthly_index_of_agreement computation completed") log_message("Completed monthly_index_of_agreement", total_months=len(d_monthly)) return d_monthly
############################################################################### ###############################################################################
[docs] def ln_nse( obs: Union[Sequence[float], np.ndarray], pred: Union[Sequence[float], np.ndarray] ) -> float: """ Compute the Nash–Sutcliffe Efficiency (NSE) on the natural logarithms of observed and predicted data. This metric evaluates model performance emphasizing relative differences by transforming data with the natural logarithm. It is useful when data span several orders of magnitude or when multiplicative errors are more meaningful. Parameters ---------- obs : array-like Observed values (must be strictly positive). pred : array-like Predicted values (must be strictly positive). Returns ------- float Logarithmic NSE value, or np.nan if there is insufficient valid data or if input contains non-positive values. Notes ----- - Both obs and pred must contain strictly positive values; zeros or negatives will be excluded from the calculation. - The function computes NSE on ln(obs) and ln(pred). - Requires at least two valid paired observations. - Returns np.nan if denominator in NSE calculation is zero or data are insufficient. Examples -------- >>> obs = np.array([1.0, 10.0, 100.0, 1000.0]) >>> pred = np.array([1.1, 9.5, 110.0, 950.0]) >>> ln_nse(obs, pred) 0.95 # example output (actual value depends on data) """ # ===== INPUT VALIDATION ===== if not hasattr(obs, "__iter__") or not hasattr(pred, "__iter__"): raise TypeError("❌ Inputs obs and pred must be array-like. ❌") obs = np.asarray(obs) pred = np.asarray(pred) if obs.shape != pred.shape: raise ValueError("❌ Inputs obs and pred must have the same shape. ❌") if obs.size == 0: raise ValueError("❌ Input arrays must not be empty. ❌") # Create mask to filter out NaNs and non-positive values, # since log is undefined for <= 0, and we need paired valid data mask = (~np.isnan(obs)) & (~np.isnan(pred)) & (obs > 0) & (pred > 0) # Require at least two valid data points to compute meaningful NSE if np.sum(mask) < 2: return np.nan # ===== COMPUTATION AND LOGGING ===== with Timer("ln_nse function"): with start_action(action_type="ln_nse") as action: log_message("Entered ln_nse function", valid_data_points=np.sum(mask)) logging.info("[Start] Logarithmic NSE computation") # Apply natural logarithm transformation to emphasize relative errors log_obs = np.log(obs[mask]) log_pred = np.log(pred[mask]) # Calculate sum of squared differences (residual variance) numerator = np.sum((log_obs - log_pred) ** 2) # Calculate total variance of log-observed values (signal variance) denominator = np.sum((log_obs - np.mean(log_obs)) ** 2) # If denominator is zero, variance is zero and NSE is undefined if denominator == 0: logging.warning("Log-observed variance is zero; ln_NSE is undefined (NaN returned).") log_message("ln_NSE undefined due to zero variance in log-observations") return np.nan # NSE formula: 1 minus ratio of residual variance to signal variance ln_nse_value = 1 - numerator / denominator log_message("Computed ln_NSE", ln_nse=ln_nse_value) logging.info(f"[Done] Logarithmic NSE computation: {ln_nse_value}") return ln_nse_value
############################################################################### ###############################################################################
[docs] def monthly_ln_nse( dictionary: Dict[str, Dict[int, List[Union[np.ndarray, list]]]] ) -> List[float]: """ Compute monthly logarithmic Nash–Sutcliffe Efficiency (ln NSE) from paired model and satellite data. This metric evaluates model performance on the natural logarithm scale, emphasizing relative differences and multiplicative errors. Parameters ---------- dictionary : dict Dictionary with keys containing 'mod' and 'sat' for model and satellite data. Each key maps to a dict of years, each year a list/array of 12 monthly arrays. Returns ------- list of float Logarithmic NSE values for each month (length 12). Raises ------ KeyError If model or satellite keys are missing. Notes ----- - Only pairs of positive values (both observed and predicted) are considered for each month. - Months without sufficient valid data yield np.nan. - Relies on the ln_nse function to compute the metric. Examples -------- >>> data = { ... 'model': {2020: [np.array([1,2]), np.array([3,4]), ..., np.array([11,12])]}, ... 'satellite': {2020: [np.array([1.1,2.1]), np.array([2.9,3.8]), ..., np.array([10.9,11.8])]} ... } >>> monthly_ln_nse(data) [0.95, 0.87, ..., 0.93] # example output (actual values depend on data) """ from .Efficiency_metrics import ln_nse # Identify keys in the dictionary corresponding to model and satellite data (case-insensitive) # Define keyword groups for matching model_keywords = ['sim', 'simulated', 'model', 'mod'] sat_keywords = ['obs', 'observed', 'sat', 'satellite'] # Try to find keys that match each group model_key = next((k for k in dictionary if any(kw in k.lower() for kw in model_keywords)), None) sat_key = next((k for k in dictionary if any(kw in k.lower() for kw in sat_keywords)), None) if not model_key or not sat_key: raise KeyError("❌ Model or satellite key not found in the dictionary. ❌") # Extract dictionaries of monthly data by year for model and satellite mod_monthly = dictionary[model_key] sat_monthly = dictionary[sat_key] # Get years present in both model and satellite dictionaries years = list(set(mod_monthly.keys()) & set(sat_monthly.keys())) if not years: raise ValueError("❌ No overlapping years found between model and satellite data. ❌") # Determine the number of months from the first year and key (validate consistent month counts) first_year = years[0] n_months = len(mod_monthly[first_year]) for y in years: if len(mod_monthly[y]) != n_months or len(sat_monthly[y]) != n_months: raise ValueError("❌ Inconsistent number of months per year in data arrays. ❌") ln_nse_monthly = [] # ===== COMPUTATION AND LOGGING ===== with Timer("monthly_ln_nse function"): with start_action(action_type="monthly_ln_nse") as action: log_message("Entered monthly_ln_nse", years=years, n_months=n_months) logging.info("[Start] monthly_ln_nse computation") # Loop through all months dynamically (no hardcoded 12) for month in range(n_months): # Concatenate monthly data arrays across all years into single 1D arrays mod_all = np.concatenate([np.asarray(mod_monthly[year][month]).ravel() for year in years]) sat_all = np.concatenate([np.asarray(sat_monthly[year][month]).ravel() for year in years]) # Create a mask to select paired values that are not NaN and strictly positive, # because logarithms require positive inputs valid_mask = (~np.isnan(mod_all) & ~np.isnan(sat_all) & (mod_all > 0) & (sat_all > 0)) # Compute ln_nse only if there is at least two valid paired data points if np.sum(valid_mask) >= 2: ln_nse_val = ln_nse(sat_all[valid_mask], mod_all[valid_mask]) else: # If no or insufficient valid data, set result as NaN for this month ln_nse_val = np.nan # Append the monthly ln NSE value to the result list ln_nse_monthly.append(ln_nse_val) log_message("Computed monthly ln NSE", month=month + 1, ln_nse=ln_nse_val) logging.info(f"Month {month + 1}: ln NSE = {ln_nse_val}") logging.info("[Done] monthly_ln_nse computation completed") log_message("Completed monthly_ln_nse", total_months=len(ln_nse_monthly)) return ln_nse_monthly
############################################################################### ###############################################################################
[docs] def nse_j( obs: Union[Sequence[float], np.ndarray], pred: Union[Sequence[float], np.ndarray], j: float = 1 ) -> float: """ Compute modified Nash–Sutcliffe Efficiency (E_j) for an arbitrary exponent j. This generalizes the Nash–Sutcliffe Efficiency by raising the absolute differences between observed and predicted values to the power j, allowing flexible weighting of deviations. Parameters ---------- obs : array-like Observed values. pred : array-like Predicted values. j : float, optional Exponent applied to absolute differences (default is 1). Returns ------- float Modified NSE value (E_j), or np.nan if insufficient valid data or zero denominator. Notes ----- - The modified NSE is defined as: E_j = 1 - (sum(|obs - pred|^j) / sum(|obs - mean(obs)|^j)) - Requires at least two paired valid values. - If the denominator is zero (no variability in obs), returns np.nan. - Increasing j increases sensitivity to larger errors. Examples -------- >>> obs = np.array([1, 2, 3, 4]) >>> pred = np.array([1.1, 1.9, 2.8, 4.2]) >>> nse_j(obs, pred, j=2) 0.95 # Example value, actual depends on data """ obs = np.asarray(obs) pred = np.asarray(pred) # ===== INPUT VALIDATION ===== # Ensure obs and pred have the same shape if obs.shape != pred.shape: raise ValueError("❌ Observed and predicted arrays must have the same shape. ❌") # Mask to ignore pairs where either value is NaN mask = ~np.isnan(obs) & ~np.isnan(pred) # Require at least two valid paired values to compute metric if np.sum(mask) < 2: return np.nan obs_masked = obs[mask] pred_masked = pred[mask] # ===== COMPUTATION AND LOGGING ===== with Timer("nse_j function"): with start_action(action_type="nse_j") as action: log_message("Entered nse_j function", valid_data_points=len(obs_masked), exponent=j) logging.info("[Start] Modified NSE computation") numerator = np.sum(np.abs(obs_masked - pred_masked) ** j) denominator = np.sum(np.abs(obs_masked - np.mean(obs_masked)) ** j) # If denominator is zero, no variability in obs, metric undefined if denominator == 0: logging.warning("Observed variance is zero; modified NSE is undefined (NaN returned).") log_message("Modified NSE undefined due to zero variance in observations") return np.nan modified_nse = 1 - numerator / denominator log_message("Computed modified NSE", modified_nse=modified_nse, exponent=j) logging.info(f"[Done] Modified NSE computation: {modified_nse}") return modified_nse
############################################################################### ###############################################################################
[docs] def monthly_nse_j( dictionary: Dict[str, Dict[int, List[Union[np.ndarray, list]]]], j: float = 1 ) -> List[float]: """ Compute monthly modified Nash–Sutcliffe Efficiency (E_j) for arbitrary exponent j from paired model and satellite data. This generalizes the NSE by raising absolute differences to the power j, allowing flexible emphasis on deviations. Parameters ---------- dictionary : dict Dictionary with keys containing 'mod' and 'sat' for model and satellite data. Each key maps to a dict of years, each year a list/array of 12 monthly arrays. j : float, optional Exponent for the absolute difference (default is 1). Returns ------- list of float Modified NSE values for each month (length 12). Raises ------ KeyError If model or satellite keys are missing. Notes ----- - The function calculates E_j = 1 - sum(|obs - pred|^j) / sum(|obs - mean(obs)|^j) for each month. - Requires at least two valid paired values per month, else returns np.nan. - Higher values of j increase sensitivity to large errors. Examples -------- >>> data = { ... 'model': {2020: [np.array([1, 2]), ..., np.array([11, 12])]}, ... 'satellite': {2020: [np.array([1.1, 2.1]), ..., np.array([10.9, 11.8])]} ... } >>> monthly_nse_j(data, j=2) [0.90, 0.85, ..., 0.88] # example output (depends on data) """ from .Efficiency_metrics import nse_j # Find keys corresponding to model and satellite data (case-insensitive) # Define keyword groups for matching model_keywords = ['sim', 'simulated', 'model', 'mod'] sat_keywords = ['obs', 'observed', 'sat', 'satellite'] # Try to find keys that match each group model_key = next((k for k in dictionary if any(kw in k.lower() for kw in model_keywords)), None) sat_key = next((k for k in dictionary if any(kw in k.lower() for kw in sat_keywords)), None) if not model_key or not sat_key: raise KeyError("❌ Model or satellite key not found in the dictionary. ❌") # Extract dictionaries of monthly data by year for model and satellite mod_monthly = dictionary[model_key] sat_monthly = dictionary[sat_key] years = list(mod_monthly.keys()) # Dynamically determine number of months from the first year num_months = len(mod_monthly[years[0]]) nse_j_monthly = [] # ===== COMPUTATION AND LOGGING ===== with Timer("monthly_nse_j function"): with start_action(action_type="monthly_nse_j") as action: log_message("Entered monthly_nse_j", years=years, num_months=num_months, exponent=j) logging.info("[Start] monthly_nse_j computation") # ===== LOOPING ===== for month in range(num_months): mod_all = np.concatenate([np.asarray(mod_monthly[year][month]).ravel() for year in years]) sat_all = np.concatenate([np.asarray(sat_monthly[year][month]).ravel() for year in years]) valid_mask = ~np.isnan(mod_all) & ~np.isnan(sat_all) if np.sum(valid_mask) >= 2: val = nse_j(sat_all[valid_mask], mod_all[valid_mask], j=j) else: val = np.nan nse_j_monthly.append(val) log_message("Computed monthly nse_j", month=month + 1, nse_j=val) logging.info(f"Month {month + 1}: nse_j = {val}") logging.info("[Done] monthly_nse_j computation completed") log_message("Completed monthly_nse_j", total_months=len(nse_j_monthly)) return nse_j_monthly
############################################################################### ###############################################################################
[docs] def index_of_agreement_j( obs: Union[Sequence[float], np.ndarray], pred: Union[Sequence[float], np.ndarray], j: float = 1 ) -> float: """ Compute modified Index of Agreement (d_j) with arbitrary exponent j. This generalizes the Index of Agreement by raising absolute deviations to the power j, allowing flexible emphasis on prediction errors. Parameters ---------- obs : array-like Observed values. pred : array-like Predicted values. j : float, optional Exponent parameter applied to absolute deviations (default is 1). Returns ------- float Modified Index of Agreement (d_j), or np.nan if insufficient valid data or zero denominator. Notes ----- - The modified index is defined as: d_j = 1 - (sum(|obs - pred|^j) / sum((|pred - mean(obs)| + |obs - mean(obs)|)^j)) - Requires at least two paired valid values. - If denominator is zero (lack of variability), returns np.nan. - Larger j values penalize larger deviations more heavily. Examples -------- >>> obs = np.array([2, 3, 4]) >>> pred = np.array([2.1, 2.9, 3.8]) >>> index_of_agreement_j(obs, pred, j=2) 0.92 # example output, actual depends on data """ obs = np.asarray(obs) # Convert inputs to numpy arrays pred = np.asarray(pred) # ===== INPUT VALIDATION ===== # Validate shapes are equal to avoid silent errors if obs.shape != pred.shape: raise ValueError("❌ Observed and predicted arrays must have the same shape. ❌") mask = ~np.isnan(obs) & ~np.isnan(pred) # Ignore NaNs in obs or pred if np.sum(mask) < 2: # Need at least two valid pairs return np.nan obs_masked = obs[mask] pred_masked = pred[mask] # ===== COMPUTATION AND LOGGING ===== with Timer("index_of_agreement_j function"): with start_action(action_type="index_of_agreement_j") as action: log_message("Entered index_of_agreement_j function", valid_data_points=len(obs_masked), exponent=j) logging.info("[Start] Modified Index of Agreement computation") # Sum of powered absolute errors between observed and predicted numerator = np.sum(np.abs(obs_masked - pred_masked) ** j) # Sum of powered combined deviations from mean of observed data denominator = np.sum((np.abs(pred_masked - np.mean(obs_masked)) + np.abs(obs_masked - np.mean(obs_masked))) ** j) if denominator == 0: # Avoid division by zero if no variability in obs logging.warning("Observed variance is zero; modified Index of Agreement is undefined (NaN returned).") log_message("Modified Index of Agreement undefined due to zero variance in observations") return np.nan modified_index = 1 - numerator / denominator log_message("Computed modified Index of Agreement", modified_index=modified_index, exponent=j) logging.info(f"[Done] Modified Index of Agreement computation: {modified_index}") return modified_index
############################################################################### ###############################################################################
[docs] def monthly_index_of_agreement_j( dictionary: Dict[str, Dict[int, List[Union[np.ndarray, list]]]], j: float = 1 ) -> List[float]: """ Compute monthly modified Index of Agreement (d_j) with exponent j from paired model and satellite data. Calculates the modified Index of Agreement for each calendar month by aggregating all paired model and satellite data across years. The exponent j controls the sensitivity of the metric to deviations, with j=1 corresponding to the standard index. Parameters ---------- dictionary : dict Dictionary with keys containing 'mod' and 'sat' for model and satellite data. Each key maps to a dict of years, where each year is a list or array of 12 monthly arrays. j : float, optional Exponent parameter for the modified Index of Agreement (default is 1). Returns ------- list of float Modified Index of Agreement (d_j) values for each month (length 12). Raises ------ KeyError If model or satellite keys are missing from the dictionary. Notes ----- - Requires at least two paired valid observations per month. - Returns np.nan for months with insufficient data or zero variability. - The metric generalizes the traditional Index of Agreement by raising deviations to the power j, allowing emphasis on different scales of error. Examples -------- >>> dictionary = { ... 'mod_data': {2020: [np.array([1,2]), np.array([3,4]), ...], 2021: [...], ...}, ... 'sat_data': {2020: [np.array([1.1,2.1]), np.array([3.1,3.9]), ...], 2021: [...], ...} ... } >>> monthly_index_of_agreement_j(dictionary, j=2) [0.85, 0.88, ..., 0.90] # list of 12 floats, one per month """ from .Efficiency_metrics import index_of_agreement_j # Identify keys containing model and satellite data # Define keyword groups for matching model_keywords = ['sim', 'simulated', 'model', 'mod'] sat_keywords = ['obs', 'observed', 'sat', 'satellite'] # Try to find keys that match each group model_key = next((k for k in dictionary if any(kw in k.lower() for kw in model_keywords)), None) sat_key = next((k for k in dictionary if any(kw in k.lower() for kw in sat_keywords)), None) if not model_key or not sat_key: raise KeyError("❌ Model or satellite key not found in the dictionary. ❌") # Extract dictionaries of monthly data by year for model and satellite mod_monthly = dictionary[model_key] sat_monthly = dictionary[sat_key] # List all years available in the dataset years = list(mod_monthly.keys()) # Determine number of months from first year available first_year = years[0] n_months = len(mod_monthly[first_year]) d_j_monthly = [] # ===== COMPUTATION AND LOGGING ===== with Timer("monthly_index_of_agreement_j function"): with start_action(action_type="monthly_index_of_agreement_j") as action: log_message("Entered monthly_index_of_agreement_j", years=years, n_months=n_months, exponent=j) logging.info("[Start] monthly_index_of_agreement_j computation") # ===== LOOPING ===== for month in range(n_months): # Aggregate all model data for the current month across years mod_all = np.concatenate([np.asarray(mod_monthly[year][month]).ravel() for year in years]) # Aggregate all satellite data for the current month across years sat_all = np.concatenate([np.asarray(sat_monthly[year][month]).ravel() for year in years]) # Mask to select valid paired (non-NaN) observations mask = ~np.isnan(mod_all) & ~np.isnan(sat_all) if np.any(mask): # Compute the modified index of agreement for valid data only val = index_of_agreement_j(sat_all[mask], mod_all[mask], j=j) else: # Return NaN if no valid data points available val = np.nan d_j_monthly.append(val) log_message("Computed monthly index_of_agreement_j", month=month + 1, value=val) logging.info(f"Month {month + 1}: index_of_agreement_j = {val}") logging.info("[Done] monthly_index_of_agreement_j computation completed") log_message("Completed monthly_index_of_agreement_j", total_months=len(d_j_monthly)) return d_j_monthly
############################################################################### ###############################################################################
[docs] def relative_nse( obs: Union[Sequence[float], np.ndarray], pred: Union[Sequence[float], np.ndarray] ) -> float: """ Compute the Relative Nash–Sutcliffe Efficiency (Relative NSE) between observations and predictions. This metric evaluates model performance by comparing relative deviations (normalized by observations) rather than absolute deviations, making it sensitive to proportional errors. Parameters ---------- obs : array-like Observed values. pred : array-like Predicted values. Returns ------- float Relative NSE value, or np.nan if insufficient data, division by zero, or invalid calculation occurs. Notes ----- - Observations with zero values are excluded to avoid division by zero. - Requires at least two valid paired observations. - Relative NSE close to 1 indicates good model performance on relative scale. - A small denominator (zero variance in relative observations) returns np.nan. Examples -------- >>> obs = np.array([10, 20, 30, 40]) >>> pred = np.array([11, 18, 33, 39]) >>> relative_nse(obs, pred) 0.95 # example output """ obs = np.asarray(obs) pred = np.asarray(pred) # Mask to exclude NaNs and zeros in observations (to avoid division by zero) mask = ~np.isnan(obs) & ~np.isnan(pred) & (obs != 0) # Require at least two valid pairs if np.sum(mask) < 2: return np.nan obs_masked = obs[mask] pred_masked = pred[mask] obs_mean = np.mean(obs_masked) # ===== COMPUTATION AND LOGGING ===== with Timer("relative_nse function"): with start_action(action_type="relative_nse") as action: log_message("Entered relative_nse function", valid_data_points=len(obs_masked)) logging.info("[Start] Relative NSE computation") # Numerator: sum of squared relative errors numerator = np.sum(((obs_masked - pred_masked) / obs_masked) ** 2) # Denominator: sum of squared relative deviations from mean denominator = np.sum(((obs_masked - obs_mean) / obs_mean) ** 2) if denominator == 0: logging.warning("Relative variance denominator is zero; Relative NSE undefined (NaN returned).") log_message("Relative NSE undefined due to zero relative variance in observations") return np.nan rel_nse_value = 1 - numerator / denominator log_message("Computed Relative NSE", relative_nse=rel_nse_value) logging.info(f"[Done] Relative NSE computation: {rel_nse_value}") return rel_nse_value
############################################################################### ###############################################################################
[docs] def monthly_relative_nse( dictionary: Dict[str, Dict[int, List[Union[np.ndarray, list]]]] ) -> List[float]: """ Compute monthly Relative Nash–Sutcliffe Efficiency (Relative NSE) from paired model and satellite data. This function calculates the Relative NSE metric for each calendar month by aggregating data across all available years. It compares relative deviations normalized by observations, emphasizing proportional accuracy of model predictions compared to satellite observations. Parameters ---------- dictionary : dict Dictionary containing keys with 'mod' and 'sat' identifying model and satellite datasets. Each key maps to a dict of years, with each year containing a list or array of 12 monthly data arrays. Returns ------- list of float Relative NSE values computed for each month (length 12). Returns np.nan for months with insufficient or invalid data. Raises ------ KeyError If either model ('mod') or satellite ('sat') keys are missing in the dictionary. Notes ----- - Observations with zero values are excluded to avoid division by zero. - Requires at least two valid paired observations per month to compute the metric. - Relative NSE close to 1 indicates good proportional agreement between model and observations. - Months with zero variance in relative observations or insufficient data return np.nan. Examples -------- >>> dictionary = { ... 'mod': { ... 2020: [np.array([10, 15]), np.array([20, 25]), ..., np.array([30, 35])], # 12 arrays for each month ... 2021: [np.array([12, 16]), np.array([22, 26]), ..., np.array([32, 37])] ... }, ... 'sat': { ... 2020: [np.array([9, 14]), np.array([19, 24]), ..., np.array([29, 34])], ... 2021: [np.array([11, 15]), np.array([21, 25]), ..., np.array([31, 36])] ... } ... } >>> monthly_relative_nse(dictionary) [0.95, 0.97, ..., 0.93] # example output for each month """ from .Efficiency_metrics import relative_nse # ===== INPUT VALIDATION ===== # Input validation: dictionary type if not isinstance(dictionary, dict): raise TypeError("❌ Input must be a dictionary ❌") # Identify model and satellite keys (case-insensitive) # Define keyword groups for matching model_keywords = ['sim', 'simulated', 'model', 'mod'] sat_keywords = ['obs', 'observed', 'sat', 'satellite'] # Try to find keys that match each group model_key = next((k for k in dictionary if any(kw in k.lower() for kw in model_keywords)), None) sat_key = next((k for k in dictionary if any(kw in k.lower() for kw in sat_keywords)), None) if not model_key or not sat_key: raise KeyError("❌ Model or satellite key not found in the dictionary. ❌") # Extract dictionaries of monthly data by year for model and satellite mod_monthly = dictionary[model_key] sat_monthly = dictionary[sat_key] # Validate that mod_monthly and sat_monthly are dicts if not isinstance(mod_monthly, dict): raise ValueError("❌ Model data must be a dictionary of years mapping to monthly arrays ❌") if not isinstance(sat_monthly, dict): raise ValueError("❌ Satellite data must be a dictionary of years mapping to monthly arrays ❌") years = list(mod_monthly.keys()) if not years: raise ValueError("❌ No yearly data found in model dataset ❌") # Check first year has monthly data, and get number of months dynamically first_year = years[0] if first_year not in sat_monthly: raise ValueError(f"❌ Year {first_year} missing from satellite data ❌") n_months = len(mod_monthly[first_year]) # Validate the monthly data structures for both model and satellite for the first year if not isinstance(mod_monthly[first_year], (list, tuple)) or not isinstance(sat_monthly[first_year], (list, tuple)): raise ValueError("❌ Monthly data for the first year must be a list or tuple of monthly arrays ❌") for month_data in mod_monthly[first_year]: if not hasattr(month_data, "__iter__"): raise ValueError("❌ Each monthly entry in model data must be iterable (like a list or numpy array) ❌") for month_data in sat_monthly[first_year]: if not hasattr(month_data, "__iter__"): raise ValueError("❌ Each monthly entry in satellite data must be iterable (like a list or numpy array) ❌") e_rel_monthly = [] # ===== COMPUTATION AND LOGGING ===== with Timer("monthly_relative_nse function"): with start_action(action_type="monthly_relative_nse") as action: log_message("Entered monthly_relative_nse", years=years, n_months=n_months) logging.info("[Start] monthly_relative_nse computation") # ===== LOOPING ===== for month in range(n_months): try: # Aggregate monthly data across all years mod_all = np.concatenate([np.asarray(mod_monthly[year][month]).ravel() for year in years]) sat_all = np.concatenate([np.asarray(sat_monthly[year][month]).ravel() for year in years]) except Exception as e: raise ValueError(f"❌ Error concatenating monthly data for month {month}: {e} ❌") # Mask: exclude NaNs and zero satellite observations to avoid division by zero mask = (~np.isnan(mod_all)) & (~np.isnan(sat_all)) & (sat_all != 0) if np.sum(mask) >= 2: # Require at least two valid pairs val = relative_nse(sat_all[mask], mod_all[mask]) else: val = np.nan e_rel_monthly.append(val) log_message("Computed monthly_relative_nse", month=month + 1, value=val) logging.info(f"Month {month + 1}: relative_nse = {val}") logging.info("[Done] monthly_relative_nse computation completed") log_message("Completed monthly_relative_nse", total_months=len(e_rel_monthly)) return e_rel_monthly
############################################################################### ###############################################################################
[docs] def relative_index_of_agreement( obs: Union[Sequence[float], np.ndarray], pred: Union[Sequence[float], np.ndarray] ) -> float: """ Compute the Relative Index of Agreement (d_rel) between observed and predicted values. This metric assesses the agreement between predictions and observations by evaluating relative errors normalized by the observations, making it sensitive to proportional differences. Parameters ---------- obs : array-like Observed values. pred : array-like Predicted values. Returns ------- float Relative Index of Agreement value ranging typically between 0 and 1, where values closer to 1 indicate better agreement. Returns np.nan if the calculation is invalid due to insufficient data, zero variance, or division by zero. Notes ----- - Observations with zero values are excluded to avoid division by zero errors. - Requires at least two valid paired observations. - Returns np.nan if the observations have zero variance (all equal). - Sensitive to relative rather than absolute errors. Examples -------- >>> obs = np.array([10, 20, 30, 40]) >>> pred = np.array([11, 19, 28, 39]) >>> relative_index_of_agreement(obs, pred) 0.92 # example output """ # ===== INPUT VALIDATION ===== if obs is None or pred is None: raise ValueError("❌ Observed and predicted inputs must not be None ❌") obs = np.asarray(obs) pred = np.asarray(pred) if obs.shape != pred.shape: raise ValueError("❌ Observed and predicted arrays must have the same shape ❌") # Mask to exclude NaNs and zero observations (avoid division by zero) mask = ~np.isnan(obs) & ~np.isnan(pred) & (obs != 0) # Require at least two valid pairs if np.sum(mask) < 2: return np.nan # Insufficient data obs_masked = obs[mask] pred_masked = pred[mask] obs_mean = np.mean(obs_masked) # Check for zero variance in observations if np.allclose(obs_masked, obs_mean): return np.nan # ===== COMPUTATION AND LOGGING ===== with Timer("relative_index_of_agreement function"): with start_action(action_type="relative_index_of_agreement") as action: log_message("Entered relative_index_of_agreement function", valid_data_points=len(obs_masked)) logging.info("[Start] Relative Index of Agreement computation") numerator = np.sum(((obs_masked - pred_masked) / obs_masked) ** 2) denominator = np.sum( ((np.abs(pred_masked - obs_mean) + np.abs(obs_masked - obs_mean)) / obs_mean) ** 2 ) if denominator == 0: logging.warning("Relative Index of Agreement denominator is zero; undefined (NaN returned).") log_message("Relative Index of Agreement undefined due to zero denominator") return np.nan ria_value = 1 - numerator / denominator log_message("Computed Relative Index of Agreement", relative_index_of_agreement=ria_value) logging.info(f"[Done] Relative Index of Agreement computation: {ria_value}") return ria_value
############################################################################### ###############################################################################
[docs] def monthly_relative_index_of_agreement( dictionary: Dict[str, Dict[int, List[Union[np.ndarray, list]]]] ) -> List[float]: """ Compute the Relative Index of Agreement (d_rel) for each calendar month by aggregating paired observed (satellite) and predicted (model) data across multiple years. This metric assesses proportional agreement between observations and predictions on a monthly basis, by evaluating relative errors normalized by observations. It is sensitive to proportional differences rather than absolute errors. Parameters ---------- dictionary : dict Dictionary containing paired model and satellite monthly data with keys containing 'mod' and 'sat' respectively. Each key maps to a dictionary of years, where each year contains a list or array of 12 elements representing monthly data: { 'mod...': {year1: [month_0_data, ..., month_11_data], year2: [...], ...}, 'sat...': {year1: [month_0_data, ..., month_11_data], year2: [...], ...} } Returns ------- list of float List of 12 Relative Index of Agreement values, one for each month (January=0,...,December=11). Returns np.nan for months with insufficient or invalid data. Notes ----- - Observations (satellite data) with zero values are excluded to avoid division by zero errors. - Requires at least two valid paired observations per month. - Returns np.nan if the observations have zero variance (all equal) or denominator is zero. - The metric ranges typically between 0 and 1, with values closer to 1 indicating better agreement. Examples -------- >>> dictionary = { ... 'mod_data': { ... 2020: [np.array([1,2]), np.array([3,4]), ...], # 12 months of data per year ... 2021: [np.array([2,3]), np.array([4,5]), ...] ... }, ... 'sat_data': { ... 2020: [np.array([1.1,1.9]), np.array([2.9,4.1]), ...], ... 2021: [np.array([2.1,2.8]), np.array([3.8,5.2]), ...] ... } ... } >>> monthly_relative_index_of_agreement(dictionary) [0.95, 0.91, ..., 0.89] # example output list with 12 values """ from .Efficiency_metrics import relative_index_of_agreement # Find dictionary keys corresponding to model and satellite data # Define keyword groups for matching model_keywords = ['sim', 'simulated', 'model', 'mod'] sat_keywords = ['obs', 'observed', 'sat', 'satellite'] # Try to find keys that match each group model_key = next((k for k in dictionary if any(kw in k.lower() for kw in model_keywords)), None) sat_key = next((k for k in dictionary if any(kw in k.lower() for kw in sat_keywords)), None) if not model_key or not sat_key: raise KeyError("❌ Model or satellite key not found in the dictionary. ❌") # Extract dictionaries of monthly data by year for model and satellite mod_monthly = dictionary[model_key] sat_monthly = dictionary[sat_key] # Get all years available in the model data (assumed same in satellite) years = list(mod_monthly.keys()) if not years: return [] # Determine number of months from the first available year (avoid hardcoded 12) first_year = years[0] num_months = len(mod_monthly[first_year]) d_rel_monthly = [] # ===== COMPUTATION AND LOGGING ===== with Timer("monthly_relative_index_of_agreement function"): with start_action(action_type="monthly_relative_index_of_agreement") as action: log_message("Entered monthly_relative_index_of_agreement", years=years, num_months=num_months) logging.info("[Start] monthly_relative_index_of_agreement computation") # ===== LOOPING ===== for month in range(num_months): # Concatenate monthly data for all years into single arrays mod_all = np.concatenate([np.asarray(mod_monthly[year][month]).ravel() for year in years]) sat_all = np.concatenate([np.asarray(sat_monthly[year][month]).ravel() for year in years]) # Mask to exclude NaNs and satellite zeros (avoid division by zero) mask = (~np.isnan(mod_all)) & (~np.isnan(sat_all)) & (sat_all != 0) # Compute relative index of agreement if data is valid; else NaN if np.any(mask): d_rel = relative_index_of_agreement(sat_all[mask], mod_all[mask]) else: d_rel = np.nan d_rel_monthly.append(d_rel) log_message("Computed monthly_relative_index_of_agreement", month=month + 1, value=d_rel) logging.info(f"Month {month + 1}: relative_index_of_agreement = {d_rel}") logging.info("[Done] monthly_relative_index_of_agreement computation completed") log_message("Completed monthly_relative_index_of_agreement", total_months=len(d_rel_monthly)) return d_rel_monthly
############################################################################### ###############################################################################
[docs] def compute_spatial_efficiency( model_da: xr.DataArray, sat_da: xr.DataArray, time_group: Literal["month", "year"] = "month" ) -> Tuple[xr.DataArray, xr.DataArray, xr.DataArray, xr.DataArray, xr.DataArray, xr.DataArray]: """ Compute spatial efficiency metrics between model and satellite data aggregated over time groups. This function calculates multiple performance metrics spatially across the domain by aggregating the input datasets over calendar months or years. It returns time-resolved maps for each metric. Parameters ---------- model_da : xarray.DataArray Model data with a 'time' coordinate. sat_da : xarray.DataArray Satellite (observed) data with a 'time' coordinate, matching the model in space and time. time_group : {'month', 'year'}, optional Temporal aggregation level: - 'month': groups data by calendar month (1–12), - 'year': groups data by unique years in the time dimension. Returns ------- tuple of xarray.DataArray Six DataArrays with spatial metrics computed for each time group (month or year), with dimensions: (time_group, lat, lon). The returned metrics are: - mb_all : Mean Bias - sde_all : Standard Deviation of the Error - cc_all : Pearson Cross-Correlation - rm_all : Standard Deviation of the Model - ro_all : Standard Deviation of the Observation - urmse_all : Unbiased Root Mean Squared Error Raises ------ ValueError If `time_group` is not 'month' or 'year'. Notes ----- - Input DataArrays must have a 'time' coordinate with datetime-like values. - Each metric is computed for all available times in the group (month or year). - The function assumes spatial alignment between model and satellite datasets. Examples -------- >>> mb, sde, cc, rm, ro, urmse = compute_spatial_efficiency(model_da, sat_da, time_group="month") >>> mb.sel(month=1).plot() # Plot Mean Bias for January """ # ===== INPUT VALIDATION ===== if not isinstance(model_da, xr.DataArray): raise TypeError("❌ 'model_da' must be an xarray.DataArray ❌") if not isinstance(sat_da, xr.DataArray): raise TypeError("❌ 'sat_da' must be an xarray.DataArray ❌") if "time" not in model_da.coords: raise ValueError("❌ 'model_da' must have a 'time' coordinate ❌") if "time" not in sat_da.coords: raise ValueError("❌ 'sat_da' must have a 'time' coordinate ❌") if not np.array_equal(model_da['time'], sat_da['time']): raise ValueError("❌ 'model_da' and 'sat_da' must have the same 'time' coordinate values ❌") if time_group not in {"month", "year"}: raise ValueError(f"❌ Invalid time_group '{time_group}', must be 'month' or 'year' ❌") # ===== TIME CLASSIFICATION ===== # Determine grouping based on selected time group if time_group == "month": groups = range(1, 13) # Months 1 to 12 time_sel = 'month' elif time_group == "year": groups = sorted(np.unique(model_da['time.year'].values)) # Unique years in the data time_sel = 'year' # ==== INNER FUNCTION TO COMPUTE GROUP ===== # Define function to compute all metrics for a single time group def compute_metrics_for_group(group): # Select model and satellite data for the given group m_sel = model_da.sel(time=model_da['time.' + time_sel] == group) o_sel = sat_da.sel(time=sat_da['time.' + time_sel] == group) # Compute and return all six metrics for the selected group return ( mean_bias(m_sel, o_sel), standard_deviation_error(m_sel, o_sel), cross_correlation(m_sel, o_sel), std_dev(m_sel), std_dev(o_sel), unbiased_rmse(m_sel, o_sel), ) # ===== COMPUTATION AND LOGGING ===== with Timer("compute_spatial_efficiency function"): with start_action(action_type="compute_spatial_efficiency") as action: log_message("Started compute_spatial_efficiency", time_group=time_group, groups=list(groups)) logging.info(f"[Start] Computing spatial efficiency metrics grouped by {time_group}") # Apply the metrics computation for each time group results = list(map(compute_metrics_for_group, groups)) # Unpack the results into separate metric lists mb_maps, sde_maps, cc_maps, rm_maps, ro_maps, urmse_maps = zip(*results) # Set dimension name and coordinates based on the time grouping dim_name = time_group coord_vals = groups # Concatenate results into DataArrays with the correct dimension and coordinate assignment mb_all = xr.concat(mb_maps, dim=dim_name).assign_coords(**{dim_name: coord_vals}) sde_all = xr.concat(sde_maps, dim=dim_name).assign_coords(**{dim_name: coord_vals}) cc_all = xr.concat(cc_maps, dim=dim_name).assign_coords(**{dim_name: coord_vals}) rm_all = xr.concat(rm_maps, dim=dim_name).assign_coords(**{dim_name: coord_vals}) ro_all = xr.concat(ro_maps, dim=dim_name).assign_coords(**{dim_name: coord_vals}) urmse_all = xr.concat(urmse_maps, dim=dim_name).assign_coords(**{dim_name: coord_vals}) logging.info(f"[Done] Computed spatial efficiency metrics for {len(groups)} {time_group} groups") log_message("Completed compute_spatial_efficiency", groups_computed=len(groups)) # Return the full set of spatial efficiency metrics return mb_all, sde_all, cc_all, rm_all, ro_all, urmse_all
############################################################################### ###############################################################################
[docs] def compute_error_timeseries(model_sst_data: xr.DataArray, sat_sst_data: xr.DataArray, ocean_mask: xr.DataArray) -> pd.DataFrame: """ Compute daily error statistics between model and satellite SST data within a specified basin mask. For each time step, this function applies the spatial mask to both model and satellite data, computes a suite of statistical metrics on the valid values, and returns a time-indexed DataFrame containing these metrics. Parameters ---------- model_sst_data : xarray.DataArray Sea Surface Temperature (SST) data from the model, with dimensions including 'time', 'lat', and 'lon'. sat_sst_data : xarray.DataArray Observed satellite SST data, aligned in space and time with the model data. basin_mask : xarray.DataArray Boolean mask (with dimensions 'lat' and 'lon') indicating the spatial domain (e.g., a basin) over which statistics should be computed. True (or 1) values indicate inclusion. Returns ------- pandas.DataFrame DataFrame indexed by time (daily), where each row contains statistics computed between model and satellite SST for the corresponding day. Columns may include metrics such as: - Mean Bias - Standard Deviation of Error - Correlation Coefficient - RMSE - Relative metrics, etc. (as defined by `compute_stats_single_time`) Notes ----- - Assumes input DataArrays are spatially aligned and share a common time coordinate. - Invalid (NaN or masked) values are excluded before metric computation. - The function `compute_stats_single_time` must return a dictionary or similar structure convertible to a DataFrame row. Examples -------- >>> df = compute_error_timeseries(model_sst_data, sat_sst_data, basin_mask) >>> df.head() mean_bias rmse corr 2000-01-01 -0.12 0.45 0.88 2000-01-02 -0.15 0.51 0.85 ... """ # ===== INPUT VALIDATION ===== for da, name in zip([model_sst_data, sat_sst_data], ["model_sst_data", "sat_sst_data"]): if not isinstance(da, xr.DataArray): raise TypeError(f"❌ {name} must be an xarray.DataArray ❌") # Switching mask to DataArray basin_mask = xr.DataArray( ocean_mask, coords={"lat": sat_sst_data["lat"], "lon": sat_sst_data["lon"]}, dims=("lat", "lon") ) # Check spatial dimensions presence and alignment if not all(dim in model_sst_data.dims for dim in ('lat', 'lon')): raise ValueError("❌ model_sst_data must have 'lat' and 'lon' dimensions ❌") if not all(dim in sat_sst_data.dims for dim in ('lat', 'lon')): raise ValueError("❌ sat_sst_data must have 'lat' and 'lon' dimensions ❌") if not isinstance(basin_mask, xr.DataArray) or basin_mask.shape != (len(sat_sst_data.lat), len(sat_sst_data.lon)): raise ValueError("❌ basin_mask must match the shape of sat_sst_data (lat x lon) ❌") # Check spatial coordinates align if not (np.array_equal(model_sst_data['lat'], basin_mask['lat']) and np.array_equal(model_sst_data['lon'], basin_mask['lon']) and np.array_equal(sat_sst_data['lat'], basin_mask['lat']) and np.array_equal(sat_sst_data['lon'], basin_mask['lon'])): raise ValueError("❌ Spatial coordinates (lat, lon) of inputs do not align ❌") # Check time alignment if not np.array_equal(model_sst_data['time'], sat_sst_data['time']): raise ValueError("❌ Time coordinates of model_sst_data and sat_sst_data must be identical ❌") # Apply the basin mask to the model and satellite SST data (mask shape broadcasts over time) model_masked = model_sst_data.where(basin_mask) sat_masked = sat_sst_data.where(basin_mask) # ===== TIME SETUP ===== # Number of time steps n_time = model_masked.sizes['time'] stats_list = [] dates = model_sst_data['time'].values # ===== COMPUTATION AND LOGGING ===== with Timer("compute_error_timeseries function"): with start_action(action_type="compute_error_timeseries") as action: logging.info(f"[Start] Computing error timeseries for {n_time} time steps") log_message("Started compute_error_timeseries", n_time=n_time) # ===== LOOP THROUGH TIMESTEPS ===== for t in range(n_time): m = model_masked.isel(time=t).values.flatten() o = sat_masked.isel(time=t).values.flatten() # Remove pairs where either is nan valid_mask = ~np.isnan(m) & ~np.isnan(o) m_valid = m[valid_mask] o_valid = o[valid_mask] # If no valid data, fill stats with NaNs or defaults if len(m_valid) == 0: stats = {k: np.nan for k in compute_stats_single_time(np.array([0]), np.array([0])).keys()} else: stats = compute_stats_single_time(m_valid, o_valid) stats_list.append(stats) logging.info(f"[Done] Computed error timeseries for all {n_time} time steps") log_message("Completed compute_error_timeseries", n_time=n_time) # Construct DataFrame indexed by time stats_df = pd.DataFrame(stats_list, index=pd.to_datetime(dates)) return stats_df
############################################################################### ###############################################################################
[docs] def compute_stats_single_time(model_slice: np.ndarray, sat_slice: np.ndarray) -> dict: """ Compute error statistics between model and satellite data for a single time slice. This function evaluates a set of core statistical metrics comparing model output and satellite observations for one timestep, using only valid (non-NaN) paired values. Parameters ---------- model_slice : np.ndarray 1D array of model data values at a single timestep, typically flattened from 2D (lat/lon). sat_slice : np.ndarray 1D array of satellite observation values at the same timestep and spatial extent. Returns ------- dict Dictionary containing the following metrics: - 'mean_bias' : float Mean difference between model and satellite values. - 'unbiased_rmse' : float Root Mean Square Error after removing mean bias. - 'std_error' : float Standard deviation of the model-satellite difference. - 'cross_correlation' : float Pearson correlation coefficient between model and satellite. If no valid data pairs exist, all values are returned as np.nan. Notes ----- - Only pairs where both model and satellite values are finite (non-NaN) are used. - This function assumes input arrays are already aligned in space. Examples -------- >>> m = np.array([20.1, 19.5, np.nan, 21.0]) >>> o = np.array([19.8, 19.7, 20.0, 21.1]) >>> compute_stats_single_time(m, o) {'mean_bias': 0.0X, 'unbiased_rmse': 0.0Y, 'std_error': 0.0Z, 'correlation': 0.99} """ # ===== INPUT VALIDATION ===== if not (isinstance(model_slice, np.ndarray) and isinstance(sat_slice, np.ndarray)): raise TypeError("Both model_slice and sat_slice must be numpy arrays.") if model_slice.ndim != 1 or sat_slice.ndim != 1: raise ValueError("Both model_slice and sat_slice must be 1-dimensional arrays.") if model_slice.shape[0] != sat_slice.shape[0]: raise ValueError("model_slice and sat_slice must have the same length.") with Timer("compute_stats_single_time"): # Create a boolean mask where both model and satellite values are not NaN valid = ~np.isnan(model_slice) & ~np.isnan(sat_slice) # ===== CORNER CASE HANDLING ===== # If there are no valid paired values, return NaN for all statistics if valid.sum() == 0: return dict( mean_bias=np.nan, unbiased_rmse=np.nan, std_error=np.nan, cross_correlation=np.nan ) # Extract only the valid model and satellite values m_valid = model_slice[valid] o_valid = sat_slice[valid] # Compute and return the statistics using the valid data return dict( mean_bias=mean_bias(m_valid, o_valid), unbiased_rmse=unbiased_rmse(m_valid, o_valid), std_error=standard_deviation_error(m_valid, o_valid), cross_correlation=cross_correlation(m_valid, o_valid) )