"""Towerpy: an open-source toolbox for processing polarimetric radar data."""
import numpy as np
import time
# import copy
from datetime import datetime
from ..base import TowerpyError
from ..datavis import rad_display
from ..utils.radutilities import find_nearest
# TODO: Add KDP to the building process.
# TODO: Add printing message warning of KDP and Vel? instead of raising error.
[docs]
class PolarimetricProfiles:
"""
A class to generate profiles of polarimetric variables.
Attributes
----------
elev_angle : float or list
Elevation angle at which the scan was taken, in deg.
file_name : str or list
Name of the file containing radar data.
scandatetime : datetime or list
Date and time of scan.
site_name : str
Name of the radar site.
georef : dict, optional
Descriptor of the computed profiles height.
vps : dict, optional
Profiles generated from a birdbath scan.
vps_stats : dict, optional
Statistics of the VPs generation.
qvps : dict, optional
Quasi-Vertical Profiles generated from the PPI scan.
qvps_stats : dict, optional
Statistics of the QVPs generation.
rd_qvps : dict, optional
Range-defined Quasi-Vertical Profiles generated from PPI scans
taken at different elevation angles.
qvps_itp : dict, optional
QVPs generated from each elevation angle.
"""
def __init__(self, radobj=None):
[docs]
self.elev_angle = getattr(radobj, 'elev_angle',
None) if radobj else None
[docs]
self.file_name = getattr(radobj, 'file_name',
None) if radobj else None
[docs]
self.scandatetime = getattr(radobj, 'scandatetime',
None) if radobj else None
[docs]
self.site_name = getattr(radobj, 'site_name',
None) if radobj else None
[docs]
self.profs_type = getattr(radobj, 'profs_type',
None) if radobj else None
[docs]
def pol_vps(self, rad_georef, rad_params, rad_vars, thlds=None,
valid_gates=0, stats=False):
"""
Generate profiles of polarimetric variables from a birdbath scan.
Parameters
----------
rad_georef : dict
Georeferenced data containing descriptors of the azimuth, gates
and beam height, amongst others.
rad_params : dict
Radar technical details.
rad_vars : dict
Radar variables used to generate the VPs.
thlds : dict containing key and 2-element tuple or list, optional
Thresholds [min, max] of radar variables used to discard gates
in the azimuthal averaging. The default is None.
valid_gates : int, optional
Number of valid gates (or azimuths) along the radial.
The default is 0.
stats : Bool, optional
Statistics of the VPs generation:
'std_dev': Standard Deviation
'min': Min values
'max': Max values
'sem': Standard Error of the Mean
"""
ri, rf = 0, rad_params['ngates']
if thlds is not None:
thlds_vps = {'ZH [dBZ]': None, 'ZDR [dB]': None, 'rhoHV [-]': None,
'PhiDP [deg]': None, 'V [m/s]': None,
'KDP [deg/km]': None}
thlds_vps.update(thlds)
rvars_idx = {k: np.where((kv >= thlds_vps[k][0])
& (kv <= thlds_vps[k][1]),
True, False)
for k, kv in rad_vars.items()
if thlds_vps[k] is not None}
valid_idx = True
for i in rvars_idx:
valid_idx = valid_idx*rvars_idx[i]
rad_vars = {k: np.where(valid_idx, kv, np.nan)
for k, kv in rad_vars.items()}
if self.elev_angle < 89:
raise TowerpyError('The elevation angle must be around 90 deg')
if self.elev_angle > 89:
vpdata = {key: values for key, values in rad_vars.items()}
validgates = 0
vppol = {
key:
np.array([np.nanmean(values[0:rad_params['nrays'], i:i+1])
if np.count_nonzero(~np.isnan(values[0:rad_params['nrays'], i:i+1]))>validgates
else np.nan for i in range(ri, rf)])
for key, values in vpdata.items()}
if stats:
vpsstd = {key: np.array([np.nanstd(values[0:rad_params['nrays'], i:i+1])
if np.count_nonzero(~np.isnan(values[0:rad_params['nrays'], i:i+1]))>validgates
else np.nan for i in range(ri, rf)])
for key, values in vpdata.items()}
vpsmin = {key: np.array([np.nanmin(values[0:rad_params['nrays'], i:i+1])
if np.count_nonzero(~np.isnan(values[0:rad_params['nrays'], i:i+1]))>validgates
else np.nan for i in range(ri, rf)])
for key, values in vpdata.items()}
vpsmax = {key: np.array([np.nanmax(values[0:rad_params['nrays'], i:i+1])
if np.count_nonzero(~np.isnan(values[0:rad_params['nrays'], i:i+1]))>validgates
else np.nan for i in range(ri, rf)])
for key, values in vpdata.items()}
vpssem = {key: np.array([np.nanstd(values[0:rad_params['nrays'], i:i+1])/np.sqrt(np.count_nonzero(~np.isnan(values[0:rad_params['nrays'], i:i+1])))
if np.count_nonzero(~np.isnan(values[0:rad_params['nrays'], i:i+1]))>validgates
else np.nan for i in range(ri, rf)])
for key, values in vpdata.items()}
self.vps_stats = {'std_dev': vpsstd,
'min': vpsmin,
'max': vpsmax,
'sem': vpssem}
if 'V [m/s]' in rad_vars.keys() and isinstance(vppol['V [m/s]'],
np.ndarray):
vppol['gradV [dV/dh]'] = np.array(np.gradient(vppol['V [m/s]'])).T
if 'gradV [dV/dh]' in vppol.keys() and stats:
self.vps_stats['std_dev']['gradV [dV/dh]'] = np.empty_like(self.vps_stats['std_dev']['V [m/s]'])
self.vps_stats['min']['gradV [dV/dh]'] = np.empty_like(self.vps_stats['min']['V [m/s]'])
self.vps_stats['max']['gradV [dV/dh]'] = np.empty_like(self.vps_stats['max']['V [m/s]'])
self.vps_stats['sem']['gradV [dV/dh]'] = np.empty_like(self.vps_stats['sem']['V [m/s]'])
self.vps_stats['std_dev']['gradV [dV/dh]'][:] = np.nan
self.vps_stats['min']['gradV [dV/dh]'][:] = np.nan
self.vps_stats['max']['gradV [dV/dh]'][:] = np.nan
self.vps_stats['sem']['gradV [dV/dh]'][:] = np.nan
self.vps = vppol
self.profs_type = 'VPs'
self.georef = {}
profh = np.array([np.mean(rays)
for rays in rad_georef['beam_height [km]'].T])
self.georef['profiles_height [km]'] = profh
[docs]
def pol_qvps(self, rad_georef, rad_params, rad_vars, thlds='default',
valid_gates=30, stats=False, exclude_vars=['V [m/s]'],
qvps_height_method='bh'):
r"""
Generate QVPs of polarimetric variables.
Parameters
----------
rad_georef : dict
Georeferenced data containing descriptors of the azimuth, gates
and beam height, amongst others.
rad_params : dict
Radar technical details.
rad_vars : dict
Radar variables used to generate the QVPs.
thlds : dict containing 2-element tuple, optional
Thresholds [min, max] of radar variables used to discard gates
in the azimuthal averaging. The default are: ZH [dBZ] > -10 and
rhoHV > 0.6, according to [1]_.
valid_gates : int, optional
Number of valid gates (or azimuths) along the radial.
The default is 30, according to [1]_.
stats : Bool, optional
Statistics of the QVPs generation:
'std_dev': Standard Deviation
'min': Min values
'max': Max values
'sem': Standard Error of the Mean
exclude_vars : list, optional
Name of the variables that will not be used to compute the QVPs.
The default is ['V [m/s]'].
Notes
-----
1. It is recommended to follow the routine described in [2]_ to
preprocess :math:`\Phi_{DP}` and compute :math:`K_{DP}`.
References
----------
.. [1] Ryzhkov, A. V. et al. (2016) "Quasi-vertical profiles-A new way
to look at polarimetric radar data"", Journal of Atmospheric and
Oceanic Technology, 33(3), pp. 551–562.
https://doi.org/10.1175/JTECH-D-15-0020.1
.. [2] Griffin, E. M., Schuur, T. J., & Ryzhkov, A. V. (2018).
"A Polarimetric Analysis of Ice Microphysical Processes in Snow,
Using Quasi-Vertical Profiles", Journal of Applied Meteorology and
Climatology, 57(1), 31-50. https://doi.org/10.1175/JAMC-D-17-0033.1
"""
ri, rf = 0, rad_params['ngates']
dh1 = (rad_georef['beam_height [km]'][0]
+ np.diff(rad_georef['range [m]']/1000, prepend=0)
* np.sin(np.deg2rad(rad_params['elev_ang [deg]'])))
dh2 = (rad_georef['beam_height [km]'][0]
+ rad_georef['beam_height [km]'][0]
* np.deg2rad(rad_params['beamwidth [deg]'])
* (1/np.tan(np.deg2rad(rad_params['elev_ang [deg]']))))
dh = np.array([dhe if dhe > dh2[c1] else dh2[c1]
for c1, dhe in enumerate(dh1)])
if qvps_height_method == 'vr':
qvps_h = dh
elif qvps_height_method == 'bh':
qvps_h = rad_georef['beam_height [km]'][0]
else:
raise TowerpyError('Choose a method to compute the height of the'
'Quasi-Vertical Profiles.')
thlds_qvps = {'ZH [dBZ]': [-10, np.inf], 'ZDR [dB]': None,
'rhoHV [-]': [0.6, np.inf], 'PhiDP [deg]': None,
'V [m/s]': None}
thlds_qvps = {k: thlds_qvps[k] if k in thlds_qvps.keys()
else None for k in rad_vars.keys()}
if thlds != 'default':
thlds_qvps.update(thlds)
rvars_idx = {k: np.where((kv >= thlds_qvps[k][0])
& (kv <= thlds_qvps[k][1]), True, False)
for k, kv in rad_vars.items()
if thlds_qvps[k] is not None}
valid_idx = True
for i in rvars_idx:
valid_idx = valid_idx*rvars_idx[i]
rad_vars = {k: np.where(valid_idx, kv, np.nan)
for k, kv in rad_vars.items()}
validgates = valid_gates
# vars_nu = ['V [m/s]'] # Modify to compute QVPs of V.
qvpvar = sorted(list(set([k for k in rad_vars.keys()
if k not in exclude_vars])),
reverse=True)
qvpdata = {key: values
for key, values in rad_vars.items() if key in qvpvar}
qvppol = {key: np.array([np.nanmean(values[0:rad_params['nrays'],
i:i+1])
if np.count_nonzero(~np.isnan(values[0:rad_params['nrays'], i: i+1])) > validgates
else np.nan
for i in range(ri, rf)])
for key, values in qvpdata.items()}
if stats:
qvpsstd = {key: np.array([np.nanstd(values[0:rad_params['nrays'], i:i+1])
if np.count_nonzero(~np.isnan(values[0:rad_params['nrays'], i:i+1]))>validgates
else np.nan for i in range(ri, rf)])
for key, values in qvpdata.items()}
qvpsmin = {key: np.array([np.nanmin(values[0:rad_params['nrays'], i:i+1])
if np.count_nonzero(~np.isnan(values[0:rad_params['nrays'], i:i+1]))>validgates
else np.nan for i in range(ri, rf)])
for key, values in qvpdata.items()}
qvpsmax = {key: np.array([np.nanmax(values[0:rad_params['nrays'], i:i+1])
if np.count_nonzero(~np.isnan(values[0:rad_params['nrays'], i:i+1]))>validgates
else np.nan for i in range(ri, rf)])
for key, values in qvpdata.items()}
qvpssem = {key: np.array([np.nanstd(values[0:rad_params['nrays'], i:i+1])/np.sqrt(np.count_nonzero(~np.isnan(values[0:rad_params['nrays'], i:i+1])))
if np.count_nonzero(~np.isnan(values[0:rad_params['nrays'], i:i+1]))>validgates
else np.nan for i in range(ri, rf)])
for key, values in qvpdata.items()}
self.qvps_stats = {'std_dev': qvpsstd,
'min': qvpsmin,
'max': qvpsmax,
'sem': qvpssem}
# qvppol['V [m/s]'] = qvppol['ZH [dBZ]']*np.nan
# qvppol['gradV [dV/dh]'] = qvppol['ZH [dBZ]']*np.nan
self.qvps = qvppol
self.profs_type = 'QVPs'
self.georef = {}
self.georef['profiles_height [km]'] = qvps_h
[docs]
def pol_rdqvps(self, rscans_georef, rscans_params, rscans_vars, r0=None,
valid_gates=30, thlds='default', power_param1=0, vert_res=2,
power_param2=2, spec_range=50, all_desc=True,
exclude_vars=['V [m/s]'], qvps_height_method='bh',
plot_method=False):
r"""
Generate RD-QVPs of polarimetric variables.
Parameters
----------
rscans_georef : list
List of dicts containing the georeference of the PPI scans.
rscans_params : list
List of dicts containing Radar technical details.
rscans_vars : list
List of dicts containing radar variables used to generate the
RD-QVPs.
r0 : float or list of floats, optional
Initial range within the PPI scans to build the QVPS, in km.
The default is None.
valid_gates : int, optional
Number of valid gates (or azimuths) along the radial.
The default is 30, according to [1]_.
thlds : dict containing 2-element tuple, optional
Thresholds [min, max] of radar variables used to discard gates
in the azimuthal averaging. The default are: ZH [dBZ] > -10 and
rhoHV > 0.6, according to [1]_.
power_param1 : float, optional
Power parameter for :math:`r_i \leq d-1`. The default is 0,
according to [2]_.
vert_res : float, optional
Resolution of the common vertical axis, in m. The default is 2.
power_param2 : float, optional
Power parameter for :math:`r_i > d-1`. The default is 2,
according to [2]_.
spec_range : int, optional
Range from the radar within which the data will be used.
The default is 50.
all_desc : bool, optional
If False, the function provides descriptors using an average of
datetime and elevations and will not give the initial QVPs used
to compute the RD-QPVs. The default is True.
exclude_vars : list, optional
Name of the variables that will not be used to compute the QVPs.
The default is ['V [m/s]'].
qvps_height_method : str, optional
'bh' or 'vr'
plot_method : bool, optional
Plot the RD-QVPS. The default is False.
Returns
-------
None.
References
----------
.. [1] Ryzhkov, A. V. et al. (2016)
‘Quasi-vertical profiles-A new way to look at polarimetric
radar data’,
Journal of Atmospheric and Oceanic Technology, 33(3), pp. 551–562.
https://doi.org/10.1175/JTECH-D-15-0020.1
.. [2] Tobin, D. M., & Kumjian, M. R. (2017). Polarimetric Radar and
Surface-Based Precipitation-Type Observations of Ice Pellet to
Freezing Rain Transitions, Weather and Forecasting, 32(6),
2065-2082. https://doi.org/10.1175/WAF-D-17-0054.1
.. [3] Griffin, E. M., Schuur, T. J., & Ryzhkov, A. V. (2018).
A Polarimetric Analysis of Ice Microphysical Processes in Snow,
Using Quasi-Vertical Profiles, Journal of Applied Meteorology and
Climatology, 57(1), 31-50. https://doi.org/10.1175/JAMC-D-17-0033.1
"""
if r0 is None:
r0 = [0 for i in rscans_params]
else:
if isinstance(r0, (int, float)):
r0 = [find_nearest(i['range [m]'], r0*1000)
for i in rscans_georef]
elif isinstance(r0, list):
if len(r0) == len(rscans_vars):
r0 = [find_nearest(i['range [m]'], r0[c]*1000)
for c, i in enumerate(rscans_georef)]
else:
raise TowerpyError('Length of values r0 does not match'
' length of elevation index'
' (rscans_vars)')
# if rf is None:
# rf = [i['ngates'] for i in rscans_params]
# else:
# if isinstance(rf, (int, float)):
# rf = [find_nearest(i['range [m]'], rf*1000)
# for i in rscans_georef]
# elif isinstance(r0, list):
# if len(rf) == len(rscans_vars):
# rf = [find_nearest(i['range [m]'], rf[c]*1000)
# for c, i in enumerate(rscans_georef)]
# else:
# raise TowerpyError('Length of values rf does not match'
# ' length of elevation index'
# ' (rscans_vars)')
# rf = [find_nearest(i['range [m]'], spec_range*1000)
# for i in rscans_georef]
rf = [i['ngates'] for i in rscans_params]
dh1 = [v['beam_height [km]'][0]+np.diff(v['range [m]']/1000, prepend=0)
* np.sin(np.deg2rad(rscans_params[c]['elev_ang [deg]']))
for c, v in enumerate(rscans_georef)]
dh2 = [v['beam_height [km]'][0]+v['beam_height [km]'][0]
* np.deg2rad(rscans_params[c]['beamwidth [deg]'])
* (1/np.tan(np.deg2rad(rscans_params[c]['elev_ang [deg]'])))
for c, v in enumerate(rscans_georef)]
dh = [np.array([dhi if dhi > dh2[c1][c2] else dh2[c1][c2]
for c2, dhi in enumerate(dhe)])
for c1, dhe in enumerate(dh1)]
if qvps_height_method == 'vr':
qvps_h = [dh[c] for c, v in enumerate(rscans_georef)]
elif qvps_height_method == 'bh':
qvps_h = [v['beam_height [km]'][0]
for n, v in enumerate(rscans_georef)]
else:
raise TowerpyError('Choose a method to compute the height of the'
'Quasi-Vertical Profiles.')
qvps_hr = [hb[r0[c]:rf[c]] for c, hb in enumerate(qvps_h)]
vg = valid_gates
thlds_qvps = {'ZH [dBZ]': [-10, 100], 'ZDR [dB]': None,
'rhoHV [-]': [0.6, 10], 'PhiDP [deg]': None,
'V [m/s]': None, 'KDP [deg/km]': None}
if thlds != 'default':
thlds_qvps.update(thlds)
rvars_idx = [{k: np.where((kv >= thlds_qvps[k][0])
& (kv <= thlds_qvps[k][1]), True, False)
for k, kv in rad_vars.items()
if thlds_qvps[k] is not None} for rad_vars in rscans_vars]
valid_idx = []
for elevsc in rvars_idx:
validxs = True
for i in elevsc:
validxs = validxs * elevsc[i]
valid_idx.append(validxs)
# vars_nu = ['V [m/s]'] # Modify to compute QVPs of V.
qvpvar = sorted(list(set([k for robj in rscans_vars
for k in robj.keys()
if k not in exclude_vars])),
reverse=True)
rscans_vc = [{k: np.where(valid_idx[c], kv, np.nan)
for k, kv in rad_vars.items() if k in qvpvar}
for c, rad_vars in enumerate(rscans_vars)]
qvppol = [{key: np.array([np.nanmean(values[0:
rscans_params[c]['nrays'],
i:i+1])
if np.count_nonzero(~np.isnan(values[0: rscans_params[c]['nrays'],
i: i+1])) > vg
else np.nan
for i in range(r0[c], rf[c])])
for key, values in qvpdata.items()}
for c, qvpdata in enumerate(rscans_vc)]
# qvppol['V [m/s]'] = qvppol['ZH [dBZ]']*np.nan
# qvppol['gradV [dV/dh]'] = qvppol['ZH [dBZ]']*np.nan
yaxis = np.arange(0, np.ceil(max([np.nanmax(hb) for hb in qvps_hr])),
vert_res/1000)
qvps_itp = [{nv: np.interp(yaxis, qvps_hr[c], pvars,
left=np.nan, right=np.nan
)
for nv, pvars in qvps.items()}
for c, qvps in enumerate(qvppol)]
rng_d = [rngs['range [m]']/1000
for c, rngs in enumerate(rscans_georef)]
rng_itp = [np.linspace(rng[0], rng[-1], len(yaxis))
for c, rng in enumerate(rng_d)]
w_func = np.array([np.array([1 if spec_range-1 < rngi <= spec_range
else
1/(abs(rngi-(spec_range-1))**power_param1)
if rngi <= spec_range-1 else
1/(abs(rngi-(spec_range-1))**power_param2)
if rngi > spec_range-1 else
np.nan
for rngi in rngelv])
for rngelv in rng_itp]).T
rdqvps_vidx = {pvar: np.array([~np.isnan([e[pvar][i]
for e in qvps_itp])
for i in range(len(yaxis))])
for pvar in qvpvar}
rdqvps_val = {pvar: np.array([[e[pvar][i] for e in qvps_itp]
for i in range(len(yaxis))])
for pvar in qvpvar}
rdqvps = {pvar: np.array([np.nansum(rdqvps_val[pvar][row]
* (w_func[row]
* rdqvps_vidx[pvar][row]))
/ np.nansum((w_func[row]
* rdqvps_vidx[pvar][row]))
if
np.count_nonzero(rdqvps_vidx[pvar][row]) >= 1
else np.nan for row in range(len(yaxis))])
for pvar in qvpvar}
self.rd_qvps = rdqvps
self.profs_type = 'RD-QVPs'
self.georef = {}
self.georef['profiles_height [km]'] = yaxis
if all_desc:
self.qvps_itp = qvps_itp
self.elev_angle = np.array([i['elev_ang [deg]']
for i in rscans_params])
self.scandatetime = [i['datetime'] for i in rscans_params]
else:
self.elev_angle = np.average(np.array([i['elev_ang [deg]']
for i in rscans_params]))
dmmydt = [i['datetime'] for i in rscans_params]
self.scandatetime = datetime.fromtimestamp(sum(d.timestamp()
for d in dmmydt)
/ len(dmmydt))
self.file_name = 'RD-QVPs'
# snames_list = [i['site_name'] for i in rscans_params]
# if snames_list.count(snames_list[0]) == len(snames_list):
# self.site_name = snames_list[0]
# else:
# self.site_name = [i['site_name'] for i in rscans_params]
if plot_method:
rad_display.plot_rdqvps(rscans_georef, rscans_params, self,
spec_range=spec_range, all_desc=all_desc)