Source code for gnssvod.io.preprocessing

"""
preprocess reads files and returns analysis-ready DataSet

gather_stations merges observations from sites according to specified pairing rules over the desired time intervals
"""
# ===========================================================
# ========================= imports =========================
import os
import glob
import numpy as np
import pandas as pd
import xarray as xr
import tempfile
import fnmatch
from pathlib import Path
from typing import Union, Literal, Any
from gnssvod.io.io import Observation
from gnssvod.io.readFile import read_obsFile
from gnssvod.io.exporters import export_as_nc
from gnssvod.position.interpolation import sp3_interp_fast
from gnssvod.position.position import gnssDataframe
from gnssvod.funcs.constants import _system_name
#-------------------------------------------------------------------------
#----------------- FILE SELECTION AND BATCH PROCESSING -------------------
#-------------------------------------------------------------------------
[docs] def preprocess(filepattern: dict, orbit: bool = True, interval: Union[str,pd.Timedelta,None] = None, keepvars: Union[list,None] = None, outputdir: Union[dict, None] = None, overwrite: bool = False, encoding: Union[None, Literal['default'], dict] = 'default', outputresult: bool = False, aux_path: Union[str, None] = None, approx_position: list[float] = None) -> dict[Any,list[Observation]]: """ Reads and processes structured lists of RINEX observation files. Parameters ---------- filepattern : dict Dictionary mapping station names to UNIX-style patterns matching RINEX observation files. For example:: filepattern={'station1':'/path/to/files/of/station1/*O'} orbit : bool, optional If True, downloads orbit solutions and calculates Azimuth and Elevation parameters. If False, no additional GNSS parameters are calculated. interval : str, pandas.Timedelta, or None, optional If None, observations are returned at their original sampling rate. If a string or pandas.Timedelta is provided, observations are resampled (averaged) over that interval. keepvars : list of str or None, optional List of columns to keep after processing, reducing the size of saved data. If None, all columns are kept. outputdir : dict or None, optional Dictionary mapping station names to folders where preprocessed data should be saved. Dictionary keys must match the `filepattern` argument. Data are saved as NetCDF files, reusing the original filenames. If None, data are not saved. overwrite : bool, optional If False (default), files that already exist in the output directory are skipped. encoding : None, str, or dict, optional Controls compression and encoding options when saving NetCDF files. - None: no variable encodings applied. - "default": applies default encoding to SNR, VOD, Azimuth, and Elevation variables. Default encoding is:: { "dtype": "int16", "scale_factor": 0.1, "zlib": True, "_FillValue": -9999, } - dict: per-variable encodings that are passed to :meth:`xarray.Dataset.to_netcdf` for fine-grained control by the user. outputresult : bool, optional If True and `outputdir` is None, observation objects are returned as a dictionary. aux_path : str or None, optional Directory for auxiliary orbit and clock files. If None, a temporary directory is created and cleaned up after processing. approx_position : list of float, optional Cartesian coordinates [X, Y, Z] of the antenna. Used if source RINEX files lack "APPROX POSITION XYZ" and `orbit` is True. To convert geographic coordinates (lat, lon, h) to Cartesian use :func:`gnssvod.geodesy.ell2cart`. Returns ------- dict or None If `outputresult` is True, returns a dictionary with one key per station name. Each value is a list of GNSS observation objects read from the input RINEX files. Examples -------- .. code-block:: python filepattern = { "station1": "/path/to/files/of/station1/*O", "station2": "/path/to/files/of/station2/*O" } interval = "15s" keepvars = ["S1*", "S2*", "Azimuth", "Elevation"] outputdir = { "station1": "/path/where/to/save/preprocessed/data", "station2": "/path/where/to/save/preprocessed/data" } output = preprocess( filepattern=filepattern, orbit=True, interval=interval, keepvars=keepvars, outputdir=outputdir outputresult=True ) """ # set up temporary directory if necessary if orbit and (aux_path is None): tmp_folder = tempfile.TemporaryDirectory() aux_path = tmp_folder.name print(f"Created a temporary directory at {aux_path}") else: tmp_folder = None # grab all files matching the patterns stations = get_filelist(filepattern) out = dict() for item in stations.items(): station_name = item[0] filenames = item[1] # checking which files will be skipped (if necessary) if (not overwrite) and (outputdir is not None): # gather all files that already exist in the outputdir files_to_skip = get_filelist({station_name:f"{outputdir[station_name]}*.nc"}) files_to_skip = [os.path.basename(x) for x in files_to_skip[station_name]] else: files_to_skip = [] # for each file result = [] for i,filename in enumerate(filenames): # determine the name of the output file that will be saved at the end of the loop out_name = os.path.splitext(os.path.basename(filename))[0]+'.nc' # if the name of the saved output file is in the files to skip, skip processing if out_name in files_to_skip: print(f"{out_name} already exists, skipping.. (pass overwrite=True to overwrite)") continue # skip remainder of loop and go directly to next filename # read in the file x = read_obsFile(filename) print(f"Processing {len(x.observation):n} individual observations") # only keep required vars if keepvars is not None: x.observation = subset_vars(x.observation,keepvars) # update the observation_types list x.observation_types = x.observation.columns.to_list() # resample if required if interval is not None: x = resample_obs(x,interval) # calculate Azimuth and Elevation if required if orbit: # use a prescribed position if one was passed as argument if approx_position is not None: x.approx_position = approx_position # check that an approximate position exists before proceeding if (x.approx_position == [0,0,0]) or (x.approx_position is None): raise ValueError("Missing an approximate antenna position. Provide the argument 'approx_position' to preprocess()") print(f"Calculating Azimuth and Elevation") # note: orbit cannot be parallelized easily because it # downloads and unzips third-party files in the current directory if not 'orbit_data' in locals(): # if there is no previous orbit data, the orbit data is returned as well x, orbit_data = add_azi_ele(x, aux_path = aux_path) else: # on following iterations the orbit data is tentatively recycled to reduce computational time x, orbit_data = add_azi_ele(x, orbit_data, aux_path = aux_path) # make sure we drop any duplicates x.observation=x.observation[~x.observation.index.duplicated(keep='first')] # store result in memory if required if outputresult: result.append(x) # write to file if required if outputdir is not None: outpath = str(Path(outputdir[station_name],out_name)) export_as_nc(ds = x.to_xarray(), outpath = outpath, encoding = encoding) print(f"Saved {len(x.observation):n} individual observations in {outpath}") # store station in memory if required if outputresult: out[station_name]=result # clean up temporary directory if one exists if tmp_folder is not None: tmp_folder.cleanup() print(f"Removed the temporary directory at {aux_path}") if outputresult: return out else: return
[docs] def subset_vars(df: pd.DataFrame, keepvars: list, force_epoch_system: bool = True) -> pd.DataFrame: """ Subset an observation DataFrame to keep only selected columns. This function filters columns of a pandas DataFrame based on a list of variable patterns. Optionally, it ensures that the 'epoch' and 'SYSTEM' columns are always retained, as they are required for computing GNSS-derived quantities such as Azimuth and Elevation. Parameters ---------- df : pandas.DataFrame Observation DataFrame, typically obtained from :attr:`~gnssvod.io.Observation.observation`. keepvars : list of str List of variable names or patterns to keep. Pattern matching uses Unix shell-style wildcards (see :func:`fnmatch.filter`). force_epoch_system : bool, optional If True (default), always retain the 'epoch' and 'SYSTEM' columns in addition to `keepvars`. Set to False to keep only columns matching `keepvars`. Returns ------- pandas.DataFrame Subset of `df` containing only the requested columns. Rows with all NaNs in the selected columns are dropped. """ # find all matches for all elements of keepvars keepvars = np.concatenate([fnmatch.filter(df.columns.tolist(),x) for x in keepvars]) # subselect those of the required columns that are present tokeep = np.intersect1d(keepvars,df.columns.tolist()) # + always keep 'epoch' and 'SYSTEM' as they are required for calculating azimuth and elevation if force_epoch_system: tokeep = np.unique(np.concatenate((keepvars,['epoch','SYSTEM']))) else: tokeep = np.unique(keepvars) # find columns not to keep todrop = np.setdiff1d(df.columns.tolist(),tokeep) # drop unneeded columns if len(todrop)>0: df = df.drop(columns=todrop) # drop rows for which all of the required vars are NA df = df.dropna(how='all') return df
[docs] def resample_obs(obs: Observation, interval: str) -> Observation: """ Temporally resample an observation object. This function averages the variables in an :class:`~gnssvod.io.Observation` over a specified time interval while preserving the multi-index (Epoch/SV) structure. The 'epoch' and 'SYSTEM' columns are reconstructed after resampling. Parameters ---------- obs : Observation Observation object to resample. Data are taken from :attr:`~gnssvod.io.Observation.observation`. interval : str Resampling interval as a pandas-compatible frequency string (e.g., '15s', '1min'). Returns ------- Observation The input observation object with the resampled :attr:`~gnssvod.io.Observation.observation` DataFrame and updated :attr:`~gnssvod.io.Observation.interval` (in seconds). """ # list all variables except SYSTEM and epoch as these are handled differently subset = np.setdiff1d(obs.observation.columns.to_list(),['epoch','SYSTEM']) # resample those variables using temporal averaging obs.observation = obs.observation[subset].groupby([pd.Grouper(freq=interval, level='Epoch'),pd.Grouper(level='SV')]).mean() # restore SYSTEM and epoch obs.observation['epoch'] = obs.observation.index.get_level_values('Epoch') obs.observation['SYSTEM'] = _system_name(obs.observation.index.get_level_values("SV")) obs.interval = pd.Timedelta(interval).seconds return obs
[docs] def add_azi_ele(obs: Observation, orbit_data: Union[pd.DataFrame,None] = None, aux_path: Union[str,None] = None) -> tuple[Observation,pd.DataFrame]: """ Adds GNSS azimuth and elevation to an Observation object. This function computes Azimuth and Elevation for all measurements in the provided :class:`Observation` object. If necessary, it will download orbit and clock data to the directory specified by ``aux_path``. Parameters ---------- obs : Observation The GNSS observation object to update. orbit_data : pandas.DataFrame or None, optional Precomputed orbit information. If provided and valid, it will be reused to avoid repeated downloads. aux_path : str or None, optional Directory where auxiliary orbit and clock files will be stored or retrieved from. If None, a temporary directory is used. Returns ------- tuple A tuple containing: - Updated :class:`Observation` object with Azimuth and Elevation columns. - Orbit data used for the computation (may be newly calculated or reused from a previous call). """ start_time = min(obs.observation.index.get_level_values('Epoch')) end_time = max(obs.observation.index.get_level_values('Epoch')) if orbit_data is None: do = True elif (orbit_data.start_time<start_time) and (orbit_data.end_time>end_time) and (orbit_data.interval==obs.interval): # if the orbit for the day corresponding to the epoch and interval is the same as the one that was passed, just reuse it. This drastically reduces the number of times orbit files have to be read and interpolated. do = False else: do = True if do: # read (=usually download) orbit data orbit = sp3_interp_fast(start_time, end_time, interval=obs.interval, aux_path=aux_path) # prepare an orbit object as well orbit_data = orbit orbit_data.start_time = orbit.index.get_level_values('Epoch').min() orbit_data.end_time = orbit.index.get_level_values('Epoch').max() orbit_data.interval = obs.interval else: orbit = orbit_data # calculate the gnss parameters (including azimuth and elevation) gnssdf = gnssDataframe(obs,orbit,cut_off=-10) # add the gnss parameters to the observation dataframe obs.observation = obs.observation.join(gnssdf[['Azimuth','Elevation']]) # drop variables 'epoch' and 'SYSTEM' as they are not needed anymore by gnssDataframe obs.observation = obs.observation.drop(columns=['epoch','SYSTEM']) # update the observation_types list obs.observation_types = obs.observation.columns.to_list() return obs, orbit_data
[docs] def get_filelist(filepatterns: dict) -> dict: """ Retrieve lists of files matching UNIX-style patterns for one or more stations. Parameters ---------- filepatterns : dict Dictionary mapping station names to file search patterns. Each pattern should be a valid glob expression (e.g., '\*.O'). Returns ------- dict Dictionary mapping each station name to a list of matching files. If no files match a given pattern, an empty list is returned. Examples -------- Single station: .. code-block:: python filepatterns = { "station1": "/path/to/files/station1/*.O" } get_filelist(filepatterns) # Output: # { # "station1": [ # "/path/to/files/station1/obs1.O", # "/path/to/files/station1/obs2.O" # ] # } Multiple stations: .. code-block:: python filepatterns = { "station1": "/path/to/files/station1/*.O", "station2": "/path/to/files/station2/*.O" } get_filelist(filepatterns) # Output: # { # "station1": [ # "/path/to/files/station1/obs1.O", # "/path/to/files/station1/obs2.O" # ], # "station2": [ # "/path/to/files/station2/obs1.O", # "/path/to/files/station2/obs2.O" # ] # } """ if not isinstance(filepatterns,dict): raise Exception(f"Expected the input of get_filelist to be a dictionary, got a {type(filepatterns)} instead") filelists = dict() for item in filepatterns.items(): station_name = item[0] search_pattern = item[1] flist = glob.glob(search_pattern) if len(flist)==0: print(f"Could not find any files matching the pattern {search_pattern}") filelists[station_name] = flist return filelists
#-------------------------------------------------------------------------- #----------------- PAIRING OBSERVATION FILES FROM SITES ------------------- #--------------------------------------------------------------------------
[docs] def gather_stations(filepattern: dict, pairings: dict, timeintervals: Union[pd.IntervalIndex,None] = None, keepvars: Union[list,None] = None, outputdir: Union[dict, None] = None, encoding: Union[None, Literal['default'], dict] = None, outputresult: bool = False) -> dict[Any,pd.DataFrame]: """ Merge observations from different sites according to specified pairing rules. The returned dataframe contains a new index level corresponding to each site, with keys given by station names. Parameters ---------- filepattern : dict Dictionary mapping station names to UNIX-style file patterns used to locate preprocessed NetCDF observation files. For example:: filepattern={'station1':'/path/to/files/of/station1/*.nc','station2':'/path/to/files/of/station2/*.nc'} pairings : dict Dictionary mapping case names to tuples of station names indicating which stations should be gathered together. For example:: pairings={'case1':('station1','station2')} If data is saved, the case name is used as the output filename. timeintervals : None or pandas.IntervalIndex, optional Time interval(s) over which data are sequentially gathered (see example below). Sequential processing avoids loading and pairing too much data at once. If ``outputdir`` is not ``None``, the interval frequency also defines how data are saved (e.g. daily files). If ``None``, all available files are used. keepvars : list of str or None, optional List of column names to keep after gathering. Helps reduce the size of the dataset when saving. If ``None``, no columns are removed. outputdir : dict or None, optional Dictionary mapping case names to output directories where gathered data should be saved. For example:: outputdir={'case1':'/path/where/to/save/data'} Data are saved as NetCDF files. The dictionary must be consistent with the ``pairings`` argument. If ``None``, data are not saved. encoding : None, str, or dict, optional Controls compression and encoding options when saving NetCDF files. - ``None``: no variable encodings are applied. - ``"default"``: applies a default encoding to SNR, Azimuth, and Elevation variables. The default encoding is:: { "dtype": "int16", "scale_factor": 0.1, "zlib": True, "_FillValue": -9999, } - ``dict``: per-variable encodings passed directly to :meth:`xarray.Dataset.to_netcdf`, allowing fine-grained customization. outputresult : bool, optional If ``True``, observation objects are also returned as a dictionary. Returns ------- dict or None If ``outputresult`` is ``True``, returns a dictionary with one key per case. Each value is a :class:`pandas.DataFrame` containing the paired data. Examples -------- .. code-block:: python filepattern = { "station1": "/path/to/files/of/station1/*.nc", "station2": "/path/to/files/of/station2/*.nc", } pairings = { "case1": ("station1", "station2") } timeintervals = pd.interval_range( start=pd.Timestamp("2018-01-01"), periods=8, freq="D" ) keepvars = ["S1", "S2", "Azimuth", "Elevation"] outputdir = { "case1": "/path/where/to/save/data" } result = gather_stations( filepattern=filepattern, pairings=pairings, timeintervals=timeintervals, keepvars=keepvars, outputdir=outputdir, outputresult=True ) """ # get all files for all stations filenames = get_filelist(filepattern) print(f'Extracting Epochs from files') # extract only Epoch timestamps from all files (should be fast enough) epochs = {key:[xr.open_mfdataset(x)['Epoch'].values for x in items] for key,items in filenames.items()} # get min and max timestamp for each file (will be used to select which files to read later) epochs_min = {key:[np.min(x) for x in items] for key,items in epochs.items()} epochs_max = {key:[np.max(x) for x in items] for key,items in epochs.items()} result=dict() for case_name, station_names in pairings.items(): out = [] print(f'----- Processing {case_name}') if timeintervals is None: timeintervals = pd.interval_range(start=epochs_min, end=epochs_max) for interval in timeintervals: print(f'-- Processing interval {interval}') iout = [] # gather all data required for that interval for station_name in station_names: # check which files have data that overlaps with the desired time intervals isin = [interval.overlaps(pd.Interval(left=pd.Timestamp(tmin), right=pd.Timestamp(tmax))) for tmin,tmax in zip(epochs_min[station_name],epochs_max[station_name])] print(f'Found {sum(isin)} file(s) for {station_name}') if sum(isin)>0: print(f'Reading') # open those files and convert them to pandas dataframes idata = [xr.open_mfdataset(x).to_dataframe().dropna(how='all') \ for x in np.array(filenames[station_name])[isin]] # concatenate idata = pd.concat(idata) # keep only data falling within the interval idata = idata.loc[[x in interval for x in idata.index.get_level_values('Epoch')]] # drop duplicates and sort the dataframes idata = idata[~idata.index.duplicated()].sort_index(level=['Epoch','SV']) # add the station data in the iout list iout.append(idata) else: iout.append(pd.DataFrame(index=pd.MultiIndex(levels=[[], []], codes=[[], []], names=['Epoch', 'SV']))) print(f"No data for station {station_name}.") continue if not all([x.empty for x in iout]): print(f'Concatenating stations') iout = pd.concat(iout, keys=station_names, names=['Station']) # only keep required vars and drop potential empty rows if keepvars is not None: iout = subset_vars(iout,keepvars,force_epoch_system=False) if len(iout)==0: print(f"No observations left after subsetting columns (argument 'keepvars')") continue # output the data as .nc if required if outputdir: ioutputdir = outputdir[case_name] print(f'Saving result in {ioutputdir}') # make destination path ts = f"{interval.left.strftime('%Y%m%d%H%M%S')}_{interval.right.strftime('%Y%m%d%H%M%S')}" filename = f"{case_name}_{ts}.nc" outpath = str(Path(ioutputdir,filename)) # sort dimensions ds = iout.to_xarray() ds = ds.sortby(['Epoch','SV','Station']) # write nc file export_as_nc(ds = ds, outpath = outpath, encoding = encoding) print(f"Saved {len(iout)} observations in {filename}") # add interval in memory if required if outputresult: out.append(iout) else: print(f"No data at all for that interval, skipping..") # store case in memory if required if outputresult and len(out)>0: result[case_name] = pd.concat(out) return result