Source code for towerpy.io.ukmo

"""Towerpy: an open-source toolbox for processing polarimetric radar data."""

import ctypes as ctp
import platform
from pathlib import Path
import datetime as dt
from zoneinfo import ZoneInfo
import numpy as np
import numpy.ctypeslib as npct
from ..georad import georef_rdata as geo
from ..base import TowerpyError


# TODO: add xarray compatibility
[docs] class Rad_scan: """ A Towerpy class to store radar data. Attributes ---------- elev_angle : float Elevation angle at which the scan was taken, in deg. file_name : str Name of the file containing radar data. scandatetime : datetime Date and time of scan. site_name : str Name of the radar site. georef : dict Georeferenced data containing descriptors of the azimuth, gates and beam height, amongst others. params : dict Radar technical details. vars : dict Radar variables retrieved from the file. """ def __init__(self, filename, radar_site):
[docs] self.file_name = filename
[docs] self.site_name = radar_site
[docs] def ppi_ukmoraw(self, get_polvar='all', exclude_vars=None, tz='Europe/London'): """ Read raw polarimetric variables from current UKMO PPI binary files. Parameters ---------- get_polvar : str, optional Define variables to read by the function. The default is 'all'. exclude_vars : list, optional Define variables to discard. The default is None. tz : str Key/name of the radar data timezone. The given tz value is then retrieved from the ZoneInfo module. Default is 'Europe/London' Notes ----- 1. This function uses the shared object 'lnxlibreadpolarradardata' or the dynamic link library 'w64libreadpolarradardata' depending on the operating system (OS). Examples -------- >>> rdata = io.ukmo.Rad_scan('metoffice-c-band-rain-radar_chenies_201804090938_raw-dual-polar-augldr-lp-el0.dat') >>> rdata.ukmo_rawpol() """ # Define Ctypes parameters array1d = npct.ndpointer(dtype=np.double, ndim=1, flags='CONTIGUOUS') array2d = npct.ndpointer(dtype=np.double, ndim=2, flags='CONTIGUOUS') if platform.system() == 'Linux': librp = npct.load_library('lnxlibreadpolarradardata.so', Path(__file__).parent.absolute()) elif platform.system() == 'Windows': librp = ctp.cdll.LoadLibrary(f'{Path(__file__).parent.absolute()}' + '/w64libreadpolarradardata.dll') else: librp = None raise TowerpyError(f'Oops!... The {platform.system} OS ' 'is not currently ' 'compatible with this version of Towerpy') librp.readpolarradardata.restype = None librp.readpolarradardata.argtypes = [ctp.c_char_p, array1d, array2d, array1d, array1d, array1d, array1d, array1d, ctp.c_char_p] fname = str.encode(self.file_name) # Create empty arrays to read nrays/ngates emptyarr1 = [np.empty(20) for i in range(8)] emptyarr1[1] = np.empty((1, 1)) emptyarr1[7] = bytes(16) librp.readpolarradardata(ctp.c_char_p(fname), emptyarr1[0], emptyarr1[1], emptyarr1[2], emptyarr1[3], emptyarr1[4], emptyarr1[5], emptyarr1[6], ctp.c_char_p(emptyarr1[7])) nrays, ngates = int(emptyarr1[6][2]), int(emptyarr1[6][1]) # read all radar variables if get_polvar == 'all' or get_polvar is None: emptyarr2 = [np.empty(20) for i in range(8)] emptyarr2[0] = np.array([0, nrays, ngates], dtype=float) emptyarr2[1] = np.empty((nrays, ngates)) emptyarr2[2] = np.empty((nrays)) emptyarr2[3] = np.empty((nrays)) emptyarr2[4] = np.empty((ngates)) emptyarr2[5] = np.empty(6) emptyarr2[7] = bytes(16) librp.readpolarradardata(ctp.c_char_p(fname), emptyarr2[0], emptyarr2[1], emptyarr2[2], emptyarr2[3], emptyarr2[4], emptyarr2[5], emptyarr2[6], ctp.c_char_p(emptyarr2[7])) nvar = int(emptyarr2[6][0]) emptyarr = [np.empty(20) for i in range(8)] emptyarr[0] = np.array([0, nrays, ngates], dtype=float) emptyarr[1] = np.empty((nrays, ngates)) emptyarr[2] = np.empty((nrays)) emptyarr[3] = np.empty((nrays)) emptyarr[4] = np.empty((ngates)) emptyarr[5] = np.empty(6) emptyarr[7] = bytes(16) vardat = {} varnam = {} dicaxs = {} for i in range(nvar): emptyarr[0][0] = i librp.readpolarradardata(ctp.c_char_p(fname), emptyarr[0], emptyarr[1], emptyarr[2], emptyarr[3], emptyarr[4], emptyarr[5], emptyarr[6], ctp.c_char_p(emptyarr[7])) varname = emptyarr[7].decode() varnam[i] = varname[0:varname.find(']')+1] vardat[i] = np.array(emptyarr[1]) dicaxs[i] = [emptyarr[6][4], emptyarr[6][5]] if emptyarr[0][0] == 0: outpar = np.array(emptyarr[6]) else: # read rad variable defined by user emptyarr = [np.empty(20) for i in range(8)] emptyarr[0] = np.array([0, nrays, ngates], dtype=float) emptyarr[1] = np.empty((nrays, ngates)) emptyarr[2] = np.empty((nrays)) emptyarr[3] = np.empty((nrays)) emptyarr[4] = np.empty((ngates)) emptyarr[5] = np.empty(6) emptyarr[7] = bytes(16) vardat = {} varnam = {} dicaxs = {} if get_polvar == 'ZH [dBZ]': nvar = 0 elif get_polvar == 'ZDR [dB]': nvar = 1 elif get_polvar == 'PhiDP [deg]': nvar = 2 elif get_polvar == 'rhoHV [-]': nvar = 3 elif get_polvar == 'V [m/s]': nvar = 4 elif get_polvar == 'W [m/s]': nvar = 5 elif get_polvar == 'CI [dB]': nvar = 6 elif get_polvar == 'SQI [-]': nvar = 7 elif get_polvar == 'LDR [dB]': nvar = 1 else: raise TowerpyError(f'Oops!... The variable {nvar}' 'cannot be retreived') emptyarr[0][0] = nvar librp.readpolarradardata(ctp.c_char_p(fname), emptyarr[0], emptyarr[1], emptyarr[2], emptyarr[3], emptyarr[4], emptyarr[5], emptyarr[6], ctp.c_char_p(emptyarr[7])) varname = emptyarr[7].decode() varnam[nvar] = varname[0:varname.find(']')+1] vardat[nvar] = np.array(emptyarr[1]) dicaxs[nvar] = [emptyarr[6][4], emptyarr[6][5]] outpar = np.array(emptyarr[6]) poldata = {varnam[i]: j for (i, j) in vardat.items()} if any(v.startswith('Zh') for k, v in varnam.items()): poldata['ZH [dBZ]'] = poldata.pop('Zh [dBZ]') if any(v.startswith('Zdr') for k, v in varnam.items()): poldata['ZDR [dB]'] = poldata.pop('Zdr [dB ]') if any(v.startswith('RhoHV') for k, v in varnam.items()): poldata['rhoHV [-]'] = poldata.pop('RhoHV [ ]') if any(v.startswith('LDR') for k, v in varnam.items()): poldata['LDR [dB]'] = poldata.pop('LDR [dB ]') if any(v.startswith('Phi') for k, v in varnam.items()): poldata['PhiDP [deg]'] = poldata.pop('Phidp [deg]') if any(not v for k, v in varnam.items()): poldata['Absphase_V [ ]'] = poldata.pop('') if any(v.startswith('CI [-') for k, v in varnam.items()): poldata['CI [dB]'] = poldata.pop('CI [- ]') if any(v.startswith('V') for k, v in varnam.items()): poldata['V [m/s]'] = poldata.pop('V [m/s]') # poldata = dict(sorted(poldata.items(), reverse=True)) if exclude_vars is not None: evars = exclude_vars poldata = {k: val for k, val in poldata.items() if k not in evars} # Create dict to store radparameters dttime = dt.datetime(int(emptyarr[5][0]), int(emptyarr[5][1]), int(emptyarr[5][2]), int(emptyarr[5][3]), int(emptyarr[5][4]), int(emptyarr[5][5]), tzinfo=ZoneInfo(tz)) outpar[17] = np.rad2deg(emptyarr[3][0]) parameters = {'nvars': int(outpar[0]), 'ngates': int(outpar[1]), 'nrays': int(outpar[2]), 'gateres [m]': outpar[3], 'rpm': outpar[6], 'prf [Hz]': outpar[7], 'pulselength [ns]': outpar[8], 'avsamples': outpar[9], 'wavelength [cm]': outpar[10]*100, 'latitude [dd]': outpar[11], 'longitude [dd]': outpar[12], 'altitude [m]': outpar[13], 'easting [km]': outpar[14]/1000, 'northing [km]': outpar[15]/1000, 'radar constant [dB]': outpar[16], 'elev_ang [deg]': outpar[17], 'datetime': dttime, 'datetimearray': emptyarr[5]} if 'metoffice' in self.file_name: parameters['beamwidth [deg]'] = 1. parameters['site_name'] = self.site_name parameters['range_start [m]'] = emptyarr[4][0] # Create dict to store geospatial data # rh, th = np.meshgrid(emptyarr[4]/1000, emptyarr[2]) geogrid = {'range [m]': emptyarr[4], 'elev [rad]': emptyarr[3], 'azim [rad]': emptyarr[2]} self.elev_angle = parameters['elev_ang [deg]'] self.scandatetime = parameters['datetime'] self.georef = geogrid self.params = parameters self.vars = poldata
[docs] def ppi_ukmogeoref(self): """ Create georeferenced data from the UKMO PPI scan. Notes ----- 1. This method wraps :func:`geo.ppi_georef` and adds OSGB coordinates based on radar easting and northing offsets. 2. OSGB coordinates are computed by shifting the relative Cartesian grid ('grid_rectx', 'grid_recty') by the radar easting/northing (in kilometres) and converting to metres. """ geogrid, _ = geo.ppi_georef(self.params, georef=self.georef) geogrid['grid_osgbx'] = ( geogrid['grid_rectx'] + self.params['easting [km]'])*1000 geogrid['grid_osgby'] = ( geogrid['grid_recty'] + self.params['northing [km]'])*1000 self.georef.update(geogrid)