###############################################################################
## ##
## LIBRARIES ##
## ##
###############################################################################
# Standard library imports
import gzip
import io
import os
from pathlib import Path
import concurrent.futures
from concurrent.futures import ThreadPoolExecutor
from typing import Dict, List, Union, Tuple, Optional
# Data handling libraries
import numpy as np
import xarray as xr
# Logging and tracing
import logging
from eliot import start_action, log_message
# Module utilities and modules
from Hydrological_model_validator.Processing.time_utils import Timer
from Hydrological_model_validator.Processing.utils import (infer_years_from_path,
build_bfm_filename,
temp_threshold,
hal_threshold)
from Hydrological_model_validator.Processing.file_io import (unzip_gz_to_file,
read_nc_variable_from_unzipped_file)
from Hydrological_model_validator.Processing.data_alignment import apply_3d_mask
###############################################################################
## ##
## FUNCTIONS ##
## ##
###############################################################################
###############################################################################
###############################################################################
[docs]
def extract_and_filter_benthic_data(data_4d: np.ndarray,
Bmost: np.ndarray,
dz: float = 2.0,
variable_key: Optional[str] = None) -> np.ndarray:
"""
Extract bottom layer data from a 4D array and apply depth-based threshold filtering.
This function extracts data from the bottom-most layer indicated by `Bmost` indices
from a 4D array `data_4d` with dimensions (time, depth, Y, X). It then applies filtering
based on depth-dependent thresholds for specific variables ('votemper' for temperature,
'vosaline' for salinity). Invalid data points outside the thresholds are set to NaN.
Parameters
----------
data_4d : np.ndarray
4D numpy array with shape (time, depth, Y, X), containing the variable data over
time, vertical layers, and spatial grid.
Bmost : np.ndarray
2D array with shape (Y, X) of 1-based indices indicating the bottom layer depth at
each spatial position.
dz : float, optional
Thickness of each vertical layer in meters. Default is 2.0.
variable_key : str, optional
Variable name to apply threshold filtering. Supported values are 'votemper' (temperature)
and 'vosaline' (salinity). If None or unsupported, no filtering is applied.
Returns
-------
np.ndarray
3D numpy array with shape (time, Y, X) containing the extracted bottom layer data,
filtered by the specified depth-dependent thresholds.
Example
-------
>>> import numpy as np
>>> data_4d = np.random.rand(12, 10, 5, 5) # 12 time steps, 10 depth layers, 5x5 spatial grid
>>> Bmost = np.array([[10, 9, 8, 10, 7],
... [10, 10, 9, 8, 7],
... [9, 10, 10, 9, 8],
... [8, 9, 10, 10, 9],
... [7, 8, 9, 10, 10]]) # bottom layer indices (1-based)
>>> benthic_filtered = extract_and_filter_benthic_data(data_4d, Bmost, dz=2.0, variable_key='votemper')
>>> print(benthic_filtered.shape)
(12, 5, 5)
"""
# ===== Input validation =====
if not isinstance(data_4d, np.ndarray):
raise TypeError(f"❌ data_4d must be a numpy.ndarray, got {type(data_4d)} ❌")
if not isinstance(Bmost, np.ndarray):
raise TypeError(f"❌ Bmost must be a numpy.ndarray, got {type(Bmost)} ❌")
if data_4d.ndim != 4:
raise ValueError(f"❌ data_4d must be 4D with shape (time, depth, Y, X), got {data_4d.shape} ❌")
if Bmost.ndim != 2:
raise ValueError(f"❌ Bmost must be 2D with shape (Y, X), got {Bmost.shape} ❌")
time_len, depth_len, Y, X = data_4d.shape
if Bmost.shape != (Y, X):
raise ValueError(f"❌ Bmost shape {Bmost.shape} must match spatial dims (Y, X) of data_4d {(Y, X)} ❌")
if not np.issubdtype(Bmost.dtype, np.integer):
raise ValueError("❌ Bmost must contain integer values (1-based indices) ❌")
if np.any(Bmost < 0) or np.any(Bmost >= depth_len):
raise ValueError(f"❌ Bmost indices must be in the range 0 to depth_len={depth_len} ❌")
if not isinstance(dz, (float, int)) or dz <= 0:
raise ValueError("❌ dz must be a positive number ❌")
if variable_key is not None and variable_key not in {'votemper', 'vosaline'}:
# Just warn, no filtering will be applied
print(f"⚠️ Warning: Unsupported variable_key '{variable_key}', no filtering will be applied ⚠️")
with Timer("extract_and_filter_benthic_data function"):
with start_action(action_type="extract_and_filter_benthic_data function"):
log_message("Input validation passed",
data_shape=data_4d.shape,
Bmost_shape=Bmost.shape,
dz=dz,
variable_key=variable_key)
logging.info(f"Input validation passed for data_4d shape {data_4d.shape} and Bmost shape {Bmost.shape}.")
# ===== EXTRACT THE DATA =====
# Extract dimensions for clarity and to guide indexing logic
time_len, depth_len, Y, X = data_4d.shape
# ===== CREATE THE MASK =====
# Create a mask to identify spatial points where bottom indices are valid
valid_mask = (Bmost > 0) & (Bmost <= depth_len)
log_message("Created valid_mask for bottom layer indices",
valid_points=np.sum(valid_mask))
logging.info(f"Valid bottom layer points count: {np.sum(valid_mask)}.")
# ===== GET BOTTOM DATA WITH MASK =====
# Convert 1-based bottom layer indices to zero-based for Python indexing
# For invalid indices, temporarily assign 0 to avoid indexing errors
Bmost_zero = np.where(valid_mask, Bmost - 1, 0)
# Generate coordinate grids to index time, y, and x dimensions simultaneously
y_idx, x_idx = np.indices((Y, X))
# Build a tuple of indices for advanced indexing into the 4D array
# This fetches data from the bottom layer depth (Bmost_zero) for all time steps and spatial points
flat_indices = (np.arange(time_len)[:, None, None], Bmost_zero[None, :, :], y_idx[None, :, :], x_idx[None, :, :])
# Initialize output array with NaNs to hold bottom layer data
# Using NaNs helps clearly identify missing or invalid data after extraction and filtering
benthic_data = np.full((time_len, Y, X), np.nan, dtype=data_4d.dtype)
# ===== EXTRACT THE BOTTOM DATA =====
# Extract bottom layer data only at valid locations; invalid locations remain NaN
benthic_data[:, valid_mask] = data_4d[flat_indices][:, valid_mask]
log_message("Extracted bottom layer data",
extracted_points=np.sum(~np.isnan(benthic_data)))
logging.info(f"Extracted bottom layer data for {np.sum(~np.isnan(benthic_data))} valid points.")
# Calculate actual depth (meters) of each bottom cell to apply depth-dependent filters
depths = Bmost * dz
log_message("Calculated depths from Bmost and dz",
depth_min=np.min(depths),
depth_max=np.max(depths))
logging.info(f"Depths calculated, range: {np.min(depths)} to {np.max(depths)} meters.")
# ===== FILTERING OF INVALID DATA =====
# Define masks for shallow and deep regions to apply variable-specific thresholds
# These ranges are based on known environmental zones with distinct physical/chemical properties
mask_shallow = (depths > 0) & (depths <= 50)
mask_deep = (depths >= 51) & (depths <= 200)
log_message("Defined shallow and deep masks",
shallow_count=np.sum(mask_shallow),
deep_count=np.sum(mask_deep))
logging.info(f"Shallow cells: {np.sum(mask_shallow)}, Deep cells: {np.sum(mask_deep)}.")
# Only apply filtering if a recognized variable key is provided
if variable_key in {'votemper', 'vosaline'}:
# Broadcast shallow and deep masks to 3D shape to match time dimension for masking invalid data
mask_shallow_3d = np.broadcast_to(mask_shallow, (time_len, Y, X))
mask_deep_3d = np.broadcast_to(mask_deep, (time_len, Y, X))
# Apply variable-specific thresholding functions that return masks of invalid data points
# This step removes physically unrealistic or erroneous measurements
if variable_key == 'votemper':
invalid_mask = temp_threshold(benthic_data, mask_shallow_3d, mask_deep_3d)
else:
invalid_mask = hal_threshold(benthic_data, mask_shallow_3d, mask_deep_3d)
# Log counts of invalid cells per time step to monitor filtering impact
invalid_counts = np.sum(invalid_mask, axis=(1, 2))
for t, count in enumerate(invalid_counts):
if count > 0:
print(f"⚠️ Month {t+1}: {count} cells outside valid {variable_key} range set to NaN ⚠️")
# Set invalid data points to NaN to exclude them from further analysis
benthic_data[invalid_mask] = np.nan
log_message("Applied filtering to benthic data",
total_invalid=np.sum(invalid_mask))
logging.info(f"Filtering applied: total invalid data points set to NaN = {np.sum(invalid_mask)}.")
return benthic_data
###############################################################################
###############################################################################
[docs]
def process_year(year: int,
IDIR: Union[str, Path],
mask3d: np.ndarray,
Bmost: np.ndarray,
filename_fragments: Dict[str, str],
variable_key: str) -> Tuple[int, np.ndarray]:
"""
Process benthic parameter data for a single year by reading, masking, extracting,
and filtering bottom layer data from model output files.
This function locates the compressed model data file for the specified year,
decompresses it, loads the specified variable's data, applies a 3D mask to
filter out invalid points, and extracts the bottom layer benthic parameter
values using depth indices (`Bmost`). It then applies variable-specific filtering
to ensure data quality.
Parameters
----------
year : int
The year of the dataset to process.
IDIR : str or Path
Base directory path where the model output files are stored.
mask3d : np.ndarray
A 3D mask array of shape (depth, Y, X) where zeros indicate invalid data points
to be masked out (set as NaN).
Bmost : np.ndarray
A 2D array of shape (Y, X) containing 1-based indices that indicate the bottom
vertical layer for each spatial point.
filename_fragments : dict
Dictionary containing filename fragments with keys 'ffrag1', 'ffrag2', and 'ffrag3',
used to construct the filename of the model output.
variable_key : str
The name of the variable to extract from the dataset (e.g., 'votemper', 'vosaline').
Returns
-------
Tuple[int, np.ndarray]
A tuple containing:
- The processed year (int).
- A 3D numpy array of shape (time, Y, X) with the extracted and filtered bottom
layer parameter data.
Raises
------
FileNotFoundError
If the compressed model output file for the given year does not exist.
KeyError
If the specified `variable_key` is not found in the dataset.
ValueError
If the spatial dimensions of the data do not match those of the provided mask.
Example
-------
>>> year = 2005
>>> base_dir = "/data/model_output"
>>> mask = np.ones((10, 20, 30)) # example mask with all valid points
>>> Bmost_indices = np.full((20, 30), 10) # bottom layer is the 10th depth for all
>>> fragments = {'ffrag1': 'model', 'ffrag2': 'output', 'ffrag3': 'nc'}
>>> variable = 'votemper'
>>> yr, benthic_arr = process_year(year, base_dir, mask, Bmost_indices, fragments, variable)
>>> print(yr)
2005
>>> print(benthic_arr.shape)
(time_steps, 20, 30)
"""
# ===== INPUT VALIDATION =====
if not isinstance(year, int):
raise TypeError(f"❌ year must be int, got {type(year)} ❌")
if not isinstance(mask3d, np.ndarray):
raise TypeError(f"❌ mask3d must be np.ndarray, got {type(mask3d)} ❌")
if mask3d.ndim != 3:
raise ValueError(f"❌ mask3d must be 3D (depth, Y, X), got shape {mask3d.shape} ❌")
if not isinstance(Bmost, np.ndarray):
raise TypeError(f"❌ Bmost must be np.ndarray, got {type(Bmost)} ❌")
if Bmost.ndim != 2:
raise ValueError(f"❌ Bmost must be 2D (Y, X), got shape {Bmost.shape} ❌")
if not isinstance(filename_fragments, dict):
raise TypeError(f"❌ filename_fragments must be dict, got {type(filename_fragments)} ❌")
required_keys = {'ffrag1', 'ffrag2', 'ffrag3'}
missing_keys = required_keys - filename_fragments.keys()
if missing_keys:
raise KeyError(f"❌ filename_fragments missing keys: {missing_keys} ❌")
if not isinstance(variable_key, str):
raise TypeError(f"❌ variable_key must be str, got {type(variable_key)} ❌")
log_message("Input validation passed",
year=year,
mask3d_shape=mask3d.shape,
Bmost_shape=Bmost.shape,
filename_fragments_keys=list(filename_fragments.keys()),
variable_key=variable_key)
logging.info(f"Input validation passed for year {year}.")
# ===== PATH CHECK =====
IDIR = Path(IDIR)
if not IDIR.exists() or not IDIR.is_dir():
raise ValueError(f"❌ IDIR path does not exist or is not a directory: {IDIR} ❌")
year_str = str(year)
log_message("Input directory verified",
IDIR=str(IDIR))
with Timer("process_year function"):
with start_action(action_type="process_year function", year=year):
# ===== BUILD FILENAME AND PATH =====
filename = build_bfm_filename(year, filename_fragments)
file_nc = IDIR / f"output{year_str}" / filename
file_gz = Path(str(file_nc) + ".gz")
if not file_gz.exists():
raise FileNotFoundError(f"❌ Compressed file not found: {file_gz} ❌")
log_message("Located compressed file",
filename=str(file_gz))
logging.info(f"Located compressed file: {file_gz}")
print(f"Handling file {filename}...")
# ===== DECOMPRESS AND READ DATA =====
with gzip.open(file_gz, 'rb') as f_in:
decompressed_bytes = f_in.read()
with xr.open_dataset(io.BytesIO(decompressed_bytes)) as ds:
if variable_key not in ds:
raise KeyError(f"❌ Variable '{variable_key}' not found in dataset. ❌")
data = ds[variable_key].values
log_message("Loaded data from dataset",
variable_key=variable_key,
data_shape=data.shape)
logging.info(f"Loaded variable '{variable_key}' data with shape {data.shape}.")
# ===== VERIFY SHAPE MATCH =====
if data.shape[2:] != mask3d.shape[1:]:
raise ValueError(f"❌ Shape mismatch between data spatial dims {data.shape[2:]} and mask3d {mask3d.shape[1:]} ❌")
log_message("Verified spatial dimensions match",
data_spatial=data.shape[2:],
mask3d_spatial=mask3d.shape[1:])
logging.info("Spatial dimensions match between data and mask3d.")
# ===== MASKING =====
data = np.where(mask3d[None, :, :, :] == 0, np.nan, data)
log_message("Applied 3D mask to data",
mask_shape=mask3d.shape)
# ===== EXTRACT AND FILTER BOTTOM DATA =====
benthic_data = extract_and_filter_benthic_data(
data_4d=data,
Bmost=Bmost,
dz=2.0,
variable_key=variable_key
)
log_message("Extracted and filtered bottom layer benthic data",
benthic_shape=benthic_data.shape)
print(f"\033[92m✅ Year {year} processed.\033[0m")
return year, benthic_data
###############################################################################
###############################################################################
[docs]
def read_benthic_parameter(IDIR: Union[str, Path],
mask3d: np.ndarray,
Bmost: np.ndarray,
filename_fragments: Dict[str, str],
variable_key: str) -> Dict[int, List[np.ndarray]]:
"""
Reads benthic parameter data (e.g., temperature, salinity) from monthly averaged
compressed NetCDF files over all available years in a specified directory.
The function scans the given directory for yearly folders matching a pattern,
then concurrently processes each year’s data by applying spatial and depth masks,
extracting bottom layer values, and filtering the data based on the variable's
quality thresholds. The results are collected as a dictionary mapping each year
to a list of 12 monthly 2D arrays representing the benthic parameter.
Parameters
----------
IDIR : str or Path
Base directory containing the MODEL output data folders.
mask3d : np.ndarray
3D mask array with shape (depth, Y, X), where 0 indicates invalid points that
should be masked out (set to NaN).
Bmost : np.ndarray
2D array with shape (Y, X) containing 1-based indices of the bottom vertical
layer at each spatial location.
filename_fragments : dict
Dictionary with keys 'ffrag1', 'ffrag2', 'ffrag3' containing parts of the filename
used to locate the NetCDF files.
variable_key : str
Key name of the variable to extract from the dataset, such as 'votemper' or 'vosaline'.
Returns
-------
Dict[int, List[np.ndarray]]
Dictionary mapping each processed year (int) to a list of 12 numpy 2D arrays
(Y, X) representing monthly bottom parameter values.
Raises
------
ValueError
If any required filename fragment is missing, or if inputs have incorrect types
or dimensions.
FileNotFoundError
If the specified data directory does not exist.
Example
-------
>>> base_dir = "/data/model_output"
>>> mask = np.ones((10, 50, 60)) # mask with all valid points
>>> Bmost_indices = np.full((50, 60), 10) # bottom layer index = 10 everywhere
>>> fragments = {'ffrag1': 'model', 'ffrag2': 'output', 'ffrag3': 'nc'}
>>> variable = 'votemper'
>>> data_by_year = read_benthic_parameter(base_dir, mask, Bmost_indices, fragments, variable)
>>> for year, monthly_data in data_by_year.items():
... print(f"Year {year} has {len(monthly_data)} months of data, shape {monthly_data[0].shape}")
"""
# ===== INPUT VALIDATIONS =====
# Convert input path to Path object for consistent filesystem operations
IDIR = Path(IDIR)
# Check existence early to avoid wasted computation on missing data
if not IDIR.exists():
raise FileNotFoundError(f"❌ Directory {IDIR} does not exist. ❌")
# Validate input types and dimensions to prevent downstream errors
if not isinstance(mask3d, np.ndarray) or mask3d.ndim != 3:
raise ValueError("❌ mask3d must be a 3D numpy array. ❌")
if not isinstance(Bmost, np.ndarray) or Bmost.ndim != 2:
raise ValueError("❌ Bmost must be a 2D numpy array. ❌")
if not filename_fragments:
raise ValueError("❌ filename_fragments must be provided and not None. ❌")
for key in ('ffrag1', 'ffrag2', 'ffrag3'):
# Ensure all required filename parts are available to correctly locate files
if key not in filename_fragments or filename_fragments[key] is None:
raise ValueError(f"❌ Missing filename fragment: '{key}' ❌")
with Timer("read_benthic_parameter function"):
with start_action(action_type="read_benthic_parameter function"):
log_message("Input validation passed",
IDIR=str(IDIR),
mask3d_shape=mask3d.shape,
Bmost_shape=Bmost.shape,
filename_fragments_keys=list(filename_fragments.keys()),
variable_key=variable_key)
logging.info("Input validation passed for read_benthic_parameter")
# ===== SCAN THE FOLDER FOR YEAR RANGE =====
# Identify the range of years available by scanning directory structure
print("Scanning directory to determine available years...")
Ybeg, Yend, ysec = infer_years_from_path(IDIR, target_type="folder", pattern=r'output\s*(\d{4})')
print(f"Found years from {Ybeg} to {Yend}: {ysec}")
print("-" * 45)
parameter_data: Dict[int, List[np.ndarray]] = {}
# ===== GET DATA ALL TOGETHER =====
# Use a thread pool to process multiple years concurrently
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = {
executor.submit(process_year, y, IDIR, mask3d, Bmost, filename_fragments, variable_key): y for y in ysec
}
for future in concurrent.futures.as_completed(futures):
year = futures[future]
try:
# Collect processed benthic data for each year
yr, values = future.result()
parameter_data[yr] = values
log_message("Year processed successfully",
year=yr,
data_shape=values.shape if isinstance(values, np.ndarray) else "list")
logging.info(f"Year {yr} processed successfully in read_benthic_parameter.")
except Exception as exc:
# Log errors but continue processing other years
log_message("Error processing year",
year=year,
error=str(exc))
logging.error(f"Error processing year {year}: {exc}")
print(f"❌ Error processing year {year}: {exc} ❌")
# Sort results by year to provide ordered output
return dict(sorted(parameter_data.items()))
###############################################################################
###############################################################################
[docs]
def read_bfm_chemical(
IDIR: Union[str, Path],
mask3d: np.ndarray,
Bmost: np.ndarray,
filename_fragments: Dict[str, str],
variable_key: str
) -> Dict[int, List[np.ndarray]]:
"""
Reads BFM NetCDF model output for a specified chemical variable over multiple years,
applies a spatial mask, extracts bottom layer values, and returns the data organized by year.
This function scans the given base directory for yearly output folders matching a pattern,
constructs filenames from provided fragments, and processes each year’s compressed
NetCDF files. For each year, it unzips the file (if needed), reads the specified variable,
applies a 3D mask to invalidate certain points, extracts the bottom-most valid layer
based on Bmost indices, and collects the monthly or time-step data as 2D arrays.
Parameters
----------
IDIR : str or Path
Base directory path where BFM model output folders are located.
mask3d : np.ndarray
3D numpy array (depth, Y, X) used as a mask; elements with zero are masked (set to NaN).
Bmost : np.ndarray
2D numpy array (Y, X) containing 1-based indices indicating the bottom-most valid layer per grid cell.
filename_fragments : dict
Dictionary with keys such as 'ffrag1', 'ffrag2', 'ffrag3' used to construct filenames for the NetCDF files.
variable_key : str
Name of the chemical variable to extract from the NetCDF files (e.g., 'O2', 'NO3').
Returns
-------
dict[int, list[np.ndarray]]
Dictionary where each key is a year (int) and each value is a list of 2D numpy arrays
representing the extracted bottom layer chemical parameter for each time step (e.g., months).
Notes
-----
- The function will unzip compressed .gz files if uncompressed NetCDF files are not already present.
- Unzipped files are deleted after reading to conserve disk space.
- Processing is done concurrently across years using a thread pool for speed.
Example
-------
>>> base_dir = "/path/to/bfm/output"
>>> mask = np.ones((10, 50, 60))
>>> Bmost_indices = np.full((50, 60), 10)
>>> fragments = {'ffrag1': 'chem', 'ffrag2': 'monthly', 'ffrag3': 'nc'}
>>> chemical_var = 'O2'
>>> data_by_year = read_bfm_chemical(base_dir, mask, Bmost_indices, fragments, chemical_var)
>>> for year, monthly_layers in data_by_year.items():
... print(f"Year {year} has {len(monthly_layers)} time slices, shape {monthly_layers[0].shape}")
"""
# ===== INPUT VALIDATIONS =====
IDIR = Path(IDIR)
if not IDIR.exists():
raise FileNotFoundError(f"❌ Directory {IDIR} does not exist. ❌")
if not isinstance(mask3d, np.ndarray) or mask3d.ndim != 3:
raise ValueError("❌ mask3d must be a 3D numpy array. ❌")
if not isinstance(Bmost, np.ndarray) or Bmost.ndim != 2:
raise ValueError("❌ Bmost must be a 2D numpy array. ❌")
if not filename_fragments or not isinstance(filename_fragments, dict):
raise ValueError("❌ filename_fragments must be a non-empty dictionary. ❌")
for key in ('ffrag1', 'ffrag2', 'ffrag3'):
if key not in filename_fragments or filename_fragments[key] is None:
raise ValueError(f"❌ Missing filename fragment: '{key}' ❌")
if not isinstance(variable_key, str) or not variable_key:
raise ValueError("❌ variable_key must be a non-empty string. ❌")
with Timer("read_bfm_chemical function"):
with start_action(action_type="read_bfm_chemical function"):
log_message("Input validation passed",
IDIR=str(IDIR),
mask3d_shape=mask3d.shape,
Bmost_shape=Bmost.shape,
filename_fragments_keys=list(filename_fragments.keys()),
variable_key=variable_key)
logging.info("Input validation passed for read_bfm_chemical")
# ===== SCAN FOLDER TO OBTAIN YEAR INFO =====
# Identify available years by scanning output folders named like "outputYYYY"
# Using a regex pattern to capture four-digit year numbers dynamically
print("Scanning directory to determine available years...")
Ybeg, Yend, ysec = infer_years_from_path(IDIR, target_type="folder", pattern=r'output\s*(\d{4})')
print(f"Found years from {Ybeg} to {Yend}: {ysec}")
print("-" * 45)
results = {}
# ===== WORKERS =====
# Worker function to process one year at a time
def worker(year: int) -> None:
year_str = str(year)
print(f"Retrieving year: {year_str}")
with start_action(action_type="process_year", year=year):
# Construct full filename based on the year and filename fragments
filename = build_bfm_filename(year, filename_fragments)
file_nc = IDIR / f"output{year_str}" / filename
file_gz = Path(str(file_nc) + ".gz")
print(f"Currently handling {filename}")
# If the NetCDF file is not yet uncompressed, unzip it from the .gz archive
if not file_nc.exists():
unzip_gz_to_file(file_gz, file_nc)
print("\033[92m✅ File successfully unzipped\033[0m")
# Read the target variable data from the unzipped NetCDF file
P_orig = read_nc_variable_from_unzipped_file(file_nc, variable_key)
# Remove the uncompressed file to save disk space after reading
try:
os.remove(file_nc)
except OSError:
# If file removal fails (e.g., permission issues), ignore and continue
pass
# Apply the 3D mask to invalidate points by setting masked locations to NaN
P = apply_3d_mask(P_orig, mask3d)
# Extract the bottom-most valid layer using Bmost indices (1-based)
bottom_layers = extract_bottom_layer(P, Bmost)
# Store the extracted bottom layer data indexed by year
results[year] = bottom_layers
log_message("Year processed successfully",
year=year,
data_shape=bottom_layers.shape if isinstance(bottom_layers, np.ndarray) else "list")
logging.info(f"Year {year} processed successfully in read_bfm_chemical.")
print(f"\033[92m✅ Correct layer data extracted for year {year_str}\033[0m")
print("-" * 45)
# ===== READS ALL FILES =====
# Use a thread pool to process each year concurrently for faster I/O and CPU utilization
with ThreadPoolExecutor() as executor:
list(executor.map(worker, ysec))
# Return the results sorted by year for consistent ordering
return dict(sorted(results.items()))