Source code for metobs_toolkit.dataset_core

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
This module contains the Dataset class and all its methods.

A Dataset holds all observations and is at the center of the
MetObs-toolkit.
"""

import os
import sys
import copy
from datetime import timedelta
import pytz
import logging
import pandas as pd
import numpy as np
import pickle

from metobs_toolkit.settings import Settings
from metobs_toolkit.data_import import import_data_from_csv, import_metadata_from_csv
from metobs_toolkit.template import Template

from metobs_toolkit.printing import print_dataset_info
from metobs_toolkit.landcover_functions import (
    connect_to_gee,
    lcz_extractor,
    height_extractor,
    lc_fractions_extractor,
    _validate_metadf,
)

# from metobs_toolkit.plotting_functions import (
#     geospatial_plot,
#     timeseries_plot,
#     # qc_stats_pie,
#     folium_plot,
#     add_stations_to_folium_map,
#     make_folium_html_plot,
# )

from metobs_toolkit.qc_checks import (
    # gross_value_check,
    # persistance_check,
    # repetitions_check,
    duplicate_timestamp_check,
    #     step_check,
    #     window_variation_check,
    invalid_input_check,
    #     toolkit_buddy_check,
    #     titan_buddy_check,
    #     titan_sct_resistant_check,
)


# from metobs_toolkit.qc_statistics import get_freq_statistics
from metobs_toolkit.writing_files import write_dataset_to_csv

# from metobs_toolkit.missingobs import Missingob_collection

from metobs_toolkit.gap import (
    # Gap,
    remove_gaps_from_obs,
    # remove_gaps_from_outliers,
    missing_timestamp_and_gap_check,
    get_gaps_indx_in_obs_space,
    get_station_gaps,
    # apply_interpolate_gaps,
    # make_gapfill_df,
    # apply_debias_era5_gapfill,
    # gaps_to_df,
)


from metobs_toolkit.df_helpers import (
    # multiindexdf_datetime_subsetting,
    fmt_datetime_argument,
    # init_multiindex,
    init_multiindexdf,
    init_triple_multiindexdf,
    metadf_to_gdf,
    conv_applied_qc_to_df,
    get_freqency_series,
    value_labeled_doubleidxdf_to_triple_idxdf,
    xs_save,
    concat_save,
)

from metobs_toolkit.obstypes import tlk_obstypes
from metobs_toolkit.obstypes import Obstype as Obstype_class


from metobs_toolkit.analysis import Analysis
from metobs_toolkit.modeldata import Modeldata
from metobs_toolkit.datasetbase import _DatasetBase


logger = logging.getLogger(__name__)


# =============================================================================
# Dataset class
# =============================================================================


class Dataset(_DatasetBase):
    """Objects holding observations and methods on observations."""

[docs] def __init__(self): """Construct all the necessary attributes for Dataset object.""" logger.info("Initialise dataset") _DatasetBase.__init__(self) # holds df, metadf, obstypes and settings # Dataset with outlier observations self.outliersdf = init_triple_multiindexdf() self.missing_obs = None # becomes a Missingob_collection after import self.gaps = None # becomes a list of gaps self.gapfilldf = init_multiindexdf() self.missing_fill_df = init_multiindexdf() # Template for mapping data and metadata self.template = Template() self._istype = "Dataset" self._freqs = pd.Series(dtype=object) self._applied_qc = pd.DataFrame(columns=["obstype", "checkname"]) self._qc_checked_obstypes = [] # list with qc-checked obstypes
def __str__(self): """Represent as text.""" if self.df.empty: if self._istype == "Dataset": return "Empty instance of a Dataset." elif self._istype == "Station": return "Empty instance of a Station." else: return "Empty instance of a Analysis." add_info = "" n_stations = self.df.index.get_level_values("name").unique().shape[0] n_obs_tot = self.df.shape[0] n_outl = self.outliersdf.shape[0] startdt = self.df.index.get_level_values("datetime").min() enddt = self.df.index.get_level_values("datetime").max() if (not self.metadf["lat"].isnull().all()) & ( not self.metadf["lon"].isnull().all() ): add_info += " *Coordinates are available for all stations." return ( f"{self._istype} instance containing: \n \ *{n_stations} stations \n \ *{self.df.columns.to_list()} observation types \n \ *{n_obs_tot} observation records \n \ *{n_outl} records labeled as outliers \n \ *{len(self.gaps)} gaps \n \ *{self.missing_obs.series.shape[0]} missing observations \n \ *records range: {startdt} --> {enddt} (total duration: {enddt - startdt}) \n \ *time zone of the records: {self.settings.time_settings['timezone']} \n " + add_info ) def __repr__(self): """Info representation.""" return self.__str__() def __add__(self, other, gapsize=None): """Addition of two Datasets.""" # important !!!!! # the toolkit makes a new dataframe, and assumes the df from self and other # to be the input data. # This means that missing obs, gaps, invalid and duplicated records are # being looked for in the concatenation of both dataset, using their current # resolution ! new = Dataset() self_obstypes = self.df.columns.to_list().copy() # ---- df ---- # check if observation of self are also in other assert all([(obs in other.df.columns) for obs in self_obstypes]) # subset obstype of other to self other.df = other.df[self.df.columns.to_list()] # remove duplicate rows common_indexes = self.df.index.intersection(other.df.index) other.df = other.df.drop(common_indexes) # set new df new.df = concat_save([self.df, other.df]) new.df = new.df.sort_index() # ----- outliers df --------- other_outliers = other.outliersdf.reset_index() other_outliers = other_outliers[other_outliers["obstype"].isin(self_obstypes)] other_outliers = other_outliers.set_index(["name", "datetime", "obstype"]) new.outliersdf = concat_save([self.outliersdf, other_outliers]) new.outliersdf = new.outliersdf.sort_index() # ------- Gaps ------------- # Gaps have to be recaluculated using a frequency assumtion from the # combination of self.df and other.df, thus NOT the native frequency if # their is a coarsening allied on either of them. new.gaps = [] # ---------- missing --------- # Missing observations have to be recaluculated using a frequency assumtion from the # combination of self.df and other.df, thus NOT the native frequency if # their is a coarsening allied on either of them. new.missing_obs = None # ---------- metadf ----------- # Use the metadf from self and add new rows if they are present in other new.metadf = concat_save([self.metadf, other.metadf]) new.metadf = new.metadf.drop_duplicates(keep="first") new.metadf = new.metadf.sort_index() # ------- specific attributes ---------- # Template (units and descritpions) are taken from self new.template = self.template # Inherit Settings from self new.settings = copy.deepcopy(self.settings) # Applied qc: # TODO: is this oke to do? new._applied_qc = pd.DataFrame(columns=["obstype", "checkname"]) new._qc_checked_obstypes = [] # list with qc-checked obstypes # set init_dataframe to empty # NOTE: this is not necesarry but users will use this method when they # have a datafile that is to big. So storing and overloading a copy of # the very big datafile is invalid for these cases. new.input_df = pd.DataFrame() # ----- Apply IO QC --------- # Apply only checks that are relevant on records in between self and other # OR # that are dependand on the frequency (since the freq of the .df is used, # which is not the naitive frequency if coarsening is applied on either. ) # missing and gap check if gapsize is None: gapsize = new.settings.gap["gaps_settings"]["gaps_finder"]["gapsize_n"] # note gapsize is now defined on the frequency of self new.missing_obs, new.gaps = missing_timestamp_and_gap_check( df=new.df, gapsize_n=self.settings.gap["gaps_settings"]["gaps_finder"]["gapsize_n"], ) # duplicate check new.df, dup_outl_df = duplicate_timestamp_check( df=new.df, checks_info=new.settings.qc["qc_checks_info"], checks_settings=new.settings.qc["qc_check_settings"], ) if not dup_outl_df.empty: new.update_outliersdf(add_to_outliersdf=dup_outl_df) # update the order and which qc is applied on which obstype checked_obstypes = list(self.obstypes.keys()) checknames = ["duplicated_timestamp"] # KEEP order new._applied_qc = concat_save( [ new._applied_qc, conv_applied_qc_to_df( obstypes=checked_obstypes, ordered_checknames=checknames ), ], ignore_index=True, ) return new
[docs] def show(self, show_all_settings=False, max_disp_n_gaps=5): """Show detailed information of the Dataset. A function to print out a detailed overview information about the Dataset. Parameters ---------- show_all_settings : bool, optional If True all the settings are printed out. The default is False. max_disp_n_gaps: int, optional The maximum number of gaps to display detailed information of. Returns ------- None. Examples -------- .. code-block:: python >>> import metobs_toolkit >>> >>> #Create your Dataset >>> dataset = metobs_toolkit.Dataset() #empty Dataset >>> >>> #Add observations to the Dataset >>> dataset.update_settings( ... input_data_file=metobs_toolkit.demo_datafile, ... input_metadata_file=metobs_toolkit.demo_metadatafile, ... template_file=metobs_toolkit.demo_template, ... ) >>> dataset.import_data_from_file() >>> >>> # Print out details >>> dataset.show() -------- General --------- ... """ logger.info("Show basic info of dataset.") print_dataset_info(self, show_all_settings)
[docs] def get_info(self, show_all_settings=False, max_disp_n_gaps=5): """Alias of show(). A function to print out a detailed overview information about the Dataset. Parameters ---------- show_all_settings : bool, optional If True all the settings are printed out. The default is False. max_disp_n_gaps: int, optional The maximum number of gaps to display detailed information of. Returns ------- None. Examples -------- .. code-block:: python >>> import metobs_toolkit >>> >>> #Create your Dataset >>> dataset = metobs_toolkit.Dataset() #empty Dataset >>> >>> #Add observations to the Dataset >>> dataset.update_settings( ... input_data_file=metobs_toolkit.demo_datafile, ... input_metadata_file=metobs_toolkit.demo_metadatafile, ... template_file=metobs_toolkit.demo_template, ... ) >>> dataset.import_data_from_file() >>> >>> # Print out details >>> dataset.get_info() -------- General --------- ... """ self.show(show_all_settings, max_disp_n_gaps)
[docs] def save_dataset( self, outputfolder=None, filename="saved_dataset.pkl", overwrite=False ): """Save a Dataset instance to a (pickle) file. Parameters ---------- outputfolder : str or None, optional The path to the folder to save the file. If None, the outputfolder from the Settings is used. The default is None. filename : str, optional The name of the output file. The default is 'saved_dataset.pkl'. overwrite : bool, optional If True, the target file will be overwritten if it exist. The default is False. Returns ------- None. Examples -------- .. code-block:: python >>> import metobs_toolkit >>> import os >>> >>> # Import data into a Dataset >>> dataset = metobs_toolkit.Dataset() >>> dataset.update_settings( ... input_data_file=metobs_toolkit.demo_datafile, ... input_metadata_file=metobs_toolkit.demo_metadatafile, ... template_file=metobs_toolkit.demo_template) >>> >>> dataset.import_data_from_file() >>> >>> # Save dataset to a .pkl file >>> dataset.save_dataset(outputfolder=os.getcwd(), ... filename='your_saved_dataset.pkl', ... overwrite=True) Dataset saved in ... """ # check if outputfolder is known and exists if outputfolder is None: outputfolder = self.settings.IO["output_folder"] assert ( outputfolder is not None ), "No outputfolder is given, and no outputfolder is found in the settings." assert os.path.isdir(outputfolder), f"{outputfolder} is not a directory!" # check file extension in the filename: if filename[-4:] != ".pkl": filename += ".pkl" full_path = os.path.join(outputfolder, filename) if (os.path.isfile(full_path)) & overwrite: logger.info(f"The file {full_path} will be overwritten!") os.remove(full_path) # check if file exists assert not os.path.isfile(full_path), f"{full_path} is already a file!" with open(full_path, "wb") as outp: pickle.dump(self, outp, pickle.HIGHEST_PROTOCOL) print(f"Dataset saved in {full_path}") logger.info(f"Dataset saved in {full_path}")
[docs] def import_dataset(self, folder_path=None, filename="saved_dataset.pkl"): """Import a Dataset instance from a (pickle) file. Parameters ---------- folder_path : str or None, optional The path to the folder to save the file. If None, the outputfolder from the Settings is used. The default is None. filename : str, optional The name of the output file. The default is 'saved_dataset.pkl'. Returns ------- metobs_toolkit.Dataset The Dataset instance. Examples -------- .. code-block:: python import metobs_toolkit import os # Initialize an empty Dataset empty_dataset = metobs_toolkit.Dataset() # Import the dataset dataset=empty_dataset.import_dataset(folder_path=os.getcwd(), filename='your_saved_dataset.pkl') """ # check if folder_path is known and exists if folder_path is None: folder_path = self.settings.IO["output_folder"] assert ( folder_path is not None ), "No folder_path is given, and no outputfolder is found in the settings." assert os.path.isdir(folder_path), f"{folder_path} is not a directory!" full_path = os.path.join(folder_path, filename) # check if file exists assert os.path.isfile(full_path), f"{full_path} does not exist." with open(full_path, "rb") as inp: dataset = pickle.load(inp) # convert metadf to a geodataframe (if coordinates are available) dataset.metadf = metadf_to_gdf(dataset.metadf) return dataset
[docs] def add_new_observationtype(self, Obstype): """Add a new observation type to the known observation types. The observation can only be added if it is not already present in the knonw observation types. If that is the case that you probably need to use use the Dataset.add_new_unit() method. Parameters ---------- The new Obstype to add. Returns ------- None. Examples -------- .. code-block:: python >>> import metobs_toolkit >>> co2_concentration = metobs_toolkit.Obstype(obsname='co2', ... std_unit='ppm') >>> #add other units to it (if needed) >>> co2_concentration.add_unit(unit_name='ppb', ... conversion=['x / 1000'], #1 ppb = 0.001 ppm ... ) >>> #Set a description >>> co2_concentration.set_description(desc='The CO2 concentration measured at 2m above surface') >>> #Add it to a Dataset >>> dataset = metobs_toolkit.Dataset() >>> dataset.add_new_observationtype(co2_concentration) >>> dataset.obstypes {'temp': Obstype instance of temp, 'humidity': Obstype instance of humidity, 'radiation_temp': Obstype instance of radiation_temp, 'pressure': Obstype instance of pressure, 'pressure_at_sea_level': Obstype instance of pressure_at_sea_level, 'precip': Obstype instance of precip, 'precip_sum': Obstype instance of precip_sum, 'wind_speed': Obstype instance of wind_speed, 'wind_gust': Obstype instance of wind_gust, 'wind_direction': Obstype instance of wind_direction, 'co2': Obstype instance of co2} """ # Test if the obstype is of the correct class. if not isinstance(Obstype, Obstype_class): sys.exit( f"{Obstype} is not an instance of metobs_toolkit.obstypes.Obstype." ) # Test if the obsname is already in use if Obstype.name in self.obstypes.keys(): logger.warning( f"{Obstype.name} is already a known observation type: {self.obstypes[Obstype.name]}" ) return # Update the known obstypes logger.info(f"Adding {Obstype} to the list of knonw observation types.") self.obstypes[Obstype.name] = Obstype
[docs] def add_new_unit(self, obstype, new_unit, conversion_expression=[]): """Add a new unit to a known observation type. Parameters ---------- obstype : str The observation type to add the new unit to. new_unit : str The new unit name. conversion_expression : list or str, optional The conversion expression to the standard unit of the observation type. The expression is a (list of) strings with simple algebraic operations, where x represent the value in the new unit, and the result is the value in the standard unit. Two examples for temperature (with a standard unit in Celsius): ["x - 273.15"] #if the new_unit is Kelvin ["x-32.0", "x/1.8"] #if the new unit is Farenheit The default is []. Returns ------- None. Examples -------- .. code-block:: python >>> import metobs_toolkit >>> >>> #Create your Dataset >>> dataset = metobs_toolkit.Dataset() #empty Dataset >>> >>> #Add new unit to a known obstype >>> dataset.add_new_unit(obstype = 'temp', ... new_unit= 'your_new_unit', ... conversion_expression = ['x+3', 'x * 2']) >>> # The conversion means: 1 [your_new_unit] = (1 + 3) * 2 [°C] >>> dataset.obstypes['temp'].get_info() temp observation with: * standard unit: Celsius * data column as None in None * known units and aliases: {'Celsius': ['celsius', '°C', '°c', 'celcius', 'Celcius'], 'Kelvin': ['K', 'kelvin'], 'Farenheit': ['farenheit'], 'your_new_unit': []} * description: 2m - temperature * conversions to known units: {'Kelvin': ['x - 273.15'], 'Farenheit': ['x-32.0', 'x/1.8'], 'your_new_unit': ['x+3', 'x * 2']} * originates from data column: None with None as native unit. """ # test if observation is present if not obstype in self.obstypes.keys(): logger.warning(f"{obstype} is not a known obstype! No unit can be added.") return # check if the unit is already present is_present = self.obstypes[obstype].test_if_unit_is_known(new_unit) if is_present: logger.info( f"{new_unit} is already a known unit of {self.obstypes[obstype]}" ) return self.obstypes[obstype].add_unit( unit_name=new_unit, conversion=conversion_expression )
[docs] def show_settings(self): """Show detailed information of the stored Settings. A function that prints out all the settings, structured per thematic. Returns ------- None. Examples -------- .. code-block:: python >>> import metobs_toolkit >>> >>> #Create your Dataset >>> dataset = metobs_toolkit.Dataset() #empty Dataset >>> >>> #Show default settings >>> dataset.show_settings() All settings:... """ self.settings.show()
[docs] def get_station(self, stationname): """Filter out one station of the Dataset. Extract a metobs_toolkit.Station object from the dataset by name. Parameters ---------- stationname : string The name of the station. Returns ------- metobs_toolkit.Station The station object. Examples -------- .. code-block:: python >>> import metobs_toolkit >>> >>> #Create your Dataset >>> dataset = metobs_toolkit.Dataset() #empty Dataset >>> >>> #Add observations to the Dataset >>> dataset.update_settings( ... input_data_file=metobs_toolkit.demo_datafile, ... input_metadata_file=metobs_toolkit.demo_metadatafile, ... template_file=metobs_toolkit.demo_template, ... ) >>> dataset.import_data_from_file() >>> >>> dataset.get_station('vlinder12') Station instance containing: *1 stations *['temp', 'humidity', 'wind_speed', 'wind_direction'] observation types *4320 observation records *0 records labeled as outliers *0 gaps *0 missing observations *records range: 2022-09-01 00:00:00+00:00 --> 2022-09-15 23:55:00+00:00 (total duration: 14 days 23:55:00) *time zone of the records: UTC *Coordinates are available for all stations. """ from metobs_toolkit.station import Station logger.info(f"Extract {stationname} from dataset.") # important: make sure all station attributes are of the same time as dataset. # so that all methods can be inherited. try: sta_df = self.df.xs(stationname, level="name", drop_level=False) sta_metadf = self.metadf.loc[stationname].to_frame().transpose() sta_metadf.index.name = "name" except KeyError: logger.warning(f"{stationname} not found in the dataset.") return None try: sta_outliers = self.outliersdf.xs( stationname, level="name", drop_level=False ) except KeyError: sta_outliers = init_triple_multiindexdf() sta_gaps = get_station_gaps(self.gaps, stationname) sta_missingobs = self.missing_obs.get_station_missingobs(stationname) try: sta_gapfill = self.gapfilldf.xs(stationname, level="name", drop_level=False) except KeyError: sta_gapfill = init_multiindexdf() try: sta_missingfill = self.missing_fill_df.xs( stationname, level="name", drop_level=False ) except KeyError: sta_missingfill = init_multiindexdf() return Station( name=stationname, df=sta_df, outliersdf=sta_outliers, gaps=sta_gaps, missing_obs=sta_missingobs, gapfilldf=sta_gapfill, missing_fill_df=sta_missingfill, metadf=sta_metadf, obstypes=self.obstypes, template=self.template, settings=self.settings, _qc_checked_obstypes=self._qc_checked_obstypes, _applied_qc=self._applied_qc, )
# ============================================================================= # Gap Filling # =============================================================================
[docs] def get_modeldata( self, modelname="ERA5_hourly", modeldata=None, obstype="temp", stations=None, startdt=None, enddt=None, ): """Make Modeldata for the Dataset. Make a metobs_toolkit.Modeldata object with modeldata at the locations of the stations present in the dataset. This Modeldata stores timeseries of model data for each station. Parameters ---------- modelname : str, optional Which dataset to download timeseries from. This is only used when no modeldata is provided. The default is 'ERA5_hourly'. modeldata : metobs_toolkit.Modeldata, optional Use the modelname attribute and the gee information stored in the modeldata instance to extract timeseries. obstype : String, optional Name of the observationtype you want to apply gap filling on. The modeldata must contain this observation type as well. The default is 'temp'. stations : string or list of strings, optional Stationnames to subset the modeldata to. If None, all stations will be used. The default is None. startdt : datetime.datetime, optional Start datetime of the model timeseries. If None, the start datetime of the dataset is used. The default is None. enddt : datetime.datetime, optional End datetime of the model timeseries. If None, the last datetime of the dataset is used. The default is None. Returns ------- Modl : metobs_toolkit.Modeldata The extracted modeldata for period and a set of stations. Note -------- If a timezone unaware datetime is given as an argument, it is interpreted as if it has the same timezone as the observations. Note ------ When extracting large amounts of data, the timeseries data will be written to a file and saved on your google drive. In this case, you need to provide the Modeldata with the data using the .set_model_from_csv() method. Examples -------- .. code-block:: python import metobs_toolkit # Import data into a Dataset dataset = metobs_toolkit.Dataset() dataset.update_settings( input_data_file=metobs_toolkit.demo_datafile, input_metadata_file=metobs_toolkit.demo_metadatafile, template_file=metobs_toolkit.demo_template, ) dataset.import_data_from_file() # To limit data transfer, we define a short period import datetime tstart = datetime.datetime(2022, 9, 5) tend = datetime.datetime(2022, 9, 6) # Collect ERA5 2mT timeseries at your stations ERA5_data = dataset.get_modeldata( modelname="ERA5_hourly", modeldata=None, obstype="temp", stations=None, startdt=tstart, enddt=tend) """ if modeldata is None: Modl = Modeldata(modelname) else: Modl = modeldata modelname = Modl.modelname # Filters if startdt is None: startdt = self.df.index.get_level_values("datetime").min() else: startdt = fmt_datetime_argument( startdt, self.settings.time_settings["timezone"] ) if enddt is None: enddt = self.df.index.get_level_values("datetime").max() else: enddt = fmt_datetime_argument( enddt, self.settings.time_settings["timezone"] ) # make shure bounds include required range Model_time_res = Modl.mapinfo[Modl.modelname]["time_res"] startdt = startdt.floor(Model_time_res) enddt = enddt.ceil(Model_time_res) if stations is not None: if isinstance(stations, str): metadf = self.metadf.loc[[stations]] if isinstance(stations, list): metadf = self.metadf.iloc[self.metadf.index.isin(stations)] else: metadf = self.metadf # Convert to UTC startdt_utc = startdt.astimezone(pytz.utc) enddt_utc = enddt.astimezone(pytz.utc) # fill modell with data if modelname == "ERA5_hourly": Modl.get_ERA5_data( metadf=metadf, startdt_utc=startdt_utc, enddt_utc=enddt_utc, obstypes=obstype, ) else: Modl.get_gee_dataset_data( mapname=modelname, metadf=metadf, startdt_utc=startdt_utc, enddt_utc=enddt_utc, obstypes=obstype, ) print( f"(When using the .set_model_from_csv() method, make sure the modelname of your Modeldata is {modelname})" ) logger.info( f"(When using the .set_model_from_csv() method, make sure the modelname of your Modeldata is {modelname})" ) return Modl
[docs] def get_analysis(self, add_gapfilled_values=False): """Create an Analysis instance from the Dataframe. Parameters ---------- add_gapfilled_values : bool, optional If True, all filled values (from gapfill and missing observation fill), are added to the analysis records aswell. The default is False. Returns ------- metobs_toolkit.Analysis The Analysis instance of the Dataset. Examples -------- .. code-block:: python >>> import metobs_toolkit >>> >>> # Import data into a Dataset >>> dataset = metobs_toolkit.Dataset() >>> dataset.update_settings( ... input_data_file=metobs_toolkit.demo_datafile, ... input_metadata_file=metobs_toolkit.demo_metadatafile, ... template_file=metobs_toolkit.demo_template, ... ) >>> dataset.import_data_from_file() >>> dataset.coarsen_time_resolution(freq='1h') >>> >>> # Create an Analysis from the dataset >>> analysis = dataset.get_analysis() >>> analysis Analysis instance containing: *28 stations *['temp', 'humidity', 'wind_speed', 'wind_direction'] observation types *10080 observation records *Coordinates are available for all stations. <BLANKLINE> *records range: 2022-09-01 00:00:00+00:00 --> 2022-09-15 23:00:00+00:00 (total duration: 14 days 23:00:00) *Coordinates are available for all stations. <BLANKLINE> """ # combine all to obsspace and include gapfill if add_gapfilled_values: mergedf = self.combine_all_to_obsspace() # gapsfilled labels gapfill_settings = self.settings.gap["gaps_fill_info"] gapfilllabels = [val for val in gapfill_settings["label"].values()] # missingfilled labels missingfill_settings = self.settings.missing_obs["missing_obs_fill_info"] missingfilllabels = [val for val in missingfill_settings["label"].values()] # get all labels fill_labels = gapfilllabels.copy() fill_labels.extend(missingfilllabels) fill_labels.append("ok") df = mergedf[mergedf["label"].isin(fill_labels)] df = df[["value"]] df = df.unstack(level="obstype") df = df.droplevel(level=0, axis=1) else: df = self.df return Analysis( obsdf=df, metadf=self.metadf, settings=self.settings, obstypes=self.obstypes, )
[docs] def write_to_csv( self, obstype=None, filename=None, include_outliers=True, include_fill_values=True, add_final_labels=True, use_tlk_obsnames=True, overwrite_outliers_by_gaps_and_missing=True, seperate_metadata_file=True, ): """Write Dataset to a csv file. Write the dataset to a file where the observations, metadata and (if available) the quality labels per observation type are merged together. A final qualty control label for each quality-controlled-observation type can be added in the outputfile. The file will be written to the outputfolder specified in the settings. Parameters ---------- obstype : string, optional Specify an observation type to subset all observations to. If None, all available observation types are written to file. The default is None. filename : string, optional The name of the output csv file. If none, a standard-filename is generated based on the period of data. The default is None. include_outliers : bool, optional If True, the outliers will be present in the csv file. The default is True. include_fill_values : bool, optional If True, the filled gap and missing observation values will be present in the csv file. The default is True. add_final_labels : bool, optional If True, a column is added containing the final label of an observation. The default is True. use_tlk_obsnames : bool, optional If True, the standard naming of the metobs_toolkit is used, else the original names for obstypes is used. The default is True. overwrite_outliers_by_gaps_and_missing : bool, optional If the gaps and missing observations are updated using outliers, interpret these records as gaps/missing outliers if True. Else these will be interpreted as outliers. The default is True. seperate_metadata_file : bool, optional If true, the metadat is written to a seperate file, else the metadata is merged to the observation in one file. The default is True. Returns ------- None. Examples -------- .. code-block:: python >>> import metobs_toolkit >>> import os >>> >>> # Import data into a Dataset >>> dataset = metobs_toolkit.Dataset() >>> dataset.update_settings( ... input_data_file=metobs_toolkit.demo_datafile, ... input_metadata_file=metobs_toolkit.demo_metadatafile, ... template_file=metobs_toolkit.demo_template) >>> >>> dataset.import_data_from_file() >>> >>> # Save dataset to a .csv file >>> dataset.update_settings(output_folder = os.getcwd()) >>> dataset.write_to_csv(filename='your_saved_table.csv') write metadata to file: ... write dataset to file: ... """ logger.info("Writing the dataset to a csv file") assert ( not self.settings.IO["output_folder"] is None ), "Specify Settings.output_folder in order to export a csv." assert os.path.isdir( self.settings.IO["output_folder"] ), f'The outputfolder: \ {self.settings.IO["output_folder"]} is not found. ' # combine all dataframes mergedf = self.combine_all_to_obsspace( overwrite_outliers_by_gaps_and_missing=overwrite_outliers_by_gaps_and_missing ) # with outliers # Unstack mergedf # remove duplicates mergedf = mergedf[~mergedf.index.duplicated(keep="first")] # drop outliers if required if not include_outliers: outlier_labels = [ var["outlier_flag"] for var in self.settings.qc["qc_checks_info"] ] mergedf = mergedf[~mergedf["label"].isin(outlier_labels)] # drop fill values if required if not include_fill_values: fill_labels = [ "gap fill", "missing observation fill", ] # toolkit representation labels mergedf = mergedf[~mergedf["toolkit_representation"].isin(fill_labels)] if obstype is not None: mergedf = xs_save(mergedf, obstype, level="obstype", drop_level=False) # Map obstypes columns if not use_tlk_obsnames: mapper = { col: self.obstypes[col].get_orig_name() for col in self.obstypes.keys() } mergedf = mergedf.reset_index() mergedf["new_names"] = mergedf["obstype"].map(mapper) mergedf = mergedf.drop(columns=["obstype"]) mergedf = mergedf.rename(columns={"new_names": "obstype"}) mergedf = mergedf.set_index(["name", "datetime", "obstype"]) mergedf = mergedf.unstack("obstype") # to one level for the columns mergedf.columns = [" : ".join(col).strip() for col in mergedf.columns.values] # columns to write write_dataset_to_csv( df=mergedf, metadf=self.metadf, filename=filename, outputfolder=self.settings.IO["output_folder"], seperate_metadata_file=seperate_metadata_file, )
# ============================================================================= # Quality control # =============================================================================
[docs] def combine_all_to_obsspace( self, repr_outl_as_nan=False, overwrite_outliers_by_gaps_and_missing=True, ): """Make one dataframe with all observations and their labels. Combine all observations, outliers, missing observations and gaps into one Dataframe. All observation types are combined an a label is added in a serperate column. When gaps and missing records are updated from outliers one has to choice to represent these records as outliers or gaps. There can not be duplicates in the return dataframe. By default the observation values of the outliers are saved, one can choice to use these values or NaN's. following checks! Parameters ---------- repr_outl_as_nan : bool, optional If True, Nan's are use for the values of the outliers. The default is False. overwrite_outliers_by_gaps_and_missing : Bool, optional If True, records that are labeld as gap/missing and outlier are labeled as gaps/missing. This has only effect when the gaps/missing observations are updated from the outliers. The default is True. Returns --------- combdf : pandas.DataFrame() A dataframe containing a continious time resolution of records, where each record is labeld. Examples -------- .. code-block:: python >>> import metobs_toolkit >>> >>> # Import data into a Dataset >>> dataset = metobs_toolkit.Dataset() >>> dataset.update_settings( ... input_data_file=metobs_toolkit.demo_datafile, ... input_metadata_file=metobs_toolkit.demo_metadatafile, ... template_file=metobs_toolkit.demo_template, ... ) >>> dataset.import_data_from_file() >>> dataset.coarsen_time_resolution(freq='1h') >>> >>> # Apply quality control on the temperature observations >>> dataset.apply_quality_control(obstype='temp') #Using the default QC settings >>> dataset Dataset instance containing: *28 stations *['temp', 'humidity', 'wind_speed', 'wind_direction'] observation types *10080 observation records *1676 records labeled as outliers *0 gaps *3 missing observations *records range: 2022-09-01 00:00:00+00:00 --> 2022-09-15 23:00:00+00:00 (total duration: 14 days 23:00:00) *time zone of the records: UTC *Coordinates are available for all stations. >>> >>> # Combine all records to one dataframe in Observation-resolution >>> overview_df = dataset.combine_all_to_obsspace() >>> overview_df.head(12) value ... toolkit_representation name datetime obstype ... vlinder01 2022-09-01 00:00:00+00:00 humidity 65.000000 ... observation temp 18.800000 ... observation wind_direction 65.000000 ... observation wind_speed 1.555556 ... observation 2022-09-01 01:00:00+00:00 humidity 65.000000 ... observation temp 18.400000 ... observation wind_direction 55.000000 ... observation wind_speed 1.416667 ... observation 2022-09-01 02:00:00+00:00 humidity 68.000000 ... observation temp 17.100000 ... observation wind_direction 45.000000 ... observation wind_speed 1.583333 ... observation <BLANKLINE> [12 rows x 3 columns] """ # TODO: label values from settings not hardcoding # TODO: use the repr_outl_as_nan argumenten here # ============================================================================= # Stack observations and outliers # ============================================================================= df = self.df # better save than sorry present_obstypes = list(self.obstypes.keys()) df = df[present_obstypes] # to tripple index df = ( df.stack(future_stack=True) .reset_index() .rename(columns={"level_2": "obstype", 0: "value"}) .set_index(["name", "datetime", "obstype"]) ) df["label"] = "ok" df["toolkit_representation"] = "observation" # outliers outliersdf = self.outliersdf.copy() outliersdf["toolkit_representation"] = "outlier" # Careful! Some outliers exist on inport frequency (duplicated, invalid) # So only use the outliers for which station-datetime-obstype are present in the # dataset.df outliersdf = outliersdf[outliersdf.index.isin(df.index)] # remove outliers from the observations df = df[~df.index.isin(outliersdf.index)] # ============================================================================= # Stack gaps # ============================================================================= # add gapfill and remove the filled records from gaps gapsfilldf = self.gapfilldf.copy() # to triple index gapsfilldf = value_labeled_doubleidxdf_to_triple_idxdf( gapsfilldf, known_obstypes=list(self.obstypes.keys()) ) gapsfilldf["toolkit_representation"] = "gap fill" gapsidx = get_gaps_indx_in_obs_space( gapslist=self.gaps, obsdf=self.df, outliersdf=self.outliersdf, resolutionseries=self.metadf["dataset_resolution"], ) gapsdf = pd.DataFrame(index=gapsidx, columns=present_obstypes) gapsdf = ( gapsdf.stack(future_stack=True) .reset_index() .rename(columns={"level_2": "obstype", 0: "value"}) .set_index(["name", "datetime", "obstype"]) ) gapsdf["label"] = self.settings.gap["gaps_info"]["gap"]["outlier_flag"] gapsdf["toolkit_representation"] = "gap" # Remove gaps from df df = df[~df.index.isin(gapsdf.index)] if overwrite_outliers_by_gaps_and_missing: outliersdf = outliersdf.drop(index=gapsdf.index, errors="ignore") # Remove gapfill values records from the gaps gapsdf = gapsdf.drop(index=gapsfilldf.index) # ============================================================================= # Stack missing # ============================================================================= missingfilldf = self.missing_fill_df.copy() missingfilldf = value_labeled_doubleidxdf_to_triple_idxdf( missingfilldf, known_obstypes=list(self.obstypes.keys()) ) missingfilldf["toolkit_representation"] = "missing observation fill" # add missing observations if they occure in observation space missingidx = self.missing_obs.get_missing_indx_in_obs_space( self.df, self.metadf["dataset_resolution"] ) missingdf = pd.DataFrame(index=missingidx, columns=present_obstypes) missingdf = ( missingdf.stack(future_stack=True) .reset_index() .rename(columns={"level_2": "obstype", 0: "value"}) .set_index(["name", "datetime", "obstype"]) ) missingdf["label"] = self.settings.gap["gaps_info"]["missing_timestamp"][ "outlier_flag" ] missingdf["toolkit_representation"] = "missing observation" # Remove missing from df df = df[~df.index.isin(missingdf.index)] if overwrite_outliers_by_gaps_and_missing: outliersdf = outliersdf.drop(index=missingdf.index, errors="ignore") # Remove missingfill values records from the missing missingdf = missingdf.drop(index=missingfilldf.index) # ============================================================================= # combine all # ============================================================================= combdf = concat_save( [df, outliersdf, gapsdf, gapsfilldf, missingdf, missingfilldf] ).sort_index() combdf.index.names = ["name", "datetime", "obstype"] # To be shure? combdf = combdf[~combdf.index.duplicated(keep="first")] return combdf
def update_outliersdf(self, add_to_outliersdf): """Update the outliersdf attribute.""" self.outliersdf = concat_save([self.outliersdf, add_to_outliersdf])
[docs] def coarsen_time_resolution( self, origin=None, origin_tz=None, freq=None, method=None, limit=None ): """Resample the observations to coarser timeresolution. The assumed dataset resolution (stored in the metadf attribute) will be updated. Parameters ---------- origin : datetime.datetime, optional Define the origin (first timestamp) for the obervations. The origin is timezone naive, and is assumed to have the same timezone as the obervations. If None, the earliest occuring timestamp is used as origin. The default is None. origin_tz : str, optional Timezone string of the input observations. Element of pytz.all_timezones. If None, the timezone from the settings is used. The default is None. freq : DateOffset, Timedelta or str, optional The offset string or object representing target conversion. Ex: '15min' is 15 minutes, '1h', is one hour. If None, the target time resolution of the dataset.settings is used. The default is None. method : 'nearest' or 'bfill', optional Method to apply for the resampling. If None, the resample method of the dataset.settings is used. The default is None. limit : int, optional Limit of how many values to fill with one original observations. If None, the target limit of the dataset.settings is used. The default is None. Returns ------- None. Examples -------- .. code-block:: python >>> import metobs_toolkit >>> >>> # Import data into a Dataset >>> dataset = metobs_toolkit.Dataset() >>> dataset.update_settings( ... input_data_file=metobs_toolkit.demo_datafile, ... input_metadata_file=metobs_toolkit.demo_metadatafile, ... template_file=metobs_toolkit.demo_template, ... ) >>> dataset.import_data_from_file() >>> dataset.coarsen_time_resolution(freq='15min') #to 15 minutes resolution >>> dataset.df[['temp', 'humidity']].head() temp humidity name datetime vlinder01 2022-09-01 00:00:00+00:00 18.8 65 2022-09-01 00:15:00+00:00 18.7 65 2022-09-01 00:30:00+00:00 18.7 65 2022-09-01 00:45:00+00:00 18.6 65 2022-09-01 01:00:00+00:00 18.4 65 """ if freq is None: freq = self.settings.time_settings["target_time_res"] if method is None: method = self.settings.time_settings["resample_method"] if limit is None: limit = int(self.settings.time_settings["resample_limit"]) if origin_tz is None: origin_tz = self.settings.time_settings["timezone"] logger.info( f"Coarsening the timeresolution to {freq} using \ the {method}-method (with limit={limit})." ) # test if coarsening the resolution is valid for the dataset # 1. If resolution-dep-qc is applied --> coarsening is not valid and will result in a broken dataset if ( self._applied_qc[ ~self._applied_qc["checkname"].isin( ["duplicated_timestamp", "invalid_input"] ) ].shape[0] > 0 ): logger.warning( "Coarsening time resolution is not possible because quality control checks that are resolution depening are already performed on the Dataset." ) logger.info( "(Apply coarsening_time_resolution BEFORE applying quality control.)" ) return # TODO: implement buffer method df = self.df.reset_index() if origin is None: # find earlyest timestamp, if it is on the hour, use it else use the following hour tstart = df["datetime"].min() if tstart.minute != 0 or tstart.second != 0 or tstart.microsecond != 0: # Round up to nearest hour tstart = tstart.ceil(freq=freq) else: origin_tz_aware = pytz.timezone(origin_tz).localize(origin) tstart = origin_tz_aware.astimezone( pytz.timezone(self.settings.time_settings["timezone"]) ) # Coarsen timeresolution if method == "nearest": df = ( df.set_index("datetime") .groupby("name") .resample(freq, origin=tstart) .nearest(limit=limit) ) elif method == "bfill": df = ( df.set_index("datetime") .groupby("name") .resample(freq, origin=tstart) .bfill(limit=limit) ) else: logger.warning(f"The coarsening method: {method}, is not implemented yet.") df = df.set_index(["name", "datetime"]) if "name" in df.columns: df = df.drop(columns=["name"]) # Update resolution info in metadf self.metadf["dataset_resolution"] = pd.to_timedelta(freq) # update df self.df = df # Remove gaps and missing from the observatios # most gaps and missing are already removed but when increasing timeres, # some records should be removed as well. self.df = remove_gaps_from_obs(gaplist=self.gaps, obsdf=self.df) self.df = self.missing_obs.remove_missing_from_obs(obsdf=self.df)
[docs] def sync_observations( self, tolerance, verbose=True, _force_resolution_minutes=None, _drop_target_nan_dt=False, ): """Simplify and syncronize the observation timestamps. To simplify the resolution (per station), a tolerance is use to shift timestamps. The tolerance indicates the maximum translation in time that can be applied to an observation. The sycronisation tries to group stations that have an equal simplified resolution, and syncronize them. The origin of the sycronized timestamps will be set to round hours, round 10-minutes or round-5 minutes if possible given the tolerance. The observations present in the input file are used. After syncronization, the IO outliers, missing observations and gaps are recomputed. Parameters ---------- tolerance : Timedelta or str The tolerance string or object representing the maximum translation in time. Ex: '5min' is 5 minutes, '1h', is one hour. verbose : bool, optional If True, a dataframe illustrating the mapping from original datetimes to simplified and syncronized is returned. The default is True. _drop_target_nan_dt : bool, optional If record has no target datetime, the datetimes will be listed as Nat. To remove them, set this to True. Default is False. _force_resolution_minutes : bool, optional force the resolution estimate to this frequency in minutes. If None, the frequency is estimated. The default is None. Note -------- Keep in mind that this method will overwrite the df, outliersdf, missing timestamps and gaps. Note -------- Because the used observations are from the input file, previously coarsend timeresolutions are ignored. Returns ------- pandas.DataFrame (if verbose is True) A dataframe containing the original observations with original timestamps and the corresponding target timestamps. """ # get columns pressent in metadf, because the input df can have columns # that does not have to be mapped to the toolkit assert ( not self.input_df.empty ), "To syncronize a dataset, the (pure) input dataframe cannot be empty." init_meta_cols = self.metadf.columns.copy() df = self.input_df self.df = init_multiindexdf() self.outliersdf = init_triple_multiindexdf() self.gapfilldf = init_multiindexdf() self.missing_obs = None self.gaps = None # find simplified resolution if _force_resolution_minutes is None: simplified_resolution = get_freqency_series( df=df, method="median", simplify=True, max_simplify_error=tolerance, ) else: if isinstance(_force_resolution_minutes, list): # TODO print( "foce resolution minutes as a list is not implemented yet, sorry." ) else: stations = self.metadf.index freq_series = pd.Series( index=stations, data=[timedelta(minutes=float(_force_resolution_minutes))] * len(stations), ) simplified_resolution = freq_series logger.debug(f"Syncronizing to these resolutions: {simplified_resolution}") occuring_resolutions = simplified_resolution.unique() df = df.reset_index() def find_simple_origin(tstart, tolerance): if tstart.minute == 0 and tstart.second == 0 and tstart.microsecond == 0: return tstart # already a round hour # try converting to a round hour tstart_round_hour = tstart.round("60min") if abs(tstart - tstart_round_hour) <= pd.to_timedelta(tolerance): return tstart_round_hour # try converting to a tenfold in minutes tstart_round_tenfold = tstart.round("10min") if abs(tstart - tstart_round_tenfold) <= pd.to_timedelta(tolerance): return tstart_round_tenfold # try converting to a fivefold in minutes tstart_round_fivefold = tstart.round("5min") if abs(tstart - tstart_round_fivefold) <= pd.to_timedelta(tolerance): return tstart_round_fivefold # no suitable conversion found return tstart merged_df = pd.DataFrame() _total_verbose_df = pd.DataFrame() for occur_res in occuring_resolutions: group_stations = simplified_resolution[ simplified_resolution == occur_res ].index.to_list() logger.info( f" Grouping stations with simplified resolution of {pd.to_timedelta(occur_res)}: {group_stations}" ) groupdf = df[df["name"].isin(group_stations)] tstart = groupdf["datetime"].min() tend = groupdf["datetime"].max() # find a good origin point origin = find_simple_origin(tstart=tstart, tolerance=tolerance) # Create records index target_records = pd.date_range( start=origin, end=tend, freq=pd.Timedelta(occur_res) ).to_series() target_records.name = "target_datetime" # convert records to new target records, station per station for sta in group_stations: stadf = groupdf[groupdf["name"] == sta] # Drop all nan values! these will be added later from the outliersdf stadf = stadf.set_index(["name", "datetime"]) # drop all records per statiotion for which there are no obsecvations present_obs = list(self.obstypes.keys()) stadf = stadf.loc[stadf[present_obs].dropna(axis=0, how="all").index] stadf = stadf.reset_index() mergedstadf = pd.merge_asof( left=stadf.sort_values("datetime"), right=target_records.to_frame(), right_on="target_datetime", left_on="datetime", direction="nearest", tolerance=pd.Timedelta(tolerance), ) if _drop_target_nan_dt: mergedstadf = mergedstadf.dropna(subset="target_datetime") # possibility 1: record is mapped crrectly correct_mapped = mergedstadf[~mergedstadf["target_datetime"].isnull()] # possibility2: records that ar not mapped to target # not_mapped_records =mergedstadf[mergedstadf['target_datetime'].isnull()] # possibilyt 3 : no suitable candidates found for the target # these will be cached by the missing and gap check # no_record_candidates = target_records[~target_records.isin(mergedstadf['target_datetime'])].values merged_df = concat_save([merged_df, correct_mapped]) if verbose: _total_verbose_df = concat_save([_total_verbose_df, mergedstadf]) # overwrite the df with the synced observations merged_df = ( merged_df.rename( columns={ "datetime": "original_datetime", "target_datetime": "datetime", } ) .set_index(["name", "datetime"]) .drop(["original_datetime"], errors="ignore", axis=1) .sort_index() ) # self.df = merged_df # Recompute the dataset attributes, apply qc, gap and missing searches, etc. self._construct_dataset( df=merged_df, freq_estimation_method="highest", freq_estimation_simplify=False, freq_estimation_simplify_error=None, fixed_freq_series=simplified_resolution, update_full_metadf=False, ) # Do not overwrite full metadf, only the frequencies self.metadf = self.metadf[ [col for col in self.metadf.columns if col in init_meta_cols] ] if verbose: _total_verbose_df = _total_verbose_df.rename( columns={ "datetime": "original_datetime", "target_datetime": "datetime", } ).set_index(["name", "datetime"]) return _total_verbose_df
[docs] def import_data_from_file( self, input_data_file=None, input_metadata_file=None, template_file=None, freq_estimation_method=None, freq_estimation_simplify=None, freq_estimation_simplify_error=None, kwargs_data_read={}, kwargs_metadata_read={}, ): """Read observations from a csv file. The paths (data, metadata and template) are stored in the settings if Dataset.update_settings() is called on this object. These paths can be updated by adding them as argument to this method. The input data (and metadata) are interpreted by using a template (json file). An estimation of the observational frequency is made per station. This is used to find missing observations and gaps. The Dataset attributes are set and the following checks are executed: * Duplicate check * Invalid input check * Find missing observations * Find gaps Parameters ---------- input_data_file : string, optional Path to the input data file with observations. If None, the input data path in the settings is used. input_metadata_file : string, optional Path to the input metadata file. If None, the input metadata path in the settings is used. template_file : string, optional Path to the template (json) file to be used on the observations and metadata. If None, the template path in the settings is used. freq_estimation_method : 'highest' or 'median', optional Select wich method to use for the frequency estimation. If 'highest', the highest apearing frequency is used. If 'median', the median of the apearing frequencies is used. If None, the method stored in the Dataset.settings.time_settings['freq_estimation_method'] is used. The default is None. freq_estimation_simplify : bool, optional If True, the likely frequency is converted to round hours, or round minutes. The "freq_estimation_simplify_error' is used as a constrain. If the constrain is not met, the simplification is not performed. If None, the method stored in the Dataset.settings.time_settings['freq_estimation_simplify'] is used. The default is None. freq_estimation_simplify_error : Timedelta or str, optional The tolerance string or object representing the maximum translation in time to form a simplified frequency estimation. Ex: '5min' is 5 minutes, '1h', is one hour. If None, the method stored in the Dataset.settings.time_settings['freq_estimation_simplify_error'] is used. The default is None. kwargs_data_read : dict, optional Keyword arguments collected in a dictionary to pass to the pandas.read_csv() function on the data file. The default is {}. kwargs_metadata_read : dict, optional Keyword arguments collected in a dictionary to pass to the pandas.read_csv() function on the metadata file. The default is {}. Note -------- In pracktice, the default arguments will be sufficient for most applications. Note -------- If options are present in the template, these will have priority over the arguments of this function. Warning --------- All CSV data files must be in *UTF-8 encoding*. For most CSV files, this condition is already met. To make sure, in Microsoft Excel (or similar), you can specify to export as **`CSV UTF-8`**. If you encounter an error, mentioning a `"/ueff..."` tag in a CSV file, it is often solved by converting the CSV to UTF-8. Returns ------- None. Examples -------- .. code-block:: python >>> import metobs_toolkit >>> >>> # Import data into a Dataset >>> dataset = metobs_toolkit.Dataset() >>> dataset.update_settings( ... input_data_file=metobs_toolkit.demo_datafile, ... input_metadata_file=metobs_toolkit.demo_metadatafile, ... template_file=metobs_toolkit.demo_template, ... ) >>> dataset.import_data_from_file() """ logger.info(f'Importing data from file: {self.settings.IO["input_data_file"]}') # Update paths to the input files, if given. if input_data_file is not None: self.update_settings(input_data_file=input_data_file) if input_metadata_file is not None: self.update_settings(input_metadata_file=input_metadata_file) if template_file is not None: self.update_settings(template_file=template_file) if freq_estimation_method is None: freq_estimation_method = self.settings.time_settings[ "freq_estimation_method" ] if freq_estimation_simplify is None: freq_estimation_simplify = self.settings.time_settings[ "freq_estimation_simplify" ] if freq_estimation_simplify_error is None: freq_estimation_simplify_error = self.settings.time_settings[ "freq_estimation_simplify_error" ] assert self.settings.templatefile is not None, "No templatefile is specified." # Read template self.template.read_template_from_file(jsonpath=self.settings.templatefile) # overload the timezone from template to the settings if not self.template._get_tz() is None: self.update_timezone(self.template.get_tz()) logger.info( f"Set timezone = {self.template.get_tz()} from options in template." ) # Read observations into pandas dataframe df = import_data_from_csv( input_file=self.settings.IO["input_data_file"], template=self.template, known_obstypes=list(self.obstypes.keys()), kwargs_data_read=kwargs_data_read, ) # Set timezone information df.index = df.index.tz_localize( tz=self.settings.time_settings["timezone"], ambiguous="infer", nonexistent="shift_forward", ) # drop Nat datetimes if present df = df.loc[pd.notnull(df.index)] logger.debug( f'Data from {self.settings.IO["input_data_file"]} \ imported to dataframe {df.head()}.' ) if self.settings.IO["input_metadata_file"] is None: logger.warning( "No metadata file is defined,\ no metadata attributes can be set!" ) else: logger.info( f'Importing metadata from file: {self.settings.IO["input_metadata_file"]}' ) meta_df = import_metadata_from_csv( input_file=self.settings.IO["input_metadata_file"], template=self.template, kwargs_metadata_read=kwargs_metadata_read, ) # in dataset of one station if self.template._is_data_single_station(): # logger.warning("No station names find in the observations!") # If there is ONE name in the metadf, than we use that name for # the df, else we use the default name if ("name" in meta_df.columns) & (meta_df.shape[0] == 1): name = meta_df["name"].iloc[0] df["name"] = name logger.warning( f"One stationname found in the metadata: {name}, this name is used for the data." ) else: df["name"] = str(self.settings.app["default_name"]) # for later merging, we add the name column with the default # also in the metadf meta_df["name"] = str(self.settings.app["default_name"]) logger.warning( f'Assume the dataset is for ONE station with the \ default name: {self.settings.app["default_name"]}.' ) # merge additional metadata to observations logger.debug(f"Head of data file, before merge: {df.head()}") logger.debug(f"Head of metadata file, before merge: {meta_df.head()}") meta_cols = [ colname for colname in meta_df.columns if not colname.startswith("_") ] additional_meta_cols = list(set(meta_cols).difference(df.columns)) if bool(additional_meta_cols): logger.debug( f"Merging metadata ({additional_meta_cols}) to dataset data by name." ) additional_meta_cols.append("name") # merging on name # merge deletes datetime index somehow? so add it back. df_index = df.index df = df.merge( right=meta_df[additional_meta_cols], how="left", on="name" ) df.index = df_index # update dataset object # Remove stations whith only one observation (no freq estimation) station_counts = df["name"].value_counts() issue_station = station_counts[station_counts < 2].index.to_list() logger.warning( f"These stations will be removed because of only having one record: {issue_station}" ) df = df[~df["name"].isin(issue_station)] # convert dataframe to multiindex (datetime - name) df = df.set_index(["name", df.index]) # Sort by name and then by datetime (to avoid negative freq) df = df.sort_index(level=["name", "datetime"]) # dataframe with all data of input file self.input_df = df.sort_index(level=["name", "datetime"]) # Construct all attributes of the Dataset self._construct_dataset( df=df, freq_estimation_method=freq_estimation_method, freq_estimation_simplify=freq_estimation_simplify, freq_estimation_simplify_error=freq_estimation_simplify_error, )
def _construct_dataset( self, df, freq_estimation_method, freq_estimation_simplify, freq_estimation_simplify_error, fixed_freq_series=None, update_full_metadf=True, ): """Construct the Dataset class from a IO dataframe. The df, metadf, outliersdf, gaps, missing timestamps and observationtypes attributes are set. The observations are converted to the toolkit standard units if possible. Qc on IO is applied (duplicated check and invalid check) + gaps and missing values are defined by assuming a frequency per station. Parameters ---------- df : pandas.dataframe The dataframe containing the input observations and metadata. freq_estimation_method : 'highest' or 'median' Select wich method to use for the frequency estimation. If 'highest', the highest apearing frequency is used. If 'median', the median of the apearing frequencies is used. freq_estimation_simplify : bool If True, the likely frequency is converted to round hours, or round minutes. The "freq_estimation_simplify_error' is used as a constrain. If the constrain is not met, the simplification is not performed. freq_estimation_simplify_error : Timedelta or str, optional The tolerance string or object representing the maximum translation in time to form a simplified frequency estimation. Ex: '5min' is 5 minutes, '1h', is one hour. fixed_freq_series : pandas.series or None, optional If you do not want the frequencies to be recalculated, one can pass the frequency series to update the metadf["dataset_resolution"]. If None, the frequencies will be estimated. The default is None. update_full_metadf : bool, optional If True, the full Dataset.metadf will be updated. If False, only the frequency columns in the Dataset.metadf will be updated. The default is True. Returns ------- None. """ # Convert dataframe to dataset attributes self._initiate_df_attribute(dataframe=df, update_metadf=update_full_metadf) # Check observation types and convert units if needed. self._setup_of_obstypes_and_units() # Apply quality control on Import resolution self._apply_qc_on_import() if fixed_freq_series is None: freq_series = get_freqency_series( df=self.df, method=freq_estimation_method, simplify=freq_estimation_simplify, max_simplify_error=freq_estimation_simplify_error, ) freq_series_import = freq_series else: if "assumed_import_frequency" in self.metadf.columns: freq_series_import = self.metadf[ "assumed_import_frequency" ] # No update else: freq_series_import = fixed_freq_series freq_series = fixed_freq_series # add import frequencies to metadf (after import qc!) self.metadf["assumed_import_frequency"] = freq_series_import self.metadf["dataset_resolution"] = freq_series # Remove gaps and missing from the observations AFTER timecoarsening self.df = remove_gaps_from_obs(gaplist=self.gaps, obsdf=self.df) self.df = self.missing_obs.remove_missing_from_obs(obsdf=self.df) def _initiate_df_attribute(self, dataframe, update_metadf=True): """Initialize dataframe attributes.""" logger.info(f"Updating dataset by dataframe with shape: {dataframe.shape}.") # Create dataframe with fixed order of observational columns obs_col_order = [ col for col in list(self.obstypes.keys()) if col in dataframe.columns ] self.df = dataframe[obs_col_order].sort_index() if update_metadf: metadf = dataframe # create metadataframe metacolumns = list(self.template._get_metadata_column_map().values()) metacolumns.remove("name") # This is the index metadf = metadf.reindex(columns=metacolumns) metadf.index = metadf.index.droplevel("datetime") # drop datetimeindex # drop dubplicates due to datetime metadf = metadf[~metadf.index.duplicated(keep="first")] # "lat' and 'lon' column are required, so add them as empty if not present if "lat" not in metadf.columns: metadf["lat"] = np.nan if "lon" not in metadf.columns: metadf["lon"] = np.nan self.metadf = metadf_to_gdf(metadf) def _apply_qc_on_import(self): # if the name is Nan, remove these records from df, and metadf (before) # they end up in the gaps and missing obs if np.nan in self.df.index.get_level_values("name"): logger.warning( f'Following observations are not linked to a station name and will be removed: {xs_save(self.df, np.nan, "name")}' ) self.df = self.df[~self.df.index.get_level_values("name").isna()] if np.nan in self.metadf.index: logger.warning( f"Following station will be removed from the Dataset {self.metadf[self.metadf.index.isna()]}" ) self.metadf = self.metadf[~self.metadf.index.isna()] # find missing obs and gaps, and remove them from the df self.missing_obs, self.gaps = missing_timestamp_and_gap_check( df=self.df, gapsize_n=self.settings.gap["gaps_settings"]["gaps_finder"]["gapsize_n"], ) # Create gaps and missing obs objects # self.gaps = gaps_list # self.missing_obs = Missingob_collection(missing_obs) # Perform QC checks on original observation frequencies self.df, dup_outl_df = duplicate_timestamp_check( df=self.df, checks_info=self.settings.qc["qc_checks_info"], checks_settings=self.settings.qc["qc_check_settings"], ) if not dup_outl_df.empty: self.update_outliersdf(add_to_outliersdf=dup_outl_df) self.df, nan_outl_df = invalid_input_check( self.df, checks_info=self.settings.qc["qc_checks_info"] ) if not nan_outl_df.empty: self.update_outliersdf(nan_outl_df) self.outliersdf = self.outliersdf.sort_index() # update the order and which qc is applied on which obstype checked_obstypes = [ obs for obs in self.df.columns if obs in self.obstypes.keys() ] checknames = ["duplicated_timestamp", "invalid_input"] # KEEP order self._applied_qc = concat_save( [ self._applied_qc, conv_applied_qc_to_df( obstypes=checked_obstypes, ordered_checknames=checknames ), ], ignore_index=True, ) def _setup_of_obstypes_and_units(self): """Function to setup all attributes related to observation types and convert to standard units.""" # Check if all present observation types are known. unknown_obs_cols = [ obs_col for obs_col in self.df.columns if obs_col not in self.obstypes.keys() ] if len(unknown_obs_cols) > 0: sys.exit(f"The following observation types are unknown: {unknown_obs_cols}") for obs_col in self.df.columns: # Convert the units to the toolkit standards (if unit is known) self.df[obs_col] = self.obstypes[obs_col].convert_to_standard_units( input_data=self.df[obs_col], input_unit=self.template._get_input_unit_of_tlk_obstype(obs_col), ) # Update the description of the obstype description = self.template._get_description_of_tlk_obstype(obs_col) if pd.isna(description): description = None self.obstypes[obs_col].set_description(desc=description) # Update the original column name and original units self.obstypes[obs_col].set_original_name( self.template._get_original_obstype_columnname(obs_col) ) self.obstypes[obs_col].set_original_unit( self.template._get_input_unit_of_tlk_obstype(obs_col) ) # subset the obstypes attribute self.obstypes = { name: obj for name, obj in self.obstypes.items() if name in self.df.columns } # ============================================================================= # Physiography extractions # =============================================================================
[docs] def get_lcz(self): """Extract local climate zones for all stations. Function to extract the Local CLimate zones (LCZ) from the wudapt global LCZ map on the Google engine for all stations. A 'LCZ' column will be added to the metadf, and series is returned. Returns ------- lcz_series : pandas.Series() A series with the stationnames as index and the LCZ as values. Examples -------- .. code-block:: python import metobs_toolkit # Import data into a Dataset dataset = metobs_toolkit.Dataset() dataset.update_settings( input_data_file=metobs_toolkit.demo_datafile, input_metadata_file=metobs_toolkit.demo_metadatafile, template_file=metobs_toolkit.demo_template, ) dataset.import_data_from_file() # Get the local climate zones for all stations lcz_series = dataset.get_lcz() # in addition to the returned series, the metadf attribute is updated aswell print(dataset.metadf) """ # connect to gee connect_to_gee() # Extract LCZ for all stations lcz_series = lcz_extractor( metadf=self.metadf, mapinfo=self.settings.gee["gee_dataset_info"]["global_lcz_map"], ) # drop column if it was already present if "lcz" in self.metadf: self.metadf = self.metadf.drop(columns=["lcz"]) # update metadata self.metadf = self.metadf.merge( lcz_series.to_frame(), how="left", left_index=True, right_index=True, ) return lcz_series
[docs] def get_altitude(self): """Extract Altitudes for all stations. Function to extract the Altitude from the SRTM Digital Elevation Data global map on the Google engine for all stations. A 'altitude' column will be added to the metadf, and series is returned. Returns ------- altitude_series : pandas.Series() A series with the stationnames as index and the altitudes as values. Examples -------- .. code-block:: python import metobs_toolkit # Import data into a Dataset dataset = metobs_toolkit.Dataset() dataset.update_settings( input_data_file=metobs_toolkit.demo_datafile, input_metadata_file=metobs_toolkit.demo_metadatafile, template_file=metobs_toolkit.demo_template, ) dataset.import_data_from_file() # Get the altitude for all stations alt_series = dataset.get_altitude() # in addition to the returned series, the metadf attribute is updated aswell print(dataset.metadf) """ # connect to gee connect_to_gee() # Extract LCZ for all stations altitude_series = height_extractor( metadf=self.metadf, mapinfo=self.settings.gee["gee_dataset_info"]["DEM"], ) # drop column if it was already present if "altitude" in self.metadf: self.metadf = self.metadf.drop(columns=["altitude"]) # update metadata self.metadf = self.metadf.merge( altitude_series.to_frame(), how="left", left_index=True, right_index=True, ) return altitude_series
[docs] def get_landcover( self, buffers=[100], aggregate=True, overwrite=True, gee_map="worldcover", ): """Extract landcover for all stations. Extract the landcover fractions in a buffer with a specific radius for all stations. If an aggregation scheme is define, one can choose to aggregate the landcoverclasses. The landcover fractions will be added to the Dataset.metadf if overwrite is True. Presented as seperate columns where each column represent the landcovertype and corresponding buffer. Parameters ---------- buffers : num, optional The list of buffer radia in dataset units (meters for ESA worldcover) . The default is 100. aggregate : bool, optional If True, the classes will be aggregated with the corresponding aggregation scheme. The default is True. overwrite : bool, optional If True, the Datset.metadf will be updated with the generated landcoverfractions. The default is True. gee_map : str, optional The name of the dataset to use. This name should be present in the settings.gee['gee_dataset_info']. If aggregat is True, an aggregation scheme should included as well. The default is 'worldcover' Returns ------- frac_df : pandas.DataFrame A Dataframe with index: name, buffer_radius and the columns are the fractions. Examples -------- .. code-block:: python import metobs_toolkit # Import data into a Dataset dataset = metobs_toolkit.Dataset() dataset.update_settings( input_data_file=metobs_toolkit.demo_datafile, input_metadata_file=metobs_toolkit.demo_metadatafile, template_file=metobs_toolkit.demo_template, ) dataset.import_data_from_file() # Get the landcover fractions for multiple buffers, for all stations lc_frac_series = dataset.get_landcover(buffers=[50, 100, 250, 500], aggregate=False) # in addition to the returned dataframe, the metadf attribute is updated aswell print(dataset.metadf) """ # connect to gee connect_to_gee() df_list = [] for buffer in buffers: logger.info( f"Extracting landcover from {gee_map} with buffer radius = {buffer}" ) # Extract landcover fractions for all stations lc_frac_df, buffer = lc_fractions_extractor( metadf=self.metadf, mapinfo=self.settings.gee["gee_dataset_info"][gee_map], buffer=buffer, agg=aggregate, ) # add buffer to the index lc_frac_df["buffer_radius"] = buffer lc_frac_df = lc_frac_df.reset_index().set_index(["name", "buffer_radius"]) lc_frac_df = lc_frac_df.sort_index() # add to the list df_list.append(lc_frac_df) # concat all df for different buffers to one frac_df = concat_save(df_list) frac_df = frac_df.sort_index() if overwrite: for buf in frac_df.index.get_level_values("buffer_radius").unique(): buf_df = xs_save(frac_df, buf, level="buffer_radius") buf_df.columns = [col + f"_{int(buf)}m" for col in buf_df.columns] # overwrite the columns or add them if they did not exist self.metadf[buf_df.columns] = buf_df return frac_df