#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
This module contains the Analysis class and all its methods.
A Analysis holds a set of 'good' observations and the methods will analyse it.
"""
from datetime import datetime
import pandas as pd
import numpy as np
import logging
import copy
from scipy.stats import pearsonr
from metobs_toolkit.plotting_functions import (
cycle_plot,
heatmap_plot,
correlation_scatter,
)
from metobs_toolkit.df_helpers import (
datetime_subsetting,
subset_stations,
fmt_datetime_argument,
)
logger = logging.getLogger(__name__)
from metobs_toolkit.datasetbase import _DatasetBase
[docs]
class Analysis(_DatasetBase):
"""The Analysis class contains methods for analysing observations."""
[docs]
def __init__(self, obsdf, metadf, settings, obstypes):
_DatasetBase.__init__(self)
self._set_df(df=obsdf)
self._set_metadf(metadf)
self._set_obstypes(obstypes)
self._set_settings(settings)
# analysis objects
self.lc_cor_dict = {}
self._lc_cor_obstype = None
self._lc_groupby_labels = None
# add empty lcz column to metadf if it is not present
if "lcz" not in self.metadf.columns:
self.metadf["lcz"] = np.nan
def __str__(self):
"""Print a overview of the analysis."""
if self.df.empty:
return "Empty Analysis instance."
add_info = ""
n_stations = self.df.index.get_level_values("name").unique().shape[0]
n_obs_tot = self.df.shape[0]
startdt = self.df.index.get_level_values("datetime").min()
enddt = self.df.index.get_level_values("datetime").max()
if (not self.metadf["lat"].isnull().all()) & (
not self.metadf["lon"].isnull().all()
):
add_info += " *Coordinates are available for all stations. \n"
if not self.metadf["lcz"].isnull().all():
add_info += " *LCZ's are available for all stations. \n"
if bool(self.lc_cor_dict):
add_info += f" *landcover correlations are computed on group: {self._lc_groupby_labels} \n"
return (
f"Analysis instance containing: \n \
*{n_stations} stations \n \
*{self.df.columns.to_list()} observation types \n \
*{n_obs_tot} observation records \n{add_info} \n \
*records range: {startdt} --> {enddt} (total duration: {enddt - startdt})"
+ add_info
)
def __repr__(self):
"""Print a overview of the analysis."""
return self.__str__()
# =============================================================================
# Setters
# =============================================================================
[docs]
def subset_period(self, startdt, enddt):
"""Subset the observations of the Analysis to a specific period.
The same timezone is assumed as the data.
Parameters
----------
startdt : datetime.datetime
The start datetime to filter the observations to.
enddt : datetime.datetime
The end datetime to filter the observations to.
Returns
-------
None.
Note
--------
If a timezone unaware datetime is given as an argument, it is interpreted
as if it has the same timezone as the observations.
"""
if not isinstance(startdt, type(datetime(2020, 1, 1))):
logger.info(f" {startdt} not a datetime type. Ignore subsetting!")
return
if not isinstance(enddt, type(datetime(2020, 1, 1))):
logger.info(f" {enddt} not a datetime type. Ignore subsetting!")
return
startdt = fmt_datetime_argument(
startdt, self.settings.time_settings["timezone"]
)
enddt = fmt_datetime_argument(enddt, self.settings.time_settings["timezone"])
self.df = datetime_subsetting(self.df, startdt, enddt)
# =============================================================================
# Helpers
# =============================================================================
[docs]
def apply_filter(self, expression):
"""Filter an Analysis by a user definde string expression.
This can be used to filter the observation to specific meteorological
conditions (i.e. low windspeeds, high humidity, cold temperatures, ...)
The filter expression contains only columns present in the Analysis.df
and/or the Analysis.metadf.
A New Analysis object is returned.
Parameters
----------
expression : str
A filter expression using columnnames present in either df or metadf.
The following timestamp derivatives can be used as well: [minute, hour,
month, year, day_of_year, week_of_year, season]. The quarry_str may
contain number and expressions like <, >, ==, >=, \*, +, .... Multiple filters
can be combine to one expression by using & (AND) and | (OR).
Returns
-------
filtered_analysis : metobs_toolkit.Analysis
The filtered Analysis.
Note
-------
All timestamp derivative values are numeric except for 'season',
possible values are ['winter', 'spring', 'summer', 'autumn'].
Note
------
Make sure to use \" of \' to indicate string values in the expression if
needed.
"""
child_df, child_metadf = filter_data(
df=self.df, metadf=self.metadf, quarry_str=expression
)
return Analysis(
obsdf=child_df,
metadf=child_metadf,
settings=self.settings,
obstypes=self.obstypes,
)
[docs]
def aggregate_df(self, df=None, agg=["lcz", "hour"], method="mean"):
"""Aggregate observations to a (list of) categories.
The output will be a dataframe that is aggregated to one, or more
categories. A commen example is aggregating to LCZ's.
Parameters
----------
df : pandas.DataFrame or None
The observations to aggregate. If None, the df attribute of the
Analysis instance is used. The default is None.
agg : list, optional
The list of columnnames to aggregate to. If 'lcz' is included, the
lcz information is extracted from the Analysis.metadf. The default
is ['lcz', 'datetime'].
method : str, optional
list of functions and/or function names, e.g. [np.sum, 'mean']. The
default is 'mean'.
Returns
-------
pandas.DataFrame
A dataframe with the agg columns as an index. The values are the
aggregated values.
Note
-------
Present columns that ar non-numeric and are not in the agg list, are
not present in the return, since these values cannot be aggregated.
"""
if df is None:
df = copy.deepcopy(self.df)
df = df.reset_index()
time_agg_keys = [
"minute",
"hour",
"month",
"year",
"day_of_year",
"week_of_year",
"season",
]
# scan trough the metadf for aggregation keys
for agg_key in agg:
if agg_key not in df.columns:
# look in metadf
if agg_key in self.metadf.columns:
df = pd.merge(
df,
self.metadf[[agg_key]],
how="left",
left_on="name",
right_index=True,
)
# Check if all agg keys are present or defined:
possible_agg_keys = time_agg_keys
possible_agg_keys.extend(list(df.columns))
unmapped = [agg_key for agg_key in agg if agg_key not in possible_agg_keys]
assert len(unmapped) == 0, f"cannot aggregate to unknown labels: {unmapped}."
# make time-derivate columns if required
df = _make_time_derivatives(df, agg)
# check if not all values are Nan
for agg_name in agg:
assert (
not df[agg_name].isnull().all()
), f"Aggregation to {agg_name} not possible because no valid values found for {agg_name}."
# remove datetime column if present, because no aggregation can be done on
# datetime and it gives a descrepation warning
if "datetime" in df.columns:
df = df.drop(columns=["datetime"])
# Remove name column if present and not in the aggregation scheme,
# this happens because name was in the index
if "name" not in agg:
df = df.drop(columns=["name"], errors="ignore")
# Aggregate the df
agg_df = df.groupby(agg).agg(method, numeric_only=True) # descrepation warning
# sort index
agg_df = agg_df.reset_index()
agg_df = agg_df.set_index(agg)
return agg_df
# =============================================================================
# Analyse method
# =============================================================================
[docs]
def get_anual_statistics(
self,
groupby=["name"],
obstype="temp",
agg_method="mean",
stations=None,
startdt=None,
enddt=None,
plot=True,
errorbands=False,
title=None,
y_label=None,
legend=True,
_return_all_stats=False,
):
"""
Create an anual cycle for aggregated groups.
(In the plot, unique combination of groupby categories is presented
as a line.)
Parameters
----------
groupby : list string, optional
Variables to aggregate to. These can be columns in the metadf, or
time aggregations ('hour', 'year', 'week_of_year', ...]. 'name' will
aggregate to the stationnames. The default is ['name'].
obstype : str, optional
Element of the metobs_toolkit.observation_types The default is 'temp'.
agg_method : str, optional
Function names to use for aggregation, e.g. [np.sum, 'mean']. The
default is 'mean'.
stations : list, optional
List of station names to use. If None, all present stations will be used. The default is None.
startdt : datetime.datetime, optional
The start datetime of the observations to use. If None, all timestamps will be used. The default is None.
enddt : datetime.datetime, optional
The end datetime of the observations to use. If None, all timestamps will be used. The default is None.
plot : bool, optional
If True, an anual plot is made. The default is True.
errorbands : bool, optional
If True, the std is representd in the plot by colored bands. The default is False.
title : string, optional
Title of the figure, if None a default title is generated. The default is None.
y_label : string, optional
y-axes label of the figure, if None a default label is generated. The default is None.
legend : bool, optional
I True, a legend is added to the plot. The default is True.
Returns
-------
df : pandas.DataFrame()
The dataframe containing the aggregated values.
Note
--------
If a timezone unaware datetime is given as an argument, it is interpreted
as if it has the same timezone as the observations.
"""
# title
assert (
obstype in self.obstypes
), f"{obstype} is not a known observation type: \n {self.obstypes}"
Obstype = self.obstypes[obstype]
if title is None:
title = f"Anual {Obstype.name} cycle plot per {groupby}."
else:
title = str(title)
# ylabel
if y_label is None:
y_label = f"{Obstype.get_plot_y_label()}"
else:
y_label = str(y_label)
stats = self.get_aggregated_cycle_statistics(
obstype=obstype,
stations=stations,
aggregation=groupby,
aggregation_method=agg_method,
horizontal_axis="month",
startdt=startdt,
enddt=enddt,
plot=plot,
title=title,
y_label=y_label,
legend=legend,
errorbands=errorbands,
verbose=_return_all_stats,
)
return stats
[docs]
def get_diurnal_statistics(
self,
colorby="name",
obstype="temp",
stations=None,
startdt=None,
enddt=None,
plot=True,
title=None,
y_label=None,
legend=True,
errorbands=False,
_return_all_stats=False,
):
"""
Create an average diurnal cycle for the observations.
(In the plot, each station is represed by a line.)
Parameters
----------
colorby : 'name' or 'lcz', optional
If 'name' the plotted lines will be colored per station, if 'lcz' the colors represent the stations lcz. The default is 'name'.
obstype : str, optional
Element of the metobs_toolkit.observation_types The default is 'temp'.
stations : list, optional
List of station names to use. If None, all present stations will be used. The default is None.
startdt : datetime.datetime, optional
The start datetime of the observations to use. If None, all timestamps will be used. The default is None.
enddt : datetime.datetime, optional
The end datetime of the observations to use. If None, all timestamps will be used. The default is None.
plot : bool, optional
If True, an diurnal plot is made. The default is True.
title : string, optional
Title of the figure, if None a default title is generated. The default is None.
y_label : string, optional
y-axes label of the figure, if None a default label is generated. The default is None.
legend : bool, optional
I True, a legend is added to the plot. The default is True.
errorbands : bool, optional
If True, the std is representd in the plot by colored bands. The default is False.
Returns
-------
df : pandas.DataFrame()
The dataframe containing the aggregated values.
Note
--------
If a timezone unaware datetime is given as an argument, it is interpreted
as if it has the same timezone as the observations.
"""
# title
assert (
obstype in self.obstypes
), f"{obstype} is not a known observation type: \n {self.obstypes}"
Obstype = self.obstypes[obstype]
if title is None:
if startdt is None:
if enddt is None:
title = f"Hourly average {Obstype.name} diurnal cycle"
else:
title = f"Hourly average {Obstype.name} diurnal cycle until {enddt}"
else:
if enddt is None:
title = (
f"Hourly average {Obstype.name} diurnal cycle from {startdt}"
)
else:
title = f"Hourly average {Obstype.name} diurnal cycle for period {startdt} - {enddt}"
else:
title = str(title)
# ylabel
if y_label is None:
y_label = f"{Obstype.get_plot_y_label()}"
else:
y_label = str(y_label)
stats = self.get_aggregated_cycle_statistics(
obstype=obstype,
stations=stations,
aggregation=[colorby],
aggregation_method="mean",
horizontal_axis="hour",
startdt=startdt,
enddt=enddt,
plot=plot,
title=title,
y_label=y_label,
legend=legend,
errorbands=errorbands,
verbose=_return_all_stats,
)
return stats
[docs]
def get_diurnal_statistics_with_reference(
self,
refstation,
colorby="name",
obstype="temp",
tolerance="30min",
stations=None,
startdt=None,
enddt=None,
plot=True,
title=None,
y_label=None,
legend=True,
errorbands=False,
show_zero_horizontal=True,
_return_all_stats=False,
):
"""
Create an average diurnal cycle for the observation differences of a reference station.
All observational values are converted to differences with the closest
(in time) reference observation. No reference observation is found when
the time difference is larger than the tolerance.
(In the plot, each station is represed by a line.)
Parameters
----------
refstation : str,
Name of the station to use as a reference.
colorby : 'name' or 'lcz', optional
If 'name' the plotted lines will be colored per station, if 'lcz' the colors represent the stations lcz. The default is 'name'.
obstype : str, optional
Element of the metobs_toolkit.observation_types The default is 'temp'.
tolerance : Timedelta or str, optional
The tolerance string or object representing the maximum translation in time to find a reference
observation for each observation. Ex: '5min' is 5 minutes, '1h', is one hour. The default is '30min'.
stations : list, optional
List of station names to use. If None, all present stations will be used. The default is None.
startdt : datetime.datetime, optional
The start datetime of the observations to use. If None, all timestamps will be used. The default is None.
enddt : datetime.datetime, optional
The end datetime of the observations to use. If None, all timestamps will be used. The default is None.
plot : bool, optional
If True, a diurnal plot is made. The default is True.
title : string, optional
Title of the figure, if None a default title is generated. The default is None.
y_label : string, optional
y-axes label of the figure, if None a default label is generated. The default is None.
legend : bool, optional
I True, a legend is added to the plot. The default is True.
errorbands : bool, optional
If True, the std is representd in the plot by colored bands. The upper bound represents +1 x std, the lower bound -1 x std. The default is False.
show_zero_horizontal : bool, optional
If True a horizontal line is drawn in the plot at zero. The default is True.
Returns
-------
df : pandas.DataFrame()
The dataframe containing the aggregated values.
Note
--------
If a timezone unaware datetime is given as an argument, it is interpreted
as if it has the same timezone as the observations.
"""
assert (
obstype in self.obstypes
), f"{obstype} is not a known observation type: \n {self.obstypes}"
Obstype = self.obstypes[obstype]
obsdf = self.df
obsdf = obsdf[Obstype.name].reset_index()
# extract refernce from observations
refdf = obsdf[obsdf["name"] == refstation]
obsdf = obsdf[obsdf["name"] != refstation]
assert (
not refdf.empty
), f"Error: No reference observation found (after filtering) for {refstation}"
assert not obsdf.empty, "Error: No observation found (after filtering)"
# Syncronize observations with the reference observations
refdf = refdf.rename(
columns={Obstype.name: "ref_" + Obstype.name, "datetime": "ref_datetime"}
)
mergedf = pd.merge_asof(
left=obsdf.sort_values("datetime"),
right=refdf[["ref_datetime", "ref_" + Obstype.name]].sort_values(
"ref_datetime"
),
right_on="ref_datetime",
left_on="datetime",
direction="nearest",
tolerance=pd.Timedelta(tolerance),
)
# Get differnces
mergedf["temp"] = mergedf["temp"] - mergedf["ref_temp"]
# Subset to relavent columns
mergedf = mergedf.reset_index()
mergedf = mergedf[["name", "datetime", Obstype.name]]
mergedf = mergedf.set_index(["name", "datetime"])
# title
if title is None:
if startdt is None:
if enddt is None:
title = f"Hourly average {Obstype.name} diurnal cycle, with {refstation} as reference,"
else:
title = f"Hourly average {Obstype.name} diurnal cycle, with {refstation} as reference, until {enddt}"
else:
if enddt is None:
title = f"Hourly average {Obstype.name} diurnal cycle, with {refstation} as reference, from {startdt}"
else:
title = f"Hourly average {Obstype.name} diurnal cycle, with {refstation} as reference, for period {startdt} - {enddt}"
else:
title = str(title)
# ylabel
if y_label is None:
y_label = f"{Obstype.get_plot_y_label()}"
else:
y_label = str(y_label)
stats = self.get_aggregated_cycle_statistics(
obstype=Obstype.name,
stations=stations,
aggregation=[colorby],
aggregation_method="mean",
horizontal_axis="hour",
startdt=startdt,
enddt=enddt,
plot=plot,
title=title,
y_label=y_label,
legend=legend,
errorbands=errorbands,
verbose=_return_all_stats,
_obsdf=mergedf,
_show_zero_line=show_zero_horizontal,
)
return stats
[docs]
def get_aggregated_cycle_statistics(
self,
obstype="temp",
aggregation=["lcz", "datetime"],
aggregation_method="mean",
horizontal_axis="hour",
stations=None,
startdt=None,
enddt=None,
plot=True,
title=None,
y_label=None,
legend=True,
errorbands=False,
verbose=False,
_obsdf=None,
_show_zero_line=False,
):
"""Create an average cycle for an aggregated categorie.
A commen example is to aggregate to the LCZ's, so to get the diurnal
cycle per LCZ rather than per station.
(In the plot, each aggregated category different from datetime, is represed by a line.)
Parameters
----------
obstype : str, optional
Element of the metobs_toolkit.observation_types The default is 'temp'.
aggregation : list, optional
List of variables to aggregate to. These variables should either a
categorical observation type, a categorical column in the metadf or
a time aggregation. All possible time aggreagetions are: ['minute',
'hour', 'month', 'year', 'day_of_year',
'week_of_year', 'season']. The default is ['lcz', 'datetime'].
aggregation_method : str, optional
Which (numpy) function is used to aggregate the observations. The default is 'mean'.
horizontal_axis : str, optional
Which aggregated value will be represented on the horizontal axis
of the plot. The default is 'hour'.
stations : list, optional
List of station names to use. If None, all present stations will be used. The default is None.
startdt : datetime.datetime, optional
The start datetime of the observations to use. If None, all timestamps will be used. The default is None.
enddt : datetime.datetime, optional
The end datetime of the observations to use. If None, all timestamps will be used. The default is None.
plot : bool, optional
If True, a diurnal plot is made. The default is True.
title : string, optional
Title of the figure, if None a default title is generated. The default is None.
y_label : string, optional
y-axes label of the figure, if None a default label is generated. The default is None.
legend : bool, optional
I True, a legend is added to the plot. The default is True.
errorbands : bool, optional
If True, the std is representd in the plot by colored bands. The default is False.
verbose : True, optional
If True, an additional dataframe with aggregation information is returned . The default is False.
Returns
-------
df : pandas.DataFrame()
The dataframe containing the aggregated values.
Note
-------
If a timezone unaware datetime is given as an argument, it is interpreted
as if it has the same timezone as the observations.
"""
assert (
obstype in self.obstypes
), f"{obstype} is not a known observation type: \n {self.obstypes}"
Obstype = self.obstypes[obstype]
if _obsdf is None:
obsdf = self.df[[obstype]]
else:
obsdf = _obsdf
assert not obsdf.empty, f"Error: No observations in the analysis.df: {self.df}"
# Filter stations
if stations is not None:
if isinstance(stations, str):
stations = [stations]
obsdf = subset_stations(obsdf, stations)
assert (
not obsdf.empty
), f"Error: No more observations after subsetting to {stations}"
# Filter datetimes
obsdf = datetime_subsetting(df=obsdf, starttime=startdt, endtime=enddt)
assert (
not obsdf.empty
), f"Error: No more observations after subsetting to {startdt} and {enddt}"
startdt = obsdf.index.get_level_values("datetime").min()
enddt = obsdf.index.get_level_values("datetime").max()
# add hour to aggregation (will be the x-axis)
if horizontal_axis not in aggregation:
aggregation.insert(0, horizontal_axis)
# add other methods for errorbands and stats
methods = ["mean", "std", "median"]
methods.append(aggregation_method)
methods = list(set(methods))
# compute the aggregation statistics
aggdf = self.aggregate_df(df=obsdf, agg=aggregation, method=methods)
# since only one observation type is in the stats, drop the column
# level with the obstye, this is not relevant
aggdf = aggdf.droplevel(0, axis="columns")
# format dataframe for plotting
# Categories to string format
aggdf = aggdf.reset_index()
for idx_col in aggdf:
if idx_col == horizontal_axis:
continue # if numeric, let it be numeric!
aggdf[idx_col] = aggdf[idx_col].astype(str)
aggdf = aggdf.set_index(aggregation)
# sorting cateigories (months and seisons)
seasons = ["winter", "spring", "summer", "autumn"]
months = [
"January",
"February",
"March",
"April",
"May",
"June",
"July",
"August",
"September",
"October",
"November",
"December",
]
season_order_dict = {}
months_order_dict = {}
for i, item in enumerate(seasons):
season_order_dict[item] = i
for i, item in enumerate(months):
months_order_dict[item] = i
# Sort columns
aggdf = aggdf.reset_index()
sort_list = aggregation.copy()
if "season" in aggdf.columns:
aggdf["season_num"] = aggdf["season"].map(season_order_dict)
sort_list = ["season_num" if x == "season" else x for x in sort_list]
if "month" in aggdf.columns:
aggdf["month_num"] = aggdf["month"].map(months_order_dict)
sort_list = ["month_num" if x == "month" else x for x in sort_list]
# sort dataframe
aggdf = aggdf.sort_values(sort_list, axis=0)
# drop dummy num coluns (if they are present)
aggdf = aggdf.drop(columns=["season_num", "month_num"], errors="ignore")
# reset the index
aggdf = aggdf.set_index(aggregation)
# unstack aggregation
aggregation.remove(horizontal_axis) # let horizontal axes be the index
all_stats = aggdf.unstack(aggregation) # return on verbose
# Sort index if categorical
if all_stats.index.name == "season":
all_stats = all_stats.reindex(seasons)
if all_stats.index.name == "month":
all_stats = all_stats.reindex(months)
# split in values and std
values_df = all_stats[aggregation_method]
std_df = all_stats["std"]
# make sure all data is numeric
values_df = values_df.astype(float)
std_df = std_df.astype(float)
# squize all column levels to one category for plotting
if len(aggregation) > 1: # more than one level for the columns
values_df.columns = [
" ,".join(col).strip() for col in values_df.columns.values
]
std_df.columns = [" ,".join(col).strip() for col in std_df.columns.values]
if plot:
# generate title
if title is None:
startdtstr = datetime.strftime(
startdt, format=self.settings.app["print_fmt_datetime"]
)
enddtstr = datetime.strftime(
enddt, format=self.settings.app["print_fmt_datetime"]
)
title = f"{aggregation_method} - {horizontal_axis } {Obstype.name} cycle for period {startdtstr} - {enddtstr} grouped by {aggregation}"
# ylabel
if y_label is None:
y_label = f"{Obstype.get_plot_y_label()}"
else:
y_label = str(y_label)
# generate errorbands df
if errorbands:
stddf = std_df
else:
stddf = None
# Make plot
ax = cycle_plot(
cycledf=values_df,
errorbandsdf=stddf,
title=title,
plot_settings=self.settings.app["plot_settings"]["diurnal"],
aggregation=aggregation,
# data_template=self.data_template,
obstype=Obstype.name,
y_label=y_label,
legend=legend,
show_zero_horizontal=_show_zero_line,
)
ax.set_ylabel(y_label)
if horizontal_axis == "hour":
# extract timezone
tzstring = str(self.df.index.get_level_values("datetime").tz)
ax.xaxis.set_major_formatter("{x:.0f} h")
ax.set_xlabel(f"Hours (timezone: {tzstring})")
if verbose:
if plot:
return values_df, all_stats, ax
return values_df, all_stats
return values_df
# =============================================================================
# Correlations analysis
# =============================================================================
[docs]
def get_lc_correlation_matrices(self, obstype=["temp"], groupby_labels=["hour"]):
"""Compute pearson correlation coeficients.
A method to compute the Pearson correlation between an obervation type
and present landcover fractions in the metadf.
The correlations are computed per group as defined by unique combinations
of the groupby_labels.
A dictionary is returnd where each key represents a unique combination of
the groupby_labels. The value is a dictionary with the following keys
and values:
* cor matrix: the Pearson correlation matrix
* significance matrix: the significance (p-)values of the correlations.
* combined matrix: A human readable combination of the correlations and their p values. Indicate by \*, \*\* or \*\*\* representing p-values < 0.05, 0.01 and 0.001 respectively.
This dictionary is also stored as a lc_cor_dict attribute.
Parameters
----------
obstype : str, or list optional
The observation type(s) to compute the correlations on. The default is ['temp'].
groupby_labels : list, optional
List of variables to form one group, resulting in one correlation.
These variables should either a categorical observation type, a categorical column in the metadf or
a time aggregation. All possible time aggreagetions are: ['minute',
'hour', 'month', 'year', 'day_of_year',
'week_of_year', 'season']. The default is ['hour'].
Returns
-------
cor_dict : dict
A nested dictionary with unique combinations of groupby values.
"""
if not isinstance(obstype, list):
obstype = [obstype]
# get data
df = self.df[obstype].reset_index()
df = _make_time_derivatives(df, groupby_labels)
for group_lab in groupby_labels:
if group_lab in self.metadf.columns:
df = df.merge(
self.metadf[[group_lab]],
how="left",
left_on="name",
right_index=True,
)
for group_lab in groupby_labels:
assert (
group_lab in df.columns
), f'"{group_lab}" is found in the observations of possible groupby_labels.'
# subset columns
relev_columns = [label for label in groupby_labels] # to avoid deep copy import
relev_columns.append("name")
relev_columns.extend(obstype)
df = df[relev_columns]
# find landcover columnnames in the metadf
lc_columns = [
col for col in self.metadf.columns if (("_" in col) & (col.endswith("m")))
]
# get landcover data
lc_df = self.metadf[lc_columns]
if lc_df.empty:
logger.warning(
"No landcover columns found in the metadf. Landcover correlations cannot be computed."
)
return None
# merge together
df = df.merge(lc_df, how="left", left_on="name", right_index=True)
# remove name column if it is not explicit in the groupby labels
if "name" not in groupby_labels:
df = df.drop(columns=["name"])
# create return
cor_dict = {}
# Iterate over all groups
# avoid futur pandas warning for groupby labels of len==1
if len(groupby_labels) == 1:
groups = df.groupby(groupby_labels[0])
else:
groups = df.groupby(groupby_labels)
for group_lab, groupdf in groups:
# No correlations can be computed when no variance is found
if groupdf.shape[0] <= 1:
logger.warning(
f"No variance found in correlationd group {group_lab}. Correlation thus not be computed for this group: {groupdf}."
)
continue
# drop groupby labels
groupdf = groupdf.drop(columns=groupby_labels, errors="ignore")
rho = groupdf.corr(method="pearson")
pval = groupdf.corr(method=lambda x, y: pearsonr(x, y)[1]) - np.eye(
*rho.shape
)
# represent p values by stars
p_stars = pval.applymap(
lambda x: "".join(["*" for t in [0.05, 0.01, 0.001] if x <= t])
)
# combined human readable df
comb_df = pd.DataFrame(index=rho.index)
for col in rho.columns:
comb_df[col] = (
rho[col].apply(lambda x: f"{x:.02f}") + " " + p_stars[col]
)
cor_dict[group_lab] = {
"cor matrix": rho,
"significance matrix": pval,
"combined matrix": comb_df,
}
# Update attribute
self.lc_cor_dict = cor_dict
self._lc_cor_obstype = obstype
self._lc_groupby_labels = groupby_labels
return cor_dict
[docs]
def plot_correlation_heatmap(
self, groupby_value=None, title=None, _return_ax=False
):
"""Make a heatmap plot af a correaltion matrix.
To specify which correlation matrix to plot, specify the group value
using the groupby_value argument.
All possible groupby_values are the keys of the lc_cor_dict attribute.
Parameters
----------
groupby_value : str, num, None, optional
A groupby value to indicate which correlation matrix to visualise.
If None is given, the first groupby value that is present is
chosen.The default is None.
title : str, optional
Title of the figure. If None, a default title is constructed.The
default is None.
Returns
-------
None.
Note
------
To list all possible groupby_values, one can use
` print(Analysis_instance.lc_cor_dict.keys())`
"""
# check if there are correlation matrices
assert bool(
self.lc_cor_dict
), "No correlation matrices found, use the metod get_lc_correlation_matrices first."
if groupby_value is None:
groupby_value = list(self.lc_cor_dict.keys())[0]
logger.warning(
"No groupby_value is given, so the first groupby value (={groupby_value}) will be used!"
)
logger.info(
f"The correlations are computed over {self._lc_groupby_labels} with the following unique values: {list(self.lc_cor_dict.keys())}"
)
# check if groupby value exists
assert (
groupby_value in self.lc_cor_dict.keys()
), f"{groupby_value} not found as a groupby value. These are all the possible values: {self.lc_cor_dict.keys()}"
if title is None:
title = f"Correlation heatmap for group: {self._lc_groupby_labels} = {groupby_value}"
ax = heatmap_plot(
cor_dict=self.lc_cor_dict[groupby_value],
title=title,
heatmap_settings=self.settings.app["plot_settings"]["correlation_heatmap"],
)
if _return_ax:
return ax
[docs]
def plot_correlation_variation(self, title=None):
"""Create correlation scatter plot.
Make a scatter plot of the correlations to visualise differences between
multiple group values.
Group values are represented by the horizontal axes, and correlations
on the vertical axe.
All correlations, that are not constant, are plotted as scatters with
unique colors.
The scatter marker indicates the p-value of the correlations.
Parameters
----------
title : str, optional
Title of the figure. If None, a default title is constructed. The
default is None.
Returns
-------
None.
Note
------
If to many possible group values exist, one can use the apply_filter()
method to reduce the group values.
"""
# check if there are correlation matrices
assert bool(
self.lc_cor_dict
), "No correlation matrices found, use the metod get_lc_correlation_matrices first."
# check if correlation evolution information is available
if len(self.lc_cor_dict.keys()) <= 1:
logger.warning(
f"Only one correlation group is found: {self.lc_cor_dict.keys()}"
)
logger.warning("The variance plot can not be made.")
return
if title is None:
title = f"Correlation scatter for group: {self._lc_groupby_labels}"
ax = correlation_scatter(
full_cor_dict=self.lc_cor_dict,
groupby_labels=self._lc_groupby_labels,
obstypes=self._lc_cor_obstype,
title=title,
cor_scatter_settings=self.settings.app["plot_settings"][
"correlation_scatter"
],
)
return ax
def _make_time_derivatives(df, required, get_all=False):
"""Construct time derivated columns if required.
datetime must be a column.
"""
if ("minute" in required) | (get_all):
df["minute"] = df["datetime"].dt.minute
if ("hour" in required) | (get_all):
df["hour"] = df["datetime"].dt.hour
if ("month" in required) | (get_all):
df["month"] = df["datetime"].dt.month_name()
if ("year" in required) | (get_all):
df["year"] = df["datetime"].dt.year
if ("day_of_year" in required) | (get_all):
df["day_of_year"] = df["datetime"].dt.day_of_year
if ("week_of_year" in required) | (get_all):
df["week_of_year"] = df["datetime"].dt.isocalendar()["week"]
if ("season" in required) | (get_all):
df["season"] = get_seasons(df["datetime"])
return df
def get_seasons(
datetimeseries,
start_day_spring="01/03",
start_day_summer="01/06",
start_day_autumn="01/09",
start_day_winter="01/12",
):
"""Convert a datetimeseries to a season label (i.g. categorical).
Parameters
----------
datetimeseries : datetime.datetime
The timeseries that you want to split up in seasons.
start_day_spring : str , optional
Start date for spring, default is '01/03' and if changed the input
should have the same format as the default value.
start_day_summer : str , optional
Start date for summer, default is '01/06' and if changed the input
should have the same format as the default value.
start_day_autumn : str , optional
Start date for autumn, default is '01/09' and if changed the input
should have the same format as the default value.
start_day_winter : str , optional
Start date for winter, default is '01/12' and if changed the input
should have the same format as the default value.
Returns
-------
output : dataframe
A obtained dataframe that has where a label for the seasons has been added.
"""
spring_startday = datetime.strptime(start_day_spring, "%d/%m")
summer_startday = datetime.strptime(start_day_summer, "%d/%m")
autumn_startday = datetime.strptime(start_day_autumn, "%d/%m")
winter_startday = datetime.strptime(start_day_winter, "%d/%m")
seasons = pd.Series(
index=["spring", "summer", "autumn", "winter"],
data=[spring_startday, summer_startday, autumn_startday, winter_startday],
name="startdt",
).to_frame()
seasons["day_of_year"] = seasons["startdt"].dt.day_of_year - 1
bins = [0]
bins.extend(seasons["day_of_year"].to_list())
bins.append(366)
labels = ["winter", "spring", "summer", "autumn", "winter"]
return pd.cut(
x=datetimeseries.dt.day_of_year,
bins=bins,
labels=labels,
ordered=False,
)
def filter_data(df, metadf, quarry_str):
"""Filter a dataframe by a user definde string expression.
This can be used to filter the observation to specific meteorological
conditions (i.e. low windspeeds, high humidity, cold temperatures, ...)
The filter expression contains only columns present in the df and/or the
metadf.
The filtered df and metadf are returned
Parameters
----------
df : pandas.DataFrame
The dataframe containing all the observations to be filterd.
metadf : pandas.DataFrame
The dataframe containig all the metadata per station.
quarry_str : str
A filter expression using columnnames present in either df or metadf.
The following timestamp derivatives can be used as well: [minute, hour,
month, year, day_of_year, week_of_year, season]. The quarry_str may
contain number and expressions like <, >, ==, >=, \*, +, .... Multiple filters
can be combine to one expression by using & (AND) and | (OR).
Returns
-------
filter_df : pandas.DataFrame
The filtered df.
filter_metadf : pandas.DataFrame
The filtered metadf.
"""
# save index order and names for reconstruction
df_init_idx = list(df.index.names)
metadf_init_idx = list(metadf.index.names)
# easyer for sperationg them
df = df.reset_index()
metadf = metadf.reset_index()
# save columns orders
df_init_cols = df.columns
metadf_init_cols = metadf.columns
# create time derivative columns
df = _make_time_derivatives(df, required=" ", get_all=True)
# merge together on name
mergedf = df.merge(metadf, how="left", on="name")
# apply filter
filtered = mergedf.query(expr=quarry_str)
# split to df and metadf
filter_df = filtered[df_init_cols]
filter_metadf = filtered[metadf_init_cols]
# set indexes
filter_df = filter_df.set_index(df_init_idx)
filter_metadf = filter_metadf.set_index(metadf_init_idx)
return filter_df, filter_metadf