"""Towerpy: an open-source toolbox for processing polarimetric radar data."""
import warnings
import numpy as np
import copy
from ..datavis import rad_display
from ..utils.radutilities import find_nearest
from ..utils.radutilities import rolling_window
[docs]
class PhiDP_Calibration:
r"""
A class for calibrating the differential phase :math:`(\Phi_{DP})`.
Attributes
----------
elev_angle : float
Elevation angle at where the scan was taken, in degrees.
file_name : str
Name of the file containing radar data.
scandatetime : datetime
Date and time of scan.
site_name : str
Name of the radar site.
phidp_offset : dict
Computed :math:`\Phi_{DP}` offset
phidp_offset_stats : dict
Stats calculated during the computation of the :math:`\Phi_{DP}`
offset.
vars : dict
Offset-corrected :math:`(\Phi_{DP})` and user-defined radar variables.
"""
def __init__(self, radobj=None):
[docs]
self.elev_angle = getattr(radobj, 'elev_angle',
None) if radobj else None
[docs]
self.file_name = getattr(radobj, 'file_name',
None) if radobj else None
[docs]
self.scandatetime = getattr(radobj, 'scandatetime',
None) if radobj else None
[docs]
self.site_name = getattr(radobj, 'site_name',
None) if radobj else None
[docs]
def offsetdetection_vps(self, pol_profs, mlyr=None, min_h=1.1, max_h=None,
zhmin=5, zhmax=30, rhvmin=0.98, minbins=2,
stats=False, plot_method=False, rad_georef=None,
rad_vars=None):
r"""
Calculate the offset on :math:`\Phi_{DP}` using vertical profiles.
Parameters
----------
pol_profs : dict
Profiles of polarimetric variables.
mlyr : class, optional
Melting layer class containing the top and bottom boundaries of
the ML.
min_h : float, optional
Minimum height of usable data within the polarimetric profiles.
The default is 1.1.
max_h : float, optional
Maximum height of usable data within the polarimetric profiles.
Use only if ML boundaries are not available.
The default is 3.
zhmin : float, optional
Threshold on :math:`Z_{H}` (in dBZ) related to light rain.
The default is 5.
zhmax : float, optional
Threshold on :math:`Z_{H}` (in dBZ) related to light rain.
The default is 30.
rhvmin : float, optional
Threshold on :math:`\rho_{HV}` (unitless) related to light rain.
The default is 0.98.
minbins : float, optional
Consecutive bins of :math:`\Phi_{DP}` related to light rain.
The default is 2.
stats : dict, optional
If True, the function returns stats related to the computation of
the :math:`\Phi_{DP}` offset. The default is False.
plot_method : Bool, optional
Plot the offset detection method. The default is False.
rad_georef : dict, optional
Georeferenced data containing descriptors of the azimuth, gates
and beam height, amongst others. The default is None.
rad_vars : dict, optional
Radar variables used for plotting the offset correction method.
The default is None.
"""
if mlyr is None:
mlvl = 5
mlyr_thickness = 0.5
mlyr_bottom = mlvl - mlyr_thickness
else:
mlvl = mlyr.ml_top
mlyr_thickness = mlyr.ml_thickness
mlyr_bottom = mlyr.ml_bottom
if np.isnan(mlyr_bottom):
boundaries_idx = [
find_nearest(pol_profs.georef['profiles_height [km]'], min_h),
find_nearest(pol_profs.georef['profiles_height [km]'],
mlvl-mlyr_thickness)]
else:
boundaries_idx = [
find_nearest(pol_profs.georef['profiles_height [km]'], min_h),
find_nearest(pol_profs.georef['profiles_height [km]'],
mlyr_bottom)]
if boundaries_idx[1] <= boundaries_idx[0]:
boundaries_idx = [np.nan]
# boundaries_idx *= np.nan
if np.isnan(mlvl) and np.isnan(mlyr_bottom):
boundaries_idx = [np.nan]
if max_h:
maxheight = find_nearest(
pol_profs.georef['profiles_height [km]'], max_h)
if any(np.isnan(boundaries_idx)):
self.phidp_offset = 0
else:
profs = copy.deepcopy(pol_profs.vps)
calphidp_vps = {k: v[boundaries_idx[0]:boundaries_idx[1]]
for k, v in profs.items()}
calphidp_vps['PhiDP [deg]'][calphidp_vps['ZH [dBZ]'] < zhmin] = np.nan
calphidp_vps['PhiDP [deg]'][calphidp_vps['ZH [dBZ]'] > zhmax] = np.nan
calphidp_vps['PhiDP [deg]'][calphidp_vps['rhoHV [-]'] < rhvmin] = np.nan
if np.count_nonzero(~np.isnan(calphidp_vps['PhiDP [deg]'])) <= minbins:
calphidp_vps['PhiDP [deg]'] *= np.nan
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=RuntimeWarning)
calphidpvps_mean = np.nanmean(calphidp_vps['PhiDP [deg]'])
calphidpvps_max = np.nanmax(calphidp_vps['PhiDP [deg]'])
calphidpvps_min = np.nanmin(calphidp_vps['PhiDP [deg]'])
calphidpvps_std = np.nanstd(calphidp_vps['PhiDP [deg]'])
calphidpvps_sem = np.nanstd(
calphidp_vps['PhiDP [deg]']) / np.sqrt(
len(calphidp_vps['PhiDP [deg]']))
if not np.isnan(calphidpvps_mean):
self.phidp_offset = calphidpvps_mean
else:
self.phidp_offset = 0
if stats:
self.phidp_offset_stats = {'offset_max': calphidpvps_max,
'offset_min': calphidpvps_min,
'offset_std': calphidpvps_std,
'offset_sem': calphidpvps_sem,
}
if plot_method:
rad_params = {}
if self.elev_angle:
rad_params['elev_ang [deg]'] = self.elev_angle
else:
rad_params['elev_ang [deg]'] = 'surveillance scan'
if self.scandatetime:
rad_params['datetime'] = self.scandatetime
else:
rad_params['datetime'] = None
var = 'PhiDP [deg]'
rad_var = np.array([i[boundaries_idx[0]:boundaries_idx[1]]
for i in rad_vars[var]], dtype=np.float64)
rad_display.plot_offsetcorrection(
rad_georef, rad_params, rad_var,
var_offset=self.phidp_offset, var_name=var)
[docs]
def offsetdetection_qvps(self, pol_profs, mlyr=None, min_h=0., max_h=3.,
zhmin=0, zhmax=20, rhvmin=0.985, minbins=4,
stats=False):
r"""
Calculate the offset on :math:`\Phi_{DP}` using QVPs.
Parameters
----------
pol_profs : dict
Profiles of polarimetric variables.
mlyr : class
Melting layer class containing the top and bottom
boundaries of the ML.
min_h : float, optional
Minimum height of usable data within the polarimetric
profiles. The default is 0.
max_h : float, optional
Maximum height of usable data within the polarimetric
profiles. The default is 3.
zhmin : float, optional
Threshold on :math:`Z_{H}` (in dBZ) related to light rain.
The default is 0.
zhmax : float, optional
Threshold on :math:`Z_{H}` (in dBZ) related to light rain.
The default is 20.
rhvmin : float, optional
Threshold on :math:`\rho_{HV}` (unitless) related to
light rain. The default is 0.985.
minbins : float, optional
Consecutive bins of :math:`\Phi_{DP}` related to light
rain. The default is 3.
stats : dict, optional
If True, the function returns stats related to the
computation of the :math:`\Phi_{DP}` offset.
The default is False.
Notes
-----
1. Adapted from the method described in [1]
References
----------
.. [1] Sanchez-Rivas, D. and Rico-Ramirez, M. A. (2022):
"Calibration of radar differential reflectivity using
quasi-vertical profiles", Atmos. Meas. Tech., 15, 503–520,
https://doi.org/10.5194/amt-15-503-2022
"""
if mlyr is None:
mlvl = 5
mlyr_thickness = 0.5
mlyr_bottom = mlvl - mlyr_thickness
else:
mlvl = mlyr.ml_top
mlyr_thickness = mlyr.ml_thickness
mlyr_bottom = mlyr.ml_bottom
if np.isnan(mlyr_bottom):
boundaries_idx = [find_nearest(
pol_profs.georef['profiles_height [km]'], min_h),
find_nearest(pol_profs.georef['profiles_height [km]'],
mlvl-mlyr_thickness)]
else:
boundaries_idx = [find_nearest(
pol_profs.georef['profiles_height [km]'], min_h),
find_nearest(pol_profs.georef['profiles_height [km]'],
mlyr_bottom)]
if boundaries_idx[1] <= boundaries_idx[0]:
boundaries_idx = [np.nan]
if np.isnan(mlvl) and np.isnan(mlyr_bottom):
boundaries_idx = [np.nan]
maxheight = find_nearest(pol_profs.georef['profiles_height [km]'],
max_h)
if any(np.isnan(boundaries_idx)):
self.phidp_offset = 0
else:
profs = copy.deepcopy(pol_profs.qvps)
calpdp_qvps = {k: v[boundaries_idx[0]:boundaries_idx[1]]
for k, v in profs.items()}
calpdp_qvps['PhiDP [deg]'][calpdp_qvps['ZH [dBZ]']
< zhmin] = np.nan
calpdp_qvps['PhiDP [deg]'][calpdp_qvps['ZH [dBZ]']
> zhmax] = np.nan
calpdp_qvps['PhiDP [deg]'][calpdp_qvps['rhoHV [-]']
< rhvmin] = np.nan
if np.count_nonzero(
~np.isnan(calpdp_qvps['PhiDP [deg]'])) <= minbins:
calpdp_qvps['PhiDP [deg]'] *= np.nan
calpdp_qvps['PhiDP [deg]'][calpdp_qvps['rhoHV [-]']
> maxheight] = np.nan
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=RuntimeWarning)
calpdpqvps_mean = np.nanmean(calpdp_qvps['PhiDP [deg]'])
calpdpqvps_max = np.nanmax(calpdp_qvps['PhiDP [deg]'])
calpdpqvps_min = np.nanmin(calpdp_qvps['PhiDP [deg]'])
calpdpqvps_std = np.nanstd(calpdp_qvps['PhiDP [deg]'])
calpdpqvps_sem = (np.nanstd(
calpdp_qvps['PhiDP [deg]'])
/ np.sqrt(len(calpdp_qvps['PhiDP [deg]'])))
if not np.isnan(calpdpqvps_mean):
self.phidp_offset = calpdpqvps_mean
else:
self.phidp_offset = 0
if stats:
self.phidp_offset_stats = {'offset_max': calpdpqvps_max,
'offset_min': calpdpqvps_min,
'offset_std': calpdpqvps_std,
'offset_sem': calpdpqvps_sem}
[docs]
def offsetdetection_ppi(self, rad_vars, mov_avrgf_len=(1, 3), thr_spdp=10,
rhohv_min=0.9, zh_min=5., max_off=360, preset=None,
preset_tol=5, mode='median', plot_method=False):
r"""
Compute the :math:`\Phi_{DP}` offset using PPIs`.
Parameters
----------
rad_vars : dict
Dict containing radar variables to plot. :math:`\Phi_{DP}`,
:math:`\rho_{HV}` and :math:`Z_H` must be in the dict.
mov_avrgf_len : 2-element tuple or list, optional
Window size used to smooth :math:`\Phi_{DP}` by applying a
moving average window. The default is (1, 3). It is
recommended to average :math:`\Phi_{DP}` along the range,
i.e. keep the window size in a (1, n) size.
thr_spdp : int or float, optional
Threshold used to discard bins with standard deviations of
:math:`\Phi_{DP}` greater than the selected value.
The default is 10 deg.
rhohv_min : float, optional
Threshold in :math:`\rho_{HV}` used to discard bins
related to nonmeteorological signals. The default is 0.90
zh_min : float, optional
Threshold in :math:`Z_H` used to discard bins related to
nonmeteorological signals. The default is 5.
max_off : float or int, optional
Maximum value allowed for :math:`\Phi_{DP}(0)`.
The default is 360.
preset : float or int, optional
Preset :math:`\Phi_{DP}(0)`. The default is None.
preset_tol : float or int, optional
Maximum difference allowed between the preset and
computed :math:`\Phi_{DP}(0)` values. Offset values that
exceed this tolerance value are replaced with the preset
value. The default tolerance is 5 deg.
mode : str, optional
Resulting :math:`\Phi_{DP}` offset. The string has to be
one of 'median' or 'multiple'. If median, :math:`\Phi_{DP}`
offset is computed as a single value
(the median of all rays). Otherwise,
the :math:`\Phi_{DP}` offset is calculated ray-wise.
The default is 'median'.
"""
rad_vars = copy.copy(rad_vars)
if (mov_avrgf_len[1] % 2) == 0:
print('Choose an odd number to apply the '
+ 'moving average filter')
phidp_O = {k: np.ones_like(rad_vars[k]) * rad_vars[k]
for k in list(rad_vars) if k.startswith('Phi')}
# phidp_O['PhiDP [deg]'][:, 0] = np.nan
# Filter isolated values
phidp_pad = np.pad(phidp_O['PhiDP [deg]'],
((0, 0), (mov_avrgf_len[1]//2,
mov_avrgf_len[1]//2)),
mode='constant', constant_values=(np.nan))
phidp_dspk = np.array(
[[np.nan if ~np.isnan(vbin)
and (np.isnan(phidp_pad[nr][nbin-1])
and np.isnan(phidp_pad[nr][nbin+1]))
else 1 for nbin, vbin in enumerate(phidp_pad[nr])
if nbin != 0
and nbin < phidp_pad.shape[1] - mov_avrgf_len[1] + 2]
for nr in range(phidp_pad.shape[0])], dtype=np.float64)
# Filter using ZH
phidp_dspk[rad_vars['ZH [dBZ]'] < zh_min] = np.nan
# Filter using rhoHV
phidp_dspk[rad_vars['rhoHV [-]'] < rhohv_min] = np.nan
# Computes sPhiDP for each ray
phidp_dspk_rhv = phidp_O['PhiDP [deg]'] * phidp_dspk
phidp_s = np.nanstd(rolling_window(
phidp_dspk_rhv, mov_avrgf_len), axis=-1, ddof=1)
phidp_pad = np.pad(phidp_s, ((0, 0), (mov_avrgf_len[1]//2,
mov_avrgf_len[1]//2)),
mode='constant', constant_values=(np.nan))
# Filter values with std values greater than std threshold
phidp_sfnv = np.array(
[[np.nan if vbin >= thr_spdp
and (phidp_pad[nr][nbin-1] >= thr_spdp
or phidp_pad[nr][nbin+1] >= thr_spdp)
else 1 for nbin, vbin in enumerate(phidp_pad[nr])
if nbin != 0
and nbin < phidp_pad.shape[1] - mov_avrgf_len[1] + 2]
for nr in range(phidp_pad.shape[0])], dtype=np.float64)
# Filter isolated values
phidp_sfnv2 = np.array(
[[np.nan if ~np.isnan(vbin)
and (np.isnan(phidp_pad[nr][nbin-1])
or np.isnan(phidp_pad[nr][nbin+1]))
else 1 for nbin, vbin in enumerate(phidp_pad[nr])
if nbin != 0
and nbin < phidp_pad.shape[1] - mov_avrgf_len[1] + 2]
for nr in range(phidp_pad.shape[0])], dtype=np.float64)
phidp_sfnv = phidp_sfnv*phidp_sfnv2
phidp_f = phidp_dspk_rhv * phidp_sfnv
# Filter isolated values
phidp_pad = np.pad(phidp_f, ((0, 0), (mov_avrgf_len[1]//2,
mov_avrgf_len[1]//2)),
mode='constant', constant_values=(np.nan))
phidp_f2 = np.array(
[[np.nan if ~np.isnan(vbin)
and (np.isnan(phidp_pad[nr][nbin-1])
or np.isnan(phidp_pad[nr][nbin+1]))
else 1 for nbin, vbin in enumerate(phidp_pad[nr])
if nbin != 0
and nbin < phidp_pad.shape[1] - mov_avrgf_len[1] + 2]
for nr in range(phidp_pad.shape[0])], dtype=np.float64)
phidp_f = phidp_f * phidp_f2
# Computes and initial PhiDP(0)
phidp0 = np.array([[
nr[np.isfinite(nr)][0] if ~np.isnan(nr).all() else 0
for nr in phidp_f]], dtype=np.float64).transpose()
phidp0[phidp0 == 0] = np.nanmedian(phidp0[phidp0 != 0])
phidp0[abs(phidp0) > max_off] = 0
phidp0 = np.nan_to_num(phidp0)
if preset:
phidp0[abs(phidp0 - preset) > preset_tol] = preset
if mode == 'median':
phidp_offset = np.nanmedian(phidp0)
if abs(phidp_offset) > max_off or np.isnan(phidp_offset):
phidp_offset = 0
if preset and abs(phidp_offset - preset) > preset_tol:
phidp_offset = preset
elif mode == 'multiple':
phidp_offset = phidp0.flatten()
self.phidp_offset = phidp_offset
if plot_method:
rad_params = {}
if self.elev_angle:
rad_params['elev_ang [deg]'] = self.elev_angle
else:
rad_params['elev_ang [deg]'] = 'surveillance scan'
if self.scandatetime:
rad_params['datetime'] = self.scandatetime
else:
rad_params['datetime'] = None
var = 'PhiDP [deg]'
phidp02plot = np.nanmedian(phidp0)
rad_var = phidp0.flatten()
azim = np.deg2rad(np.arange(len(rad_var)))
rad_georef = {}
rad_georef['azim [rad]'] = azim
rad_display.plot_offsetcorrection(
rad_georef, rad_params, rad_vars['PhiDP [deg]'], var_m=rad_var,
var_offset=phidp02plot, var_name=var, mode='other')
[docs]
def offset_correction(self, phidp2calib, phidp_offset=0,
data2correct=None):
r"""
Correct the PhiDP offset using a given value.
Parameters
----------
phidp2calib : array of float
Offset-affected differential phase :math:`(\Phi_{DP})`
in deg.
phidp_offset : float
Differential phase offset in deg. The default is 0.
data2correct : dict, optional
Dictionary to update the offset-corrected
:math:`(\Phi_{DP})`. The default is None.
"""
if isinstance(phidp_offset, (int, float)):
if np.isnan(phidp_offset):
phidp_offset = 0
phidp_oc = copy.deepcopy(phidp2calib) - phidp_offset
elif isinstance(phidp_offset, (np.ndarray, list, tuple)):
# TODO: raise error if lens are different
phidp_oc = copy.deepcopy(phidp2calib)
phidp_oc[:] = [ray - phidp_offset[cnt]
for cnt, ray in enumerate(phidp_oc)]
if data2correct is None:
self.vars = {'PhiDP [deg]': phidp_oc}
else:
data2cc = copy.deepcopy(data2correct)
data2cc.update({'PhiDP [deg]': phidp_oc})
self.vars = data2cc