Source code for towerpy.eclass.nme

"""Towerpy: an open-source toolbox for processing polarimetric radar data."""

from pathlib import Path
import platform
import warnings
import copy
import ctypes as ctp
import numpy as np
import numpy.ctypeslib as npct
from ..datavis import rad_display
from ..base import TowerpyError


warnings.filterwarnings("ignore", category=RuntimeWarning)


[docs] class NME_ID: r""" A class to identify non-meteorlogical echoes within radar data. Attributes ---------- elev_angle : float Elevation angle at which the scan was taken, in deg. file_name : str Name of the file containing radar data. scandatetime : datetime Date and time of scan. site_name : str Name of the radar site. echoesID : dict Key/values of the ME/NME classification: 'pcpn' = 0 'noise' = 3 'clutter' = 5 nme_classif : dict Results of the clutter classification. vars : dict Radar variables with clutter echoes removed. """ def __init__(self, radobj=None):
[docs] self.elev_angle = getattr(radobj, 'elev_angle', None) if radobj else None
[docs] self.file_name = getattr(radobj, 'file_name', None) if radobj else None
[docs] self.scandatetime = getattr(radobj, 'scandatetime', None) if radobj else None
[docs] self.site_name = getattr(radobj, 'site_name', None) if radobj else None
[docs] def lsinterference_filter(self, rad_georef, rad_vars, data2correct=None, rhv_min=0.3, classid=None, plot_method=False): """ Filter linear signatures and speckles. Parameters ---------- rad_georef : dict Georeferenced data containing descriptors of the azimuth, gates and beam height, amongst others. rad_vars : dict Radar variables used to identify the LS and speckles. rhv_min : float, optional Minimal threshold in rhoHV [-] used to discard non-meteorological scatterers. The default is 0.3 classid : dict, optional Modifies the key/values of the LS/Despeckling results (echoesID). The default are the same as in echoesID (see class definition). data2correct : dict, optional Variables into which LS ans speckles are removed. The default is None. plot_method : bool, optional Plot the LS/speckles classification method. The default is False. Notes ----- 1. Radar variables should already be (at least) filtered for noise to ensure accurate and reliable results. """ self.echoesID = {'pcpn': 0, 'noise': 3, 'clutter': 5} if classid is not None: self.echoesID.update(classid) window = (3, 3) mode = 'constant' arr_rhohv = rad_vars['rhoHV [-]'].copy() constant_values = np.nan # Create a padded array if mode == 'edge': apad = np.pad(arr_rhohv, ((0, 0), (window[1]//2, window[1]//2)), mode='edge') elif mode == 'constant': apad = np.pad(arr_rhohv, ((0, 0), (window[1]//2, window[1]//2)), mode='constant', constant_values=(constant_values)) if window[0] > 1: apad = np.pad(apad, ((window[0]//2, window[0]//2), (0, 0)), mode='wrap') # Check that all sorrounding values of pixel are nan to remove speckles spckl1 = np.array([[np.nan if ~np.isnan(vbin) and np.isnan(apad[nray-1][nbin-1]) and np.isnan(apad[nray-1][nbin]) and np.isnan(apad[nray-1][nbin+1]) and np.isnan(apad[nray][nbin-1]) and np.isnan(apad[nray][nbin+1]) and np.isnan(apad[nray+1][nbin-1]) and np.isnan(apad[nray+1][nbin]) and np.isnan(apad[nray+1][nbin+1]) else 1 for nbin, vbin in enumerate(apad[nray]) if nbin != 0 and nbin != apad.shape[1]-1] for nray in range(apad.shape[0]) if nray != 0 and nray != apad.shape[0]-1], dtype=np.float64) spckl1[:, 0] = np.nan # Filter using rhohv threshold. spckl1[rad_vars['rhoHV [-]'] <= rhv_min] = np.nan # Detect linear signatures. spckl2 = np.array([[np.nan if ~np.isnan(vbin) and np.isnan(apad[nray-1][nbin]) and np.isnan(apad[nray+1][nbin]) else 1 for nbin, vbin in enumerate(apad[nray]) if nbin != 0 and nbin != apad.shape[1]-1] for nray in range(apad.shape[0]) if nray != 0 and nray != apad.shape[0]-1], dtype=np.float64) spckl1[:, 0] = 1 # Classifies the pixels according to echoesID fclass = np.where(np.isnan(rad_vars['ZH [dBZ]']), 3., 0.) fclass2 = np.where(np.isnan(spckl1 * spckl2), 5., 0.) fclass = np.where(fclass2 == 5., 5., fclass) fclass[:, :5] = 0 if classid is not None: fclass[fclass == 0] = self.echoesID['pcpn'] fclass[fclass == 3] = self.echoesID['noise'] fclass[fclass == 5] = self.echoesID['clutter'] lsc_data = {'classif [EC]': fclass} if data2correct is not None: data2cc = copy.deepcopy(data2correct) for key, values in data2cc.items(): values[fclass != self.echoesID['pcpn']] = np.nan self.vars = data2cc self.ls_dsp_class = lsc_data if plot_method: rad_params = {} if self.elev_angle: rad_params['elev_ang [deg]'] = self.elev_angle else: rad_params['elev_ang [deg]'] = 'surveillance scan' if self.scandatetime: rad_params['datetime'] = self.scandatetime else: rad_params['datetime'] = None rad_display.plot_ppi(rad_georef, rad_params, lsc_data, cbticks=self.echoesID, ucmap='tpylc_div_yw_gy_bu')
[docs] def clutter_id(self, rad_georef, rad_params, rad_vars, path_mfs=None, min_snr=0, binary_class=255, clmap=None, classid=None, data2correct=None, plot_method=False): r""" Classify between weather and clutter echoes. Parameters ---------- rad_georef : dict Georeferenced data containing descriptors of the azimuth, gates and beam height, amongst others. rad_params : dict Radar technical details. rad_vars : dict Radar variables used to identify the clutter echoes. path_mfs : str, optional Location of the membership function files. min_snr : float, optional Reference noise value. The default is 0. binary_class : int Binary code used for clutter classification: :math:`\rho_{HV} = 128` :math:`CM = 64` :math:`LDR = 32` :math:`V = 16` :math:`\sigma(\rho_{HV}) = 8` :math:`\sigma(\Phi_{DP}) = 4` :math:`\sigma(Z_{DR}) = 2` :math:`\sigma(Z_{H}) = 1` The default is 255, i.e. all the variables are used. clmap : array, optional Clutter frequency map in the interval [0-1]. The default is None. classid : dict, optional Modifies the key/values of the clutter classification results (echoesID). The default are the same as in echoesID (see class definition). data2correct : dict, optional Variables into which clutter echoes are removed. The default is None. plot_method : bool, optional Plot the clutter classification method. The default is False. Notes ----- 1. Make sure to define which radar variables are used in the classification by setting up the parameter 'binary_class'. 2. This function uses the shared object 'lnxlibclutterclassifier' or the dynamic link library 'w64libclutterclassifier' depending on the operating system (OS). 3. Based on the method described in [1]_ References ---------- .. [1] Rico-Ramirez, M. A., & Cluckie, I. D. (2008). Classification of ground clutter and anomalous propagation using dual-polarization weather radar. IEEE Transactions on Geoscience and Remote Sensing, 46(7), 1892-1904. https://doi.org/10.1109/TGRS.2008.916979 Examples -------- >>> rnme = tp.eclass.nme.NME_ID(rdata) >>> rnme.clutter_id(rdata.georef, rdata.params, rsnr.vars, binary_class=159, min_snr=rsnr.min_snr) binary_class = 159 -> (128+16+8+4+2+1) i.e. :math:`\rho_{HV} + V + \sigma(\rho_{HV}) + \sigma(\Phi_{DP}) + \sigma(Z_{DR}) + \sigma(Z_{H})` """ self.echoesID = {'pcpn': 0, 'noise': 3, 'clutter': 5} if classid is not None: self.echoesID.update(classid) if path_mfs is None: pathmfs = str.encode(str(Path(__file__).parent.absolute()) + '/mfs_cband/') else: pathmfs = str.encode(path_mfs) array1d = npct.ndpointer(dtype=np.double, ndim=1, flags='CONTIGUOUS') array2d = npct.ndpointer(dtype=np.double, ndim=2, flags='CONTIGUOUS') if platform.system() == 'Linux': libcc = npct.load_library('lnxlibclutterclassifier.so', Path(__file__).parent.absolute()) elif platform.system() == 'Windows': libcc = ctp.cdll.LoadLibrary(f'{Path(__file__).parent.absolute()}' + '/w64libclutterclassifier.dll') else: libcc = None raise TowerpyError(f'The {platform.system} OS is not currently' 'compatible with this version of Towerpy') libcc.clutterclassifier.restype = None libcc.clutterclassifier.argtypes = [ctp.c_char_p, ctp.c_int, ctp.c_int, array2d, array2d, array2d, array2d, array2d, array2d, array2d, array1d, array1d, array1d, array1d, array2d] param_clc = np.zeros(5) param_clc[0] = rad_params['radar constant [dB]'] param_clc[1] = min_snr param_clc[2] = binary_class rdatv = copy.deepcopy(rad_vars) rdatp = copy.deepcopy(rad_params) rdatg = copy.deepcopy(rad_georef) clc = np.full(rdatv['ZH [dBZ]'].shape, 0.) if 'LDR [dB]' in rad_vars.keys(): ldr = rdatv['LDR [dB]'] else: ldr = np.full(rdatv['ZH [dBZ]'].shape, 0.)-35 if clmap is None: pc = np.full(rdatv['ZH [dBZ]'].shape, 1.) else: pc = clmap if 'ZDR [dB]' not in rdatv.keys(): rdatv['ZDR [dB]'] = ldr if 'PhiDP [deg]' not in rdatv.keys(): rdatv['PhiDP [deg]'] = ldr if 'rhoHV [-]' not in rdatv.keys(): rdatv['rhoHV [-]'] = ldr np.nan_to_num(rdatv['ZH [dBZ]'], copy=False, nan=-50.) libcc.clutterclassifier(pathmfs, rdatp['nrays'], rdatp['ngates'], rdatv['ZH [dBZ]'], rdatv['ZDR [dB]'], rdatv['PhiDP [deg]'], rdatv['rhoHV [-]'], rdatv['V [m/s]'], ldr, pc, rdatg['range [m]'], rdatg['azim [rad]'], rdatg['elev [rad]'], param_clc, clc) if classid is not None: clc[clc == 0] = self.echoesID['pcpn'] clc[clc == 3] = self.echoesID['noise'] clc[clc == 5] = self.echoesID['clutter'] ccpoldata = {'classif [EC]': clc, 'clutter_map': clmap} if data2correct is not None: data2cc = copy.deepcopy(data2correct) for key, values in data2cc.items(): values[clc != self.echoesID['pcpn']] = np.nan self.vars = data2cc self.nme_classif = ccpoldata if plot_method: if clmap is not None: rad_display.plot_nmeclassif(rad_georef, rad_params, clc, self.echoesID, clmap) else: rad_display.plot_nmeclassif(rad_georef, rad_params, clc, self.echoesID)