"""
created matt_dumont
on: 14/09/23
"""
import numpy as np
import pandas as pd
from scipy.stats import norm, mstats
import warnings
import matplotlib.pyplot as plt
try:
from matplotlib.cm import get_cmap
except ImportError:
from matplotlib.pyplot import get_cmap
from matplotlib.lines import Line2D
def _make_s_array(x):
"""
make the s array for the mann kendall test
:param x:
:return:
"""
n = len(x)
k_array = np.repeat(x[:, np.newaxis], n, axis=1)
j_array = k_array.transpose()
s_stat_array = np.sign(k_array - j_array)
s_stat_array = np.tril(s_stat_array, -1) # remove the diagonal and upper triangle
return s_stat_array
def _seasonal_mann_kendall_from_sarray(x, season_data, alpha=0.05, sarray=None,
freq_limit=0.05):
"""
calculate the seasonal mann kendall test for a time series after: https://doi.org/10.1029/WR020i006p00727
:param x: the data
:param season_data: the season data, will be converted to integers
:param alpha: significance level
:param sarray: the s array, if None will be calculated from _make_s_array
:param freq_limit: the maximum difference in frequency between seasons (as a fraction), if greater than this will raise a warning
:return:
"""
# calculate the unique data
x = np.atleast_1d(x)
season_data = np.atleast_1d(season_data)
assert np.issubdtype(season_data.dtype, int) or np.issubdtype(season_data.dtype, np.string_), (
'season data must be a string, or integer to avoid errors associated with float precision'
)
# get unique values convert to integers
unique_seasons, season_data = np.unique(season_data, return_inverse=True)
# get unique integer values
unique_season_ints, counts = np.unique(season_data, return_counts=True)
relaive_freq = np.abs(counts - counts.mean()) / counts.mean()
if (relaive_freq > freq_limit).any():
warnings.warn(f'the discrepancy of frequency of seasons is greater than the limit({freq_limit})'
f' this may affect the test'
f' the frequency of seasons are {counts}')
assert season_data.shape == x.shape, 'season data and x must be the same shape'
assert x.ndim == 1
n = len(x)
assert n >= 3, 'need at least 3 data points'
# calculate s
if sarray is None:
sarray = _make_s_array(x)
assert sarray.shape == (n, n)
# make the season array
season_k_array = np.repeat(season_data[:, np.newaxis], n, axis=1)
season_j_array = season_k_array.transpose()
s = 0
var_s = 0
# run the mann kendall for each season
for season in unique_season_ints:
season_idx = (season_k_array == season) & (season_j_array == season)
temp_s = sarray[season_idx].sum()
temp_x = x[season_data == season]
n0 = len(temp_x)
# calculate the var(s)
unique_x, unique_counts = np.unique(temp_x, return_counts=True)
unique_mod = (unique_counts * (unique_counts - 1) * (2 * unique_counts + 5)).sum() * (unique_counts > 1).sum()
temp_var_s = (n0 * (n0 - 1) * (2 * n0 + 5) + unique_mod) / 18
s += temp_s
var_s += temp_var_s
# calculate the z value
z = np.abs(np.sign(s)) * (s - np.sign(s)) / np.sqrt(var_s)
p = 2 * (1 - norm.cdf(abs(z))) # two tail test
h = abs(z) > norm.ppf(1 - alpha / 2)
trend = np.sign(z) * h
# -1 decreasing, 0 no trend, 1 increasing
return trend, h, p, z, s, var_s
def _mann_kendall_from_sarray(x, alpha=0.05, sarray=None):
"""
code optimised mann kendall
:param x:
:param alpha:
:param sarray:
:return:
"""
# calculate the unique data
x = np.atleast_1d(x)
assert x.ndim == 1
n = len(x)
assert n >= 3, 'need at least 3 data points'
# calculate s
if sarray is None:
sarray = _make_s_array(x)
assert sarray.shape == (n, n)
s = sarray.sum()
# calculate the var(s)
unique_x, unique_counts = np.unique(x, return_counts=True)
unique_mod = (unique_counts * (unique_counts - 1) * (2 * unique_counts + 5)).sum() * (unique_counts > 1).sum()
var_s = (n * (n - 1) * (2 * n + 5) + unique_mod) / 18
z = np.abs(np.sign(s)) * (s - np.sign(s)) / np.sqrt(var_s)
# calculate the p_value
p = 2 * (1 - norm.cdf(abs(z))) # two tail test
h = abs(z) > norm.ppf(1 - alpha / 2)
trend = np.sign(z) * h
# -1 decreasing, 0 no trend, 1 increasing
return trend, h, p, z, s, var_s
def _mann_kendall_old(x, alpha=0.05):
"""
the duplicate from above is to return more parameters and put into the mann kendall class
retrieved from https://mail.scipy.org/pipermail/scipy-dev/2016-July/021413.html
this was depreciated as _mann_kendall_from_sarray is MUCH faster
Input:
x: a vector of data
alpha: significance level (0.05 default)
Output:
trend: tells the trend (increasing, decreasing or no trend)
h: True (if trend is present) or False (if trend is absence)
p: p value of the significance test
z: normalized test statistics
"""
warnings.warn('this function is depreciated, use _mann_kendall_from_sarray', DeprecationWarning)
x = np.array(x)
n = len(x)
# calculate S
s = 0
for k in range(n - 1):
for j in range(k + 1, n):
s += np.sign(x[j] - x[k])
# calculate the unique data
unique_x = np.unique(x)
g = len(unique_x)
# calculate the var(s)
if n == g: # there is no tie
var_s = (n * (n - 1) * (2 * n + 5)) / 18
else: # there are some ties in data
tp = np.zeros(unique_x.shape)
for i in range(len(unique_x)):
tp[i] = sum(unique_x[i] == x)
var_s = (n * (n - 1) * (2 * n + 5) + np.sum(tp * (tp - 1) * (2 * tp + 5))) / 18
if s > 0:
z = (s - 1) / np.sqrt(var_s)
elif s == 0:
z = 0
elif s < 0:
z = (s + 1) / np.sqrt(var_s)
else:
raise ValueError('shouldnt get here')
# calculate the p_value
p = 2 * (1 - norm.cdf(abs(z))) # two tail test
h = abs(z) > norm.ppf(1 - alpha / 2)
if (z < 0) and h:
trend = -1
elif (z > 0) and h:
trend = 1
else:
trend = 0
return trend, h, p, z, s, var_s
def _old_smk(df, data_col, season_col, alpha=0.05, rm_na=True):
warnings.warn('this function is depreciated, use _mann_kendall_from_sarray', DeprecationWarning)
if rm_na:
data = df.dropna(subset=[data_col, season_col])
else:
data = df.copy(deep=True)
data_col = data_col
season_col = season_col
alpha = alpha
# get list of seasons
season_vals = np.unique(data[season_col])
# calulate the seasonal MK values
_season_outputs = {}
s = 0
var_s = 0
for season in season_vals:
tempdata = data[data_col][data[season_col] == season].sort_index()
_season_outputs[season] = MannKendall(data=tempdata, alpha=alpha, rm_na=rm_na)
var_s += _season_outputs[season].var_s
s += _season_outputs[season].s
# calculate the z value
z = np.abs(np.sign(s)) * (s - np.sign(s)) / np.sqrt(var_s)
h = abs(z) > norm.ppf(1 - alpha / 2)
p = 2 * (1 - norm.cdf(abs(z))) # two tail test
trend = np.sign(z) * h
# -1 decreasing, 0 no trend, 1 increasing
return trend, h, p, z, s, var_s
def _calc_seasonal_senslope(y, season, x=None, alpha=0.95, method='separate'):
"""
modified from scipy/stats/_stats_mstats_common.py
Computes the Theil-Sen estimator for a set of points (x, y).
`theilslopes` implements a method for robust linear regression. It
computes the slope as the median of all slopes between paired values.
Parameters
----------
y : array_like
Dependent variable.
x : array_like or None, optional
Independent variable. If None, use ``arange(len(y))`` instead.
alpha : float, optional
Confidence degree between 0 and 1. Default is 95% confidence.
Note that `alpha` is symmetric around 0.5, i.e. both 0.1 and 0.9 are
interpreted as "find the 90% confidence interval".
method : {'joint', 'separate'}, optional
Method to be used for computing estimate for intercept.
Following methods are supported,
* 'joint': Uses np.median(y - slope * x) as intercept.
* 'separate': Uses np.median(y) - slope * np.median(x)
as intercept.
The default is 'separate'.
.. versionadded:: 1.8.0
Returns
-------
result : ``TheilslopesResult`` instance
The return value is an object with the following attributes:
slope : float
Theil slope.
intercept : float
Intercept of the Theil line.
low_slope : float
Lower bound of the confidence interval on `slope`.
high_slope : float
Upper bound of the confidence interval on `slope`.
See Also
--------
siegelslopes : a similar technique using repeated medians
Notes
-----
The implementation of `theilslopes` follows [1]_. The intercept is
not defined in [1]_, and here it is defined as ``median(y) -
slope*median(x)``, which is given in [3]_. Other definitions of
the intercept exist in the literature such as ``median(y - slope*x)``
in [4]_. The approach to compute the intercept can be determined by the
parameter ``method``. A confidence interval for the intercept is not
given as this question is not addressed in [1]_.
For compatibility with older versions of SciPy, the return value acts
like a ``namedtuple`` of length 4, with fields ``slope``, ``intercept``,
``low_slope``, and ``high_slope``, so one can continue to write::
slope, intercept, low_slope, high_slope = theilslopes(y, x)
References
----------
.. [1] P.K. Sen, "Estimates of the regression coefficient based on
Kendall's tau", J. Am. Stat. Assoc., Vol. 63, pp. 1379-1389, 1968.
.. [2] H. Theil, "A rank-invariant method of linear and polynomial
regression analysis I, II and III", Nederl. Akad. Wetensch., Proc.
53:, pp. 386-392, pp. 521-525, pp. 1397-1412, 1950.
.. [3] W.L. Conover, "Practical nonparametric statistics", 2nd ed.,
John Wiley and Sons, New York, pp. 493.
.. [4] https://en.wikipedia.org/wiki/Theil%E2%80%93Sen_estimator
"""
if method not in ['joint', 'separate']:
raise ValueError("method must be either 'joint' or 'separate'."
"'{}' is invalid.".format(method))
# We copy both x and y so we can use _find_repeats.
y = np.array(y).flatten()
season = np.array(season).flatten()
if len(season) != len(y):
raise ValueError("Incompatible lengths ! (%s<>%s)" %
(len(y), len(season)))
if x is None:
x = np.arange(len(y), dtype=float)
else:
x = np.array(x, dtype=float).flatten()
if len(x) != len(y):
raise ValueError("Incompatible lengths ! (%s<>%s)" %
(len(y), len(x)))
if len(x) == 1:
msg = "Theil-Sen estimator is not defined for a single point."
warnings.warn(msg, RuntimeWarning, stacklevel=2)
return np.nan, np.nan, np.nan, np.nan
# Compute sorted slopes only when deltax > 0
deltax = x[:, np.newaxis] - x
deltay = y[:, np.newaxis] - y
# remove slopes where the seasons do not match
seasons_array_i = np.repeat(season[:, np.newaxis], len(season), axis=1)
seasons_array_j = np.repeat(season[np.newaxis, :], len(season), axis=0)
seasons_array = seasons_array_i == seasons_array_j
deltax[~seasons_array] = 0
deltay[~seasons_array] = 0
slopes = deltay[deltax > 0] / deltax[deltax > 0]
if not slopes.size:
msg = "All `x` coordinates are identical."
warnings.warn(msg, RuntimeWarning, stacklevel=2)
slopes.sort()
medslope = np.nanmedian(slopes)
if method == 'joint':
medinter = np.median(y - medslope * x)
else:
medinter = np.median(y) - medslope * np.median(x)
# Now compute confidence intervals
if alpha > 0.5:
alpha = 1. - alpha
from scipy.stats import distributions
z = distributions.norm.ppf(alpha / 2.)
# This implements (2.6) from Sen (1968)
_, nxreps = _find_repeats(x)
_, nyreps = _find_repeats(y)
nt = len(slopes) # N in Sen (1968)
ny = len(y) # n in Sen (1968)
# Equation 2.6 in Sen (1968):
sigsq = 1 / 18. * (ny * (ny - 1) * (2 * ny + 5) -
sum(k * (k - 1) * (2 * k + 5) for k in nxreps) -
sum(k * (k - 1) * (2 * k + 5) for k in nyreps))
# Find the confidence interval indices in `slopes`
try:
sigma = np.sqrt(sigsq)
Ru = min(int(np.round((nt - z * sigma) / 2.)), len(slopes) - 1)
Rl = max(int(np.round((nt + z * sigma) / 2.)) - 1, 0)
delta = slopes[[Rl, Ru]]
except (ValueError, IndexError):
delta = (np.nan, np.nan)
low_slope = delta[0]
high_slope = delta[1]
slope = medslope
intercept = medinter
return slope, intercept, low_slope, high_slope
def get_colors(vals, cmap='tab10'):
n_scens = len(vals)
if n_scens < 20:
cmap = get_cmap(cmap)
colors = [cmap(e / (n_scens + 1)) for e in range(n_scens)]
else:
colors = []
i = 0
cmap = get_cmap(cmap)
for v in vals:
colors.append(cmap(i / 20))
i += 1
if i == 20:
i = 0
return colors
def _find_repeats(arr):
# taken from scipy.stats._stats_mstats_common._find_repeats
# This function assumes it may clobber its input.
if len(arr) == 0:
return np.array(0, np.float64), np.array(0, np.intp)
# XXX This cast was previously needed for the Fortran implementation,
# should we ditch it?
arr = np.asarray(arr, np.float64).ravel()
arr.sort()
# Taken from NumPy 1.9's np.unique.
change = np.concatenate(([True], arr[1:] != arr[:-1]))
unique = arr[change]
change_idx = np.concatenate(np.nonzero(change) + ([arr.size],))
freq = np.diff(change_idx)
atleast2 = freq > 1
return unique[atleast2], freq[atleast2]
[docs]
class MannKendall(object):
"""
an object to hold and calculate kendall trends assumes a pandas dataframe or series with a time index
:param trend: the trend of the data, -1 decreasing, 0 no trend, 1 increasing
:param h: boolean, True if the trend is significant
:param p: the p value of the trend
:param z: the z value of the trend
:param s: the s value of the trend
:param var_s: the variance of the s value
:param alpha: the alpha value used to calculate the trend
:param data: the data used to calculate the trend
:param data_col: the column of the data used to calculate the trend
"""
trend_dict = {1: 'increasing', -1: 'decreasing', 0: 'no trend'}
def __init__(self, data, alpha=0.05, data_col=None, rm_na=True):
self.alpha = alpha
if data_col is not None:
test_data = data[data_col]
else:
test_data = pd.Series(data)
if rm_na:
test_data = test_data.dropna(how='any')
test_data = test_data.sort_index()
self.data = test_data
self.data_col = data_col
self.trend, self.h, self.p, self.z, self.s, self.var_s = _mann_kendall_from_sarray(test_data, alpha=alpha)
[docs]
def calc_senslope(self):
"""
calculate the senslope of the data
:return: senslope, senintercept, lo_slope, up_slope
"""
senslope, senintercept, lo_slope, up_slope = mstats.theilslopes(self.data, self.data.index, alpha=self.alpha)
return senslope, senintercept, lo_slope, up_slope
[docs]
def plot_data(self, ax=None, **kwargs):
"""
plot the data and the senslope fit
:param ax: optional matplotlib axis to plot the data on
:param kwargs: kwargs to pass to plt.scatter for the raw data
:return:
"""
if ax is None:
fig, ax = plt.subplots(figsize=(10, 8))
else:
fig = ax.figure
sslope, sintercept, lo_slope, up_slope = self.calc_senslope()
# plot the senslope fit and intercept
x = self.data.index.values
y = (x).astype(float) * sslope + sintercept
ax.plot(x, y, color='k', ls='--', label=f'sen slope fit')
if 'color' not in kwargs:
kwargs['color'] = 'k'
ax.scatter(x, self.data.values, label=f'raw data', **kwargs)
handles, labels = ax.get_legend_handles_labels()
handles.append(Line2D([0], [0], color='w', ls='--'))
labels.append(f'sen slope fit: {sslope:.2e}')
handles.append(Line2D([0], [0], color='w', ls='--'))
labels.append(f'sen intercept: {sintercept:.2e}')
handles.append(Line2D([0], [0], color='w', ls='--'))
labels.append(f'trend: {self.trend_dict[self.trend]}')
handles.append(Line2D([0], [0], color='w', ls='--'))
labels.append(f'p: {self.p:0.3f}')
ax.legend(handles, labels)
return fig, ax, (handles, labels)
@classmethod
@staticmethod
[docs]
[docs]
def map_trend(val):
"""
map the trend value to a string (1: increasing, -1: decreasing, 0: no trend)
:param val: trend value
:return:
"""
return MannKendall.trend_dict[int(val)]
[docs]
class SeasonalKendall(MannKendall):
"""
an object to hold and calculate seasonal kendall trends
:param trend: the trend of the data, -1 decreasing, 0 no trend, 1 increasing
:param h: boolean, True if the trend is significant
:param p: the p value of the trend
:param z: the z value of the trend
:param s: the s value of the trend
:param var_s: the variance of the s value
:param alpha: the alpha value used to calculate the trend
:param data: the data used to calculate the trend
:param data_col: the column of the data used to calculate the trend
:param season_col: the column of the season data used to calculate the trend
:param freq_limit: the maximum difference in frequency between seasons (as a fraction), if greater than this will raise a warning
"""
def __init__(self, df, data_col, season_col, alpha=0.05, rm_na=True,
freq_limit=0.05):
self.trend_dict = {1: 'increasing', -1: 'decreasing', 0: 'no trend'}
assert isinstance(df, pd.DataFrame), 'df must be a pandas DataFrame'
self.freq_limit = freq_limit
if rm_na:
self.data = df.dropna(subset=[data_col, season_col]).sort_index()
else:
self.data = df.copy(deep=True).sort_index()
self.data_col = data_col
self.season_col = season_col
self.alpha = alpha
x = self.data[data_col]
self.season_data = season_data = self.data[season_col]
trend, h, p, z, s, var_s = _seasonal_mann_kendall_from_sarray(x, season_data, alpha=self.alpha, sarray=None,
freq_limit=self.freq_limit)
self.trend = trend
self.h = h
self.p = p
self.z = z
self.s = s
self.var_s = var_s
# -1 decreasing, 0 no trend, 1 increasing
[docs]
def calc_senslope(self):
"""
calculate the senslope of the data
:return: senslope, senintercept, lo_slope, lo_intercept
"""
senslope, senintercept, lo_slope, lo_intercept = _calc_seasonal_senslope(self.data[self.data_col],
self.season_data, x=self.data.index,
alpha=self.alpha)
return senslope, senintercept, lo_slope, lo_intercept
[docs]
def plot_data(self, ax=None, **kwargs):
"""
plot the data and the senslope fit
:param ax: optional matplotlib axis to plot the data on
:param kwargs: kwargs to pass to plt.scatter for the raw data (note that the seasonal column is passed to scatter as c)
:return:
"""
if ax is None:
fig, ax = plt.subplots(figsize=(10, 8))
else:
fig = ax.figure
sslope, sintercept, lo_slope, up_slope = self.calc_senslope()
# plot the senslope fit and intercept
x = self.data.index.values
y = (x).astype(float) * sslope + sintercept
ax.plot(x, y, color='k', ls='--', label=f'sen slope fit')
ax.scatter(x, self.data[self.data_col], c=self.season_data, label=f'raw data', **kwargs)
handles, labels = ax.get_legend_handles_labels()
handles.append(Line2D([0], [0], color='w', ls='--'))
labels.append(f'sen slope fit: {sslope:.2e}')
handles.append(Line2D([0], [0], color='w', ls='--'))
labels.append(f'sen intercept: {sintercept:.2e}')
handles.append(Line2D([0], [0], color='w', ls='--'))
labels.append(f'trend: {self.trend_dict[self.trend]}')
handles.append(Line2D([0], [0], color='w', ls='--'))
labels.append(f'p: {self.p:.3f}')
ax.legend(handles, labels)
return fig, ax, (handles, labels)