#!/usr/bin/python
# -*- coding: utf-8 -*-
[docs]
def generate_full_report(
data_folder,
output_dir = None,
check_only = False,
generate_pdf = True,
verbose = False,
variable = None,
unit = None,
open_report = False
):
"""
Generate a comprehensive report from spatial and timeseries data inputs.
This function processes input datasets (spatial and time series) along with a mask,
validates and aligns data, computes statistical analyses, and generates plots and
reports summarizing the model vs observed comparisons. It supports saving outputs
to a specified directory, optionally generating a PDF report, and can run in a
check-only mode to validate inputs without producing outputs.
Parameters
----------
data_folder : dict or str
Dictionary of file paths or a folder path containing required input files:
observed spatial data, simulated spatial data, observed timeseries, simulated timeseries,
and mask file. If a folder path, the function will locate files within it.
output_dir : str or None, optional
Directory where the report, plots, and data outputs will be saved.
Defaults to None, which prompts the user or uses a default folder.
check_only : bool, optional
If True, only validate inputs and exit without generating any report or plots.
generate_pdf : bool, optional
Whether to generate a PDF report including plots and summary tables.
verbose : bool, optional
Enable verbose logging and console output for progress tracking.
variable : str or None, optional
Name of the variable for labeling plots and outputs (e.g., 'Chlorophyll-a').
If None, the user may be prompted to provide it during execution.
unit : str or None, optional
Unit string for the variable (e.g., 'mg/L', 'm3/s') shown in plot labels and legends.
If None, the user may be prompted to provide it during execution.
open_report : bool, optional
If True, automatically open the generated PDF report after creation.
Returns
-------
None
Saves outputs (pdf, plots and dataframes) to disk and modifies
internal state. The files are saved to `output_dir` if specified.
Raises
------
ValueError
If required inputs are missing or incompatible.
TypeError
If input types are invalid.
Example
-------
>>> generate_full_report(
... data_folder="input_data",
... output_dir="reports/2023-06",
... generate_pdf=True,
... variable="Temperature",
... unit="°C"
... )
"""
# ===== LIBRARIES =====
# General utility libraries
import pandas as pd
import numpy as np
import xarray as xr
import rasterio
from pathlib import Path
import os
from datetime import datetime
import calendar
import re
from reportlab.lib.units import inch
# Module specific libraries
from Hydrological_model_validator.Processing.utils import convert_dataarrays_in_df
from Hydrological_model_validator.Processing.file_io import (mask_reader,
find_file_with_keywords,
select_3d_variable)
from Hydrological_model_validator.Processing.Data_saver import save_variable_to_json
from Hydrological_model_validator.Processing.time_utils import (split_to_monthly,
split_to_yearly,
is_invalid_time_index,
prompt_for_datetime_index,
ensure_datetime_index)
from Hydrological_model_validator.Plotting.Plots import (timeseries,
seasonal_scatter_plot,
whiskerbox,
violinplot,
efficiency_plot,
error_components_timeseries,
plot_spectral,
plot_spatial_efficiency)
from Hydrological_model_validator.Plotting.Taylor_diagrams import (comprehensive_taylor_diagram,
monthly_taylor_diagram)
from Hydrological_model_validator.Plotting.Target_plots import (comprehensive_target_diagram,
target_diagram_by_month)
from Hydrological_model_validator.Processing.Efficiency_metrics import (r_squared,
weighted_r_squared,
nse,
index_of_agreement,
ln_nse,
nse_j,
index_of_agreement_j,
relative_nse,
relative_index_of_agreement,
monthly_r_squared,
monthly_weighted_r_squared,
monthly_nse,
monthly_index_of_agreement,
monthly_ln_nse,
monthly_nse_j,
monthly_index_of_agreement_j,
monthly_relative_nse,
monthly_relative_index_of_agreement,
compute_spatial_efficiency,
compute_error_timeseries)
from Hydrological_model_validator.Processing.stats_math_utils import (compute_coverage_stats,
corr_no_nan,
compute_fft,
detrend_dim)
from Hydrological_model_validator.Plotting.formatting import compute_geolocalized_coords
from Hydrological_model_validator.Report.report_utils import (PDFReportBuilder,
add_rotated_image_page,
add_seasonal_scatter_page,
add_multiple_rotated_images_grid,
add_multiple_images_grid,
add_efficiency_pages,
add_tables_page,
add_plot_to_pdf)
def vprint(*args, verbose=False, **kwargs):
if verbose:
print(*args, **kwargs)
# Border for terminal prints
border = "#" * 60
vprint("Starting to retrieve the data to make the report...", verbose=verbose)
# ===== FILE RETRIEVAL =====
# Detect if input is a folder or dict of paths
if isinstance(data_folder, str):
# Convert string path to a pathlib.Path object for convenient filesystem operations
data_folder = Path(data_folder)
if not data_folder.is_dir():
raise ValueError(f"❌ Provided path '{data_folder}' is not a valid directory. ❌")
vprint(f"Reading input files from folder: {data_folder}", verbose=verbose)
# Get a list of all files in the provided folder
files_in_folder = list(data_folder.iterdir())
# Attempt to find the spatial data file based on keyword hints
obs_spatial_path = find_file_with_keywords(files_in_folder, ['obs', 'observed', 'sat', 'satellite'], "observed spatial data")
sim_spatial_path = find_file_with_keywords(files_in_folder, ['sim', 'simulated', 'model', 'mod'], "simulated spatial data")
# Attempt to find the timeseries data files based on hints
obs_ts_path = find_file_with_keywords(files_in_folder, ['obs', 'observed', 'sat', 'satellite'], "observed timeseries data")
sim_ts_path = find_file_with_keywords(files_in_folder, ['sim', 'simulated', 'model', 'mod'], "simulated timeseries data")
# Attempt to find the mask
mask_path = find_file_with_keywords(files_in_folder, ['mask'], "mask data")
elif isinstance(data_folder, dict):
required_keys = ['obs_spatial', 'sim_spatial', 'obs_ts', 'sim_ts', 'mask']
missing_keys = [k for k in required_keys if k not in data_folder]
if missing_keys:
raise ValueError(f"❌ Missing keys in data dictionary: {missing_keys}")
vprint("Using explicit file paths provided via dictionary.", verbose=verbose)
# If user provides a dictionary of paths explicitly, extract and convert each one to a Path object
obs_spatial_path = Path(data_folder['obs_spatial'])
sim_spatial_path = Path(data_folder['sim_spatial'])
obs_ts_path = Path(data_folder['obs_ts'])
sim_ts_path = Path(data_folder['sim_ts'])
mask_path = Path(data_folder['mask'])
else:
raise ValueError("❌ Input must be a folder path string or a dict of file paths ❌")
vprint("Inputs paths validated!", verbose=verbose)
if check_only:
return
vprint("\n" + border + "\n", verbose=verbose)
vprint("Beginning to read the files...", verbose=verbose)
# ===== MASK RETRIEVAL =====
vprint("Getting the mask...", verbose=verbose)
# Extract the necessary mask elements
Mmask, Mfsm, _, _, _ = mask_reader(mask_path)
# Assign it to the ocean mask to be used later
ocean_mask=Mmask
vprint("Mask obtained!", verbose=verbose)
vprint("\n" + border + "\n", verbose=verbose)
# ===== LOAD THE SPATIAL DATASETS =====
for label, path in [('obs_spatial', obs_spatial_path), ('sim_spatial', sim_spatial_path)]:
suffix = Path(path).suffix.lower() # Get the file extension, lowecase for consistency
# Handle NetCDF/HDF5 file types
if suffix in ['.nc', '.netcdf', '.h5', '.hdf5']:
ds = xr.open_dataset(path) # Use Xarray to open
dataarray = select_3d_variable(ds, label) # Extract the main variable in the dataset
# Transpose dims if needed to ensure (time, lat, lon)
expected_dims = ('time', 'lat', 'lon')
if dataarray.dims != expected_dims:
vprint("⚠️ Dataarray dimensions do not match the expected one!", verbose=verbose)
vprint("Attempting to transpose the data...", verbose=verbose)
try:
dataarray = dataarray.transpose(*expected_dims)
vprint("Data transposed, new dimensions are (time, lat, lon)", verbose=verbose)
except ValueError as e:
raise ValueError(f"❌ Cannot transpose {dataarray.dims} to {expected_dims} in {label}: {e} ❌")
# Handle time index validity
if 'time' in dataarray.coords:
time_vals = dataarray['time'].values
if is_invalid_time_index(time_vals): # Handle cases like constant or dummy time values
vprint(f"⚠️ Detected invalid trivial time index in {label}, asking for manual input. ⚠️", verbose=verbose)
length = dataarray.sizes['time']
# Ask for the starting time/frquency and apply it
time_index = prompt_for_datetime_index(length)
dataarray = dataarray.assign_coords(time=time_index)
else:
# Try to decode time values if they are encoded as CF-compliant attributes
try:
dataarray = xr.decode_cf(dataarray)
except Exception:
pass
else:
# If no time coordinate is present, ask for it
vprint(f"⚠️ No 'time' coordinate found in {label}. Asking for manual input. ⚠️", verbose=verbose)
length = dataarray.sizes['time']
time_index = prompt_for_datetime_index(length)
dataarray = dataarray.assign_coords(time=time_index)
# Handle geospatial raster images (e.g., GeoTIFFs)
elif suffix in ['.tif', '.tiff']:
with rasterio.open(path) as src:
arr = src.read() # shape: (bands, y, x)
if arr.ndim == 2:
arr = arr[np.newaxis, :, :] # Convert to (1, y, x) by expanding
elif arr.ndim != 3:
raise ValueError(f"❌ Unexpected TIFF shape {arr.shape} in {Path(path).name} ❌")
# Convert numpy array to xarray DataArray (no time dimension in TIFF)
import xarray as xr
dataarray = xr.DataArray(arr, dims=('band', 'y', 'x'))
else:
raise ValueError(f"❌ Unsupported spatial data extension for {label}: {suffix} ❌")
# Store the datasets
if label == 'obs_spatial':
obs_spatial = dataarray
else:
sim_spatial = dataarray
# ===== LOAD THE TIMESERIES =====
for label, path in [('obs_ts', obs_ts_path), ('sim_ts', sim_ts_path)]:
suffix = path.suffix.lower()
if suffix in ['.csv', '.txt']:
# Read assuming 1st column is the datetime
df = pd.read_csv(path, index_col=0, parse_dates=True)
elif suffix in ['.xls', '.xlsx']:
# Read assuming the 1st column is the datetime
df = pd.read_excel(path, index_col=0, parse_dates=True)
elif suffix in ['.nc', '.netcdf']:
ds = xr.open_dataset(path)
# Attempt to auto-detect 1D time series variable
time_dim = [v for v in ds.data_vars if 'time' in ds[v].dims]
if len(time_dim) != 1:
raise ValueError(f"❌ Unable to determine unique timeseries variable in {path.name} ❌")
var = time_dim[0] # Extract the variable
df = ds[[var]].to_dataframe()
else:
raise ValueError(f"❌ Unsupported timeseries data extension for {label}: {suffix} ❌")
# Store the dataframes
if label == 'obs_ts':
obs_ts_df = df
else:
sim_ts_df = df
# Convert the dataframes into series
obs_ts = obs_ts_df.squeeze()
sim_ts = sim_ts_df.squeeze()
# Ensure that the series have a datetime index and it is valid
obs_ts = ensure_datetime_index(obs_ts, "Observed timeseries")
sim_ts = ensure_datetime_index(sim_ts, "Simulated timeseries")
if not isinstance(obs_ts.index, pd.DatetimeIndex) or not isinstance(sim_ts.index, pd.DatetimeIndex):
raise TypeError("❌ Timeseries must have a DatetimeIndex after processing ❌")
# Align the timeseries
common_index = obs_ts.index.intersection(sim_ts.index)
if len(common_index) == 0:
raise ValueError("❌ No overlapping dates between observed and simulated timeseries ❌")
obs_aligned = obs_ts.loc[common_index]
sim_aligned = sim_ts.loc[common_index]
# Ask for a couple of info, used for plots and reporto
# If both are provided, just optionally print and return
if variable is not None and unit is not None:
vprint("Variable and unit provided via CLI:", verbose=verbose)
vprint(f"Variable: {variable}", verbose=verbose)
vprint(f"Unit: {unit}", verbose=verbose)
vprint("\n" + border + "\n", verbose=verbose)
else:
vprint("""What kind of variable/units are going to be displayed on the axis/title?
(NOTE: The units provided will be converted in latex format)""", verbose=verbose)
if variable is None:
variable = input("What kind of variable are we using? ").strip()
if unit is None:
unit = input("What is the units of measurements? ").strip()
vprint("\n" + border + "\n", verbose=verbose)
# ===== SETUP OF THE DICTIONARIES =====
Basin_Average_Timeseries = {
'observed': obs_aligned,
'model': sim_aligned,
}
# Get the year range to split the dataframe
Ybeg, Yend = common_index[0].year, common_index[-1].year
years = list(range(Ybeg, Yend + 1))
vprint("Creating the yearly datasets...", verbose=verbose)
Basin_Average_Yearly = {}
for key in Basin_Average_Timeseries:
Basin_Average_Yearly[key] = split_to_yearly(Basin_Average_Timeseries[key], years)
vprint("Yearly dataset created!")
vprint("Creating the monthly datasets...", verbose=verbose)
Basin_Average_Monthly = {}
for key in Basin_Average_Yearly:
Basin_Average_Monthly[key] = split_to_monthly(Basin_Average_Yearly[key])
vprint("Monthly dataset created!", verbose=verbose)
vprint("\n" + border + "\n", verbose=verbose)
# ===== SETUP OF THE OUTPUT FOLDERS =====
if output_dir is None:
if isinstance(data_folder, str):
default_outdir = os.path.join(str(data_folder), "REPORT")
else:
default_outdir = os.path.join(str(obs_spatial_path.parent), "REPORT")
use_default = input(
f"Save output in default REPORT folder?\n {default_outdir}\nEnter Y for yes, N to specify another path (Y/n): "
).strip().lower()
if use_default in ('', 'y', 'yes'):
outdir = default_outdir
else:
outdir = input("Enter base output directory path: ").strip()
else:
outdir = Path(output_dir)
timestamp = datetime.now().strftime("run_%Y-%m-%d")
output_path = os.path.join(outdir, timestamp)
os.makedirs(output_path, exist_ok=True)
vprint(f"Output folder created at: {output_path}", verbose=verbose)
vprint("\n" + border + "\n", verbose=verbose)
plots_path = Path(output_path) / "plots"
plots_path.mkdir(exist_ok=True)
dataframe_path = Path(output_path) / "dataframes"
dataframe_path.mkdir(exist_ok=True)
pdf_path = Path(output_path) / f"Report_{timestamp}.pdf"
if 'variable' in locals():
pdf_path = Path(output_path) / f"Report_{variable}_{timestamp}.pdf"
# ===== INITIALIZE THE PDF =====
if generate_pdf:
vprint("Generating the PDF report...", verbose=verbose)
pdf = PDFReportBuilder(str(pdf_path))
pdf.build_title_page()
pdf.build_toc()
else:
vprint("PDF generation skipped, saving only the plots and the dataframes", verbose=verbose)
# ===== BEGIN THE PLOTTING =====
BIAS = Basin_Average_Timeseries['model'] - Basin_Average_Timeseries['observed']
# ===== TIMESERIES =====
vprint("Plotting the time-series...", verbose=verbose)
timeseries(
Basin_Average_Timeseries,
BIAS,
variable=variable,
unit=unit,
BA=False,
output_path=plots_path
)
vprint("\033[92m✅ Time-series plotted succesfully!\033[0m", verbose=verbose)
# Add the page
ts_img = Path(plots_path) / f'{variable}_timeseries.png'
if generate_pdf:
add_rotated_image_page(pdf, ts_img, section_title="Time Series Plot")
vprint("\n" + border + "\n", verbose=verbose)
# ===== SCATTERPLOT =====
vprint("Plotting the seasonal data as scatterplots...", verbose=verbose)
seasonal_scatter_plot(Basin_Average_Timeseries, variable=variable, unit=unit, BA=False, output_path=plots_path)
vprint("\033[92m✅ Seasonal scatterplots plotted succesfully!\033[0m", verbose=verbose)
seasonal_img = Path(plots_path) / f"{variable}_all_seasons_scatterplot.png"
DJF_img = Path(plots_path) / f"{variable}_DJF_scatterplot.png"
MAM_img = Path(plots_path) / f"{variable}_MAM_scatterplot.png"
JJA_img = Path(plots_path) / f"{variable}_JJA_scatterplot.png"
SON_img = Path(plots_path) / f"{variable}_SON_scatterplot.png"
if generate_pdf:
# Add the page
add_seasonal_scatter_page(
pdf,
seasonal_img, # main plot path
[DJF_img, MAM_img, JJA_img, SON_img], # list of 4 small seasonal plots
"Seasonal Scatterplots"
)
vprint("\n" + border + "\n", verbose=verbose)
# ===== VIOLIN AND BOX =====
vprint("Plotting the whisker-box plots...", verbose=verbose)
whiskerbox(Basin_Average_Monthly, variable=variable, unit=unit, output_path=plots_path)
vprint("\033[92m✅ Whisker-box plotted succesfully!\033[0m", verbose=verbose)
whisker_img = Path(plots_path) / f'{variable}_boxplot.png'
vprint("\n" + border + "\n", verbose=verbose)
vprint("Plotting the violinplots...", verbose=verbose)
violinplot(Basin_Average_Monthly, variable=variable, unit=unit, output_path=plots_path)
vprint("\033[92m✅ Violinplots plotted succesfully!\033[0m", verbose=verbose)
violin_img = Path(plots_path) / f'{variable}_violinplot.png'
if generate_pdf:
# Add the page
add_multiple_rotated_images_grid(
pdf,
img_paths=[whisker_img, violin_img],
cols=2,
section_title="Box and Violin Plots"
)
vprint("\n" + border + "\n", verbose=verbose)
# ===== TAYLOR DIAGRAMS =====
vprint("Plotting the SST Taylor diagram for yearly data...", verbose=verbose)
comprehensive_taylor_diagram(Basin_Average_Yearly, output_path=plots_path, variable=variable, unit=unit)
vprint("\033[92m✅ Yearly data Taylor diagram has been plotted!\033[0m", verbose=verbose)
vprint("-"*45, verbose=verbose)
vprint("Plotting the monthly data diagrams...", verbose=verbose)
monthly_taylor_diagram(Basin_Average_Monthly, output_path=plots_path, variable=variable, unit=unit)
vprint("\033[92m✅ Monthly Taylor diagrams have been plotted!\033[0m", verbose=verbose)
yearly_taylor_img = Path(plots_path) / 'Taylor_diagram_summary.png'
monthly_taylor_img = Path(plots_path) / "Unified_Taylor_Diagram.png"
if generate_pdf:
# Add the page
add_multiple_images_grid(
pdf,
img_paths=[yearly_taylor_img, monthly_taylor_img],
section_title="Taylor Diagrams",
columns=1,
rows=2,
max_width=4.85 * inch,
spacing=0
)
vprint("\n" + border + "\n", verbose=verbose)
# ===== TARGET PLOTS =====
vprint("Plotting the Target plot for the yearly data...", verbose=verbose)
comprehensive_target_diagram(Basin_Average_Yearly, output_path=plots_path, variable=variable, unit=unit)
vprint("\033[92m✅ Yearly data Target plot has been plotted!\033[0m", verbose=verbose)
vprint("-"*45, verbose=verbose)
vprint("Plotting the monthly data plots...", verbose=verbose)
target_diagram_by_month(Basin_Average_Monthly, output_path=plots_path, variable=variable, unit=unit)
vprint("\033[92m✅ All of the Target plots has been plotted!\033[0m", verbose=verbose)
yearly_target_img = Path(plots_path) / "Unified_Target_Diagram.png"
monthly_target_img = Path(plots_path) / "Monthly_Target_Diagram.png"
if generate_pdf:
# Add to page
add_multiple_images_grid(
pdf,
img_paths=[yearly_target_img, monthly_target_img],
section_title="Target Plots",
columns=1,
rows=2,
max_width=4.55 * inch,
spacing=0
)
vprint("\n" + border + "\n", verbose=verbose)
# ===== EFFICINECY METRICS =====
vprint("Computing the Efficiency Metrics...", verbose=verbose)
vprint("-" * 45, verbose=verbose)
# Initialize the DataFrame
months = list(calendar.month_name)[1:] # ['January', ..., 'December']
metrics = ['r²', 'wr²', 'NSE', 'd', 'ln NSE', 'E_1', 'd_1', 'E_{rel}', 'd_{rel}']
efficiency_df = pd.DataFrame(index=metrics, columns=['Total'] + months)
# List of metric functions and display names
metric_functions = [
('r²', r_squared, monthly_r_squared),
('wr²', weighted_r_squared, monthly_weighted_r_squared),
('NSE', nse, monthly_nse),
('d', index_of_agreement, monthly_index_of_agreement),
('ln NSE', ln_nse, monthly_ln_nse),
('E_1', lambda x, y: nse_j(x, y, j=1), lambda m: monthly_nse_j(m, j=1)),
('d_1', lambda x, y: index_of_agreement_j(x, y, j=1), lambda m: monthly_index_of_agreement_j(m, j=1)),
('E_{rel}', relative_nse, monthly_relative_nse),
('d_{rel}', relative_index_of_agreement, monthly_relative_index_of_agreement),
]
# Handle log-transformed and relative metrics (which need filtering)
for name, func, monthly_func in metric_functions:
vprint(f"\033[93mComputing {name}...\033[0m", verbose=verbose)
if name in ['ln NSE', 'E_rel', 'd_rel']:
mask = ~np.isnan(Basin_Average_Timeseries['observed']) & ~np.isnan(Basin_Average_Timeseries['model'])
if name == 'ln NSE':
mask &= (Basin_Average_Timeseries['observed'] > 0) & (Basin_Average_Timeseries['model'] > 0)
if name in ['E_rel', 'd_rel']:
mask &= Basin_Average_Timeseries['observed'] != 0
x = Basin_Average_Timeseries['observed'][mask]
y = Basin_Average_Timeseries['model'][mask]
else:
x = Basin_Average_Timeseries['observed']
y = Basin_Average_Timeseries['model']
total_val = func(x, y)
monthly_vals = monthly_func(Basin_Average_Monthly)
# Store in DataFrame
efficiency_df.loc[name, 'Total'] = total_val
efficiency_df.loc[name, months] = monthly_vals
# Print values
vprint(f"{name} (Total) = {total_val:.4f}", verbose=verbose)
for month, val in zip(months, monthly_vals):
vprint(f"{month}: {name} = {val:.4f}", verbose=verbose)
vprint("-" * 45, verbose=verbose)
vprint("\033[92m✅ All of the metrics have been computed!\033[0m", verbose=verbose)
vprint("Saving the dataframe...")
# Save the dataframe
save_variable_to_json(efficiency_df, Path(dataframe_path) / "efficiency_df.json")
vprint("Dataframe saved!", verbose=verbose)
vprint("\n" + border + "\n", verbose=verbose)
vprint("Plotting the efficiency metrics results...", verbose=verbose)
# Mapping of display names for plots
plot_titles = {
'r²': 'Coefficient of Determination (r²)',
'wr²': 'Weighted Coefficient of Determination (wr²)',
'NSE': 'Nash-Sutcliffe Efficiency',
'd': 'Index of Agreement (d)',
'ln NSE': 'Nash-Sutcliffe Efficiency (Logarithmic)',
'E_1': 'Modified NSE ($E_1$, j=1)',
'd_1': 'Modified Index of Agreement ($d_1$, j=1)',
'E_{rel}': r'Relative NSE ($E_{rel}$)',
'd_{rel}': r'Relative Index of Agreement ($d_{rel}$)',
}
# Plotting all metrics
for metric_key, title in plot_titles.items():
total_value = efficiency_df.loc[metric_key, 'Total']
monthly_values = efficiency_df.loc[metric_key, efficiency_df.columns[1:]].values.astype(float)
efficiency_plot(total_value, monthly_values,
title=f'{title}',
y_label=f'{metric_key}',
metric_name=metric_key,
output_path=plots_path)
# Remove any parentheses and their contents for the print message
clean_title = re.sub(r'\s*\([^)]*\)', '', title)
vprint(f"\033[92m✅ {clean_title} plotted!\033[0m", verbose=verbose)
vprint("\033[92m✅ All efficiency metric plots have been successfully created!\033[0m", verbose=verbose)
vprint("\n" + border + "\n", verbose=verbose)
if generate_pdf:
# Add page
add_efficiency_pages(pdf, efficiency_df, plot_titles, plots_path)
# ===== ERROR COMPONENTS + TIMESERIES =====
vprint("Computing the error components timeseries...", verbose=verbose)
error_comp_stats_df = compute_error_timeseries(sim_spatial,
obs_spatial,
ocean_mask)
vprint("Error components timeseries obtaines!", verbose=verbose)
vprint("Computing the cloud cover...", verbose=verbose)
_, cloud_cover = compute_coverage_stats(obs_spatial.values, Mmask)
cloud_cover = pd.Series(cloud_cover) # Convert numpy array to Series
cloud_cover = ensure_datetime_index(cloud_cover, "Cloud Cover") # Ensure datetime index
vprint("Cloud cover timeseries obtained!", verbose=verbose)
vprint("Plotting the results...", verbose=verbose)
# Use cloud cover index for the error components df
correct_index = cloud_cover.index
error_comp_stats_df.index = correct_index
# Make the plot
error_components_timeseries(error_comp_stats_df, plots_path,
cloud_cover, variable=variable)
err_ts_img = Path(plots_path) / 'Error_Decomposition_Timeseries_.png'
if generate_pdf:
# Add the page
add_rotated_image_page(pdf, err_ts_img,
section_title="Error Components Timeseries",
custom_offset_y=200)
vprint("Results plotted!", verbose=verbose)
vprint('-'*45, verbose=verbose)
# Compute the correlations
vprint("Computing the correlation statistics", verbose=verbose)
cloud_cover_30d = cloud_cover.rolling(window=30, center=True).mean()
correlations = {
'raw_cloud_cover': {},
'smoothed_cloud_cover_30d': {}
}
for metric in error_comp_stats_df.columns:
corr_raw = corr_no_nan(error_comp_stats_df[metric], cloud_cover)
corr_smooth = corr_no_nan(error_comp_stats_df[metric], cloud_cover_30d)
correlations['raw_cloud_cover'][metric] = corr_raw
correlations['smoothed_cloud_cover_30d'][metric] = corr_smooth
vprint("The correlation between cloud cover and error components are...", verbose=verbose)
vprint("Correlation between stats metrics and RAW cloud cover:", verbose=verbose)
for metric, corr_val in correlations['raw_cloud_cover'].items():
vprint(f" {metric}: {corr_val:.4f}", verbose=verbose)
vprint("\nCorrelation between stats metrics and SMOOTHED (30d) cloud cover:", verbose=verbose)
for metric, corr_val in correlations['smoothed_cloud_cover_30d'].items():
vprint(f" {metric}: {corr_val:.4f}", verbose=verbose)
# Prepare tables for PDF
correlation_tables = {
"Raw Cloud Cover vs. Error Components": {
metric: corr_val for metric, corr_val in correlations['raw_cloud_cover'].items()
},
"Smoothed (30d) Cloud Cover vs. Error Components": {
metric: corr_val for metric, corr_val in correlations['smoothed_cloud_cover_30d'].items()
}
}
if generate_pdf:
# Add a page with both tables to the PDF
add_tables_page(
pdf,
tables_dict=correlation_tables,
section_title="Correlation Between Cloud Cover and Error Components",
columns=1,
rows=2
)
vprint("\n" + border + "\n", verbose=verbose)
# ===== SPECTRAL =====
vprint("Making the spectral analysis...", verbose=verbose)
# Add the cloud cover to the dataframe
combined_df = error_comp_stats_df.copy()
combined_df['cloud_cover'] = cloud_cover
combined_df['cloud_cover_30d'] = cloud_cover_30d
combined_df = combined_df.dropna()
vprint("Saving the dataframe...", verbose=verbose)
# Save the dataframe
save_variable_to_json(combined_df, Path(dataframe_path) / "error_ts_df.json")
vprint("Dataframe saved!", verbose=verbose)
# Separate cleaned data
error_comp_clean = combined_df[error_comp_stats_df.columns]
cloud_cover_clean = combined_df['cloud_cover']
cloud_cover_30d_clean = combined_df['cloud_cover_30d']
# Make a quick detrend
error_comp_detrended = error_comp_clean - error_comp_clean.mean()
cloud_cover_detrended = cloud_cover_clean - cloud_cover_clean.mean()
cloud_cover_30d_detrended = cloud_cover_30d_clean - cloud_cover_30d_clean.mean()
vprint("Data cleaned and deterended, proceeding to plot...", verbose=verbose)
vprint("Computing the fft components...", verbose=verbose)
# Compute the fft components for all of the data in the dataframe
freqs, fft_components = compute_fft({
k: v.values for k, v in error_comp_detrended.items()
})
# For cloud cover series
_, fft_cloud = compute_fft(cloud_cover_detrended.values)
# For 30-day cloud cover
_, fft_cloud_30d = compute_fft(cloud_cover_30d_detrended.values)
vprint("Plotting the PSD...", verbose=verbose)
plot_type='PSD'
plot_spectral(plot_type=plot_type,
freqs=freqs,
fft_components=fft_components,
output_path=plots_path,
variable_name=variable)
vprint("PSD plotted!", verbose=verbose)
psd_img = Path(plots_path) / f"Spectral_Plot_{plot_type}_{variable}.png"
vprint("Plotting the CSD...", verbose=verbose)
plot_type='CSD'
plot_spectral(
plot_type=plot_type,
error_comp=error_comp_clean,
output_path=plots_path,
variable_name=variable,
cloud_covers=[
(cloud_cover_clean, 'cloud_cover'),
(cloud_cover_30d_clean, 'cloud_cover_30d')
])
vprint("CSD plotted!", verbose=verbose)
csd_img = Path(plots_path) / f"Spectral_Plot_{plot_type}_{variable}.png"
if generate_pdf:
# Add the plot's pages
add_multiple_images_grid(
pdf,
img_paths=[psd_img, csd_img],
section_title="Spectral Analysis",
columns=1,
rows=2,
max_width=8 * inch,
spacing=0
)
vprint("\n" + border + "\n", verbose=verbose)
vprint("Beginning with the spatial analysis...", verbose=verbose)
# ===== RESAMPLING =====
vprint("""The resampling can be done with either:
1. The built-in resampler (might take a while)
2. The CDO (requires previous installation)""", verbose=verbose)
method = input("Enter the method of your choice for the resampling (1 or 2): ")
if method == '1':
from Hydrological_model_validator.Processing.time_utils import resample_and_compute
# Chunk to better handle the data
obs_spatial_chunked = obs_spatial.chunk({'time': 100})
sim_spatial_chunked = sim_spatial.chunk({'time': 100})
vprint("Resampling...", verbose=verbose)
obs_monthly, sim_monthly = resample_and_compute(obs_spatial_chunked, sim_spatial_chunked)
vprint("Data resampled!", verbose=verbose)
# Mask
obs_monthly = obs_monthly.where(ocean_mask)
sim_monthly = sim_monthly.where(ocean_mask)
vprint("Finishing setting up the data...", verbose=verbose)
# Add month and year dimensions
sim_monthly = sim_monthly.assign_coords(
month=sim_monthly.time.dt.month,
year=sim_monthly.time.dt.year,
)
obs_monthly = obs_monthly.assign_coords(
month=obs_monthly.time.dt.month,
year=obs_monthly.time.dt.year,
)
# Mask using an expanded mask, the regular one does not work for the 2D plotting
mask_da = xr.DataArray(Mfsm)
mask_expanded = mask_da.expand_dims(time=sim_monthly.time)
obs_monthly = obs_monthly.where(mask_expanded)
sim_monthly = sim_monthly.where(mask_expanded)
vprint("Data ready!", verbose=verbose)
vprint("\n" + border + "\n", verbose=verbose)
elif method == '2':
import subprocess
# Save CDO-ready datasets if not already saved
obs_output_path = Path(data_folder) / f"{obs_spatial_path.stem}_CDO.nc"
sim_output_path = Path(data_folder) / f"{sim_spatial_path.stem}_CDO.nc"
vprint("Checking if CDO-ready datasets exist...", verbose=verbose)
if not obs_output_path.exists():
vprint("Saving observed spatial dataset...", verbose=verbose)
obs_spatial.to_netcdf(obs_output_path, mode="w")
else:
vprint("Observed spatial dataset already exists, skipping save.", verbose=verbose)
if not sim_output_path.exists():
vprint("Saving simulated spatial dataset...")
sim_spatial.to_netcdf(sim_output_path, mode="w")
else:
vprint("Simulated spatial dataset already exists, skipping save.", verbose=verbose)
vprint("CDO-ready datasets prepared!", verbose=verbose)
# Run CDO on observed spatial dataset
vprint("Running CDO on observed data...", verbose=verbose)
obs_spatial_path = obs_output_path # update path
obs_monthly_path = obs_spatial_path.with_name("obs_spatial_monthly.nc")
if not obs_monthly_path.exists():
vprint("Running CDO on observed data...", verbose=verbose)
subprocess.run(["cdo", "-v", "monmean", str(obs_spatial_path), str(obs_monthly_path)], check=True)
vprint("The observed data has been resampled!", verbose=verbose)
else:
vprint("Monthly observed file already exists, skipping CDO.", verbose=verbose)
# Run CDO on simulated spatial dataset
vprint("Onto the satellite data...", verbose=verbose)
sim_spatial_path = sim_output_path # update path
sim_monthly_path = sim_spatial_path.with_name("sim_spatial_monthly.nc")
if not sim_monthly_path.exists():
vprint("Running CDO on satellite data...", verbose=verbose)
subprocess.run(["cdo", "-v", "monmean", str(sim_spatial_path), str(sim_monthly_path)], check=True)
vprint("The satellite data has been resampled!", verbose=verbose)
else:
vprint("Monthly satellite file already exists, skipping CDO.", verbose=verbose)
# Open the new datasets
obs_monthly = xr.open_dataset(Path(data_folder) / "obs_spatial_monthly.nc")
sim_monthly = xr.open_dataset(Path(data_folder) / "sim_spatial_monthly.nc")
vprint("Finishing setting up the data...", verbose=verbose)
# Extract the data
obs_monthly_da = select_3d_variable(obs_monthly, "obs_monthly")
sim_monthly_da = select_3d_variable(sim_monthly, "sim_monthly")
# Drop the bounds added by the CDO
obs_monthly = obs_monthly_da.drop_vars("time_bnds", errors="ignore")
sim_monthly = sim_monthly_da.drop_vars("time_bnds", errors="ignore")
# Mask using an expanded mask, the regular one does not work for the 2D plotting
mask_da = xr.DataArray(Mfsm)
mask_expanded = mask_da.expand_dims(time=sim_monthly.time)
obs_monthly = obs_monthly.where(mask_expanded)
sim_monthly = sim_monthly.where(mask_expanded)
vprint("Data ready!", verbose=verbose)
vprint("\n" + border + "\n", verbose=verbose)
else:
raise ValueError(f"Invalid resampling method selected: {method}")
# ===== SPATIAL ERROR / EFFICIENCY =====
vprint("Computing the metrics from the raw data...", verbose=verbose)
mb_raw, sde_raw, cc_raw, rm_raw, ro_raw, urmse_raw = compute_spatial_efficiency(sim_monthly, obs_monthly, time_group="month")
# Create a dict with metrics as keys, and each value is a list of 12 2D arrays (one per month)
metrics_dict_raw = {
'MB_raw': [mb_raw.isel(month=month) for month in range(12)],
'SDE_raw': [sde_raw.isel(month=month) for month in range(12)],
'CC_raw': [cc_raw.isel(month=month) for month in range(12)],
'RM_raw': [rm_raw.isel(month=month) for month in range(12)],
'RO_raw': [ro_raw.isel(month=month) for month in range(12)],
'URMSE_raw': [urmse_raw.isel(month=month) for month in range(12)],
}
# Convert to DataFrame: rows=months, columns=metrics, cells=2D DataArrays
metrics_df_raw = pd.DataFrame(metrics_dict_raw)
# Set index to months
metrics_df_raw.index = [f'Month_{i+1}' for i in range(12)]
vprint("Metrics computed!", verbose=verbose)
# Use before saving:
metrics_df_raw = convert_dataarrays_in_df(metrics_df_raw)
vprint("Saving the dataframe...", verbose=verbose)
save_variable_to_json(metrics_df_raw, Path(dataframe_path) / "metrics_df_raw_month.json")
vprint("Dataframe saved!", verbose=verbose)
vprint('-'*45, verbose=verbose)
# Detrend the monthly data
vprint("Detrending the data...", verbose=verbose)
sim_monthly_detrend = detrend_dim(sim_monthly, dim='time', mask=mask_expanded)
obs_monthly_detrend = detrend_dim(obs_monthly, dim='time', mask=mask_expanded)
vprint("Data detrended!", verbose=verbose)
# Repeat the computations for the detrended data
vprint("Computing the metrics from the detrended data...", verbose=verbose)
mb_detr, sde_detr, cc_detr, rm_detr, ro_detr, urmse_detr = compute_spatial_efficiency(sim_monthly_detrend, obs_monthly_detrend, time_group="month")
# Create a dict with metrics as keys, and each value is a list of 12 2D arrays (one per month)
metrics_dict_detr = {
'MB_detr': [mb_detr.isel(month=month) for month in range(12)],
'SDE_detr': [sde_detr.isel(month=month) for month in range(12)],
'CC_detr': [cc_detr.isel(month=month) for month in range(12)],
'RM_detr': [rm_detr.isel(month=month) for month in range(12)],
'RO_detr': [ro_detr.isel(month=month) for month in range(12)],
'URMSE_detr': [urmse_detr.isel(month=month) for month in range(12)],
}
# Convert to DataFrame: rows=months, columns=metrics, cells=2D DataArrays
metrics_df_detr = pd.DataFrame(metrics_dict_detr)
# Set index to months
metrics_df_detr.index = [f'Month_{i+1}' for i in range(12)]
vprint("Detrended data metrcis computed!", verbose=verbose)
vprint('-'*45, verbose=verbose)
vprint("Saving the dataframe...", verbose=verbose)
metrics_df_detr = convert_dataarrays_in_df(metrics_df_detr)
save_variable_to_json(metrics_df_detr, Path(dataframe_path) / "metrics_df_detrended_month.json")
vprint("Dataframe saved!", verbose=verbose)
vprint("\n" + border + "\n", verbose=verbose)
vprint("The geolocalization specifics are needed to plot the data, please provide them:", verbose=verbose)
# Known values from the dataset, need to be changed if the area of analysis is changed
import ast
grid_shape_str = input("Provide the grid shape (lat, lon): ")
grid_shape = ast.literal_eval(grid_shape_str)
epsilon_str = input("Provide a correction factor if needed: ")
epsilon = float(epsilon_str)
x_start_str = input("Provide the starting longitude coordinate: ")
x_start = float(x_start_str)
x_step_str = input("Provide the x-step discretization: ")
x_step = float(x_step_str)
y_start_str = input("Provide the starting latitude coordinate: ")
y_start = float(y_start_str)
y_step_str = input("Provide the y-step discretization: ")
y_step = float(y_step_str)
geo_coords = compute_geolocalized_coords(grid_shape, epsilon, x_start, x_step, y_start, y_step)
vprint("\033[92m✅ Geolocalization complete! \033[0m", verbose=verbose)
vprint("\n" + border + "\n", verbose=verbose)
# Plot the results
vprint("Plotting the results...", verbose=verbose)
# Variable used to store all of the paths to append the pages
plot_paths = {}
# Used to dentify the saved plots
period = "Monthly"
# Mapping of titles to short keys for your variables
key_map = {
"Mean Bias": "mb",
"Standard Deviation Error": "sde",
"Cross Correlation": "cc",
"Model Std Dev": "rm",
"Satellite Std Dev": "ro",
"Unbiased RMSE": "urmse",
}
# All of the possibilities for the plots are collected in a dictionary
plots_info = [
{
"raw": mb_raw, "detr": mb_detr, "title": "Mean Bias", "unit": "°C",
"cmap": "RdBu_r", "vmin": -2, "vmax": 2, "suffix": "(Model-Satellite)",
},
{
"raw": sde_raw, "detr": sde_detr, "title": "Standard Deviation Error", "unit": "°C",
"cmap": "viridis", "vmin": 0, "vmax": 3, "suffix": "(Model-Satellite)",
},
{
"raw": cc_raw, "detr": cc_detr, "title": "Cross Correlation",
"cmap": "OrangeGreen", "vmin": -1, "vmax": 1, "suffix": "(Model-Satellite)",
},
{
"raw": rm_raw, "detr": rm_detr, "title": "Model Std Dev", "unit": "°C",
"cmap": "plasma", "vmin": 0, "vmax": 8, "suffix": "(Model)",
},
{
"raw": ro_raw, "detr": ro_detr, "title": "Satellite Std Dev", "unit": "°C",
"cmap": "plasma", "vmin": 0, "vmax": 8, "suffix": "(Satellite)",
},
{
"raw": urmse_raw, "detr": urmse_detr, "title": "Unbiased RMSE", "unit": "°C",
"cmap": "inferno", "vmin": 0, "vmax": 3, "suffix": "(Model-Satellite)",
},
]
# Begin the loop to plot the data
for info in plots_info:
key_prefix = key_map.get(info["title"], info["title"].lower().replace(" ", "_"))
# Starting with raw data
vprint(f"Plotting the {info['title'].lower()}...", verbose=verbose)
plot_spatial_efficiency(info["raw"], geo_coords, plots_path,
info["title"],
unit=info.get("unit"),
cmap=info["cmap"],
vmin=info["vmin"],
vmax=info["vmax"],
suffix=info.get("suffix"))
suffix = info.get("suffix", "")
safe_title = info["title"].replace("/", "_").replace("\\", "_")
raw_filename = f"Monthly {safe_title} (Raw) {suffix}".strip() + ".png" # Hardcoded monthly/yearly
raw_path = Path(plots_path) / raw_filename
# Append the path to be retrieved later
plot_paths[f"{key_prefix}_img_{period}"] = raw_path
# Plot the detrended results
plot_spatial_efficiency(info["detr"], geo_coords, plots_path,
info["title"],
unit=info.get("unit"),
cmap=info["cmap"],
vmin=info["vmin"],
vmax=info["vmax"],
detrended=True,
suffix=info.get("suffix"))
detrended_filename = f"Monthly {safe_title} (Detrended) {suffix}".strip() + ".png"
detrended_path = Path(plots_path) / detrended_filename
# Append the path to be retrieved later
plot_paths[f"{key_prefix}_img_detr_{period}"] = detrended_path
vprint(f"\033[92m✅ {info['title']} plotted! \033[0m", verbose=verbose)
vprint("Monthly data done!", verbose=verbose)
vprint("\n" + border + "\n", verbose=verbose)
# Repeat the computations for the yearly data
vprint("Moving onto the yearly data...", verbose=verbose)
vprint("Computing the metrics from the raw data...", verbose=verbose)
mb_raw, sde_raw, cc_raw, rm_raw, ro_raw, urmse_raw = compute_spatial_efficiency(sim_monthly_detrend, obs_monthly_detrend, time_group="year")
# Create a dict with metrics as keys, and each value is a list of 2D arrays, one per year
metrics_dict_raw = {
'MB_raw': [mb_raw.isel(year=i) for i in range(len(mb_raw.year))],
'SDE_raw': [sde_raw.isel(year=i) for i in range(len(sde_raw.year))],
'CC_raw': [cc_raw.isel(year=i) for i in range(len(cc_raw.year))],
'RM_raw': [rm_raw.isel(year=i) for i in range(len(rm_raw.year))],
'RO_raw': [ro_raw.isel(year=i) for i in range(len(ro_raw.year))],
'URMSE_raw': [urmse_raw.isel(year=i) for i in range(len(urmse_raw.year))],
}
# Convert to DataFrame: rows=years, columns=metrics, cells=2D DataArrays
metrics_df_raw = pd.DataFrame(metrics_dict_raw)
# Set index to years (as strings)
metrics_df_raw.index = [f'Year_{year}' for year in mb_raw.year.values]
vprint("Metrics computed!", verbose=verbose)
vprint("Saving the dataframe...", verbose=verbose)
metrics_df_raw = convert_dataarrays_in_df(metrics_df_raw)
save_variable_to_json(metrics_df_raw, Path(dataframe_path) / "metrics_df_raw_year.json")
vprint("Dataframe saved!", verbose=verbose)
vprint('-'*45, verbose=verbose)
# Detrending the data
vprint("Detrending the data...", verbose=verbose)
sim_monthly_detrend = detrend_dim(sim_monthly, dim='time', mask=mask_expanded)
obs_monthly_detrend = detrend_dim(obs_monthly, dim='time', mask=mask_expanded)
vprint("Data detrended!", verbose=verbose)
# Remake the computations for the detrended data
vprint("Computing the metrics from the detrended data...", verbose=verbose)
mb_detr, sde_detr, cc_detr, rm_detr, ro_detr, urmse_detr = compute_spatial_efficiency(sim_monthly_detrend, obs_monthly_detrend, time_group="year")
# Create a dict with metrics as keys, and each value is a list of 2D arrays, one per year
metrics_dict_detr = {
'MB_detr': [mb_detr.isel(year=i) for i in range(len(mb_detr.year))],
'SDE_detr': [sde_detr.isel(year=i) for i in range(len(sde_detr.year))],
'CC_detr': [cc_detr.isel(year=i) for i in range(len(cc_detr.year))],
'RM_detr': [rm_detr.isel(year=i) for i in range(len(rm_detr.year))],
'RO_detr': [ro_detr.isel(year=i) for i in range(len(ro_detr.year))],
'URMSE_detr': [urmse_detr.isel(year=i) for i in range(len(urmse_detr.year))],
}
# Convert to DataFrame: rows=years, columns=metrics, cells=2D DataArrays
metrics_df_detr = pd.DataFrame(metrics_dict_detr)
# Set index to years (as strings)
metrics_df_detr.index = [f'Year_{year}' for year in mb_detr.year.values]
vprint("Detrended data metrics computed!", verbose=verbose)
vprint("Saving the dataframe...", verbose=verbose)
metrics_df_detr = convert_dataarrays_in_df(metrics_df_detr)
save_variable_to_json(metrics_df_detr, Path(dataframe_path) / "metrics_df_detrended_year.json")
vprint("Dataframe saved!", verbose=verbose)
vprint('-'*45, verbose=verbose)
# Start with the plotting
vprint("Plotting the results...", verbose=verbose)
# New period definition for the Yearly data
period = "Yearly"
plots_info = [
{
"raw": mb_raw, "detr": mb_detr, "title": "Mean Bias", "unit": "°C",
"cmap": "RdBu_r", "vmin": -2, "vmax": 2, "suffix": "(Model-Satellite)",
},
{
"raw": sde_raw, "detr": sde_detr, "title": "Standard Deviation Error", "unit": "°C",
"cmap": "viridis", "vmin": 0, "vmax": 3, "suffix": "(Model-Satellite)",
},
{
"raw": cc_raw, "detr": cc_detr, "title": "Cross Correlation",
"cmap": "OrangeGreen", "vmin": -1, "vmax": 1, "suffix": "(Model-Satellite)",
},
{
"raw": rm_raw, "detr": rm_detr, "title": "Model Std Dev", "unit": "°C",
"cmap": "plasma", "vmin": 0, "vmax": 8, "suffix": "(Model)",
},
{
"raw": ro_raw, "detr": ro_detr, "title": "Satellite Std Dev", "unit": "°C",
"cmap": "plasma", "vmin": 0, "vmax": 8, "suffix": "(Satellite)",
},
{
"raw": urmse_raw, "detr": urmse_detr, "title": "Unbiased RMSE", "unit": "°C",
"cmap": "inferno", "vmin": 0, "vmax": 3, "suffix": "(Model-Satellite)",
},
]
# Begin the loop to plot and append the paths
for info in plots_info:
key_prefix = key_map.get(info["title"], info["title"].lower().replace(" ", "_"))
# Plotting of the raw data
vprint(f"Plotting the {info['title'].lower()}...", verbose=verbose)
plot_spatial_efficiency(
info["raw"], geo_coords, plots_path,
info["title"],
unit=info.get("unit"),
cmap=info["cmap"],
vmin=info["vmin"],
vmax=info["vmax"],
suffix=info.get("suffix")
)
suffix = info.get("suffix", "")
safe_title = info["title"].replace("/", "_").replace("\\", "_")
raw_filename = f"Yearly {safe_title} (Raw) {suffix}".strip() + ".png"
raw_path = Path(plots_path) / raw_filename
# Save path and append it
plot_paths[f"{key_prefix}_img_{period}"] = raw_path
# Plots and save the detrended results
plot_spatial_efficiency(
info["detr"], geo_coords, plots_path,
info["title"],
unit=info.get("unit"),
cmap=info["cmap"],
vmin=info["vmin"],
vmax=info["vmax"],
detrended=True,
suffix=info.get("suffix")
)
detrended_filename = f"Yearly {safe_title} (Detrended) {suffix}".strip() + ".png"
detrended_path = Path(plots_path) / detrended_filename
# Save the paths and append them
plot_paths[f"{key_prefix}_img_detr_{period}"] = detrended_path
vprint(f"\033[92m✅ {info['title']} plotted!\033[0m", verbose=verbose)
vprint("Yearly data done!", verbose=verbose)
reverse_key_map = {v: k for k, v in key_map.items()} # Reverse lookup to retrieve the paths
if generate_pdf:
# Begin the loop to append the pages
for key, img_path in plot_paths.items():
# Path keys to retrieve the correct file
parts = key.split('_') # e.g. ['mb', 'img', 'detr', 'Monthly']
plot_type = parts[0].lower()
detrended = 'detr' in parts
time_type = parts[-1].capitalize() # e.g. 'Monthly' or 'Yearly'
# Get full title
title_base = reverse_key_map.get(plot_type, plot_type.upper())
# Use the keys to build the section title
section_title = f"{title_base} - {'Detrended' if detrended else 'Raw'} - {time_type}"
# Append the page
add_plot_to_pdf(pdf, img_path, section_title, width=6 * inch)
vprint("\n" + border + "\n", verbose=verbose)
# ==== SAVE =====
if generate_pdf:
vprint("Saving the report...", verbose=verbose)
pdf.save()
vprint(f"\033[92m✅ PDF report saved at {output_path}\033[0m", verbose=verbose)
if open_report and generate_pdf:
import subprocess
import platform
pdf_path_str = str(pdf_path)
system = platform.system()
release = platform.release().lower()
try:
if system == "Darwin": # macOS
subprocess.run(["open", pdf_path_str], check=True)
elif system == "Windows": # Windows native
os.startfile(pdf_path_str)
elif system == "Linux":
try:
subprocess.run(["xdg-open", pdf_path_str], check=True)
except Exception as e1:
try:
subprocess.run(["evince", pdf_path_str], check=True)
except Exception as e2:
if "microsoft" in release:
try:
result = subprocess.run(
["wslpath", "-w", pdf_path_str],
capture_output=True,
text=True,
check=True
)
win_path = result.stdout.strip()
subprocess.run(["explorer.exe", win_path], check=True)
except Exception as e3:
print("❌ Failed WSL fallback with explorer.exe")
print("explorer.exe error:", e3)
else:
print("❌ Both xdg-open and evince failed.")
print("xdg-open error:", e1)
print("evince error:", e2)
except Exception as final_error:
print("❌ Failed to open the PDF report.")
print("Final error:", final_error)
else:
vprint(f"\033[92m✅ The data has been saved at {output_path}\033[0m", verbose=verbose)