"""
created matt_dumont
on: 21/09/23
"""
import traceback
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from matplotlib.transforms import blended_transform_factory
from matplotlib.lines import Line2D
from pathlib import Path
from copy import deepcopy
from scipy.stats import mstats
import itertools
from komanawa.kendall_stats.mann_kendall import _mann_kendall_from_sarray, _seasonal_mann_kendall_from_sarray, \
_calc_seasonal_senslope, get_colors, _make_s_array
def _generate_startpoints(n, min_size, nparts, check_step=1, check_window=None, test=False):
if check_window is None:
if nparts == 2:
all_start_points_out = np.arange(min_size, n - min_size, check_step)[:, np.newaxis]
if test:
assert ((all_start_points_out - 0 >= min_size) & (n - all_start_points_out >= min_size)).all()
else:
all_start_points = []
for part in range(nparts - 1):
start_points = np.arange(min_size + min_size * part, n - (min_size * (nparts - 1 - part)), check_step)
all_start_points.append(start_points)
all_start_points_out = np.array(list(itertools.product(*all_start_points)))
all_start_points_out = all_start_points_out[np.all(
[all_start_points_out[:, i] < all_start_points_out[:, i + 1] for i in range(nparts - 2)],
axis=0)]
temp = np.concatenate((
np.array(all_start_points_out),
np.full((len(all_start_points_out), 1), n)
), axis=1)
sizes = np.diff(temp, axis=1)
all_start_points_out = all_start_points_out[np.all(sizes >= min_size, axis=1)]
else:
check_window = np.atleast_2d(check_window)
if nparts == 2:
all_start_points_out = np.arange(check_window[0, 0], check_window[0, 1] + check_step, check_step)[:,
np.newaxis]
else:
all_start_points = []
for part in range(nparts - 1):
start_points = np.arange(check_window[part, 0], check_window[part, 1] + check_step, check_step)
all_start_points.append(start_points)
all_start_points_out = np.array(list(itertools.product(*all_start_points)))
temp = np.concatenate((
np.array(all_start_points_out),
np.full((len(all_start_points_out), 1), n)
), axis=1)
sizes = np.diff(temp, axis=1)
all_start_points_out = all_start_points_out[np.all(sizes >= min_size, axis=1)]
if test:
temp = np.concatenate((
np.array(all_start_points_out),
np.full((len(all_start_points_out), 1), n)
), axis=1)
sizes = np.diff(temp, axis=1)
assert np.all(sizes >= min_size)
return all_start_points_out
[docs]
class MultiPartKendall():
def __init__(self, data, nparts=2, expect_part=(1, -1), min_size=10,
alpha=0.05, no_trend_alpha=0.5,
data_col=None, rm_na=True,
serialise_path=None,
check_step=1, check_window=None,
recalc=False, initalize=True):
"""
multi part mann kendall test to indentify a change point(s) in a time series after Frollini et al., 2020, DOI: 10.1007/s11356-020-11998-0 note where the expected trend is zero the lack of a trend is considered significant if p > 1-alpha
:param data: time series data, if DataFrame or Series, expects the index to be sample order (will sort on index) if np.array or list expects the data to be in sample order
:param nparts: number of parts to split the time series into
:param expect_part: expected trend in each part of the time series (1 increasing, -1 decreasing, 0 no trend)
:param min_size: minimum size for the first and last section of the time series
:param alpha: significance level
:param no_trend_alpha: significance level for no trend e.g. will accept if p> no_trend_alpha
:param data_col: if data is a DataFrame or Series, the column to use
:param rm_na: remove na values from the data
:param serialise_path: path to serialised file (as hdf), if None will not serialise
:param check_step: int, the step to check for breakpoints, e.g. if 1 will check every point, if 2 will check every second point
:param check_window: the window to check for breakpoints. if None will use the whole data. this is used to significantly speed up the mann kendall test. Note that check_step still applies to the check_window (e.g. a check_window of (2, 6) with a check_step of 2 will check the points (2, 4, 6)) One of:
* None or tuple (start_idx, end_idx) (one breakpoint only)
* list of tuples of len nparts-1 with a start/end idx for each part,
* or a 2d array shape (nparts-1, 2) with a start/end idx for each part,
:param recalc: if True will recalculate the mann kendall even if the serialised file exists
:param initalize: if True will initalize the class from the data, only set to False used in self.from_file
:return:
"""
self.freq_limit = None
self.trend_dict = {1: 'increasing', -1: 'decreasing', 0: 'no trend'}
if not initalize:
assert all([e is None for e in
[data, nparts, expect_part, min_size, alpha, no_trend_alpha, data_col, rm_na, serialise_path,
recalc]])
else:
loaded = False
if serialise_path is not None:
serialise_path = Path(serialise_path)
self.serialise_path = serialise_path
self.serialise = True
if Path(serialise_path).exists() and not recalc:
loaded = True
self._set_from_file(
data=data,
nparts=nparts,
expect_part=expect_part,
min_size=min_size,
alpha=alpha,
no_trend_alpha=no_trend_alpha,
data_col=data_col,
rm_na=rm_na,
check_step=check_step,
check_window=check_window,
season_col=None)
else:
self.serialise = False
self.serialise_path = None
if not loaded:
self._set_from_data(data=data,
nparts=nparts,
expect_part=expect_part,
min_size=min_size,
alpha=alpha,
no_trend_alpha=no_trend_alpha,
data_col=data_col,
rm_na=rm_na,
check_step=check_step,
check_window=check_window,
season_col=None)
if self.serialise and not loaded:
self.to_file()
def __eq__(self, other):
out = True
out *= isinstance(other, self.__class__)
out *= self.check_step == other.check_step
out *= self.data_col == other.data_col
out *= self.rm_na == other.rm_na
out *= self.season_col == other.season_col
out *= self.nparts == other.nparts
out *= self.min_size == other.min_size
out *= self.alpha == other.alpha
out *= self.no_trend_alpha == other.no_trend_alpha
out *= all(np.atleast_1d(self.expect_part) == np.atleast_1d(other.expect_part))
datatype = type(self.data).__name__
datatype_other = type(other.data).__name__
out *= datatype == datatype_other
if self.check_window is None:
out *= other.check_window is None
else:
out *= np.allclose(self.check_window, other.check_window)
if datatype == datatype_other:
try:
# check datasets
if datatype == 'DataFrame':
pd.testing.assert_frame_equal(self.data, other.data, check_dtype=False, check_like=True,
)
elif datatype == 'Series':
pd.testing.assert_series_equal(self.data, other.data, check_dtype=False, check_like=True,
)
elif datatype == 'ndarray':
assert np.allclose(self.data, other.data)
else:
raise AssertionError(f'unknown datatype {datatype}')
except AssertionError:
out *= False
out *= np.allclose(self.x, other.x)
out *= np.allclose(self.idx_values, other.idx_values)
out *= np.all(self.acceptable_matches.values == other.acceptable_matches.values)
if self.season_col is not None:
out *= np.allclose(self.season_data, other.season_data)
out *= np.allclose(self.s_array, other.s_array)
out *= np.allclose(self.all_start_points, other.all_start_points)
try:
for part in range(self.nparts):
pd.testing.assert_frame_equal(self.datasets[f'p{part}'], other.datasets[f'p{part}'])
except AssertionError:
out *= False
return bool(out)
[docs]
[docs]
def print_mk_diffs(self, other):
"""
convenience function to print the differences between two MultiPartKendall classes
:param other: another MultiPartKendall class
"""
if not isinstance(other, self.__class__):
print('problem with class: not same class: got ', type(other))
if not self.check_step == other.check_step:
print(f'problem with check step {self.check_step=} {other.check_step=}')
if not self.data_col == other.data_col:
print(f'problem with data col {self.data_col=} {other.data_col=}')
if not self.rm_na == other.rm_na:
print(f'problem with rm_na {self.rm_na=} other: {other.rm_na=}')
if not self.season_col == other.season_col:
print(f'problem with season col {self.season_col=} {other.season_col=}')
if not self.nparts == other.nparts:
print(f'problem with nparts: {self.nparts=} {other.nparts=}')
if not self.min_size == other.min_size:
print(f'problem with min_size {self.min_size=} {other.min_size=}')
if not self.alpha == other.alpha:
print(f'problem with alpha {self.alpha=} {other.alpha=}')
if not self.no_trend_alpha == other.no_trend_alpha:
print(f'problem with no_trend_alpha {self.no_trend_alpha=} {other.no_trend_alpha=}')
if not all(np.atleast_1d(self.expect_part) == np.atleast_1d(other.expect_part)):
print(f'problem with expect_part {self.expect_part=} {other.expect_part=}')
datatype = type(self.data).__name__
datatype_other = type(other.data).__name__
if not datatype == datatype_other:
print(f'problem with datatype {datatype=} {datatype_other=}')
if self.check_window is None:
if not other.check_window is None:
print(f'problem with check_window, should both be None {self.check_window=} {other.check_window=}')
else:
if not np.allclose(self.check_window, other.check_window):
print(f'problem with check_window {self.check_window=} {other.check_window=}')
if datatype == datatype_other:
try:
# check datasets
if datatype == 'DataFrame':
pd.testing.assert_frame_equal(self.data, other.data, check_dtype=False, check_like=True,
)
elif datatype == 'Series':
pd.testing.assert_series_equal(self.data, other.data, check_dtype=False, check_like=True,
)
elif datatype == 'ndarray':
assert np.allclose(self.data, other.data)
else:
raise AssertionError(f'unknown datatype {datatype}')
except AssertionError:
print(f'problem with data')
print(traceback.format_exc())
if not np.allclose(self.x, other.x):
print(f'problem with x')
if not np.allclose(self.idx_values, other.idx_values):
print(f'problem with idx_values')
if not np.all(self.acceptable_matches.values == other.acceptable_matches.values):
print(f'problem with acceptable_matches')
if self.season_col is not None:
if not np.allclose(self.season_data, other.season_data):
print(f'problem with season_data')
if not np.allclose(self.s_array, other.s_array):
print('problem with s_array')
if not np.allclose(self.all_start_points, other.all_start_points):
print('problem with all_start_points')
try:
for part in range(self.nparts):
pd.testing.assert_frame_equal(self.datasets[f'p{part}'], other.datasets[f'p{part}'])
except AssertionError:
print(f'problem with datasets')
print(traceback.format_exc())
[docs]
[docs]
def get_acceptable_matches(self):
"""
get the acceptable matches for the multipart kendall test
:return: pd.DataFrame
"""
return self._get_matches(acceptable_only=True)
[docs]
[docs]
def get_all_matches(self):
"""
get the all matches for the multipart kendall test (including those that are not significant)
:return: pd.DataFrame
"""
return self._get_matches(acceptable_only=False)
def _get_matches(self, acceptable_only):
if acceptable_only:
use_idx = self.acceptable_matches
else:
use_idx = np.ones(self.acceptable_matches.shape, dtype=bool)
outdata = self.datasets['p0'].loc[use_idx]
outdata = outdata.set_index([f'split_point_{i}' for i in range(1, self.nparts)])
outdata.rename(columns={f'{e}': f'{e}_p0' for e in ['trend', 'h', 'p', 'z', 's', 'var_s']}, inplace=True)
for i in range(1, self.nparts):
next_data = self.datasets[f'p{i}'].loc[use_idx]
next_data = next_data.set_index([f'split_point_{j}' for j in range(1, self.nparts)])
next_data.rename(columns={f'{e}': f'{e}_p{i}' for e in ['trend', 'h', 'p', 'z', 's', 'var_s']},
inplace=True)
outdata = pd.merge(outdata, next_data, left_index=True, right_index=True)
for p in range(self.nparts):
temp = ((outdata[f'z_p{p}'].abs() - outdata[f'z_p{p}'].abs().min())
/
(outdata[f'z_p{p}'].abs().max() - outdata[f'z_p{p}'].abs().min()))
if self.expect_part[p] == 0:
outdata[f'znorm_p{p}'] = 1 - temp
else:
outdata[f'znorm_p{p}'] = temp
outdata['znorm_joint'] = outdata[[f'znorm_p{p}' for p in range(self.nparts)]].sum(axis=1)
return deepcopy(outdata)
[docs]
[docs]
def get_maxz_breakpoints(self, raise_on_none=False):
"""
get the breakpoints for the maximum joint normalised (min-max for each part) z the best match is the maximum znorm_joint value where:
* if expected trend == 1 or -1:
* znorm = the min-max normalised z value for each part
* else: (no trend expected)
* znorm = 1 - the min-max normalised z value for each part
* and
* znorm_joint = the sum of the znorm values for each part
:param raise_on_none: bool, if True will raise an error if no acceptable matches, otherwise will return None
:return: array of breakpoint tuples
"""
acceptable = self.get_acceptable_matches()
if acceptable.empty:
if raise_on_none:
raise ValueError('no acceptable matches')
else:
return None
best = acceptable[acceptable['znorm_joint'] == acceptable['znorm_joint'].max()]
return best.index.values
[docs]
[docs]
def get_data_from_breakpoints(self, breakpoints):
"""
get the data from the breakpoints
:param breakpoints: beakpoints to split the data, e.g. from self.get_acceptable_matches
:return: outdata: list of dataframes for each part of the time series
:return: kendal_stats: dataframe of kendal stats for each part of the time series
"""
breakpoints = np.atleast_1d(breakpoints)
assert len(breakpoints) == self.nparts - 1
outdata = []
kendal_stats = pd.DataFrame(index=[f'p{i}' for i in range(self.nparts)],
columns=['trend', 'h', 'p', 'z', 's', 'var_s', 'senslope',
'senintercept'])
for p, (pkey, ds) in enumerate(self.datasets.items()):
assert pkey == f'p{p}'
temp = ds.set_index([f'split_point_{i}' for i in range(1, self.nparts)])
outcols = ['trend', 'h', 'p', 'z', 's', 'var_s']
kendal_stats.loc[f'p{p}', outcols] = temp.loc[tuple(breakpoints), outcols].values
start = 0
for i in range(self.nparts):
if i == self.nparts - 1:
end = self.n
else:
end = breakpoints[i]
if isinstance(self.data, pd.DataFrame):
outdata.append(self.data.loc[self.idx_values[start:end]])
else:
outdata.append(deepcopy(
pd.Series(index=self.idx_values[start:end], data=self.data[self.idx_values[start:end]])))
start = end
# calculate the senslope stats
for i, ds in enumerate(outdata):
senslope, senintercept = self._calc_senslope(ds)
kendal_stats.loc[f'p{i}', 'sen_slope'] = senslope
kendal_stats.loc[f'p{i}', 'sen_intercept'] = senintercept
return outdata, kendal_stats
[docs]
[docs]
def plot_acceptable_matches(self, key):
"""
quickly plot the acceptable matches
:param key: key to plot (one of ['p', 'z', 's', 'var_s','znorm', znorm_joint]) or 'all' a figure for each value note joint stats only have 1 value
:return:
"""
poss_keys = ['p', 'z', 's', 'var_s', 'znorm', 'znorm_joint']
assert key in poss_keys or key == 'all'
if key == 'all':
keys = poss_keys
else:
keys = np.atleast_1d(key)
figs, axs = [], []
for key in keys:
fig, ax = plt.subplots(figsize=(10, 8))
acceptable = self.get_acceptable_matches()
if 'joint' in key:
use_keys = [key]
else:
use_keys = [f'{key}_p{i}' for i in range(self.nparts)]
acceptable = acceptable[use_keys]
acceptable.plot(ax=ax, ls='none', marker='o')
figs.append(fig)
axs.append(ax)
if len(figs) == 1:
return fig, ax
else:
return figs, axs
[docs]
[docs]
def plot_data_from_breakpoints(self, breakpoints, ax=None, txt_vloc=-0.05, add_labels=True, **kwargs):
"""
plot the data from the breakpoints including the senslope fits
:param breakpoints:
:param ax: ax to plot on if None then create the ax
:param txt_vloc: vertical location of the text (in ax.transAxes)
:param add_labels: boolean, if True add labels (slope, pval) to the plot
:param kwargs: passed to ax.scatter (all parts)
:return: fig, ax
"""
breakpoints = np.atleast_1d(breakpoints)
if ax is None:
fig, ax = plt.subplots(figsize=(10, 8))
else:
fig = ax.figure
data, kendal_stats = self.get_data_from_breakpoints(breakpoints)
trans = blended_transform_factory(ax.transData, ax.transAxes)
# axhlines at breakpoints
prev_bp = 0
for i, bp in enumerate(np.concatenate((breakpoints, [self.n]))):
if not bp == self.n:
ax.axvline(self.idx_values[bp], color='k', ls=':')
sslope = kendal_stats.loc[f"p{i}", "sen_slope"]
sintercept = kendal_stats.loc[f"p{i}", "sen_intercept"]
x = self.idx_values[prev_bp:bp]
if add_labels:
xval = pd.Series(x).mean()
ax.text(xval,
txt_vloc,
f'expected: {self.trend_dict[self.expect_part[i]]}\n'
f'got: slope: {sslope:.3e}, '
f'pval:{round(kendal_stats.loc[f"p{i}", "p"], 3)}',
transform=trans, ha='center', va='top')
# plot the senslope fit and intercept
y = (x).astype(float) * sslope + sintercept
ax.plot(x, y, color='k', ls='--')
prev_bp = bp
if self.season_data is None:
colors = get_colors(data)
for i, (ds, c) in enumerate(zip(data, colors)):
if isinstance(self.data, pd.DataFrame):
ax.scatter(ds.index, ds[self.data_col], c=c, label=f'part {i}', **kwargs)
else:
ax.scatter(ds.index, ds, color=c, label=f'part {i}', **kwargs)
else:
seasons = np.unique(self.season_data)
colors = get_colors(seasons)
for i, ds in enumerate(data):
for s, c in zip(seasons, colors):
temp = ds[ds[self.season_col] == s]
ax.scatter(temp.index, temp[self.data_col], color=c, label=f'season: {s}', **kwargs)
legend_handles = [Line2D([0], [0], color='k', ls=':'),
Line2D([0], [0], color='k', ls='--')]
legend_labels = ['breakpoint', 'sen slope fit', ]
nhandles, nlabels = ax.get_legend_handles_labels()
temp = dict(zip(nlabels, nhandles))
legend_handles.extend(temp.values())
legend_labels.extend(temp.keys())
ax.legend(legend_handles, legend_labels, loc='best')
return fig, ax
def _set_from_file(self, data, nparts, expect_part, min_size, alpha, no_trend_alpha, data_col, rm_na,
check_step, check_window, season_col=None, check_inputs=True):
"""
setup the class data from a serialised file, values are passed to ensure they are consistent
:param check_inputs: bool, if True will check the inputs match the serialised file
"""
assert self.serialise_path is not None, 'serialise path not set, should not get here'
params = pd.read_hdf(self.serialise_path, key='params')
assert isinstance(params, pd.Series)
# other parameters
self.alpha = params['alpha']
if 'check_step' in params.index:
self.check_step = int(params['check_step'])
else:
self.check_step = 1 # support legacy files
self.no_trend_alpha = params['no_trend_alpha']
self.nparts = int(params['nparts'])
self.min_size = int(params['min_size'])
self.rm_na = bool(params['rm_na'])
self.n = int(params['n'])
self.expect_part = [int(params[f'expect_part{i}']) for i in range(self.nparts)]
if 'freq_limit' in params.index:
self.freq_limit = params['freq_limit']
else:
self.freq_limit = None
params_str = pd.read_hdf(self.serialise_path, key='params_str')
assert isinstance(params_str, pd.Series)
datatype = params_str['datatype']
self.season_col = params_str['season_col']
if self.season_col == 'None':
self.season_col = None
self.data_col = params_str['data_col']
if self.data_col == 'None':
self.data_col = None
# d1 data
d1_data = pd.read_hdf(self.serialise_path, key='d1_data')
assert isinstance(d1_data, pd.DataFrame)
self.x = d1_data['x'].values
self.idx_values = d1_data['idx_values'].values
self.acceptable_matches = pd.read_hdf(self.serialise_path, key='acceptable_matches')
# check window
if 'check_window' in params_str.index:
temp = params_str['check_window']
assert temp == 'None'
self.check_window = None
self.int_check_window = None
else:
with pd.HDFStore(self.serialise_path, 'r') as store:
keys = [e.replace('/', '') for e in store.keys()]
if not 'check_window' in keys: # support legacy files
self.check_window = None
self.int_check_window = None
else:
temp = pd.read_hdf(self.serialise_path, key='check_window')
assert isinstance(temp, pd.DataFrame)
self.check_window = temp.values
temp = pd.Series(index=self.idx_values, data=np.arange(len(self.idx_values)))
self.int_check_window = temp[self.check_window.flatten()].values.reshape(self.check_window.shape)
if datatype == 'pd.DataFrame':
self.data = pd.read_hdf(self.serialise_path, key='data')
assert isinstance(self.data, pd.DataFrame)
elif datatype == 'pd.Series':
self.data = pd.read_hdf(self.serialise_path, key='data')
assert isinstance(self.data, pd.Series)
elif datatype == 'np.array':
self.data = pd.read_hdf(self.serialise_path, key='data').values
assert isinstance(self.data, np.ndarray)
assert self.data.ndim == 1
else:
raise ValueError('unknown datatype, thou shall not pass')
if self.season_col is not None:
self.season_data = self.data.loc[self.idx_values, self.season_col]
else:
self.season_data = None
# s array
self.s_array = pd.read_hdf(self.serialise_path, key='s_array').values
assert self.s_array.shape == (self.n, self.n)
# all start points
self.all_start_points = pd.read_hdf(self.serialise_path, key='all_start_points').values
assert self.all_start_points.shape == (len(self.all_start_points), self.nparts - 1)
# datasets
dtypes = {'trend': 'float64', 'h': 'bool', 'p': 'float64',
'z': 'float64', 's': 'float64', 'var_s': 'float64'}
for part in range(1, self.nparts):
dtypes.update({f'split_point_{part}': 'int64'})
self.datasets = {}
for part in range(self.nparts):
self.datasets[f'p{part}'] = pd.read_hdf(self.serialise_path, key=f'part{part}').astype(dtypes)
if check_inputs:
# check parameters have not changed
assert self.data_col == data_col, 'data_col does not match'
assert self.rm_na == rm_na, 'rm_na does not match'
assert self.season_col == season_col, 'season_col does not match'
assert self.nparts == nparts, 'nparts does not match'
assert self.min_size == min_size, 'min_size does not match'
assert self.alpha == alpha, 'alpha does not match'
assert self.no_trend_alpha == no_trend_alpha, 'no_trend_alpha does not match'
assert self.check_step == check_step, 'check_step does not match'
assert all(np.atleast_1d(self.expect_part) == np.atleast_1d(expect_part)), 'expect_part does not match'
if self.check_window is None:
assert check_window is None, 'check_window does not match'
else:
check_window = np.atleast_2d(check_window)
assert np.allclose(self.check_window, check_window), 'check_window does not match'
# check datasets
if datatype == 'pd.DataFrame':
pd.testing.assert_frame_equal(self.data, data, check_dtype=False, check_like=True)
elif datatype == 'pd.Series':
pd.testing.assert_series_equal(self.data, data, check_dtype=False, check_like=True)
elif datatype == 'np.array':
assert np.allclose(self.data, data)
def _set_from_data(self, data, nparts, expect_part, min_size, alpha, no_trend_alpha, data_col, rm_na,
check_step, check_window, season_col=None):
"""
set up the class data from the input data
:param data:
:param nparts:
:param expect_part:
:param min_size:
:param alpha:
:param data_col:
:param rm_na:
:param season_col:
:return:
"""
self.data = deepcopy(data)
self.alpha = alpha
self.no_trend_alpha = no_trend_alpha
self.nparts = nparts
self.min_size = min_size
self.expect_part = expect_part
self.data_col = data_col
self.rm_na = rm_na
assert len(expect_part) == nparts
# handle data (including options for season)
self.season_col = season_col
if season_col is not None:
assert isinstance(data, pd.DataFrame) or isinstance(data, dict), ('season_col passed but data is not a '
'DataFrame or dictionary')
assert season_col in data.keys(), 'season_col not in data'
assert data_col is not None, 'data_col must be passed if season_col is passed'
assert data_col in data.keys(), 'data_col not in data'
if rm_na:
data = data.dropna(subset=[data_col, season_col])
data = data.sort_index()
self.season_data = data[season_col]
self.idx_values = data.index.values
x = np.array(data[data_col])
self.x = x
else:
self.season_data = None
if data_col is not None:
x = pd.Series(data[data_col])
else:
x = pd.Series(data)
if rm_na:
x = x.dropna(how='any')
x = x.sort_index()
self.idx_values = x.index.values
x = np.array(x)
self.x = x
assert x.ndim == 1, 'data must be 1d or multi d but with col_name passed'
n = len(x)
self.n = n
if n / self.nparts < min_size:
raise ValueError('the time series is too short for the minimum size')
self.s_array = _make_s_array(x)
assert isinstance(check_step, int), f'{check_step=} must be an int'
assert check_step > 0, f'{check_step=} must be > 0'
self.check_step = check_step
if check_window is not None:
check_window = np.atleast_2d(check_window)
assert check_window.shape == (nparts - 1, 2), f'{check_window=} must have shape (nparts-1, 2)'
self.check_window = check_window
if isinstance(self.data, pd.DataFrame) or isinstance(self.data, pd.Series):
assert np.isin(check_window.flatten(), self.data.index).all(), (
'check_window contains values not in data index')
if data_col:
check_data = self.data[self.data_col]
else:
check_data = self.data
assert not check_data.loc[check_window.flatten()].isna().any(), (
'check_window references nan values')
if self.season_col is not None:
assert not self.data.loc[check_window.flatten(), self.season_col].isna().any(), (
'check_window references nan values')
elif isinstance(self.data, np.ndarray):
assert set(check_window.flatten()).issubset(np.arange(len(self.data)))
assert not np.isnan(self.data[check_window.flatten()]).any(), 'check_window references nan values'
temp = pd.Series(index=self.idx_values, data=np.arange(len(self.idx_values)))
self.int_check_window = temp[self.check_window.flatten()].values.reshape(self.check_window.shape)
assert (self.int_check_window.flatten() >= self.min_size).all(), 'check_window contains values < min_size'
assert (self.n - self.int_check_window.flatten() >= self.min_size).all(), (
'n - check_window contains values <min_size')
else:
self.check_window = None
self.int_check_window = None
all_start_points = _generate_startpoints(n, self.min_size, self.nparts, self.check_step, self.int_check_window)
datasets = {f'p{i}': [] for i in range(nparts)}
self.all_start_points = all_start_points
self.datasets = datasets
self._calc_mann_kendall()
# find all acceptable matches
idx = np.ones(len(self.all_start_points), bool)
for part, expect in enumerate(self.expect_part):
if expect == 0:
idx = (idx
& (self.datasets[f'p{part}'].trend == expect)
& (self.datasets[f'p{part}'].p > self.no_trend_alpha)
)
else:
idx = (idx
& (self.datasets[f'p{part}'].trend == expect)
& (self.datasets[f'p{part}'].p < self.alpha)
)
self.acceptable_matches = idx
def _calc_senslope(self, data):
if isinstance(self.data, pd.DataFrame):
senslope, senintercept, lo_slope, up_slope = mstats.theilslopes(data[self.data_col], data.index,
alpha=self.alpha)
else:
senslope, senintercept, lo_slope, up_slope = mstats.theilslopes(data, data.index, alpha=self.alpha)
return senslope, senintercept
def _calc_mann_kendall(self):
"""
acutually calculate the mann kendall from the sarray, this should be the only thing that needs to be updated for the seasonal kendall
:return:
"""
temp_data = {}
for sp in np.atleast_2d(self.all_start_points):
start = 0
for i in range(self.nparts):
if i == self.nparts - 1:
end = self.n
else:
end = sp[i]
temp_key = (start, end)
if temp_key in temp_data:
datai = temp_data[temp_key]
else:
datai = _mann_kendall_from_sarray(self.x[start:end], alpha=self.alpha,
sarray=self.s_array[start:end, start:end])
temp_data[temp_key] = datai
data = (*sp, *datai)
self.datasets[f'p{i}'].append(data)
start = end
for part in range(self.nparts):
self.datasets[f'p{part}'] = pd.DataFrame(self.datasets[f'p{part}'],
columns=[f'split_point_{i}' for i in range(1, self.nparts)]
+ ['trend', 'h', 'p', 'z', 's', 'var_s'])
[docs]
[docs]
def to_file(self, save_path=None, complevel=9, complib='blosc:lz4'):
"""
save the data to a hdf file
:param save_path: None (save to self.serialise_path) or path to save the file
:param complevel: compression level for hdf
:param complib: compression library for hdf
:return:
"""
if save_path is None:
assert self.serialise_path is not None, 'serialise path not set, should not get here'
save_path = self.serialise_path
with pd.HDFStore(save_path, 'w') as hdf:
# setup single value parameters
params = pd.Series()
params_str = pd.Series()
# should be 1d+ of same length
d1_data = pd.DataFrame(index=range(len(self.x)))
d1_data['x'] = self.x
d1_data['idx_values'] = self.idx_values
d1_data.to_hdf(hdf, key='d1_data')
self.acceptable_matches.to_hdf(hdf, 'acceptable_matches', complevel=complevel, complib=complib)
# save as own datasets
if isinstance(self.data, pd.DataFrame):
self.data.to_hdf(hdf, key='data', complevel=complevel, complib=complib)
params_str['datatype'] = 'pd.DataFrame'
elif isinstance(self.data, pd.Series):
self.data.to_hdf(hdf, key='data', complevel=complevel, complib=complib)
params_str['datatype'] = 'pd.Series'
else:
params_str['datatype'] = 'np.array'
pd.Series(self.data).to_hdf(hdf, key='data', complevel=complevel, complib=complib)
assert isinstance(self.s_array, np.ndarray)
pd.DataFrame(self.s_array).to_hdf(hdf, key='s_array', complevel=complevel, complib=complib)
assert isinstance(self.all_start_points, np.ndarray)
pd.DataFrame(self.all_start_points).to_hdf(hdf, key='all_start_points', complevel=complevel, complib=complib)
for part in range(self.nparts):
self.datasets[f'p{part}'].astype(float).to_hdf(hdf, key=f'part{part}', complevel=complevel, complib=complib)
# check_window
if self.check_window is not None:
pd.DataFrame(self.check_window).to_hdf(hdf, key='check_window', complevel=complevel, complib=complib)
else:
params_str['check_window'] = 'None'
# other parameters
params['alpha'] = self.alpha
params['no_trend_alpha'] = self.no_trend_alpha
params['nparts'] = float(self.nparts)
params['min_size'] = float(self.min_size)
params['rm_na'] = float(self.rm_na)
params['n'] = float(self.n)
params['check_step'] = float(self.check_step)
if self.freq_limit is not None:
params['freq_limit'] = float(self.freq_limit)
for i in range(self.nparts):
params[f'expect_part{i}'] = float(self.expect_part[i])
params_str['season_col'] = str(self.season_col)
params_str['data_col'] = str(self.data_col)
params.to_hdf(hdf, key='params', complevel=complevel, complib=complib)
params_str.to_hdf(hdf, key='params_str', complevel=complevel, complib=complib)
@staticmethod
[docs]
def from_file(path):
"""
load the class from a serialised file
:param path: path to the serialised file
:return: MultiPartKendall
"""
mpk = MultiPartKendall(
data=None, nparts=None, expect_part=None, min_size=None, alpha=None, no_trend_alpha=None, data_col=None,
serialise_path=None, recalc=None, rm_na=None, initalize=False)
mpk.serialise_path = Path(path)
mpk.serialise = True
mpk._set_from_file(data=None, nparts=None, expect_part=None, min_size=None, alpha=None, no_trend_alpha=None,
data_col=None, rm_na=None,
check_step=None, check_window=None,
season_col=None, check_inputs=False)
return mpk
[docs]
class SeasonalMultiPartKendall(MultiPartKendall):
def __init__(self, data, data_col, season_col, nparts=2, expect_part=(1, -1), min_size=10,
alpha=0.05, no_trend_alpha=0.5,
rm_na=True,
serialise_path=None, freq_limit=0.05,
check_step=1, check_window=None,
recalc=False, initalize=True):
"""
multi part seasonal mann kendall test to indentify a change point(s) in a time series after Frollini et al., 2020, DOI: 10.1007/s11356-020-11998-0
:param data: time series data, if DataFrame or Series, expects the index to be sample order (will sort on index)if np.array or list expects the data to be in sample order
:param data_col: if data is a DataFrame or Series, the column to use
:param season_col: the column to use for the season
:param nparts: number of parts to split the time series into
:param expect_part: expected trend in each part of the time series (1 increasing, -1 decreasing, 0 no trend)
:param min_size: minimum size for the first and last section of the time series
:param alpha: significance level
:param no_trend_alpha: significance level for no trend e.g. will accept if p> no_trend_alpha
:param rm_na: remove na values from the data
:param serialise_path: path to serialised file (as hdf), if None will not serialise
:param check_step: int, the step to check for breakpoints, e.g. if 1 will check every point, if 2 will check every second point
:param check_window: the window to check for breakpoints. if None will use the whole data. this is used to significantly speed up the mann kendall test Note that check_step still applies to the check_window (e.g. a check_window of (2, 6) with a check_step of 2 will check the points (2, 4, 6)) one of:
* None or tuple (start_idx, end_idx) (one breakpoint only)
* or list of tuples of len nparts-1 with a start/end idx for each part,
* or a 2d array shape (nparts-1, 2) with a start/end idx for each part
:param recalc: if True will recalculate the mann kendall even if the serialised file exists
:param initalize: if True will initalize the class from the data, only set to False used in self.from_file
:return:
"""
self.trend_dict = {1: 'increasing', -1: 'decreasing', 0: 'no trend'}
self.freq_limit = freq_limit
if not initalize:
assert all([e is None for e in
[data, nparts, expect_part, min_size, alpha, no_trend_alpha, data_col, rm_na, serialise_path,
recalc]])
else:
loaded = False
if serialise_path is not None:
serialise_path = Path(serialise_path)
self.serialise_path = serialise_path
self.serialise = True
if Path(serialise_path).exists() and not recalc:
loaded = True
self._set_from_file(
data=data,
nparts=nparts,
expect_part=expect_part,
min_size=min_size,
alpha=alpha,
no_trend_alpha=no_trend_alpha,
data_col=data_col,
rm_na=rm_na,
check_step=check_step,
check_window=check_window,
season_col=season_col)
else:
self.serialise = False
self.serialise_path = None
if not loaded:
self._set_from_data(data=data,
nparts=nparts,
expect_part=expect_part,
min_size=min_size,
alpha=alpha,
no_trend_alpha=no_trend_alpha,
data_col=data_col,
rm_na=rm_na,
check_step=check_step,
check_window=check_window,
season_col=season_col)
if self.serialise and not loaded:
self.to_file()
@staticmethod
[docs]
def from_file(path):
"""
load the class from a serialised file
:param path:
:return:
"""
mpk = SeasonalMultiPartKendall(data=None, data_col=None, season_col=None, nparts=None, expect_part=None,
min_size=None, alpha=None, no_trend_alpha=None, rm_na=None,
serialise_path=None, recalc=None, initalize=False)
mpk.serialise_path = Path(path)
mpk.serialise = True
mpk._set_from_file(data=None, nparts=None, expect_part=None, min_size=None,
alpha=None, no_trend_alpha=None, data_col=None,
rm_na=None, check_step=None,
check_window=None,
season_col=None, check_inputs=False)
return mpk
def _calc_mann_kendall(self):
"""
actually calculate the mann kendall from the sarray, this should be the only thing that needs
to be updated for the seasonal kendall
:return:
"""
temp_data = {}
for sp in self.all_start_points:
start = 0
for i in range(self.nparts):
if i == self.nparts - 1:
end = self.n
else:
end = sp[i]
temp_key = (start, end)
if temp_key in temp_data:
datai = temp_data[temp_key]
else:
datai = _seasonal_mann_kendall_from_sarray(self.x[start:end], alpha=self.alpha,
season_data=self.season_data.values[start:end],
sarray=self.s_array[start:end,
start:end],
freq_limit=self.freq_limit) # and passing the s array
temp_data[temp_key] = datai
data = (*sp, *datai)
self.datasets[f'p{i}'].append(data)
start = end
for part in range(self.nparts):
self.datasets[f'p{part}'] = pd.DataFrame(self.datasets[f'p{part}'],
columns=[f'split_point_{i}' for i in range(1, self.nparts)]
+ ['trend', 'h', 'p', 'z', 's', 'var_s'])
def _calc_senslope(self, data):
senslope, senintercept, lo_slope, lo_intercept = _calc_seasonal_senslope(data[self.data_col],
data[self.season_col],
x=data.index, alpha=self.alpha)
return senslope, senintercept