Source code for towerpy.io.ncas

"""Towerpy: an open-source toolbox for processing polarimetric radar data."""

import datetime as dt
from zoneinfo import ZoneInfo
import numpy as np
import netCDF4 as nc
from scipy import constants as sc
from ..utils import radutilities as rut
from ..georad import georef_rdata as geo
from ..base import TowerpyError


[docs] class Rad_scan: """A Towerpy class to store radar scans data.""" def __init__(self, filename): """ Init Rad_scan Class with a filename. Parameters ---------- fname : string """
[docs] self.file_name = filename
[docs] def ppi_ncasx(self, ncfilename, elevangle=None, bvars=False, tz='Europe/London'): """ Read-in NCAS NetCDF radar data. Parameters ---------- ncfilename : dict NetCDF radar file. elevangle : float, optional Elevation angle to work with, in deg. The default is None. Returns ------- None. """ ncdat = {k: v[:] for k, v in nc.Dataset(ncfilename).variables.items()} print(f'This file contains data taken at the following elevation' f" angles: \n {ncdat['fixed_angle']}") if elevangle is None: elev = ncdat['fixed_angle'][0] else: if elevangle in ncdat['fixed_angle']: elev = elevangle else: raise TowerpyError('Oops!... Choose one of the following' f" elevations: \n {ncdat['fixed_angle']}") idxelev = np.where(ncdat['elevation'].data == elev) nrays = ncdat['azimuth'][idxelev].shape[0] ngates = ncdat['range [m]'].shape[0] # ====================================================================== # Reads in the netcdf parameters and store them in a dict # ====================================================================== rparams = {k: v for k, v in ncdat.items()} for k, v in rparams.items(): if v.size == ncdat['azimuth'].size: rparams[k] = ncdat[k][idxelev] rparams['ngates'] = ngates rparams['nrays'] = nrays rparams['gateres [m]'] = ncdat['ray_gate_spacing'][idxelev][0] rparams['frequency [GHz]'] = rparams.pop('frequency', np.nan).data/1000000000 rparams['rpm'] = rparams.pop('rpm', np.nan) rparams['prf [Hz]'] = 1/ncdat['prt'][idxelev] rparams['pulselength [ns]'] = rparams.pop('r_calib_pulse_width', np.nan) rparams['avsamples'] = rparams.pop('n_samples', np.nan) rparams['wavelength [cm]'] = (sc.c / rparams['Frequency [GHz]'])/10000000 rparams['latitude [dd]'] = rparams.pop('latitude', np.nan) rparams['longitude [dd]'] = rparams.pop('longitude', np.nan) rparams['altitude [m]'] = rparams.pop('altitude', np.nan) rparams['easting [km]'] = rparams.pop('easting', np.nan) rparams['northing [km]'] = rparams.pop('northing', np.nan) rparams['radar constant [dB]'] = rparams.pop('r_calib_radar_constant_h', np.nan) rparams['elev_ang [deg]'] = rparams.pop('elevation', np.nan)[0] date_string = ''.join([str(a, encoding='UTF-8') for a in ncdat['time_coverage_start'].data]) rparams['datetime'] = dt.datetime.strptime(date_string, "%Y-%m-%dT%H:%M:%SZ", ).replace(tzinfo=ZoneInfo(tz)) rparams['datetimearray'] = list(rparams['datetime'].timetuple())[: -3] rparams['beamwidth [deg]'] = rparams.pop('radar_beam_width_v', np.nan) rparams['status_xml'] = str(rparams.pop('status_xml', np.nan), encoding='UTF-8') rvars = {k: rparams.pop(k, np.nan)[idxelev].data for k, v in ncdat.items() if v.shape == (ncdat['azimuth'].size, ncdat['range [m]'].size)} for k, v in rvars.items(): rvars[k][ncdat[k][idxelev].mask == 1] = np.nan rvars['ZH [dBZ]'] = rvars.pop('dBZ', np.nan) rvars['ZDR [dB]'] = rvars.pop('ZDR', np.nan) rvars['rhoHV [-]'] = rvars.pop('rhoHV', np.nan) rvars['PhiDP [deg]'] = rvars.pop('PhiDP', np.nan) rvars['V [m/s]'] = rvars.pop('V', np.nan) rvars['KDP [deg/km]'] = rvars.pop('KDP', np.nan) # rvars['W [m/s]'] = rvars.pop('W', np.nan) # rvars['SQI [-]'] = rvars.pop('SQI', np.nan) if bvars: rvars = {k: v for k, v in rvars.items() if '[' in k} # ====================================================================== # Rolls the array, so the azimuth 0 matches the row 0 # ====================================================================== azim = np.deg2rad(ncdat['azimuth'][idxelev]) idx0 = abs(rut.find_nearest(azim, 0) - ncdat['azimuth'].size) azim = np.roll(azim, idx0) for k, v in rvars.items(): rvars[k] = np.roll(v, idx0, axis=0) elevrad = np.deg2rad(ncdat['elevation'][idxelev]) gatei = ncdat['ray_start_range'][0] rng = np.arange(gatei, rparams['ngates']*rparams['gateres [m]'], rparams['gateres [m]'], dtype=float) rh, th = np.meshgrid(rng/1000, azim) xgrid, ygrid = geo.pol2cart(rh, np.pi/2-th) geogrid = {'azim [rad]': azim, 'elev [rad]': elevrad, 'range [m]': rng, 'rho': rh, 'theta': th, 'grid_rectx': xgrid, 'grid_recty': ygrid} geogrid['beam_height [km]'] = np.array([geo.height_beamc(ray, rng/1000) for ray in np.rad2deg(elevrad)]) geogrid['beambottom_height [km]'] = np.array([geo.height_beamc(ray-rparams['beamwidth [deg]']/2, rng/1000) for ray in np.rad2deg(elevrad)]) geogrid['beamtop_height [km]'] = np.array([geo.height_beamc(ray+rparams['beamwidth [deg]']/2, rng/1000) for ray in np.rad2deg(elevrad)]) self.georef = geogrid self.params = rparams self.vars = rvars