###############################################################################
## ##
## LIBRARIES ##
## ##
###############################################################################
# Standard library imports
from pathlib import Path
from datetime import datetime
from typing import Dict, List, Union
# Third-party libraries
import gsw
import numpy as np
# Logging and tracing
import logging
from eliot import start_action, log_message
# Module utilities and file I/O
from Hydrological_model_validator.Processing.time_utils import Timer
from Hydrological_model_validator.Processing.utils import infer_years_from_path, temp_threshold, hal_threshold, build_bfm_filename
from Hydrological_model_validator.Processing.file_io import read_nc_variable_from_gz_in_memory
###############################################################################
## ##
## FUNCTIONS ##
## ##
###############################################################################
[docs]
def compute_density_bottom(temperature_data: dict,
salinity_data: dict,
Bmost: np.ndarray,
method: str,
dz: float = 2.0) -> dict:
"""
Compute seawater density (kg/m³) at benthic depth using the specified method.
Parameters
----------
temperature_data : dict
Dictionary keyed by year (int or str), each containing a list of 12 arrays
representing bottom temperature for each month.
salinity_data : dict
Dictionary keyed by year, each containing a list of 12 arrays representing
bottom salinity for each month.
Bmost : np.ndarray
2D array (Y, X) containing the 1-based index of the deepest valid vertical level.
method : str
Method to compute density. Supported options:
- 'EOS': Linear equation of state approximation.
- 'EOS80': EOS-80 potential density at surface.
- 'TEOS10': TEOS-10 absolute density with pressure.
dz : float, optional
Vertical layer thickness in meters (default is 2.0 m).
Returns
-------
density_data : dict
Dictionary keyed by year, each containing a list of 12 arrays with computed
density fields corresponding to the monthly bottom data.
Raises
------
ValueError
If an unsupported method is passed or the density output shape is unexpected.
Example
-------
>>> # Assume temperature_data and salinity_data are loaded dictionaries as described
>>> # Bmost is a 2D numpy array indicating bottom layer indices
>>> method = 'EOS80'
>>> density = compute_density_bottom(temperature_data, salinity_data, Bmost, method)
>>> # density is a dict {year: [12 arrays]}, each array is density at benthic depth for that month
"""
# ===== INPUT VALIDATION =====
if not isinstance(temperature_data, dict):
raise TypeError("❌ temperature_data must be a dictionary keyed by year. ❌")
if not isinstance(salinity_data, dict):
raise TypeError("❌ salinity_data must be a dictionary keyed by year. ❌")
if not isinstance(Bmost, np.ndarray):
raise TypeError("❌ Bmost must be a numpy.ndarray. ❌")
if Bmost.ndim != 2:
raise ValueError("❌ Bmost must be a 2D array. ❌")
if np.any(Bmost < 0): # the 0 value is kept due to the 3D renders
raise ValueError("❌ Bmost indices must be greater or equal to 0 ❌")
if not isinstance(dz, (int, float)) or dz <= 0:
raise ValueError("❌ dz must be a positive float. ❌")
if set(temperature_data.keys()) != set(salinity_data.keys()):
raise ValueError("❌ temperature_data and salinity_data must have the same years as keys. ❌")
for year in temperature_data:
temp_list = temperature_data[year]
sal_list = salinity_data[year]
for month_idx in range(len(temp_list)):
temp_arr = temp_list[month_idx]
sal_arr = sal_list[month_idx]
if not isinstance(temp_arr, np.ndarray) or not isinstance(sal_arr, np.ndarray):
raise TypeError(f"❌ For year {year}, month {month_idx+1}, temperature and salinity must be numpy arrays. ❌")
if temp_arr.shape != sal_arr.shape:
raise ValueError(f"❌ For year {year}, month {month_idx+1}, temperature and salinity arrays must have the same shape. ❌")
# Check the 2D shape matches Bmost shape (spatial dimensions)
if temp_arr.shape != Bmost.shape:
raise ValueError(f"❌ For year {year}, month {month_idx+1}, temperature/salinity shape {temp_arr.shape} does not match Bmost shape {Bmost.shape}. ❌")
if method not in {"EOS", "EOS80", "TEOS10"}:
raise ValueError(f"❌ Unsupported method '{method}'. Choose from: 'EOS', 'EOS80', 'TEOS10'. ❌")
with Timer("compute_density_bottom function"):
with start_action(action_type="compute_density_bottom", method=method) as action:
log_message("Entered compute_density_bottom", method=method)
logging.info(f"[Start] compute_density_bottom with method: {method}")
log_message("Validated input dictionaries and parameters", dz=dz)
logging.info("[Validation] Input data and parameters validated")
# ===== GET DEPTH =====
# Convert bottom layer indices to depth by multiplying by vertical layer thickness dz
depth = Bmost * dz
# Initialize output dictionary to hold density results for each year and month
density_data = {}
log_message("Depth computed from Bmost", depth_shape=depth.shape)
logging.info("[Info] Depth array calculated")
# ===== LOOP TO GET ALL VALUES =====
# Loop over each year and its list of monthly temperature arrays
for year, temp_list in temperature_data.items():
density_data[year] = []
# Loop over each month (0-based index) and its corresponding temperature 2D array
for month_idx, temp_2d in enumerate(temp_list):
# Fetch matching monthly salinity 2D array for the same year and month
sal_2d = salinity_data[year][month_idx]
# Add a new axis to create a 3D array with a single vertical level (needed for gsw functions)
temp_3d = temp_2d[None, ...]
sal_3d = sal_2d[None, ...]
# ===== COMPUTE THE DENSITY =====
# Calculate density depending on the specified method:
if method == "EOS":
# Linear equation of state approximation parameters:
alpha = 0.0002 # thermal expansion coefficient (1/°C)
beta = 0.0008 # haline contraction coefficient (1/psu)
rho0 = 1025 # reference density (kg/m³)
# Compute density using linear approximation around T=10°C and S=35 psu
density = rho0 * (1 - alpha * (temp_3d - 10) + beta * (sal_3d - 35))
elif method == "EOS80":
# Compute potential density referenced to the surface using EOS-80 (from Gibbs SeaWater toolbox)
density = gsw.density.sigma0(sal_3d, temp_3d) + 1000 # sigma0 returns density anomaly (kg/m³ - 1000)
elif method == "TEOS10":
# Compute absolute density at pressure corresponding to bottom depth using TEOS-10
# Depth is converted to pressure internally by gsw.density.rho if needed
density = gsw.density.rho(sal_3d, temp_3d, depth)
# Ensure the density output has the expected shape: 3D with singleton vertical dimension
if density.ndim == 3 and density.shape[0] == 1:
# Extract the 2D slice for the single bottom layer
density_2d = density[0]
else:
# Raise error if output dimensions are unexpected to avoid silent bugs
raise ValueError(f"❌ Unexpected density shape: {density.shape} ❌")
# Append the 2D density array for this month to the output list for the year
density_data[year].append(density_2d)
logging.info(f"[Year Complete] Processed density for year {year}")
log_message("Completed density computation for one year", year=year)
# Return dictionary containing benthic density arrays by year and month
log_message("Completed compute_density_bottom", total_years=len(density_data))
logging.info("[Done] compute_density_bottom completed successfully")
return density_data
###############################################################################
###############################################################################
[docs]
def compute_Bmost(mask3d: np.ndarray) -> np.ndarray:
"""
Compute a 2D array by summing the 3D mask array along the depth axis.
Parameters
----------
mask3d : np.ndarray
3D binary mask array with shape (depth, rows, cols), where
valid data points are typically marked as 1, invalid as 0.
Returns
-------
np.ndarray
2D array (rows, cols) where each element is the count of valid
depth levels (sum of mask) at that spatial location.
Notes
-----
Summation is performed over the depth dimension (axis=0), equivalent
to counting valid levels for each (row, col).
Examples
--------
>>> mask3d = np.array([[[1, 0], [0, 1]],
[[0, 1], [1, 0]]])
>>> compute_Bmost(mask3d)
array([[1, 1],
[1, 1]])
"""
# ===== INPUT VALIDATION =====
if not isinstance(mask3d, np.ndarray):
raise TypeError("❌ mask3d must be a numpy.ndarray. ❌")
if mask3d.ndim != 3:
raise ValueError("❌ mask3d must be a 3D array with shape (depth, rows, cols). ❌")
with Timer("compute_Bmost function"):
with start_action(action_type="compute_Bmost") as action:
log_message("Entered compute_Bmost", mask_shape=str(mask3d.shape))
logging.info(f"[Start] compute_Bmost with input shape {mask3d.shape}")
log_message("Input validation passed", shape=str(mask3d.shape))
logging.info("[Validation] mask3d input is valid")
# Sum mask values along the depth axis (axis=0).
# Since mask is binary (1 for valid, 0 for invalid),
# the sum at each (row, col) counts how many depth layers are valid.
result = np.sum(mask3d, axis=0).squeeze()
log_message("Computed Bmost", result_shape=str(result.shape))
logging.info(f"[Done] compute_Bmost result shape {result.shape}")
return result
###############################################################################
###############################################################################
[docs]
def compute_Bleast(mask3d: np.ndarray) -> np.ndarray:
"""
Extract the first (top) layer from a 3D mask array along the depth axis.
Parameters
----------
mask3d : np.ndarray
3D array of shape (depth, rows, cols).
Returns
-------
np.ndarray
2D array of shape (rows, cols) corresponding to the first layer mask3d[0, :, :].
Examples
--------
>>> mask3d = np.array([[[1, 0], [0, 1]],
[[0, 1], [1, 0]]])
>>> compute_Bleast(mask3d)
array([[1, 0],
[0, 1]])
"""
# ===== INPUT VALIDATION =====
if not isinstance(mask3d, np.ndarray):
raise TypeError("❌ mask3d must be a numpy.ndarray. ❌")
if mask3d.ndim != 3:
raise ValueError("❌ mask3d must be a 3D array with shape (depth, rows, cols). ❌")
if mask3d.shape[0] == 0:
raise ValueError("❌ mask3d must have at least one depth layer. ❌")
with Timer("compute_Bleast function"):
with start_action(action_type="compute_Bleast") as action:
log_message("Entered compute_Bleast", mask_shape=str(mask3d.shape))
logging.info(f"[Start] compute_Bleast with input shape {mask3d.shape}")
log_message("Input validation passed", depth=mask3d.shape[0])
logging.info("[Validation] mask3d input is valid")
# Extract the first depth layer (index 0) from the 3D mask array.
# The squeeze removes any single-dimensional entries from the shape,
# but here it mainly ensures a 2D array output (rows x cols).
result = np.squeeze(mask3d[0, :, :])
log_message("Computed Bleast", result_shape=str(result.shape))
logging.info(f"[Done] compute_Bleast result shape {result.shape}")
return result
###############################################################################
###############################################################################
[docs]
def filter_dense_water_masses(
density_data: Dict[int, List[np.ndarray]],
threshold: float = 1029.2
) -> Dict[int, List[np.ndarray]]:
"""
Filter density data to retain only dense water masses with density values
greater than or equal to the specified threshold.
Parameters
----------
density_data : dict
Dictionary with keys as years (int) and values as lists of 12 2D numpy arrays,
each representing monthly seawater density fields.
threshold : float, optional
Density threshold in kg/m³ for defining dense water masses.
Values below this threshold will be masked out (default is 1029.2).
Returns
-------
filtered_data : dict
Dictionary with the same structure as input, where density values below the
threshold are replaced with np.nan, retaining only dense water masses.
Examples
--------
>>> import numpy as np
>>> density_data = {
... 2000: [np.array([[1029.3, 1028.9], [1029.5, 1027.0]]) for _ in range(12)],
... 2001: [np.array([[1029.1, 1029.0], [1028.0, 1030.0]]) for _ in range(12)]
... }
>>> filtered = filter_dense_water_masses(density_data, threshold=1029.2)
>>> print(filtered[2000][0])
[[1029.3 nan]
[1029.5 nan]]
>>> print(filtered[2001][0])
[[ nan nan]
[ nan 1030.0]]
"""
# ===== INPUT VALIDATION =====
if not isinstance(density_data, dict):
raise TypeError("❌ density_data must be a dictionary keyed by year (int). ❌")
for year, monthly_arrays in density_data.items():
if not isinstance(year, int):
raise TypeError(f"❌ Year keys must be integers, got {type(year)}. ❌")
if not isinstance(monthly_arrays, list):
raise TypeError(f"❌ For year {year}, density data must be a list of numpy arrays. ❌")
for idx, arr in enumerate(monthly_arrays):
if not isinstance(arr, np.ndarray):
raise TypeError(f"❌ For year {year}, month index {idx}, density data must be a numpy.ndarray. ❌")
if arr.ndim != 2:
raise ValueError(f"❌ For year {year}, month index {idx}, density array must be 2D. ❌")
if not isinstance(threshold, (int, float)):
raise TypeError("❌ threshold must be a numeric value (int or float). ❌")
with Timer("filter_dense_water_masses function"):
with start_action(action_type="filter_dense_water_masses", threshold=str(threshold)) as action:
log_message("Entered filter_dense_water_masses", num_years=len(density_data), threshold=str(threshold))
logging.info(f"[Start] Filtering dense water masses with threshold {threshold} for {len(density_data)} years")
logging.info("[Validation] Input validation passed")
log_message("Input validation complete")
# ===== FILTER THE DATA =====
filtered_data = {
year: [
# For each 2D monthly density array:
# Keep values >= threshold (dense water masses),
# mask out (set to np.nan) all values below threshold (less dense water)
np.where(density_2d >= threshold, density_2d, np.nan)
for density_2d in monthly_arrays
]
for year, monthly_arrays in density_data.items()
}
log_message("Filtering complete", years=list(filtered_data.keys()))
logging.info(f"[Done] Filtering complete for {len(filtered_data)} years")
return filtered_data
###############################################################################
###############################################################################
[docs]
def calc_density(
temp_3d: np.ndarray,
sal_3d: np.ndarray,
depths: np.ndarray,
valid_mask,
density_method: str,
) -> np.ndarray:
"""
Calculate seawater density based on temperature, salinity, and depth using
the specified density method.
Parameters
----------
temp_3d : np.ndarray
3D array of temperature values (depth x lat x lon).
sal_3d : np.ndarray
3D array of salinity values (depth x lat x lon).
depths : np.ndarray
1D array of depth levels in meters (length = depth dimension of temp_3d).
valid_mask : np.ndarray or None
Optional mask array; if None, valid values are where temp and sal are not NaN.
(In this function, valid_mask is redefined internally, so input is ignored.)
density_method : str
Method for density calculation. One of: "EOS", "EOS80", "TEOS10".
Returns
-------
np.ndarray
3D array of seawater density values (depth x lat x lon).
Raises
------
ValueError
If density_method is not one of the supported options.
Examples
--------
>>> import numpy as np
>>> import gsw
>>> depth_levels = np.array([0, 10, 20])
>>> temp = np.array([
... [[10, 11], [12, 13]],
... [[9, 10], [11, 12]],
... [[8, 9], [10, 11]]
... ])
>>> sal = np.array([
... [[35, 35], [35, 35]],
... [[34.5, 34.5], [34.5, 34.5]],
... [[34, 34], [34, 34]]
... ])
>>> density = calc_density(temp, sal, depth_levels, None, "EOS")
>>> print(density.shape)
(3, 2, 2)
>>> print(np.round(density, 2))
[[[1025. 1024.8 ]
[1025.2 1025.4 ]]
[[1024.6 1024.4 ]
[1024.8 1025. ]]
[[1024.2 1024. ]
[1024.4 1024.6 ]]]
"""
# ===== INPUT VALIDATION =====
if not isinstance(temp_3d, np.ndarray):
raise TypeError("❌ temp_3d must be a numpy.ndarray. ❌")
if not isinstance(sal_3d, np.ndarray):
raise TypeError("❌ sal_3d must be a numpy.ndarray. ❌")
if not isinstance(depths, np.ndarray):
raise TypeError("❌ depths must be a numpy.ndarray. ❌")
if temp_3d.shape != sal_3d.shape:
raise ValueError(f"❌ temp_3d and sal_3d must have the same shape. Got {temp_3d.shape} and {sal_3d.shape}. ❌")
if density_method not in {"EOS", "EOS80", "TEOS10"}:
raise ValueError(
f"❌ Unsupported density method: {density_method}. Choose from: 'EOS', 'EOS80', 'TEOS10'. ❌"
)
with Timer("calc_density function"):
with start_action(action_type="calc_density", method=density_method) as action:
log_message(
"Entered calc_density",
method=density_method,
temp_shape=str(temp_3d.shape),
sal_shape=str(sal_3d.shape),
)
logging.info(f"[Start] Calculating density with method '{density_method}'")
# Define valid data points where neither temperature nor salinity is NaN
valid_mask = ~np.isnan(temp_3d) & ~np.isnan(sal_3d)
# Initialize density array with NaNs to preserve invalid points
density = np.full(temp_3d.shape, np.nan, dtype=np.float64)
# ===== COMPUTE THE DENSITY =====
if density_method == "EOS":
# Linear equation of state parameters
alpha, beta, rho0 = 0.0002, 0.0008, 1025
# Calculate density only at valid points using linear EOS approximation
density[valid_mask] = rho0 * (
1 - alpha * (temp_3d[valid_mask] - 10) + beta * (sal_3d[valid_mask] - 35)
)
elif density_method == "EOS80":
# Use EOS-80 potential density at surface from Gibbs SeaWater (gsw) toolbox
# Add 1000 to convert sigma0 to absolute density in kg/m^3
density[valid_mask] = gsw.density.sigma0(sal_3d[valid_mask], temp_3d[valid_mask]) + 1000
elif density_method == "TEOS10":
# Convert depths (m) to pressure (dbar) assuming 1 dbar ≈ 10 m depth
pressure = depths[:, None, None] / 10.0
# Broadcast pressure to match 3D data shape for pointwise computation
pressure_3d = np.broadcast_to(pressure, temp_3d.shape)
# Compute absolute density with pressure using TEOS-10 standard from gsw
density[valid_mask] = gsw.density.rho(
sal_3d[valid_mask], temp_3d[valid_mask], pressure_3d[valid_mask]
)
log_message("Density calculation complete", method=density_method)
logging.info(f"[Done] Density calculation done with method '{density_method}'")
return density
###############################################################################
###############################################################################
[docs]
def compute_dense_water_volume(
IDIR: Union[str, Path],
mask3d: np.ndarray,
filename_fragments: dict,
density_method: str,
dz: float = 2.0,
dx: float = 800.0,
dy: float = 800.0,
dens_threshold: float = 1029.2,
) -> List[Dict]:
"""
Compute the volume of dense water masses (density >= dens_threshold kg/m³)
over time from oceanographic temperature and salinity data.
Parameters
----------
IDIR : Union[str, Path]
Directory path containing yearly subfolders with compressed NetCDF files.
Each subfolder named like 'outputYYYY' contains the data for year YYYY.
mask3d : np.ndarray
3D boolean mask array of shape (depth, Y, X), where True means the cell is masked/excluded.
filename_fragments : dict
Dictionary with keys 'ffrag1', 'ffrag2', 'ffrag3' used to construct filenames
for the data files.
density_method : str
Method to calculate seawater density. Must be one of "EOS", "EOS80", or "TEOS10".
dz : float, optional
Vertical thickness of each grid layer in meters (default 2.0 m).
dx : float, optional
Horizontal grid spacing in meters along the x-axis (default 800.0 m).
dy : float, optional
Horizontal grid spacing in meters along the y-axis (default 800.0 m).
dens_threshold : float, optional
Density threshold in kg/m³ to define dense water (default 1029.2 kg/m³).
Returns
-------
List[Dict]
List of dictionaries, each containing:
- 'date': datetime object for the first day of each month,
- 'volume_m3': volume of dense water (in cubic meters) for that month.
Notes
-----
- The function expects yearly subdirectories named as 'outputYYYY' inside IDIR.
- Temperature and salinity are read from compressed NetCDF files.
- Cells masked by mask3d are excluded from volume calculation.
- Density is computed per cell per time using the specified density method.
- The dense water volume is calculated by counting cells exceeding the density
threshold and multiplying by the cell volume (dx * dy * dz).
Examples
--------
>>> from pathlib import Path
>>> import numpy as np
>>> mask = np.zeros((50, 100, 100), dtype=bool) # no masked cells
>>> filename_fragments = {'ffrag1': 'data', 'ffrag2': 'temp', 'ffrag3': 'sal'}
>>> IDIR = Path('/path/to/ocean_data')
>>> dense_volumes = compute_dense_water_volume(
... IDIR=IDIR,
... mask3d=mask,
... filename_fragments=filename_fragments,
... density_method="EOS80",
... dz=2.0,
... dx=800.0,
... dy=800.0,
... dens_threshold=1029.2
... )
>>> print(dense_volumes[0])
{'date': datetime.datetime(2000, 1, 1, 0, 0), 'volume_m3': 1234567.89}
"""
# ==== INPUT VALIDATION =====
if not isinstance(IDIR, (str, Path)):
raise TypeError("❌ IDIR must be a string or Path object ❌")
if not isinstance(mask3d, np.ndarray):
raise TypeError("❌ mask3d must be a numpy ndarray ❌")
if mask3d.ndim != 3:
raise ValueError("❌ mask3d must be a 3D numpy array ❌")
if not isinstance(filename_fragments, dict):
raise TypeError("❌ filename_fragments must be a dict ❌")
required_keys = {'ffrag1', 'ffrag2', 'ffrag3'}
if not required_keys.issubset(filename_fragments.keys()):
missing = required_keys - filename_fragments.keys()
raise ValueError(f"❌ filename_fragments is missing required keys: {missing} ❌")
if density_method not in {"EOS", "EOS80", "TEOS10"}:
raise ValueError('❌ density_method must be one of "EOS", "EOS80", or "TEOS10" ❌')
for param_name, param_value in [('dz', dz), ('dx', dx), ('dy', dy), ('dens_threshold', dens_threshold)]:
if not isinstance(param_value, (int, float)):
raise TypeError(f"❌ {param_name} must be a number ❌")
if param_value <= 0 and param_name != 'dens_threshold':
raise ValueError(f"❌ {param_name} must be positive ❌")
if not isinstance(dens_threshold, (int, float)):
raise TypeError("❌ dens_threshold must be a number ❌")
with Timer("compute_dense_water_volume function"):
with start_action(action_type="compute_dense_water_volume", method=density_method) as action:
logging.info(f"[Start] compute_dense_water_volume with method '{density_method}'")
log_message("Starting compute_dense_water_volume",
IDIR=str(IDIR),
density_method=density_method,
dz=dz, dx=dx, dy=dy,
dens_threshold=dens_threshold)
IDIR = Path(IDIR)
# ===== GET THE YEARS =====
# Identify years available based on folder naming convention
print("Scanning directory to determine available years...")
Ybeg, Yend, ysec = infer_years_from_path(IDIR, target_type="folder", pattern=r'output\s*(\d{4})')
print(f"Found years from {Ybeg} to {Yend}: {ysec}")
print("-" * 45)
log_message("Found years", Ybeg=Ybeg, Yend=Yend, years_list=ysec)
# ===== CELL SPECIFICS =====
# Compute grid cell dimensions and volume
cell_area = dx * dy
cell_volume = cell_area * dz
log_message("Grid cell dimensions", dx=dx, dy=dy, dz=dz, cell_volume=cell_volume)
volume_time_series = [] # Stores output: list of dicts with volume per month
# ==== LOOP =====
for year in ysec:
# Construct expected filename path for current year
filename = build_bfm_filename(year, filename_fragments)
file_nc = IDIR / f"output{year}" / filename
file_gz = Path(str(file_nc) + ".gz")
print(f"Working on year {year}")
log_message("Processing year", year=year, filename=str(file_gz))
# ===== FILE OPENING =====
# Skip year if data file is missing
if not file_gz.exists():
print(f"File missing: {file_gz}, skipping year {year}")
logging.warning(f"Missing file for year {year}: {file_gz}")
continue
# Read temperature and salinity arrays from compressed NetCDF file
temp = read_nc_variable_from_gz_in_memory(file_gz, 'votemper')
sal = read_nc_variable_from_gz_in_memory(file_gz, 'vosaline')
# Validate array shapes
if temp.shape != sal.shape:
raise ValueError("Temperature and salinity data shape mismatch")
time_len, depth_len, Y, X = temp.shape
log_message("Data shapes", temp_shape=temp.shape, sal_shape=sal.shape)
# ===== MASKING AND VALIDATION =====
# Apply static 3D spatial mask (broadcast to 4D to match time dimension)
mask_4d = np.broadcast_to(mask3d == 0, temp.shape)
# Create 1D array of depths and define logical depth ranges
depths = np.arange(depth_len) * dz
mask_shallow = (depths > 0) & (depths <= 50)
mask_deep = (depths > 50) & (depths <= 200)
# Convert shallow/deep masks to full 4D masks for later filtering
mask_shallow_4d = np.broadcast_to(mask_shallow[:, None, None], temp.shape)
mask_deep_4d = np.broadcast_to(mask_deep[:, None, None], temp.shape)
# Apply mask to remove excluded spatial regions
temp = np.where(mask_4d, np.nan, temp)
sal = np.where(mask_4d, np.nan, sal)
# Identify and mask invalid temperature and salinity values based on depth
invalid_temp = temp_threshold(temp, mask_shallow_4d, mask_deep_4d)
invalid_sal = hal_threshold(sal, mask_shallow_4d, mask_deep_4d)
invalid_mask = invalid_temp | invalid_sal
temp = np.where(invalid_mask, np.nan, temp)
sal = np.where(invalid_mask, np.nan, sal)
# Define mask of valid values for density computation
valid_mask = ~np.isnan(temp) & ~np.isnan(sal)
# Calculate density using the specified method
density_4d = calc_density(temp, sal, depths, valid_mask, density_method)
# ===== THRESHOLD AND VOLUME =====
# Identify cells with density ≥ threshold (i.e., dense water)
dense_cells = density_4d >= dens_threshold
# Count number of dense cells for each time slice (month)
dense_counts = np.sum(dense_cells, axis=(1, 2, 3))
# Convert count of dense cells to volume in cubic meters
dense_volumes = dense_counts * cell_volume
# Build output record for each month in the year
for month_idx in range(time_len):
date = datetime(year, month_idx + 1, 1)
volume = dense_volumes[month_idx]
print(f"Dense water volume for {date.strftime('%Y-%m')}: {volume:.2f} m³")
print("-" * 45)
log_message("Monthly dense water volume", date=str(date), volume_m3=volume)
volume_time_series.append({'date': date, 'volume_m3': volume})
logging.info("[Done] Completed compute_dense_water_volume")
return volume_time_series