Source code for towerpy.utils.radutilities

"""Towerpy: an open-source toolbox for processing polarimetric radar data."""

# import copy
import cartopy.io.shapereader as shpreader
import numpy as np
import xarray as xr
from scipy import interpolate


[docs] def find_nearest(iarray, val2search, mode="any"): """ Return the index of the closest value to a given number. Parameters ---------- iarray : array Input array. val2search : float or int Value to search into the array. mode : {"any", "major", "minor"}, optional - "any": closest value in the array (default) - "major": closest local maximum - "minor": closest local minimum Returns ------- idx : float or int Index into the array, or None if no candidate found. """ a = np.asarray(iarray) if mode == "any": return int(np.abs(a - val2search).argmin()) # interior extrema candidates if mode == "major": mask = (a[1:-1] > a[:-2]) & (a[1:-1] > a[2:]) elif mode == "minor": mask = (a[1:-1] < a[:-2]) & (a[1:-1] < a[2:]) else: raise ValueError("mode must be 'any', 'major', or 'minor'") candidates = np.where(mask)[0] + 1 # shift to original indices # if none found, fall back to nearest anywhere if candidates.size == 0: return int(np.abs(a - val2search).argmin()) return int(candidates[np.abs(a[candidates] - val2search).argmin()])
[docs] def normalisenan(a): """ Scale input vectors to unit norm, ignoring any NaNs. Parameters ---------- a : array The data to normalise, element by element. Returns ------- normarray : array Normalised input a. """ normarray = (a-np.nanmin(a))/(np.nanmax(a)-np.nanmin(a)) return normarray
[docs] def normalisenanvalues(a, vmin, vmax): """ Scale input vectors to unit norm, by scaling the vector to given values. Parameters ---------- a : array The data to normalize. vmin : float or int Minimum value used to scale the data. vmax : float or int Maximum value used to scale the data. Returns ------- normarray : array Normalised data. """ normarray = (a-vmin)/(vmax-vmin) return normarray
[docs] def fillnan1d(x): """ Fill nan value with last non nan value. Parameters ---------- x : array The data to be filled. Returns ------- xf Array with nan values filtered. """ x = np.array(x, dtype=float) mask = np.isnan(x) idx = np.where(~mask, np.arange(mask.size), 0) np.maximum.accumulate(idx, out=idx) return x[idx]
[docs] def interp_nan(x, y, kind='linear', nan_type='mask'): """ Interpolate 1-D arrays to fill nan-masked values. Parameters ---------- x : array_like 1-D array. y : array_like 1-D array. kind : str, optional Specifies the kind of interpolation used in the scipy.interpolate.interp1d function. The default is 'linear'. nan_type : str, optional Type of non-valid values, either 'nan' or 'mask'. The default is 'mask'. """ if nan_type == 'mask': idx_valid = np.ma.where(np.isfinite(y)) elif nan_type == 'nan': idx_valid = np.where(np.isfinite(y)) f = interpolate.interp1d(x[idx_valid], y[idx_valid], bounds_error=False, kind=kind) if nan_type == 'mask': nanitp = np.where(~np.isfinite(y).mask, y, f(x)) elif nan_type == 'nan': nanitp = np.where(np.isfinite(y), y, f(x)) return nanitp
[docs] def maf_radial(rad_vars, maf_len=3, maf_ignorenan=True, maf_extendvalid=False, maf_params=None): r""" Apply a Moving-Average Filter to variables along the radial direction. Parameters ---------- rad_vars : dict Radar variables to be smoothed. maf_len : int, optional Odd number used to apply a moving average filter to each beam and smooth the signal. The default is 3. maf_ignorenan : bool, optional Set to False if nan values shall not be filtered. The default is True. maf_params : dict, optional Filters the radar variable using min and max constraints. The default are: :math:`ZH` [dBZ]: [-np.inf, np.inf] :math:`Z_{DR}` [dB]: [-np.inf, np.inf] :math:`\Phi_{DP}` [deg]: [-np.inf, np.inf] :math:`\rho_{HV}` [-]: [-np.inf, np.inf] :math:`V` [m/s]: [-np.inf, np.inf] :math:`LDR` [dB]: [-np.inf, np.inf]35, 35, 3] Returns ------- mafvars : dict Transformed data. """ lpv = {'ZH [dBZ]': [-np.inf, np.inf], 'ZDR [dB]': [-np.inf, np.inf], 'PhiDP [deg]': [-np.inf, np.inf], 'rhoHV [-]': [-np.inf, np.inf], 'V [m/s]': [-np.inf, np.inf], 'LDR [dB]': [-np.inf, np.inf], 'Rainfall [mm/h]': [-np.inf, np.inf], 'KDP [deg/km]': [-np.inf, np.inf]} for rkey in rad_vars.keys(): if rkey not in lpv: lpv[rkey] = [-np.inf, np.inf] if maf_params is not None: lpv.update(maf_params) for k, v in lpv.items(): v.append(maf_len) if maf_ignorenan: m = np.full(rad_vars[list(rad_vars.keys())[0]].shape, 0.) for k, v in rad_vars.items(): m[v < lpv[k][0]] = np.nan m[v > lpv[k][1]] = np.nan vars_mask = {keys: np.ma.masked_invalid(np.ma.masked_array(values, m)) for keys, values in rad_vars.items()} else: m = np.full(rad_vars[list(rad_vars.keys())[0]].shape, 1.) for k, v in rad_vars.items(): m[v < lpv[k][0]] = np.nan m[v > lpv[k][1]] = np.nan vars_mask = {keys: values * m for keys, values in rad_vars.items()} mafvars = {} if maf_extendvalid and not maf_ignorenan: for k, v in vars_mask.items(): rvarmaf = [] for beam in v: # extend last valid value with ignore nan False mask = np.isnan(beam) wl = np.ones(maf_len, dtype=int) amaf = np.convolve(np.where(mask, 0, beam), wl, mode='same')/np.convolve(~mask, wl, mode='same') rvarmaf.append(amaf) mafvars[k] = np.array(rvarmaf) elif not maf_extendvalid: for k, v in vars_mask.items(): rvarmaf = [] for beam in v: amaf = np.ma.convolve(beam, np.ones(lpv[k][2])/lpv[k][2], mode='same') rvarmaf.append(amaf) mafvars[k] = np.array(rvarmaf) return mafvars
[docs] def get_datashp(fname, key2read=None): """ Read in data from *.shp files using cartopy. Parameters ---------- fname : str Name of the *.shp file. key2read : str, optional Name of the feature to retrieve from the *.shp file. The default is None. Returns ------- shpdatalist : list Features extrated from the file. """ shpdata = shpreader.Reader(fname) shpdata1 = shpdata.records() shpattr = next(shpdata1) print('The available key-attributes of the shapefile are: \n' + f' {sorted(shpattr.attributes.keys())}') if key2read is None: key_att = input('Enter key attribute:') else: key_att = key2read print(f'Reading shapefile using -{key_att}- as key-attribute') # getshpdata = lambda shpdata1: shpdata1.attributes[key_att] gshpdat = sorted(shpdata.records(), key=lambda shpdata1: shpdata1.attributes[key_att]) shpdatalist = [i.attributes for i in gshpdat] return shpdatalist
[docs] def get_windows_data(wdw_size, wdw_coords, array2extract): """ Retrieve data from a PPI scan using size-defined windows. Parameters ---------- wdw_size : 2-element tuple or list of int Size of the window [row, cols]. Must be odd numbers. wdw_coords : 2-element tuple or list of int/floats Coordinates within the PPI scan of the centre of the window to extract. array2extract : array Data array from which the data will bve retrieved. Returns ------- wdw : list Retrieved data. """ if all([i % 2 for i in wdw_size]): start_row_index = wdw_coords[0] - wdw_size[0]//2 end_row_index = wdw_coords[0] + (wdw_size[0]//2) + 1 start_column_index = wdw_coords[1] - wdw_size[1]//2 end_column_index = wdw_coords[1] + (wdw_size[1]//2) + 1 wdw = array2extract[start_row_index:end_row_index, start_column_index:end_column_index] else: wdw = print('The window rows/columns must be and odd numer') return wdw
[docs] def rolling_window(a, window, mode='constant', constant_values=np.nan): """ Compute a rolling window using np.lib.stride_tricks for fast computation. Parameters ---------- a : array Array to be smoothed. window : 2-element tuple or list, optional Window size (m, n) used to apply a moving average filter. m and n must be odd numbers for the m-rays and n-gates. The default is (1, 3). mode : str, optional Mode used to pad the array. See numpy.pad for more information. The default is 'constant'. constant_values : int or float, optional Used in 'constant'. The values to set the padded values for each axis. The default is np.nan. Notes ----- It is expected to pad arrays that represent radar data in polar format. Thus, the rays are wrapped, and the gates are extended for consistency. """ if mode == 'edge': apad = np.pad(a, ((0, 0), (window[1]//2, window[1]//2)), mode='edge') elif mode == 'constant': apad = np.pad(a, ((0, 0), (window[1]//2, window[1]//2)), mode='constant', constant_values=(constant_values)) if window[0] > 1: apad = np.pad(apad, ((window[0]//2, window[0]//2), (0, 0)), mode='wrap') return np.lib.stride_tricks.sliding_window_view(apad, window)[:, :, 0]
[docs] def compute_texture(tpy_coordlist, rad_vars, wdw_size=[3, 3], classid=None): """ Compute the texture of given arrays. Parameters ---------- tpy_coordlist : 3-element tuple or list of int Coordinates and classID of a given pixel. rad_vars : dict Radar variables used to compute the texture. wdw_size : 2-element tuple or list of int Size of the window [row, cols]. Must be odd numbers. The default is [3, 3]. classid : dict Key/values of the echoes classification: 'precipi' = 0 'clutter' = 5 Returns ------- rvars : dict Texture values. """ echoesID = {'precipi': 0, 'clutter': 5} if classid is not None: echoesID.update(classid) vval = {keid: {nvar: [vvar[nval[0], nval[1]] for nidx, nval in enumerate(tpy_coordlist) if nval[2] == veid] for nvar, vvar in rad_vars.items()} for keid, veid in echoesID.items()} vtxt = {keid: {'s'+nvar: [np.nanstd(get_windows_data(wdw_size, nval[:-1], vvar), ddof=1) for nidx, nval in enumerate(tpy_coordlist) if nval[2] == veid] for nvar, vvar in rad_vars.items()} for keid, veid in echoesID.items()} rvars = {k: vtxt[k] | vval[k] for k in echoesID.keys()} return rvars
[docs] def idx_consecutive(array1d, step_size=1, group_size=1): """ Find the index of consecutive non-nan values within a 1D array. Parameters ---------- array1d : array_like Input 1-D array. stepsize : int or float, optional Difference between consecutive elements. The default is 1. group_size : int, optional Minimum size of the grouped consecutive-valid values. The default is 3. """ # IDX of non-nan values idx_valid = np.nonzero(~np.isnan(array1d))[0] # Group consecutive non-nan values cons_group = np.split( idx_valid, np.where(np.diff(idx_valid) != step_size)[0]+1) cons_idx = np.hstack([i for i in cons_group if len(i) >= group_size]) return cons_idx
[docs] def linspace_step(start, stop, step): """Like np.linspace but uses step instead of num.""" return np.linspace(start, stop, int((stop - start) / step + 1))
# ============================================================================= # %% xarray implementation # =============================================================================
[docs] def _to_kilometers(var: xr.DataArray): '''Convert units of DataArray in metres to kilometres.''' units = var.attrs.get("units", "").lower() if units in ["m", "meter", "meters", "metre", "metres"]: out = var / 1000 out.attrs = var.attrs.copy() out.attrs["units"] = "km" return out elif units in ["km", "kilometer", "kilometers", "kilometre", "kilometres"]: return var # fallback: assume metres out = var / 1000 out.attrs = var.attrs.copy() out.attrs["units"] = "km" return out
[docs] def xr_hist2d(x, y, x_edges, y_edges, dim): """ Compute a vectorised 2D histogram of two DataArrays. Parameters ---------- x, y : xr.DataArray Input variables to histogram over, sharing the same core dimensions. x_edges, y_edges : array-like Bin edges for the x and y axes, defining the histogram grid. dim : list[str] Dimensions over which the histogram is computed and reduced. Returns ------- xr.DataArray A 2D histogram with labelled ``x_bin`` and ``y_bin`` dimensions, vectorised over all non-core dimensions. """ def _hist2d(a, b): H, _, _ = np.histogram2d(a.ravel(), b.ravel(), bins=[x_edges, y_edges]) return H return xr.apply_ufunc(_hist2d, x, y, input_core_dims=[dim, dim], output_core_dims=[["x_bin", "y_bin"]], vectorize=True, dask="parallelized", output_dtypes=[float]).assign_coords( x_bin=x_edges[:-1], y_bin=y_edges[:-1])
[docs] def record_provenance(ds, function, outputs, parameters, step=None, inputs=None, extra_attrs=None): """ Update a Dataset’s provenance chain by recording function steps, inputs, outputs, and parameters. Parameters ---------- ds : xr.Dataset Dataset whose provenance metadata is to be updated. function : str Name of the function contributing to the correction chain. outputs : list[str] Variables produced or modified by the function. parameters : dict Parameter values used during the function call. step : str, optional Logical step name for grouping related function calls; defaults to ``function``. inputs : list[str], optional Variables read or required by the function. extra_attrs : dict, optional Additional attributes to attach directly to the Dataset. Returns ------- xr.Dataset The Dataset with an updated ``correction_chain`` attribute reflecting the new provenance entry. """ # Im not sure if timestamp is necessary # timestamp = dt.datetime.utcnow().isoformat() + "Z" # Normalise outputs to unique sorted list outputs = sorted(set(outputs)) inputs = sorted(set(inputs or [])) chain = list(ds.attrs.get("correction_chain", [])) # Use (function, step) as grouping key key_func = function key_step = step or function # default: step == function existing = None for entry in chain: if (entry.get("function") == key_func and entry.get("step") == key_step): # and entry.get("scope") == "dataset") existing = entry break if existing: # Merge outputs existing_outputs = set(existing.get("outputs", [])) existing["outputs"] = sorted(existing_outputs.union(outputs)) # Merge parameters (update with latest values) existing["parameters"].update(parameters) # Merge inputs existing_inputs = set(existing.get("inputs", [])) existing["inputs"] = sorted(existing_inputs.union(inputs)) else: # Create new entry entry = { # "timestamp": timestamp, "function": key_func, "step": key_step, "outputs": outputs, "inputs": inputs, "parameters": parameters, # "scope": "dataset", "provenance": "Towerpy", } chain.append(entry) ds.attrs["correction_chain"] = chain if extra_attrs: for k, v in extra_attrs.items(): ds.attrs[k] = v return ds
[docs] def apply_correction_chain(ds, varname, step, params, mask=None, suffix="_corr"): """ Apply a correction mask to a variable and add it back into the Dataset, appending provenance-aware metadata to track the full correction chain. Parameters ---------- ds : xarray.Dataset Dataset containing the variable to correct. varname : str Name of the variable to correct. mask : xarray.DataArray Boolean or categorical mask aligned with ds[varname]. step : str Short tag describing the correction step (e.g. "SNR_correction"). params : dict Parameters used in the correction (e.g. {"min_snr": 5}). suffix : str, optional Suffix for the corrected variable name. Default "_corr". Returns ------- ds : xarray.Dataset Dataset with new corrected variable added. """ corrected_name = f"{varname}{suffix}" if mask is not None: corrected = ds[varname].where(mask == 0) else: corrected = ds[varname] new_entry = { "step": step, "params": params, "parent": varname, "outputs": [corrected_name], # <-- explicit outputs "mode": "overwrite" if suffix == "" else "preserve" } chain = ds[varname].attrs.get("correction_chain", []) chain.append(new_entry) # Base metadata update (variable-level only) meta_update = { "correction": step, "correction_params": params, "parent": varname, "history": ds[varname].attrs.get("history", "") + f" | {new_entry}", "correction_chain": chain, "provenance": "Towerpy", "provenance_step": step, "provenance_base": varname, "mode": new_entry["mode"], } # Variable-specific refinements (inline) if varname == "DBTH" and step == "offset_correction": meta_update.update({ "standard_name": "radar_equivalent_reflectivity_factor_h", "long_name": "Equivalent reflectivity factor H", "short_name": "DBZH", "units": "dBZ", "provenance_offset_applied": float(params.get("offset", 0.0)), }) # Apply variable-level attrs and insert into dataset corrected.attrs = {**ds[varname].attrs, **meta_update} ds[corrected_name] = corrected return ds