"""Towerpy: an open-source toolbox for processing polarimetric radar data."""
# import copy
import cartopy.io.shapereader as shpreader
import numpy as np
import xarray as xr
from scipy import interpolate
[docs]
def find_nearest(iarray, val2search, mode="any"):
"""
Return the index of the closest value to a given number.
Parameters
----------
iarray : array
Input array.
val2search : float or int
Value to search into the array.
mode : {"any", "major", "minor"}, optional
- "any": closest value in the array (default)
- "major": closest local maximum
- "minor": closest local minimum
Returns
-------
idx : float or int
Index into the array, or None if no candidate found.
"""
a = np.asarray(iarray)
if mode == "any":
return int(np.abs(a - val2search).argmin())
# interior extrema candidates
if mode == "major":
mask = (a[1:-1] > a[:-2]) & (a[1:-1] > a[2:])
elif mode == "minor":
mask = (a[1:-1] < a[:-2]) & (a[1:-1] < a[2:])
else:
raise ValueError("mode must be 'any', 'major', or 'minor'")
candidates = np.where(mask)[0] + 1 # shift to original indices
# if none found, fall back to nearest anywhere
if candidates.size == 0:
return int(np.abs(a - val2search).argmin())
return int(candidates[np.abs(a[candidates] - val2search).argmin()])
[docs]
def normalisenan(a):
"""
Scale input vectors to unit norm, ignoring any NaNs.
Parameters
----------
a : array
The data to normalise, element by element.
Returns
-------
normarray : array
Normalised input a.
"""
normarray = (a-np.nanmin(a))/(np.nanmax(a)-np.nanmin(a))
return normarray
[docs]
def normalisenanvalues(a, vmin, vmax):
"""
Scale input vectors to unit norm, by scaling the vector to given values.
Parameters
----------
a : array
The data to normalize.
vmin : float or int
Minimum value used to scale the data.
vmax : float or int
Maximum value used to scale the data.
Returns
-------
normarray : array
Normalised data.
"""
normarray = (a-vmin)/(vmax-vmin)
return normarray
[docs]
def fillnan1d(x):
"""
Fill nan value with last non nan value.
Parameters
----------
x : array
The data to be filled.
Returns
-------
xf
Array with nan values filtered.
"""
x = np.array(x, dtype=float)
mask = np.isnan(x)
idx = np.where(~mask, np.arange(mask.size), 0)
np.maximum.accumulate(idx, out=idx)
return x[idx]
[docs]
def interp_nan(x, y, kind='linear', nan_type='mask'):
"""
Interpolate 1-D arrays to fill nan-masked values.
Parameters
----------
x : array_like
1-D array.
y : array_like
1-D array.
kind : str, optional
Specifies the kind of interpolation used in the
scipy.interpolate.interp1d function. The default is 'linear'.
nan_type : str, optional
Type of non-valid values, either 'nan' or 'mask'.
The default is 'mask'.
"""
if nan_type == 'mask':
idx_valid = np.ma.where(np.isfinite(y))
elif nan_type == 'nan':
idx_valid = np.where(np.isfinite(y))
f = interpolate.interp1d(x[idx_valid], y[idx_valid], bounds_error=False,
kind=kind)
if nan_type == 'mask':
nanitp = np.where(~np.isfinite(y).mask, y, f(x))
elif nan_type == 'nan':
nanitp = np.where(np.isfinite(y), y, f(x))
return nanitp
[docs]
def maf_radial(rad_vars, maf_len=3, maf_ignorenan=True, maf_extendvalid=False,
maf_params=None):
r"""
Apply a Moving-Average Filter to variables along the radial direction.
Parameters
----------
rad_vars : dict
Radar variables to be smoothed.
maf_len : int, optional
Odd number used to apply a moving average filter to each beam and
smooth the signal. The default is 3.
maf_ignorenan : bool, optional
Set to False if nan values shall not be filtered. The default is True.
maf_params : dict, optional
Filters the radar variable using min and max constraints.
The default are:
:math:`ZH` [dBZ]: [-np.inf, np.inf]
:math:`Z_{DR}` [dB]: [-np.inf, np.inf]
:math:`\Phi_{DP}` [deg]: [-np.inf, np.inf]
:math:`\rho_{HV}` [-]: [-np.inf, np.inf]
:math:`V` [m/s]: [-np.inf, np.inf]
:math:`LDR` [dB]: [-np.inf, np.inf]35, 35, 3]
Returns
-------
mafvars : dict
Transformed data.
"""
lpv = {'ZH [dBZ]': [-np.inf, np.inf], 'ZDR [dB]': [-np.inf, np.inf],
'PhiDP [deg]': [-np.inf, np.inf], 'rhoHV [-]': [-np.inf, np.inf],
'V [m/s]': [-np.inf, np.inf], 'LDR [dB]': [-np.inf, np.inf],
'Rainfall [mm/h]': [-np.inf, np.inf],
'KDP [deg/km]': [-np.inf, np.inf]}
for rkey in rad_vars.keys():
if rkey not in lpv:
lpv[rkey] = [-np.inf, np.inf]
if maf_params is not None:
lpv.update(maf_params)
for k, v in lpv.items():
v.append(maf_len)
if maf_ignorenan:
m = np.full(rad_vars[list(rad_vars.keys())[0]].shape, 0.)
for k, v in rad_vars.items():
m[v < lpv[k][0]] = np.nan
m[v > lpv[k][1]] = np.nan
vars_mask = {keys: np.ma.masked_invalid(np.ma.masked_array(values, m))
for keys, values in rad_vars.items()}
else:
m = np.full(rad_vars[list(rad_vars.keys())[0]].shape, 1.)
for k, v in rad_vars.items():
m[v < lpv[k][0]] = np.nan
m[v > lpv[k][1]] = np.nan
vars_mask = {keys: values * m
for keys, values in rad_vars.items()}
mafvars = {}
if maf_extendvalid and not maf_ignorenan:
for k, v in vars_mask.items():
rvarmaf = []
for beam in v:
# extend last valid value with ignore nan False
mask = np.isnan(beam)
wl = np.ones(maf_len, dtype=int)
amaf = np.convolve(np.where(mask, 0, beam), wl,
mode='same')/np.convolve(~mask, wl,
mode='same')
rvarmaf.append(amaf)
mafvars[k] = np.array(rvarmaf)
elif not maf_extendvalid:
for k, v in vars_mask.items():
rvarmaf = []
for beam in v:
amaf = np.ma.convolve(beam,
np.ones(lpv[k][2])/lpv[k][2],
mode='same')
rvarmaf.append(amaf)
mafvars[k] = np.array(rvarmaf)
return mafvars
[docs]
def get_datashp(fname, key2read=None):
"""
Read in data from *.shp files using cartopy.
Parameters
----------
fname : str
Name of the *.shp file.
key2read : str, optional
Name of the feature to retrieve from the *.shp file.
The default is None.
Returns
-------
shpdatalist : list
Features extrated from the file.
"""
shpdata = shpreader.Reader(fname)
shpdata1 = shpdata.records()
shpattr = next(shpdata1)
print('The available key-attributes of the shapefile are: \n' +
f' {sorted(shpattr.attributes.keys())}')
if key2read is None:
key_att = input('Enter key attribute:')
else:
key_att = key2read
print(f'Reading shapefile using -{key_att}- as key-attribute')
# getshpdata = lambda shpdata1: shpdata1.attributes[key_att]
gshpdat = sorted(shpdata.records(),
key=lambda shpdata1: shpdata1.attributes[key_att])
shpdatalist = [i.attributes for i in gshpdat]
return shpdatalist
[docs]
def get_windows_data(wdw_size, wdw_coords, array2extract):
"""
Retrieve data from a PPI scan using size-defined windows.
Parameters
----------
wdw_size : 2-element tuple or list of int
Size of the window [row, cols]. Must be odd numbers.
wdw_coords : 2-element tuple or list of int/floats
Coordinates within the PPI scan of the centre of the window to extract.
array2extract : array
Data array from which the data will bve retrieved.
Returns
-------
wdw : list
Retrieved data.
"""
if all([i % 2 for i in wdw_size]):
start_row_index = wdw_coords[0] - wdw_size[0]//2
end_row_index = wdw_coords[0] + (wdw_size[0]//2) + 1
start_column_index = wdw_coords[1] - wdw_size[1]//2
end_column_index = wdw_coords[1] + (wdw_size[1]//2) + 1
wdw = array2extract[start_row_index:end_row_index,
start_column_index:end_column_index]
else:
wdw = print('The window rows/columns must be and odd numer')
return wdw
[docs]
def rolling_window(a, window, mode='constant', constant_values=np.nan):
"""
Compute a rolling window using np.lib.stride_tricks for fast computation.
Parameters
----------
a : array
Array to be smoothed.
window : 2-element tuple or list, optional
Window size (m, n) used to apply a moving average filter. m and n must
be odd numbers for the m-rays and n-gates. The default is (1, 3).
mode : str, optional
Mode used to pad the array. See numpy.pad for more information.
The default is 'constant'.
constant_values : int or float, optional
Used in 'constant'. The values to set the padded values for each axis.
The default is np.nan.
Notes
-----
It is expected to pad arrays that represent radar data in polar format.
Thus, the rays are wrapped, and the gates are extended for consistency.
"""
if mode == 'edge':
apad = np.pad(a, ((0, 0), (window[1]//2, window[1]//2)),
mode='edge')
elif mode == 'constant':
apad = np.pad(a, ((0, 0), (window[1]//2, window[1]//2)),
mode='constant', constant_values=(constant_values))
if window[0] > 1:
apad = np.pad(apad, ((window[0]//2, window[0]//2), (0, 0)),
mode='wrap')
return np.lib.stride_tricks.sliding_window_view(apad, window)[:, :, 0]
[docs]
def compute_texture(tpy_coordlist, rad_vars, wdw_size=[3, 3], classid=None):
"""
Compute the texture of given arrays.
Parameters
----------
tpy_coordlist : 3-element tuple or list of int
Coordinates and classID of a given pixel.
rad_vars : dict
Radar variables used to compute the texture.
wdw_size : 2-element tuple or list of int
Size of the window [row, cols]. Must be odd numbers. The default is
[3, 3].
classid : dict
Key/values of the echoes classification:
'precipi' = 0
'clutter' = 5
Returns
-------
rvars : dict
Texture values.
"""
echoesID = {'precipi': 0, 'clutter': 5}
if classid is not None:
echoesID.update(classid)
vval = {keid: {nvar: [vvar[nval[0], nval[1]] for nidx, nval
in enumerate(tpy_coordlist) if nval[2] == veid]
for nvar, vvar in rad_vars.items()}
for keid, veid in echoesID.items()}
vtxt = {keid: {'s'+nvar: [np.nanstd(get_windows_data(wdw_size, nval[:-1],
vvar), ddof=1)
for nidx, nval
in enumerate(tpy_coordlist) if nval[2] == veid]
for nvar, vvar in rad_vars.items()}
for keid, veid in echoesID.items()}
rvars = {k: vtxt[k] | vval[k] for k in echoesID.keys()}
return rvars
[docs]
def idx_consecutive(array1d, step_size=1, group_size=1):
"""
Find the index of consecutive non-nan values within a 1D array.
Parameters
----------
array1d : array_like
Input 1-D array.
stepsize : int or float, optional
Difference between consecutive elements. The default is 1.
group_size : int, optional
Minimum size of the grouped consecutive-valid values.
The default is 3.
"""
# IDX of non-nan values
idx_valid = np.nonzero(~np.isnan(array1d))[0]
# Group consecutive non-nan values
cons_group = np.split(
idx_valid, np.where(np.diff(idx_valid) != step_size)[0]+1)
cons_idx = np.hstack([i for i in cons_group if len(i) >= group_size])
return cons_idx
[docs]
def linspace_step(start, stop, step):
"""Like np.linspace but uses step instead of num."""
return np.linspace(start, stop, int((stop - start) / step + 1))
# =============================================================================
# %% xarray implementation
# =============================================================================
[docs]
def _to_kilometers(var: xr.DataArray):
'''Convert units of DataArray in metres to kilometres.'''
units = var.attrs.get("units", "").lower()
if units in ["m", "meter", "meters", "metre", "metres"]:
out = var / 1000
out.attrs = var.attrs.copy()
out.attrs["units"] = "km"
return out
elif units in ["km", "kilometer", "kilometers", "kilometre",
"kilometres"]:
return var
# fallback: assume metres
out = var / 1000
out.attrs = var.attrs.copy()
out.attrs["units"] = "km"
return out
[docs]
def xr_hist2d(x, y, x_edges, y_edges, dim):
"""
Compute a vectorised 2D histogram of two DataArrays.
Parameters
----------
x, y : xr.DataArray
Input variables to histogram over, sharing the same core dimensions.
x_edges, y_edges : array-like
Bin edges for the x and y axes, defining the histogram grid.
dim : list[str]
Dimensions over which the histogram is computed and reduced.
Returns
-------
xr.DataArray
A 2D histogram with labelled ``x_bin`` and ``y_bin`` dimensions,
vectorised over all non-core dimensions.
"""
def _hist2d(a, b):
H, _, _ = np.histogram2d(a.ravel(), b.ravel(), bins=[x_edges, y_edges])
return H
return xr.apply_ufunc(_hist2d, x, y, input_core_dims=[dim, dim],
output_core_dims=[["x_bin", "y_bin"]],
vectorize=True, dask="parallelized",
output_dtypes=[float]).assign_coords(
x_bin=x_edges[:-1], y_bin=y_edges[:-1])
[docs]
def record_provenance(ds, function, outputs, parameters, step=None, inputs=None,
extra_attrs=None):
"""
Update a Dataset’s provenance chain by recording function steps, inputs, outputs, and parameters.
Parameters
----------
ds : xr.Dataset
Dataset whose provenance metadata is to be updated.
function : str
Name of the function contributing to the correction chain.
outputs : list[str]
Variables produced or modified by the function.
parameters : dict
Parameter values used during the function call.
step : str, optional
Logical step name for grouping related function calls; defaults to ``function``.
inputs : list[str], optional
Variables read or required by the function.
extra_attrs : dict, optional
Additional attributes to attach directly to the Dataset.
Returns
-------
xr.Dataset
The Dataset with an updated ``correction_chain`` attribute reflecting the new provenance entry.
"""
# Im not sure if timestamp is necessary
# timestamp = dt.datetime.utcnow().isoformat() + "Z"
# Normalise outputs to unique sorted list
outputs = sorted(set(outputs))
inputs = sorted(set(inputs or []))
chain = list(ds.attrs.get("correction_chain", []))
# Use (function, step) as grouping key
key_func = function
key_step = step or function # default: step == function
existing = None
for entry in chain:
if (entry.get("function") == key_func
and entry.get("step") == key_step):
# and entry.get("scope") == "dataset")
existing = entry
break
if existing:
# Merge outputs
existing_outputs = set(existing.get("outputs", []))
existing["outputs"] = sorted(existing_outputs.union(outputs))
# Merge parameters (update with latest values)
existing["parameters"].update(parameters)
# Merge inputs
existing_inputs = set(existing.get("inputs", []))
existing["inputs"] = sorted(existing_inputs.union(inputs))
else:
# Create new entry
entry = {
# "timestamp": timestamp,
"function": key_func,
"step": key_step,
"outputs": outputs,
"inputs": inputs,
"parameters": parameters,
# "scope": "dataset",
"provenance": "Towerpy",
}
chain.append(entry)
ds.attrs["correction_chain"] = chain
if extra_attrs:
for k, v in extra_attrs.items():
ds.attrs[k] = v
return ds
[docs]
def apply_correction_chain(ds, varname, step, params, mask=None, suffix="_corr"):
"""
Apply a correction mask to a variable and add it back into the Dataset,
appending provenance-aware metadata to track the full correction chain.
Parameters
----------
ds : xarray.Dataset
Dataset containing the variable to correct.
varname : str
Name of the variable to correct.
mask : xarray.DataArray
Boolean or categorical mask aligned with ds[varname].
step : str
Short tag describing the correction step (e.g. "SNR_correction").
params : dict
Parameters used in the correction (e.g. {"min_snr": 5}).
suffix : str, optional
Suffix for the corrected variable name. Default "_corr".
Returns
-------
ds : xarray.Dataset
Dataset with new corrected variable added.
"""
corrected_name = f"{varname}{suffix}"
if mask is not None:
corrected = ds[varname].where(mask == 0)
else:
corrected = ds[varname]
new_entry = {
"step": step,
"params": params,
"parent": varname,
"outputs": [corrected_name], # <-- explicit outputs
"mode": "overwrite" if suffix == "" else "preserve"
}
chain = ds[varname].attrs.get("correction_chain", [])
chain.append(new_entry)
# Base metadata update (variable-level only)
meta_update = {
"correction": step,
"correction_params": params,
"parent": varname,
"history": ds[varname].attrs.get("history", "") + f" | {new_entry}",
"correction_chain": chain,
"provenance": "Towerpy",
"provenance_step": step,
"provenance_base": varname,
"mode": new_entry["mode"],
}
# Variable-specific refinements (inline)
if varname == "DBTH" and step == "offset_correction":
meta_update.update({
"standard_name": "radar_equivalent_reflectivity_factor_h",
"long_name": "Equivalent reflectivity factor H",
"short_name": "DBZH",
"units": "dBZ",
"provenance_offset_applied": float(params.get("offset", 0.0)),
})
# Apply variable-level attrs and insert into dataset
corrected.attrs = {**ds[varname].attrs, **meta_update}
ds[corrected_name] = corrected
return ds