Source code for Hydrological_model_validator.Processing.Missing_data

###############################################################################
##                                                                           ##
##                               LIBRARIES                                   ##
##                                                                           ##
###############################################################################

# Data handling libraries
import numpy as np
from datetime import datetime
from typing import Tuple, List

# Logging and tracing
import logging
from eliot import start_action, log_message

# Module utilities
from Hydrological_model_validator.Processing.time_utils import Timer

###############################################################################
##                                                                           ##
##                               FUNCTIONS                                   ##
##                                                                           ##
###############################################################################


[docs] def check_missing_days( T_orig: np.ndarray, data_orig: np.ndarray, desired_start_date: datetime = datetime(2000, 1, 1) ) -> Tuple[np.ndarray, np.ndarray]: """ Fill missing days in a time series based on expected daily intervals. Automatically shifts the time series so the first timestamp corresponds to desired_start_date. Parameters ---------- T_orig : np.ndarray 1D array of timestamps (in seconds since epoch). data_orig : np.ndarray 3D array with shape (time, lat, lon), matching T_orig in the first dimension. desired_start_date : datetime, optional The desired start date to align the time series to (default is 2000-01-01). Returns ------- Ttrue : np.ndarray Complete array of timestamps with daily spacing, shifted to desired_start_date. data_complete : np.ndarray Data array with NaNs inserted where gaps were detected. Raises ------ TypeError If input types are incorrect. ValueError If input shapes are invalid or time step is non-positive. AssertionError If time series is out of order or length mismatch occurs. """ # === INPUT VALIDATION === # Ensure T_orig is a NumPy array if not isinstance(T_orig, np.ndarray): raise TypeError("❌ T_orig must be a NumPy array. ❌") # Ensure data_orig is a NumPy array if not isinstance(data_orig, np.ndarray): raise TypeError("❌ data_orig must be a NumPy array. ❌") # Ensure T_orig is 1-dimensional if T_orig.ndim != 1: raise ValueError("❌ T_orig must be 1-dimensional. ❌") # Ensure data_orig is 3-dimensional (time, lat, lon) if data_orig.ndim != 3: raise ValueError("❌ data_orig must be 3-dimensional. ❌") # Ensure the length of T_orig matches the first dimension of data_orig if len(T_orig) != data_orig.shape[0]: raise ValueError("❌ T_orig length must match first dimension of data_orig. ❌") with Timer("check_missing_days function"): with start_action( action_type="check_missing_days function", original_time_steps=len(T_orig), desired_start_date=desired_start_date.strftime("%Y-%m-%d") ): print(f"Original time steps: {len(T_orig)}") first_date = datetime.utcfromtimestamp(T_orig[0]) print("First timestamp (original):", T_orig[0], f"({first_date.strftime('%Y-%m-%d')})") # === SHIFT TIME SERIES TO DESIRED START DATE === # Calculate offset in seconds to shift the time series start date offset_seconds = (desired_start_date - first_date).total_seconds() print(f"Offset to add (seconds): {offset_seconds}") # Apply offset to all timestamps to align start date to desired_start_date T_shifted = T_orig + offset_seconds shifted_first_date = datetime.utcfromtimestamp(T_shifted[0]) print("First timestamp (shifted):", T_shifted[0], f"({shifted_first_date.strftime('%Y-%m-%d')})") log_message( "Shifted time series start", original_first_date=first_date.strftime("%Y-%m-%d"), shifted_first_date=shifted_first_date.strftime("%Y-%m-%d"), offset_seconds=offset_seconds ) logging.info(f"Shifted time series start from {first_date} to {shifted_first_date}") # === DETECT TIME STEP === # Calculate differences between consecutive timestamps to find sampling interval steps = np.diff(T_shifted) # Ensure timestamps strictly increase (no duplicates or out-of-order) if not np.all(steps > 0): raise ValueError("❌ Timestamps must be strictly increasing after shifting. ❌") # Identify the most frequent time step as expected sampling interval unique_steps, counts = np.unique(steps, return_counts=True) time_step = unique_steps[np.argmax(counts)] # Validate that the detected time step is positive if time_step <= 0: raise ValueError("❌ Detected non-positive time step. ❌") print(f"Detected time step: {time_step} seconds ({time_step / 86400:.2f} days)") logging.info(f"Detected time step: {time_step} seconds ({time_step / 86400:.2f} days)") log_message("Detected time step", time_step_seconds=time_step) # === GENERATE EXPECTED TIME RANGE === # Define full expected time range based on detected time step T_start, T_end = T_shifted[0], T_shifted[-1] Ttrue = np.arange(T_start, T_end + time_step, time_step) # Include last day by adding time_step # Calculate how many timestamps are missing compared to ideal timeline missing_count = len(Ttrue) - len(T_shifted) if missing_count == 0: print("\033[92m✅ The time series is complete!\033[0m") print('*' * 45) log_message("Time series complete", time_steps=len(Ttrue)) logging.info("✅ The time series is complete!") # Return the shifted original data since there are no missing entries return T_shifted.copy(), data_orig.copy() print("\033[91m⚠️ Time series is incomplete!\033[0m") print(f"\033[91mMissing {missing_count} time steps detected.\033[0m") print("Filling missing time steps...") print('-' * 45) log_message("Time series incomplete", missing_steps=missing_count) logging.warning(f"⚠️ Missing {missing_count} time steps detected. Filling missing data.") # === FILL DATA INTO COMPLETE TIME SERIES === # Create a map from timestamp to index for quick lookup time_index = {t: i for i, t in enumerate(T_shifted)} # Prepare an output array initialized with NaNs for missing entries data_shape = (len(Ttrue), *data_orig.shape[1:]) data_complete = np.full(data_shape, np.nan, dtype=data_orig.dtype) # Fill data where available, leave NaNs where timestamps are missing missing_days_logged = [] for i, t in enumerate(Ttrue): idx = time_index.get(t) if idx is not None: data_complete[i] = data_orig[idx] else: dt = datetime.utcfromtimestamp(t) missing_days_logged.append(dt.strftime('%Y-%m-%d')) for day in missing_days_logged: print(f"Missing day filled: {day}") # === FINAL SANITY CHECK === # Confirm that output data length matches the complete timeline length if data_complete.shape[0] != len(Ttrue): raise AssertionError("❌ Mismatch in expected time series length after filling. ❌") print("\033[92m✅ Time series gaps filled successfully.\033[0m") print('*' * 45) log_message("Time series gaps filled successfully", filled_length=len(Ttrue)) logging.info("✅ Time series gaps filled successfully.") return Ttrue, data_complete
############################################################################### ###############################################################################
[docs] def find_missing_observations(data_complete: np.ndarray) -> Tuple[int, List[int]]: """ Identify days with no satellite observations (all NaN or zero). Parameters ---------- data_complete : np.ndarray 3D array of shape (days, lat, lon) with satellite data. Returns ------- cnan : int Number of days with no satellite observations. satnan : List[int] Indices of days with no valid observations. Raises ------ TypeError If input is not a NumPy array. ValueError If input array does not have 3 dimensions. """ # ==== INPUT VALIDATION ==== # Ensure input is a NumPy array if not isinstance(data_complete, np.ndarray): raise TypeError("❌ data_complete must be a NumPy array. ❌") # Check that input has three dimensions (days, lat, lon) if data_complete.ndim != 3: raise ValueError("❌ data_complete must be a 3D array (days, lat, lon). ❌") with Timer("find_missing_observations function"): with start_action( action_type="find_missing_observations function", data_shape=data_complete.shape ): print("Checking for missing satellite observations...") # Sum over spatial dimensions ignoring NaNs to detect days with no data daily_sums = np.nansum(data_complete, axis=(1, 2)) # Days where sum is zero mean all values are either NaN or zero (no observations) mask_missing = daily_sums == 0 # Count how many days have missing data cnan = int(np.sum(mask_missing)) # Get list of indices for days with missing observations satnan = np.flatnonzero(mask_missing).tolist() if cnan > 0: print(f"\033[91m⚠️ {cnan} daily satellite fields have no observations ⚠️\033[0m") logging.warning(f"{cnan} daily satellite fields have no observations") log_message("Missing satellite observations found", count=cnan, indices=satnan) else: print("\033[92m✅ No missing satellite observation days found\033[0m") logging.info("No missing satellite observation days found") log_message("No missing satellite observations") print('*' * 45) return cnan, satnan
############################################################################### ###############################################################################
[docs] def eliminate_empty_fields(data_complete: np.ndarray) -> np.ndarray: """ Replace empty chlorophyll fields (days where all values are NaN or zero) with NaNs. Parameters ---------- data_complete : np.ndarray 3D array of chlorophyll data with shape (days, lat, lon). Returns ------- np.ndarray Modified array with empty daily fields set entirely to NaN. Raises ------ TypeError If input is not a NumPy array. ValueError If input array does not have 3 dimensions. Examples -------- >>> data = np.random.rand(10, 5, 5) >>> data[3] = 0 # simulate empty day >>> data[7] = np.nan >>> result = eliminate_empty_fields(data) """ # ===== INPUT VALIDATION ===== # Validate input is a NumPy array if not isinstance(data_complete, np.ndarray): raise TypeError("❌ data_complete must be a NumPy array. ❌") # Validate input is 3D (days, lat, lon) if data_complete.ndim != 3: raise ValueError("❌ data_complete should be a 3D array (days, lat, lon). ❌") with Timer("eliminate_empty_fields function"): with start_action( action_type="eliminate_empty_fields function", data_shape=data_complete.shape ): print("Checking and removing empty fields...") # Identify days where all values are NaN across spatial dimensions all_nan = np.isnan(data_complete).all(axis=(1, 2)) # Suppress warnings for empty slices when computing max import warnings with warnings.catch_warnings(): warnings.simplefilter("ignore", category=RuntimeWarning) # Get max value per day ignoring NaNs to detect days that might be all zero max_vals = np.nanmax(data_complete, axis=(1, 2)) # Identify days where max value is zero => all values are zero or NaN all_zero = (max_vals == 0) # Combine masks to find days that are either all NaN or all zero empty_mask = all_nan | all_zero # Count how many empty days found cempty = np.sum(empty_mask) if cempty > 0: # Replace entire daily fields flagged as empty with NaNs data_complete[empty_mask] = np.nan # Inform user about which days were corrected (1-based indexing for clarity) for day_idx in np.flatnonzero(empty_mask): print(f"\033[91m⚠️ Empty field found at day {day_idx + 1} — replaced with NaNs ⚠️\033[0m") print(f"\033[93m{cempty} empty fields were found and corrected\033[0m") logging.warning(f"{cempty} empty fields found and replaced with NaNs") log_message("Empty fields replaced", count=int(cempty), indices=np.flatnonzero(empty_mask).tolist()) else: print("\033[92m✅ No empty fields found in dataset\033[0m") logging.info("No empty fields found in dataset") log_message("No empty fields detected") print('*' * 45) return data_complete