Source code for towerpy.calib.calib_rhohv

"""Towerpy: an open-source toolbox for processing polarimetric radar data."""

import datetime as dt
import copy
import numpy as np
import xarray as xr
import xradar as xrd
from scipy.optimize import minimize_scalar
from sklearn.metrics import root_mean_squared_error as sklrmse
from ..datavis.rad_display import _plot_rhohvmethod_single, _plot_rhohvmethod_grid
from ..eclass.snr import signal2noiseratio
from ..utils.radutilities import xr_hist2d, _to_kilometers
from ..utils.radutilities import apply_correction_chain, record_provenance


[docs] class rhoHV_Calibration: r""" A class for calibrating the correlation coefficient :math:`\rho_{HV}`. Attributes ---------- elev_angle : float Elevation angle at where the scan was taken, in degrees. file_name : str Name of the file containing radar data. scandatetime : datetime Date and time of scan. site_name : str Name of the radar site. vars : dict Corrected :math:`\rho_{HV}` and user-defined radar variables. """ def __init__(self, radobj=None):
[docs] self.elev_angle = getattr(radobj, 'elev_angle', None) if radobj else None
[docs] self.file_name = getattr(radobj, 'file_name', None) if radobj else None
[docs] self.scandatetime = getattr(radobj, 'scandatetime', None) if radobj else None
[docs] self.site_name = getattr(radobj, 'site_name', None) if radobj else None
[docs] def rhohv_noise_correction(self, rad_georef, rad_params, rad_vars, mode="exp", exp_curvet=20.0, eps=0.005, rhohv_theo=(0.9, 1.0), noise_level=(0, 100), bins_rho=(0.8, 1.1, 0.005), bins_snr=(5, 30, 0.1), plot_method=False, data2correct=None): r""" Applies noise‑bias correction to :math:`\rho_{HV}`. Parameters ---------- rad_georef : dict Georeferenced data containing descriptors of the azimuth, gates and beam height, amongst others. rad_params : dict Radar technical details. rad_vars : dict Radar variables used for the correction method. The default is None. data2correct : dict, optional Dictionary to update the corrected :math:`\rho_{HV}`. The default is None. Notes ----- 1. Based on the method described in [1]_ 2. See the xarray implementation (rhohv_noisecorrection) for more details. References ---------- .. [1] Ryzhkov, A. V.; Zrnic, D. S. (2019). Radar Polarimetry for Weather Observations (1st ed.). Springer International Publishing. https://doi.org/10.1007/978-3-030-05093-1 """ # ============================================================================= # assign radar params # ============================================================================= prms_mod = {k1.replace(' ', ('_')).replace('[m/s]', '[ms-1]'): v1 for k1, v1 in rad_params.items() if not isinstance(v1, dict) and not isinstance(v1, dt.datetime) and not isinstance(v1, np.ndarray)} prms_mod2 = { f"{outer_key.replace('/', '_')}({inner_key})": value for outer_key, inner_dict in rad_params.items() if isinstance(inner_dict, dict) for inner_key, value in inner_dict.items()} prms_mod = prms_mod | prms_mod2 prms_mod['datetime'] = rad_params['datetime'].strftime( "%Y-%m-%dT%H:%M:%S.%f_%Z%z") prms_mod['timestamp'] = rad_params['datetime'].timestamp() prms_mod['file_name'] = self.file_name # ============================================================================= # assign rvars # ============================================================================= sweep_vars_mapping = {'DBZH': 'ZH [dBZ]', 'ZDR': 'ZDR [dB]', 'PHIDP': 'PhiDP [deg]', 'RHOHV': 'rhoHV [-]', 'VRADH': 'V [m/s]'} sweep_vars_attrs_f = xrd.model.sweep_vars_mapping azimuth = np.rad2deg(rad_georef['azim [rad]']) elevation = np.rad2deg(rad_georef['elev [rad]']) elevation_attrs = xrd.model.get_elevation_attrs() rng = rad_georef['range [m]'] azimuth_attrs = xrd.model.get_azimuth_attrs(azimuth) range_attrs = xrd.model.get_range_attrs(rng) sweep = xr.Dataset(coords=dict(azimuth=(["azimuth"], azimuth, azimuth_attrs), elevation=(["azimuth"], elevation, elevation_attrs), range=(["range"], rng, range_attrs))) sweep = sweep.assign_coords(sweep_mode="azimuth_surveillance", longitude=rad_params['longitude [dd]'], latitude=rad_params['latitude [dd]'], altitude=rad_params['altitude [m]'], time=rad_params['aztime']) for k1, v1 in sweep_vars_mapping.items(): if v1 in rad_vars.keys(): sweep = sweep.assign({k1: (['azimuth', 'range'], rad_vars[v1])}) else: print(f'{k1} not in data') if k1 in sweep_vars_attrs_f: sweep[k1].attrs = sweep_vars_attrs_f[k1] else: print(f'{k1} moment_attrs not assigned ') sweep.attrs = prms_mod for vv in sweep.data_vars: if sweep[vv].dtype == "float" or sweep[vv].dtype == "float32" or sweep[vv].dtype == "float64": sweep[vv].encoding = {'zlib': True, 'complevel': 6} rhohv_nc = rhohv_noisecorrection( sweep, inp_names={"Z": "DBZH", "rng": "range", "rhohv": "RHOHV"}, rhohv_theo=rhohv_theo, mode=mode, noise_level=noise_level, exp_curvet=exp_curvet, eps=eps, bins_rho=bins_rho, bins_snr=bins_snr, preserve_original=False, data2correct=None, plot_method=plot_method) if data2correct is None: self.vars = {'rhoHV [-]': rhohv_nc.RHOHV.values} else: data2cc = copy.deepcopy(data2correct) data2cc.update({'rhoHV [-]': rhohv_nc.RHOHV.values}) self.vars = data2cc self.noise_level_dB = rhohv_nc.attrs['noise_level_dB']
# ============================================================================= # %% xarray implementation # =============================================================================
[docs] def _build_theo_line(snr_centers, rhohv_theo, mode="linear", exp_curvet=20.0, eps=0.005): """Internal helper for rhohv_noisecorrection.""" rho0, rho_inf = rhohv_theo if mode == "linear": return np.linspace(rho0, rho_inf, len(snr_centers)) elif mode == "exp": s0 = snr_centers[0] k = np.log((rho_inf - rho0) / eps) / (exp_curvet - s0) return rho_inf - (rho_inf - rho0) * np.exp(-k * (snr_centers - s0)) else: raise ValueError(f"Unknown mode: {mode}")
[docs] def _rmse_objective(rc, Z, rng_km, rhohv_na, bins_snr, bins_rho, rhohv_theo, mode="linear", exp_curvet=20.0, eps=0.005): '''Internal helper for rhohv_noisecorrection.''' snr_db = signal2noiseratio(Z, rng_km, rc, scale="db").rename("snr_db") snr_lin = signal2noiseratio(Z, rng_km, rc, scale="lin").rename("snr_lin") rhohv_corr = (rhohv_na * (1 + 1 / snr_lin)).rename("rhohv_corr") snr_edges = np.arange(*bins_snr) rho_edges = np.arange(*bins_rho) hist = xr_hist2d(snr_db, rhohv_corr, snr_edges, rho_edges, dim=list(snr_db.dims)) rhohv_bin_dim = [d for d in hist.dims if d.endswith("_bin")][1] idx = hist.argmax(dim=rhohv_bin_dim) rhohv_centers = 0.5 * (rho_edges[:-1] + rho_edges[1:]) histmax = xr.apply_ufunc(lambda i: rhohv_centers[i], idx, vectorize=True, dask="parallelized", output_dtypes=[float]).values snr_centers = 0.5 * (snr_edges[:-1] + snr_edges[1:]) theo_line = _build_theo_line(snr_centers, rhohv_theo, mode=mode, exp_curvet=exp_curvet, eps=eps) return sklrmse(theo_line, histmax)
[docs] def _optimise_noise_level(Z, rng_km, rhohv_na, bins_rho=(0.8, 1.1, 0.005), bins_snr=(5, 30, 0.1), rhohv_theo=(0.90, 1.), noise_level=(0, 100), mode="linear", exp_curvet=20.0, eps=0.005): '''Internal helper for rhohv_noisecorrection.''' def objective(rc): return _rmse_objective(rc, Z, rng_km, rhohv_na, bins_snr, bins_rho, rhohv_theo, mode=mode, exp_curvet=exp_curvet, eps=eps) result = minimize_scalar(objective, bounds=noise_level, method="bounded") return result.x, result.fun
[docs] def rhohv_noisecorrection(ds, inp_names=None, rhohv_theo=(0.9, 1.0), mode="exp", exp_curvet=20.0, eps=0.005, noise_level=(0, 100), bins_rho=(0.8, 1.1, 0.005), bins_snr=(5, 30, 0.1), data2correct=None, preserve_original=True, plot_method=False): r""" Correct noise-bias in the radar correlation coefficient (rhoHV). Parameters ---------- ds : xarray.Dataset Input dataset containing at least: - reflectivity (e.g. "DBTH") - range (e.g. "range") - raw correlation coefficient rhoHV (e.g. "URHOHV") inp_names : dict, optional Mapping of variable names in `ds`. Keys: {"Z", "rng", "rhohv"}. Defaults: {"Z": "DBTH", "rng": "range", "rhohv": "URHOHV"}. bins_rho : tuple of float, optional rhoHV binning interval as (start, stop, step). Default is (0.8, 1.1, 0.005). bins_snr : tuple of float, optional SNR binning interval as (start, stop, step). Default is (5, 30, 0.1). rhohv_theo : tuple of float, optional Theoretical rhoHV range expected in rain (rhoHV_0, rhoHV_{inf}). Default is (0.90, 1.0). noise_level : tuple of float, optional Bounds for radar constant optimisation (min, max). Default is (0, 100). mode : {"linear", "exp", "piecewise"}, optional Functional form of the theoretical rhoHV–SNR curve. Default is "exp". exp_curvet : float, optional Transition point for "exp" mode. Default is 20.0. eps : float, optional Small tolerance for exponential decay. Default is 0.005. data2correct : xarray.Dataset, optional If provided, this dataset is updated with corrected rhoHV. If None, a new dataset is created containing `RHOHV_corr`. preserve_original : bool, optional Only applies when `data2correct` is provided: - True: keep raw rhoHV and add `RHOHV_corr` - False: overwrite raw rhoHV plot_method : bool, optional If True, plots both the optimised diagnostic plot and the calibration grid. Returns ------- xarray.Dataset Dataset with corrected rhoHV and diagnostic attributes: - `noise_level_dB` - `objective_rmse` - `rhohv_theo` - `mode`, `exp_curvet`, `eps` Notes ----- Based on the method described in [1]_. References ---------- .. [1] Ryzhkov, A. V.; Zrnic, D. S. (2019). *Radar Polarimetry for Weather Observations* (1st ed.). Springer International Publishing. https://doi.org/10.1007/978-3-030-05093-1 """ defaults = {"Z": "DBTH", "rng": "range", "rhohv": "URHOHV"} names = {**defaults, **(inp_names or {})} rng_km = _to_kilometers(ds[names["rng"]]).values Z = ds[names["Z"]] rhohv_na = ds[names["rhohv"]] # Optimisation opt_noise, opt_rmse = _optimise_noise_level( Z, rng_km, rhohv_na, bins_rho, bins_snr, rhohv_theo, noise_level, mode=mode, exp_curvet=exp_curvet, eps=eps) # Correction snr_db = signal2noiseratio(Z, rng_km, opt_noise, scale="db").rename("snr_db") snr_lin = signal2noiseratio(Z, rng_km, opt_noise, scale="lin").rename("snr_lin") rhohv_corr = (rhohv_na * (1 + 1 / snr_lin)).rename("rhohv_corr") rhohv_final = rhohv_corr.rename("rhohv_corr") rhohv_final.attrs = {'standard_name': 'radar_correlation_coefficient_hv', 'long_name': 'Correlation coefficient HV', 'short_name': 'RHOHV', 'units': 'unitless'} # Histogram snr_edges = np.arange(*bins_snr) rho_edges = np.arange(*bins_rho) hist = xr_hist2d(snr_db, rhohv_final, snr_edges, rho_edges, dim=list(snr_db.dims)) # Extract maxima per SNR bin (bin centers) rhohv_bin_dim = [d for d in hist.dims if d.endswith("_bin")][1] idx = hist.argmax(dim=rhohv_bin_dim) rhohv_centers = 0.5 * (rho_edges[:-1] + rho_edges[1:]) histmax = xr.apply_ufunc(lambda i: rhohv_centers[i], idx, vectorize=True, dask="parallelized", output_dtypes=[float]) # Define centers and theoretical line for plotting snr_centers = 0.5 * (snr_edges[:-1] + snr_edges[1:]) # Theoretical line according to chosen mode theo_line = _build_theo_line(snr_centers, rhohv_theo, mode=mode, exp_curvet=exp_curvet, eps=eps) # --- output dataset --- suffix = "_nsc" if preserve_original else "" if data2correct is not None: # Update existing dataset ds_out = data2correct.copy() if preserve_original: corrected_name = f"RHOHV{suffix}" ds_out[corrected_name] = rhohv_final varname = corrected_name else: ds_out = ds_out.drop_vars(names["rhohv"]) corrected_name = 'RHOHV' ds_out[corrected_name] = rhohv_final varname = corrected_name else: # Create new dataset with only corrected field ds_out = xr.Dataset() corrected_name = f"RHOHV{suffix}" ds_out[corrected_name] = rhohv_final varname = corrected_name # Copy attrs from original dataset ds_out.attrs = ds.attrs.copy() # --- attach diagnostics as attrs --- params = {"noise_level_dB": float(opt_noise), "objective_rmse": float(opt_rmse), "rhohv_theo": rhohv_theo, "mode": mode, "exp_curvet": exp_curvet, "eps": eps, "noise_level_bounds": noise_level} ds_out = apply_correction_chain(ds_out, varname=varname, params=params, step="noise_correction", suffix=suffix) ds_out.attrs.update(params) outputs = list(ds_out.keys()) ds_out = record_provenance(ds_out, function="rhohv_noisecorrection", inputs=ds.keys(), outputs=outputs, parameters=params) # Plot if plot_method: _plot_rhohvmethod_single(snr_edges, rho_edges, hist, snr_db, rhohv_na, snr_centers, theo_line, histmax, opt_noise) _plot_rhohvmethod_grid(Z, rng_km, rhohv_na, bins_snr=bins_snr, bins_rho=bins_rho, rhohv_theo=rhohv_theo, opt_noise=opt_noise, mode=mode, exp_curvet=exp_curvet, eps=eps) return ds_out