"""Towerpy: an open-source toolbox for processing polarimetric radar data."""
from functools import reduce
from itertools import product
import numpy as np
from ..base import TowerpyError
from ..utils import radutilities as rut
from ..datavis import rad_display
from ..datavis import rad_interactive
[docs]
class MeltingLayer:
"""
A class to determine the melting layer using weather radar data.
Attributes
----------
elev_angle : float
Elevation angle at which the scan was taken, in deg.
file_name : str
Name of the file containing radar data.
scandatetime : datetime
Date and time of scan.
site_name : str
Name of the radar site.
ml_top : float
Top of the detected melting layer, in km.
ml_bottom : float
Bottom of the detected melting layer, in km.
ml_thickness : float
Thickness of the detected melting layer, in km.
"""
def __init__(self, radobj=None):
[docs]
self.elev_angle = getattr(radobj, 'elev_angle',
None) if radobj else None
[docs]
self.file_name = getattr(radobj, 'file_name',
None) if radobj else None
[docs]
self.scandatetime = getattr(radobj, 'scandatetime',
None) if radobj else None
[docs]
self.site_name = getattr(radobj, 'site_name',
None) if radobj else None
[docs]
self.ml_source = getattr(radobj, 'profs_type',
'user-defined') if radobj else 'user-defined'
[docs]
self.profs_type = getattr(radobj, 'profs_type',
'user-defined') if radobj else 'user-defined'
[docs]
def findpeaksboundaries(profile_v, profile_h, param_k=None):
"""
Find peaks inside a profile using signal processing.
Parameters
----------
profile : array
Radar profile built from high elevation scans.
pheight : array
Heights correspondig to the radar profiles, in km.
Returns
-------
peaks_idx : dict
Index values of the peaks found within the profiles.
"""
import scipy.signal as scs
peaks, peaks_props = scs.find_peaks(profile_v, height=(None, None),
threshold=(None, None),
prominence=(param_k*.7, 2),
width=(None, None),
plateau_size=(None, None),
# rel_height=0.985
rel_height=0.99
)
peaks_props['left_ips'] = np.interp(
peaks_props['left_ips'], np.arange(0, len(profile_v)),
profile_h)
peaks_props['right_ips'] = np.interp(
peaks_props['right_ips'], np.arange(0, len(profile_v)),
profile_h)
if peaks.size == 0 or np.all(np.isnan(peaks)):
peaks_idx = {'idxmax': np.nan, 'idxtop': np.nan, 'idxbot': np.nan,
'mltop': np.nan, 'mlbtm': np.nan, 'peakmaxvalue': np.nan,
'mlpeak': np.nan}
else:
bbpeak_naiveidx = np.nanargmax(peaks_props['peak_heights'])
peakmax_props = {
(k1 if k1.endswith('ips') else k1[:-1]): v1[bbpeak_naiveidx]
for k1, v1 in peaks_props.items()}
idx_top = rut.find_nearest(profile_h, peakmax_props['right_ips'],
mode='major')
idx_bot = rut.find_nearest(profile_h, peakmax_props['left_ips'],
mode='minor')
peaks_idx = {
'idxmax': peaks[bbpeak_naiveidx],
# 'idxtop': peakmax_props['right_base'],
# 'idxbot': peakmax_props['left_base'],
'idxtop': idx_top,
'idxbot': idx_bot,
'peakmaxvalue': peakmax_props['peak_height'],
'peakmaxits': peakmax_props['prominence'],
'mlpeak': profile_h[peaks[bbpeak_naiveidx]],
# 'mltop': profile_h[peakmax_props['right_base']],
# 'mlbtm': profile_h[peakmax_props['left_base']],
'mltop': profile_h[idx_top],
'mlbtm': profile_h[idx_bot]
}
peaks_idx['mlthk'] = peaks_idx['mltop'] - peaks_idx['mlbtm']
return peaks_idx
[docs]
def ml_detection(self, pol_profs, min_h=0., max_h=5., zhnorm_min=5.,
zhnorm_max=60., rhvnorm_min=0.85, rhvnorm_max=1.,
upplim_thr=0.75, param_k=0.05, param_w=0.75, comb_id=None,
phidp_peak='left', gradv_peak='left', plot_method=False):
r"""
Detect melting layer signatures within polarimetric VPs/QVPs.
Parameters
----------
pol_profs : dict
Polarimetric profiles of radar variables.
min_h : float, optional
Minimum height of usable data within the polarimetric profiles.
The default is 0.
max_h : float, optional
Maximum height to search for the bright band peak.
The default is 5.
zhnorm_min : float, optional
Min value of :math:`Z_{H}` to use for the min-max normalisation.
The default is 5.
zhnorm_max : float, optional
Max value of :math:`Z_{H}` to use for the min-max normalisation.
The default is 60.
rhvnorm_min : float, optional
Min value of :math:`\rho_{HV}` to use for the min-max
normalisation. The default is 0.85.
rhvnorm_max : float, optional
Max value of :math:`\rho_{HV}` to use for the min-max
normalisation. The default is 1.
phidp_peak : str, optional
Direction of the peak in :math:`\Phi_{DP}` related to the ML. The
method described in [1]_ assumes that the peak points to the left,
(see Figure 3 in the paper) but this can be changed using this
argument.
gradv_peak : str, optional
Direction of the peak in :math:`gradV` related to the ML. The
method described in [1]_ assumes that the peak points to the left,
(see Figure 3 in the paper) but this can be changed using this
argument.
param_k : float, optional
Threshold related to the magnitude of the peak used to detect the
ML. The default is 0.05.
param_w : float, optional
Weighting factor used to sharpen the peak within the profile.
The default is 0.75.
comb_id : int, optional
Identifier of the combination selected for the ML detection.
If None, the method provides all the possible combinations of
polarimetric variables for VPs/QVPs. The default is None.
plot_method : bool, optional
Plot the ML detection method. The default is False.
Notes
-----
1. Based on the methodology described in [1]_
References
----------
.. [1] Sanchez-Rivas, D. and Rico-Ramirez, M. A. (2021)
"Detection of the melting level with polarimetric weather radar"
in Atmospheric Measurement Techniques Journal, Volume 14, issue 4,
pp. 2873–2890, 13 Apr 2021 https://doi.org/10.5194/amt-14-2873-2021
"""
min_hidx = rut.find_nearest(pol_profs.georef['profiles_height [km]'],
min_h)
max_hidx = rut.find_nearest(pol_profs.georef['profiles_height [km]'],
max_h)
# The user shall use the combID described in the paper, thus it is
# necessary to adjust to python indexing.
if comb_id is not None:
comb_idpy = comb_id-1
else:
comb_idpy = None
if self.profs_type.lower() == 'vps':
if 'ZH [dBZ]' and 'rhoHV [-]' in pol_profs.vps:
profzh = pol_profs.vps['ZH [dBZ]'].copy()
profrhv = pol_profs.vps['rhoHV [-]'].copy()
else:
raise TowerpyError(r'Profiles of $Z_H$ and $\rho_{HV}$ are '
'required to run this function')
if 'ZDR [dB]' in pol_profs.vps:
profzdr = pol_profs.vps['ZDR [dB]'].copy()
else:
print(r'$Z_{DR}$ profile was not found. A dummy one was '
'built to run the method.')
profzdr = np.ones_like(profzh)
if 'PhiDP [deg]' in pol_profs.vps:
if phidp_peak == 'left':
profpdp = pol_profs.vps['PhiDP [deg]'].copy()
elif phidp_peak == 'right':
profpdp = pol_profs.vps['PhiDP [deg]'].copy()
profpdp *= -1
else:
print(r'$Phi_{DP}$ profile was not found. A dummy one was '
'built to run the method.')
profpdp = np.ones_like(profzh)
if 'gradV [dV/dh]' in pol_profs.vps:
if gradv_peak == 'left':
profdvel = pol_profs.vps['gradV [dV/dh]'].copy()
elif gradv_peak == 'right':
profdvel = pol_profs.vps['gradV [dV/dh]'].copy()
profdvel *= -1
else:
print('gradV [dV/dh] profile was not found. A dummy one was '
'built to run the method.')
profdvel = np.ones_like(profzh)
elif self.profs_type.lower() == 'qvps':
if 'ZH [dBZ]' and 'rhoHV [-]' in pol_profs.qvps:
profzh = pol_profs.qvps['ZH [dBZ]'].copy()
profrhv = pol_profs.qvps['rhoHV [-]'].copy()
else:
raise TowerpyError(r'Profiles of $Z_H$ and $\rho_{HV}$ are '
'required to run this function')
if 'ZDR [dB]' in pol_profs.qvps:
profzdr = pol_profs.qvps['ZDR [dB]'].copy()
else:
print(r'$Z_{DR}$ profile was not found. A dummy one was '
'built to run the method.')
profzdr = np.ones_like(profzh)
if 'PhiDP [deg]' in pol_profs.qvps:
if phidp_peak == 'left':
profpdp = pol_profs.qvps['PhiDP [deg]'].copy()
elif phidp_peak == 'right':
profpdp = pol_profs.qvps['PhiDP [deg]'].copy()
profpdp *= -1
else:
print(r'$Phi_{DP}$ profile was not found. A dummy one was '
'built to run the method.')
profpdp = np.ones_like(profzh)
elif self.profs_type.lower() == 'rd-qvps':
if 'ZH [dBZ]' and 'rhoHV [-]' in pol_profs.rd_qvps:
profzh = pol_profs.rd_qvps['ZH [dBZ]'].copy()
profrhv = pol_profs.rd_qvps['rhoHV [-]'].copy()
else:
raise TowerpyError(r'At least $Z_H$ and $\rho_{HV}$ are '
+ 'required to run this function')
if 'ZDR [dB]' in pol_profs.rd_qvps:
profzdr = pol_profs.rd_qvps['ZDR [dB]'].copy()
else:
print(r'$Z_{DR}$ profile was not found. A dummy one was '
'built to run the method.')
profzdr = np.ones_like(profzh)
if 'PhiDP [deg]' in pol_profs.rd_qvps:
if phidp_peak == 'left':
profpdp = pol_profs.rd_qvps['PhiDP [deg]'].copy()
elif phidp_peak == 'right':
profpdp = pol_profs.rd_qvps['PhiDP [deg]'].copy()
profpdp *= -1
else:
print(r'$Phi_{DP}$ profile was not found. A dummy one was '
'built to run the method.')
profpdp = np.ones_like(profzh)
# Normalise ZH and rhoHV
profzh[profzh < zhnorm_min] = zhnorm_min
profzh[profzh > zhnorm_max] = zhnorm_max
profzh_norm = rut.normalisenanvalues(profzh, zhnorm_min, zhnorm_max)
profrhv[profrhv < rhvnorm_min] = rhvnorm_min
profrhv[profrhv > rhvnorm_max] = rhvnorm_max
profrhv_norm = rut.normalisenanvalues(
profrhv, rhvnorm_min, rhvnorm_max)
# Combine ZH and rhoHV (norm) to create a new profile
profcombzh_rhv = profzh_norm[min_hidx:max_hidx]*(
1-profrhv_norm[min_hidx:max_hidx])
# Detect peaks within the new profile
pkscombzh_rhv = MeltingLayer.findpeaksboundaries(
profcombzh_rhv,
pol_profs.georef['profiles_height [km]'][min_hidx:max_hidx],
param_k=param_k)
# If no peaks were found, the profile is classified as No ML signatures
if (all(value is np.nan for value in pkscombzh_rhv.values())
or pkscombzh_rhv['peakmaxvalue'] < param_k):
mlyr = np.nan
mlrand = np.nan
combin = np.nan
idxml_top_it1 = 0
comb_mult = []
comb_mult_w = []
idxml_btm_it1 = 0
else:
peakcombzh_rhv = (pol_profs.georef['profiles_height [km]']
[min_hidx:max_hidx][pkscombzh_rhv['idxmax']])
idxml_btm_it1 = rut.find_nearest(
pol_profs.georef['profiles_height [km]'],
peakcombzh_rhv-upplim_thr)
idxml_top_it1 = rut.find_nearest(
pol_profs.georef['profiles_height [km]'],
peakcombzh_rhv+upplim_thr)
if idxml_top_it1 > min_hidx:
if self.profs_type.lower() == 'vps':
n = 5
ncomb = [1-rut.normalisenan(
profdvel[idxml_btm_it1:idxml_top_it1]),
profzh_norm[idxml_btm_it1:idxml_top_it1],
rut.normalisenan(
profzdr[idxml_btm_it1:idxml_top_it1]),
1-profrhv_norm[idxml_btm_it1:idxml_top_it1],
1-rut.normalisenan(
profpdp[idxml_btm_it1:idxml_top_it1])]
else:
n = 4
ncomb = [profzh_norm[idxml_btm_it1:idxml_top_it1],
rut.normalisenan(
profzdr[idxml_btm_it1:idxml_top_it1]),
1-profrhv_norm[idxml_btm_it1:idxml_top_it1],
rut.normalisenan(
profpdp[idxml_btm_it1:idxml_top_it1])]
combin = np.array(list(map(list, product([0, 1],
repeat=n)))[1:])
comb_mult = []
for i, j in enumerate(combin):
nfin4 = []
[idx] = np.where(combin[i] == 1)
for idxcomb in idx:
nfin = ncomb[idxcomb]
nfin4.append(nfin)
nfin5 = reduce(lambda x, y: x*y, nfin4)
comb_mult.append(nfin5)
comb_mult_w = [i-(param_w*(np.gradient(np.gradient(i))))
for i in comb_mult]
mlrand = [MeltingLayer.findpeaksboundaries(
i, pol_profs.georef['profiles_height [km]'][idxml_btm_it1:idxml_top_it1],
param_k=param_k)
for i in comb_mult_w]
for i, j in enumerate(mlrand):
if mlrand[i]['peakmaxvalue'] <= param_k:
mlrand[i] = {k: np.nan for k in mlrand[i]}
if (mlrand[i]['mltop'] < min_h
or mlrand[i]['mltop'] > max_h
# or mlrand[i]['mltop'] <= 0
):
mlrand[i] = {k: np.nan for k in mlrand[i]}
# if mlrand[i]['mlbtm'] <= 0:
# mlrand[i]['mlbtm'] = 0
mlrandf = [{'ml_top': n['mltop'],
'ml_bottom': n['mlbtm'],
'ml_thickness': n['mltop']-n['mlbtm'],
'ml_peakh': n['mlpeak'],
'ml_peakv': n['peakmaxvalue']}
for n in mlrand]
if comb_idpy is None:
mlyr = mlrandf
else:
mlyr = mlrandf[comb_idpy]
else:
mlrand = np.nan
combin = np.nan
mlyr = np.nan
comb_mult = []
if isinstance(mlrand, list) and mlrand and ~np.isnan(mlrand[7]['idxmax']):
bb_intensity = profzh[idxml_btm_it1:idxml_top_it1][mlrand[7]['idxmax']]
bb_peakh = pol_profs.georef['profiles_height [km]'][idxml_btm_it1:idxml_top_it1][mlrand[7]['idxmax']]
else:
bb_intensity = np.nan
bb_peakh = np.nan
if plot_method:
rad_interactive.ml_detectionvis(
pol_profs.georef['profiles_height [km]'], profzh_norm,
profrhv_norm, profcombzh_rhv, pkscombzh_rhv, comb_mult,
comb_mult_w, comb_idpy, mlrand, min_hidx, max_hidx,
param_k, idxml_btm_it1, idxml_top_it1)
if comb_idpy is None:
self.ml_id = mlyr
elif comb_idpy is not None and isinstance(mlyr, dict):
self.ml_top = mlyr['ml_top']
self.ml_bottom = mlyr['ml_bottom']
self.ml_thickness = mlyr['ml_thickness']
self.profpeakv = mlyr['ml_peakv']
self.profpeakh = mlyr['ml_peakh']
self.bbpeakvalue = bb_intensity
self.bb_peakh = bb_peakh
else:
self.ml_top = np.nan
self.ml_bottom = np.nan
self.ml_thickness = np.nan
self.profpeakv = np.nan
self.profpeakh = np.nan
self.bbpeakvalue = bb_intensity
self.bb_peakh = bb_peakh
[docs]
def ml_ppidelimitation(self, rad_georef, rad_params, beam_cone='centre',
classid=None, plot_method=False):
"""
Create a PPI depicting the limits of the melting layer.
Parameters
----------
rad_georef : dict
Georeferenced data containing descriptors of the azimuth, gates
and beam height, amongst others.
rad_params : dict
Radar technical details.
beam_cone : {'centre', 'top', 'bottom', 'all'}, optional
Beam‑height descriptor to use for ML gating.
'centre' uses ``beam_height [km]`` (default),
'top' uses ``beamtop_height [km]``,
'bottom' uses ``beambottom_height [km]``,
'all' returns results for all three.
classid : dict, optional
Modifies the key/values of the melting layer delimitation
(regionID). The default are the same as in regionID.
plot_method : bool, optional
Plot the results of the ML delimitation. The default is False.
Attributes
----------
regionID : dict
Key/values of the rain limits:
'rain' = 1
'mlyr' = 2
'solid_pcp' = 3
"""
ml_top = self.ml_top
ml_thickness = self.ml_thickness
ml_bottom = self.ml_bottom
self.regionID = {'rain': 1.,
'mlyr': 2.,
'solid_pcp': 3.}
if classid is not None:
self.regionID.update(classid)
gbeam2use = (['beam_height [km]'] if beam_cone == 'centre' else
['beamtop_height [km]'] if beam_cone == 'top' else
['beambottom_height [km]'] if beam_cone == 'bottom' else
['beam_height [km]', 'beamtop_height [km]',
'beambottom_height [km]'])
# =============================================================================
mlylims = {}
for gb2d in gbeam2use:
if isinstance(ml_top, (int, float)):
mlt_idx = [rut.find_nearest(nbh, ml_top)
for nbh in rad_georef[gb2d]]
elif isinstance(ml_top, (np.ndarray, list, tuple)):
mlt_idx = [rut.find_nearest(nbh, ml_top[cnt])
for cnt, nbh in
enumerate(rad_georef[gb2d])]
if isinstance(ml_bottom, (int, float)):
if np.isnan(ml_bottom):
ml_bottom = ml_top - ml_thickness
mlb_idx = [rut.find_nearest(nbh, ml_bottom)
for nbh in rad_georef[gb2d]]
elif isinstance(ml_bottom, (np.ndarray, list, tuple)):
ml_bottom = [mlt - ml_thickness[c1]
if np.isnan(ml_bottom[c1]) else ml_bottom[c1]
for c1, mlt in enumerate(ml_top)]
mlb_idx = [rut.find_nearest(nbh, ml_bottom[cnt])
for cnt, nbh in
enumerate(rad_georef[gb2d])]
ashape = np.zeros((rad_params['nrays'], rad_params['ngates']))
for cnt, azi in enumerate(ashape):
azi[:mlb_idx[cnt]] = self.regionID['rain']
azi[mlt_idx[cnt]:] = self.regionID['solid_pcp']
ashape[ashape == 0] = self.regionID['mlyr']
if gb2d == 'beam_height [km]':
mlylims['pcp_region [HC]'] = ashape
elif gb2d == 'beamtop_height [km]':
mlylims['pcp_region_tb [HC]'] = ashape
elif gb2d == 'beambottom_height [km]':
mlylims['pcp_region_bb [HC]'] = ashape
# =============================================================================
# self.mlyr_limits = {'pcp_region [HC]': ashape}
self.mlyr_limits = mlylims
if plot_method:
rad_display.plot_ppi(rad_georef, rad_params, self.mlyr_limits,
cbticks=self.regionID,
ucmap='tpylc_div_yw_gy_bu')
# def ml_ppidelimitation(self, rad_georef, rad_params, beam_cone='centre',
# classid=None, plot_method=False):
# """
# Create a PPI depicting the limits of the melting layer.
# Parameters
# ----------
# rad_georef : dict
# Georeferenced data containing descriptors of the azimuth, gates
# and beam height, amongst others.
# rad_params : dict
# Radar technical details.
# beam_cone : {'centre', 'top', 'bottom', 'all'}, optional
# Beam‑height descriptor to use for ML delimitation.
# 'centre' uses ``beam_height [km]`` (default),
# 'top' uses ``beamtop_height [km]``,
# 'bottom' uses ``beambottom_height [km]``,
# 'all' returns results for all three.
# classid : dict, optional
# Modifies the key/values of the melting layer delimitation
# (regionID). The default are the same as in regionID.
# plot_method : bool, optional
# Plot the results of the ML delimitation. The default is False.
# Attributes
# ----------
# regionID : dict
# Key/values of the rain limits:
# 'rain' = 1
# 'mlyr' = 2
# 'solid_pcp' = 3
# """
# ml_top = self.ml_top
# ml_thickness = self.ml_thickness
# ml_bottom = self.ml_bottom
# self.regionID = {'rain': 1.,
# 'mlyr': 2.,
# 'solid_pcp': 3.}
# if classid is not None:
# self.regionID.update(classid)
# beam_map = {
# 'centre': ['beam_height [km]'],
# 'top': ['beamtop_height [km]'],
# 'bottom': ['beambottom_height [km]'],
# 'all': ['beam_height [km]', 'beamtop_height [km]',
# 'beambottom_height [km]']}
# gbeam2use = beam_map.get(beam_cone, beam_map['centre'])
# # =============================================================================
# nrays = rad_params['nrays']
# ngates = rad_params['ngates']
# # --- normalise ml_top ---
# if np.isscalar(ml_top):
# ml_top_arr = np.full(nrays, ml_top, dtype=float)
# else:
# ml_top_arr = np.asarray(ml_top, dtype=float)
# if ml_top_arr.shape[0] != nrays:
# raise ValueError("ml_top must have length nrays")
# # --- handle ml_bottom according to your rules ---
# if np.isscalar(ml_bottom):
# if np.isnan(ml_bottom):
# ml_bottom_arr = ml_top_arr - ml_thickness
# else:
# ml_bottom_arr = np.full(nrays, ml_bottom, dtype=float)
# else:
# ml_bottom_arr = np.asarray(ml_bottom, dtype=float)
# if ml_bottom_arr.shape[0] != nrays:
# raise ValueError("ml_bottom must have length nrays")
# if np.isnan(ml_bottom_arr).any():
# raise ValueError("ml_bottom array must not contain NaN")
# mlylims = {}
# rain_id = self.regionID['rain']
# solid_id = self.regionID['solid_pcp']
# mlyr_id = self.regionID['mlyr']
# key_map = {'beam_height [km]': 'pcp_region [HC]',
# 'beamtop_height [km]': 'pcp_region_tb [HC]',
# 'beambottom_height [km]':'pcp_region_bb [HC]'}
# gate_idx = np.arange(ngates)[None, :]
# for gb2d in gbeam2use:
# ashape = np.full((nrays, ngates), mlyr_id, dtype=float)
# rain_mask = gate_idx < ml_bottom_arr[:, None]
# solid_mask = gate_idx >= ml_top_arr[:, None]
# ashape[rain_mask] = rain_id
# ashape[solid_mask] = solid_id
# # store result
# mlylims[key_map[gb2d]] = ashape
# # =============================================================================
# self.mlyr_limits = mlylims
# if plot_method:
# rad_display.plot_ppi(rad_georef, rad_params, self.mlyr_limits,
# cbticks=self.regionID,
# ucmap='tpylc_div_yw_gy_bu')