blob: 34765bbafda0824a91ac0482e81622bf0fad39a5 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import numpy as np
from otava.analysis import TTestSignificanceTester
from otava.change_point_divisive.calculator import PairDistanceCalculator
from otava.change_point_divisive.detector import ChangePointDetector
from otava.change_point_divisive.significance_test import PermutationsSignificanceTester
SEQUENCE = np.array([
0.3, 2.4, 1.5, -0.9, -0.5,
99.7, 98.3, 99.1,
149.0, 149.7, 149.5, 149.1, 148.8, 150.0,
])
CHANGE_POINTS_INDS = [5, 8]
def compute_Q_and_candidate_slow(sequence):
N = len(sequence)
Q = np.zeros((N - 1, N - 1))
Q_max = 0
candidate_ind = None
for tau in range(1, N):
for kappa in range(tau + 1, N + 1):
X, Y = sequence[: tau], sequence[tau : kappa]
n, m = len(X), len(Y)
A = 0
for x in X:
for y in Y:
A += abs(x - y)
A *= 2 / (m + n)
B = 0
for i in range(n - 1):
for k in range(i + 1, n):
B += abs(X[i] - X[k])
B *= 2 * m / (m + n) / (n - 1) if n > 1 else 0
C = 0
for j in range(m - 1):
for k in range(j, m):
C += abs(Y[j] - Y[k])
C *= 2 * n / (m + n) / (m - 1) if m > 1 else 0
Q[tau - 1, kappa - 2] = A - B - C
if Q[tau - 1, kappa - 2] > Q_max:
Q_max = Q[tau - 1, kappa - 2]
candidate_ind = tau
return Q, Q_max, candidate_ind
def test_calculator_candidate():
sequence = SEQUENCE.copy()
calc = PairDistanceCalculator(sequence)
Q = calc._get_Q_vals(start=0, end=len(sequence))
test_Q, test_Q_max, test_candidate_ind = compute_Q_and_candidate_slow(sequence)
assert np.allclose(test_Q, Q)
whole_interval = slice(None, None)
candidate = calc.get_candidate_change_point(whole_interval)
assert np.allclose(test_Q_max, candidate.qhat)
assert test_candidate_ind == candidate.index
def test_permutation_calculation():
sequence = SEQUENCE.copy()
calc = PairDistanceCalculator(sequence)
whole_interval = slice(None, None)
candidate = calc.get_candidate_change_point(whole_interval)
seed = 1
st = PermutationsSignificanceTester(max_pvalue=0.05, permutations=1, calculator=PairDistanceCalculator, seed=seed)
change_point = st.change_point(candidate=candidate, series=sequence, intervals=[whole_interval])
test_rng = np.random.default_rng(seed)
rand_sequence = sequence.copy()
test_rng.shuffle(rand_sequence)
_, rand_Q_max, rand_candidate_ind = compute_Q_and_candidate_slow(rand_sequence)
assert int(rand_Q_max >= candidate.qhat) == change_point.stats.extreme_qhat_perm
assert np.allclose(rand_Q_max, change_point.stats.permuted_qhats[0])
def test_permutation_test():
seed = 1
sequence = SEQUENCE.copy()
st = PermutationsSignificanceTester(max_pvalue=0.01, permutations=100, calculator=PairDistanceCalculator, seed=seed)
cpd = ChangePointDetector(significance_tester=st, calculator=PairDistanceCalculator)
cps = cpd.get_change_points(series=sequence)
assert [cp.index for cp in cps] == CHANGE_POINTS_INDS
def test_ttest():
sequence = SEQUENCE.copy()
st = TTestSignificanceTester(max_pvalue=0.01)
cpd = ChangePointDetector(significance_tester=st, calculator=PairDistanceCalculator)
cps = cpd.get_change_points(series=sequence)
assert [cp.index for cp in cps] == CHANGE_POINTS_INDS