blob: 423e51abf635efb1d3aeab1a6d141f1cbf6a6cdd [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import itertools
import logging
import math
import os
import pickle
import shutil
import tempfile
import unittest
from collections.abc import Iterable
from typing import Any
import mock
import numpy
from parameterized import parameterized
import apache_beam as beam
from apache_beam.ml.anomaly.aggregations import AnyVote
from apache_beam.ml.anomaly.base import AnomalyPrediction
from apache_beam.ml.anomaly.base import AnomalyResult
from apache_beam.ml.anomaly.base import EnsembleAnomalyDetector
from apache_beam.ml.anomaly.detectors.offline import OfflineDetector
from apache_beam.ml.anomaly.detectors.zscore import ZScore
from apache_beam.ml.anomaly.specifiable import Spec
from apache_beam.ml.anomaly.specifiable import Specifiable
from apache_beam.ml.anomaly.specifiable import _spec_type_to_subspace
from apache_beam.ml.anomaly.specifiable import specifiable
from apache_beam.ml.anomaly.thresholds import FixedThreshold
from apache_beam.ml.anomaly.thresholds import QuantileThreshold
from apache_beam.ml.anomaly.transforms import AnomalyDetection
from apache_beam.ml.anomaly.transforms import _StatefulThresholdDoFn
from apache_beam.ml.anomaly.transforms import _StatelessThresholdDoFn
from apache_beam.ml.inference.base import KeyedModelHandler
from apache_beam.ml.inference.base import RunInference
from apache_beam.ml.inference.base import _PostProcessingModelHandler
from apache_beam.ml.inference.base import _PreProcessingModelHandler
from apache_beam.ml.inference.sklearn_inference import SklearnModelHandlerNumpy
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
def _prediction_iterable_is_equal_to(
a: Iterable[AnomalyPrediction], b: Iterable[AnomalyPrediction]):
a_list = list(a)
b_list = list(b)
if len(a_list) != len(b_list):
return False
a_dict = {}
b_dict = {}
for i in a_list:
a_dict[i.model_id] = i
for i in b_list:
b_dict[i.model_id] = i
for k, a_val in a_dict.items():
if k not in b_dict or not _prediction_is_equal_to(a_val, b_dict[k]):
return False
return True
def _prediction_is_equal_to(a: AnomalyPrediction, b: AnomalyPrediction):
if a.model_id != b.model_id:
return False
if a.threshold != b.threshold:
return False
if a.score != b.score:
if not (a.score is not None and b.score is not None and
math.isnan(a.score) and math.isnan(b.score)):
return False
if a.label != b.label:
return False
if a.info != b.info:
return False
if a.source_predictions is None and b.source_predictions is None:
return True
if a.source_predictions is not None and b.source_predictions is not None:
return _prediction_iterable_is_equal_to(
a.source_predictions, b.source_predictions)
return False
def _unkeyed_result_is_equal_to(a: AnomalyResult, b: AnomalyResult):
return a.example._asdict() == b.example._asdict() and \
_prediction_iterable_is_equal_to(a.predictions, b.predictions)
def _keyed_result_is_equal_to(
a: tuple[int, AnomalyResult], b: tuple[int, AnomalyResult]):
return a[0] == b[0] and _unkeyed_result_is_equal_to(a[1], b[1])
class TestAnomalyDetection(unittest.TestCase):
class TestData:
unkeyed_input = [
beam.Row(x1=1, x2=4),
beam.Row(x1=2, x2=4),
beam.Row(x1=3, x2=5),
beam.Row(x1=10, x2=4), # outlier in key=1, with respect to x1
beam.Row(x1=2, x2=10), # outlier in key=1, with respect to x2
beam.Row(x1=3, x2=4),
]
keyed_input = list(zip(itertools.repeat(1),
unkeyed_input)) + [(2, beam.Row(x1=100, x2=5))]
zscore_x1_expected_predictions = [
AnomalyPrediction(
model_id='zscore_x1', score=float('NaN'), label=-2, threshold=3),
AnomalyPrediction(
model_id='zscore_x1', score=float('NaN'), label=-2, threshold=3),
AnomalyPrediction(
model_id='zscore_x1',
score=2.1213203435596424,
label=0,
threshold=3),
AnomalyPrediction(
model_id='zscore_x1', score=8.0, label=1, threshold=3),
AnomalyPrediction(
model_id='zscore_x1',
score=0.4898979485566356,
label=0,
threshold=3),
AnomalyPrediction(
model_id='zscore_x1',
score=0.16452254913212455,
label=0,
threshold=3),
AnomalyPrediction(
model_id='zscore_x1', score=float('NaN'), label=-2, threshold=3),
]
zscore_x2_expected_predictions = [
AnomalyPrediction(
model_id='zscore_x2', score=float('NaN'), label=-2, threshold=2),
AnomalyPrediction(
model_id='zscore_x2', score=float('NaN'), label=-2, threshold=2),
AnomalyPrediction(model_id='zscore_x2', score=0, label=0, threshold=2),
AnomalyPrediction(
model_id='zscore_x2',
score=0.5773502691896252,
label=0,
threshold=2),
AnomalyPrediction(
model_id='zscore_x2', score=11.5, label=1, threshold=2),
AnomalyPrediction(
model_id='zscore_x2',
score=0.5368754921931594,
label=0,
threshold=2),
AnomalyPrediction(
model_id='zscore_x2', score=float('NaN'), label=-2, threshold=2),
]
aggregated_expected_predictions = [
AnomalyPrediction(model_id="custom", label=-2),
AnomalyPrediction(model_id="custom", label=-2),
AnomalyPrediction(model_id="custom", label=0),
AnomalyPrediction(model_id="custom", label=1),
AnomalyPrediction(model_id="custom", label=1),
AnomalyPrediction(model_id="custom", label=0),
AnomalyPrediction(model_id="custom", label=-2),
]
keyed_zscore_x1_expected = [
(input[0], AnomalyResult(example=input[1], predictions=[decision]))
for input, decision in zip(keyed_input, zscore_x1_expected_predictions)
]
unkeyed_zscore_x1_expected = [
AnomalyResult(example=input, predictions=[decision])
for input, decision in zip(
unkeyed_input, zscore_x1_expected_predictions)
]
keyed_ensemble_expected = [(
input[0],
AnomalyResult(example=input[1], predictions=[decision1, decision2]))
for input, decision1, decision2 in zip(
keyed_input, zscore_x1_expected_predictions,
zscore_x2_expected_predictions)]
unkeyed_ensemble_expected = [
AnomalyResult(example=input, predictions=[decision1, decision2])
for input, decision1, decision2 in zip(
unkeyed_input, zscore_x1_expected_predictions,
zscore_x2_expected_predictions)
]
keyed_ensemble_agg_expected = [
(input[0], AnomalyResult(example=input[1], predictions=[prediction]))
for input, prediction in zip(
keyed_input, aggregated_expected_predictions)
]
unkeyed_ensemble_agg_expected = [
AnomalyResult(example=input, predictions=[prediction])
for input, prediction in zip(
unkeyed_input, aggregated_expected_predictions)
]
@parameterized.expand([
(TestData.keyed_input, TestData.keyed_zscore_x1_expected),
(TestData.unkeyed_input, TestData.unkeyed_zscore_x1_expected),
])
def test_one_detector(self, input, expected):
detector = ZScore(features=["x1"], model_id="zscore_x1")
with TestPipeline() as p:
result = (p | beam.Create(input) | AnomalyDetection(detector))
if isinstance(input[0], tuple):
assert_that(result, equal_to(expected, _keyed_result_is_equal_to))
else:
assert_that(result, equal_to(expected, _unkeyed_result_is_equal_to))
@parameterized.expand([
(TestData.keyed_input, TestData.keyed_ensemble_expected),
(TestData.unkeyed_input, TestData.unkeyed_ensemble_expected),
])
def test_multiple_detectors_without_aggregation(self, input, expected):
sub_detectors = []
sub_detectors.append(ZScore(features=["x1"], model_id="zscore_x1"))
sub_detectors.append(
ZScore(
features=["x2"],
threshold_criterion=FixedThreshold(2),
model_id="zscore_x2"))
with beam.Pipeline() as p:
result = (
p | beam.Create(input)
| AnomalyDetection(EnsembleAnomalyDetector(sub_detectors)))
if isinstance(input[0], tuple):
assert_that(result, equal_to(expected, _keyed_result_is_equal_to))
else:
assert_that(result, equal_to(expected, _unkeyed_result_is_equal_to))
@parameterized.expand([
(TestData.keyed_input, TestData.keyed_ensemble_agg_expected),
(TestData.unkeyed_input, TestData.unkeyed_ensemble_agg_expected),
])
def test_multiple_sub_detectors_with_aggregation(self, input, expected):
sub_detectors = []
sub_detectors.append(ZScore(features=["x1"], model_id="zscore_x1"))
sub_detectors.append(
ZScore(
features=["x2"],
threshold_criterion=FixedThreshold(2),
model_id="zscore_x2"))
with beam.Pipeline() as p:
result = (
p | beam.Create(input)
| AnomalyDetection(
EnsembleAnomalyDetector(
sub_detectors, aggregation_strategy=AnyVote())))
if isinstance(input[0], tuple):
assert_that(result, equal_to(expected, _keyed_result_is_equal_to))
else:
assert_that(result, equal_to(expected, _unkeyed_result_is_equal_to))
class FakeNumpyModel():
def __init__(self):
self.total_predict_calls = 0
def predict(self, input_vector: numpy.ndarray):
self.total_predict_calls += 1
return [input_vector[0][0] * 10 - input_vector[0][1]]
def _to_keyed_numpy_array(t: tuple[Any, beam.Row]):
"""Converts an Apache Beam Row to a NumPy array."""
return t[0], numpy.array(list(t[1]))
class TestOfflineDetector(unittest.TestCase):
class TestData:
unkeyed_input = [
beam.Row(x=1, y=2),
beam.Row(x=2, y=4),
beam.Row(x=3, y=6),
]
keyed_input = list(zip(itertools.repeat(1), unkeyed_input))
expected_predictions = [
AnomalyPrediction(
model_id='OfflineDetector',
score=8.0,
label=None,
threshold=None,
info='',
source_predictions=None),
AnomalyPrediction(
model_id='OfflineDetector',
score=16.0,
label=None,
threshold=None,
info='',
source_predictions=None),
AnomalyPrediction(
model_id='OfflineDetector',
score=24.0,
label=None,
threshold=None,
info='',
source_predictions=None),
]
keyed_expected = [
(input[0], AnomalyResult(example=input[1], predictions=[prediction]))
for input, prediction in zip(keyed_input, expected_predictions)
]
unkeyed_expected = [
AnomalyResult(example=input, predictions=[prediction])
for input, prediction in zip(unkeyed_input, expected_predictions)
]
def setUp(self):
global SklearnModelHandlerNumpy, KeyedModelHandler
global _PreProcessingModelHandler, _PostProcessingModelHandler
# Make model handlers into Specifiable
SklearnModelHandlerNumpy = specifiable(SklearnModelHandlerNumpy)
KeyedModelHandler = specifiable(KeyedModelHandler)
_PreProcessingModelHandler = specifiable(_PreProcessingModelHandler)
_PostProcessingModelHandler = specifiable(_PostProcessingModelHandler)
self.tmpdir = tempfile.mkdtemp()
def tearDown(self):
shutil.rmtree(self.tmpdir)
# Make the model handlers back to normal
SklearnModelHandlerNumpy.unspecifiable()
KeyedModelHandler.unspecifiable()
_PreProcessingModelHandler.unspecifiable()
_PostProcessingModelHandler.unspecifiable()
@parameterized.expand([
(TestData.keyed_input, TestData.keyed_expected),
(TestData.unkeyed_input, TestData.unkeyed_expected),
])
def test_default_inference_fn(self, input, expected):
temp_file_name = self.tmpdir + os.sep + 'pickled_file'
with open(temp_file_name, 'wb') as file:
pickle.dump(FakeNumpyModel(), file)
keyed_model_handler = KeyedModelHandler(
SklearnModelHandlerNumpy(model_uri=temp_file_name)).with_preprocess_fn(
_to_keyed_numpy_array).with_postprocess_fn(
OfflineDetector.score_prediction_adapter)
detector = OfflineDetector(keyed_model_handler=keyed_model_handler)
detector_spec = detector.to_spec()
expected_spec = Spec(
type='OfflineDetector',
config={
'keyed_model_handler': Spec(
type='_PostProcessingModelHandler',
config={
'base': Spec(
type='_PreProcessingModelHandler',
config={
'base': Spec(
type='KeyedModelHandler',
config={
'unkeyed': Spec(
type='SklearnModelHandlerNumpy',
config={'model_uri': temp_file_name})
}),
'preprocess_fn': Spec(
type='_to_keyed_numpy_array', config=None)
}),
'postprocess_fn': Spec(
type='score_prediction_adapter', config=None)
})
})
self.assertEqual(detector_spec, expected_spec)
self.assertEqual(_spec_type_to_subspace('SklearnModelHandlerNumpy'), '*')
self.assertEqual(_spec_type_to_subspace('_PreProcessingModelHandler'), '*')
self.assertEqual(_spec_type_to_subspace('_PostProcessingModelHandler'), '*')
self.assertEqual(_spec_type_to_subspace('_to_keyed_numpy_array'), '*')
self.assertEqual(_spec_type_to_subspace('score_prediction_adapter'), '*')
# Make sure the spec from the detector can be used to reconstruct the same
# detector
detector_new = Specifiable.from_spec(detector_spec)
with TestPipeline() as p:
result = (p | beam.Create(input) | AnomalyDetection(detector_new))
if isinstance(input[0], tuple):
assert_that(result, equal_to(expected, _keyed_result_is_equal_to))
else:
assert_that(result, equal_to(expected, _unkeyed_result_is_equal_to))
def test_run_inference_args(self):
model_handler = SklearnModelHandlerNumpy(model_uri="unused")
detector = OfflineDetector(
keyed_model_handler=model_handler,
run_inference_args={"inference_args": {
"multiplier": 10
}})
p = TestPipeline()
# patch the RunInference in "apache_beam.ml.anomaly.transforms" where
# it is imported and call
with mock.patch('apache_beam.ml.anomaly.transforms.RunInference') as mock_run_inference: # pylint: disable=line-too-long
# make the actual RunInference as the sideeffect, so we record the call
# information but also create the true RunInference instance.
mock_run_inference.side_effect = RunInference
try:
p = TestPipeline()
_ = (
p | beam.Create(TestOfflineDetector.TestData.unkeyed_input)
| AnomalyDetection(detector))
except: # pylint: disable=bare-except
pass
call_args = mock_run_inference.call_args[1]
self.assertEqual(
call_args,
{
'inference_args': {
'multiplier': 10
},
'model_identifier': 'OfflineDetector'
})
R = beam.Row(x=10, y=20)
class TestStatelessThresholdDoFn(unittest.TestCase):
def test_dofn_on_single_prediction(self):
input = [
(1, (2, AnomalyResult(R, [AnomalyPrediction(score=1)]))),
(1, (3, AnomalyResult(R, [AnomalyPrediction(score=2)]))),
(1, (4, AnomalyResult(R, [AnomalyPrediction(score=3)]))),
]
expected = [
(
1,
(
2,
AnomalyResult(
R, [AnomalyPrediction(score=1, label=0, threshold=2)]))),
(
1,
(
3,
AnomalyResult(
R, [AnomalyPrediction(score=2, label=1, threshold=2)]))),
(
1,
(
4,
AnomalyResult(
R, [AnomalyPrediction(score=3, label=1, threshold=2)]))),
]
with TestPipeline() as p:
result = (
p
| beam.Create(input)
| beam.ParDo(
_StatelessThresholdDoFn(
FixedThreshold(2, normal_label=0,
outlier_label=1).to_spec())))
assert_that(result, equal_to(expected))
def test_dofn_on_multiple_predictions(self):
input = [
(
1,
(
2,
AnomalyResult(
R,
[AnomalyPrediction(score=1), AnomalyPrediction(score=4)]))),
(
1,
(
3,
AnomalyResult(
R,
[AnomalyPrediction(score=2), AnomalyPrediction(score=0.5)
]))),
]
expected = [
(
1,
(
2,
AnomalyResult(
R,
[
AnomalyPrediction(score=1, label=0, threshold=2),
AnomalyPrediction(score=4, label=1, threshold=2)
]))),
(
1,
(
3,
AnomalyResult(
R,
[
AnomalyPrediction(score=2, label=1, threshold=2),
AnomalyPrediction(score=0.5, label=0, threshold=2)
]))),
]
with TestPipeline() as p:
result = (
p
| beam.Create(input)
| beam.ParDo(
_StatelessThresholdDoFn(
FixedThreshold(2, normal_label=0,
outlier_label=1).to_spec())))
assert_that(result, equal_to(expected))
class TestStatefulThresholdDoFn(unittest.TestCase):
def test_dofn_on_single_prediction(self):
# use the input data with two keys to test stateful threshold function
input = [
(1, (2, AnomalyResult(R, [AnomalyPrediction(score=1)]))),
(1, (3, AnomalyResult(R, [AnomalyPrediction(score=2)]))),
(1, (4, AnomalyResult(R, [AnomalyPrediction(score=3)]))),
(2, (2, AnomalyResult(R, [AnomalyPrediction(score=10)]))),
(2, (3, AnomalyResult(R, [AnomalyPrediction(score=20)]))),
(2, (4, AnomalyResult(R, [AnomalyPrediction(score=30)]))),
]
expected = [
(
1,
(
2,
AnomalyResult(
R, [AnomalyPrediction(score=1, label=1, threshold=1)]))),
(
1,
(
3,
AnomalyResult(
R, [AnomalyPrediction(score=2, label=1, threshold=1.5)]))),
(
2,
(
2,
AnomalyResult(
R, [AnomalyPrediction(score=10, label=1, threshold=10)]))),
(
2,
(
3,
AnomalyResult(
R, [AnomalyPrediction(score=20, label=1, threshold=15)]))),
(
1,
(
4,
AnomalyResult(
R, [AnomalyPrediction(score=3, label=1, threshold=2)]))),
(
2,
(
4,
AnomalyResult(
R, [AnomalyPrediction(score=30, label=1, threshold=20)]))),
]
with TestPipeline() as p:
result = (
p
| beam.Create(input)
# use median just for test convenience
| beam.ParDo(
_StatefulThresholdDoFn(
QuantileThreshold(
quantile=0.5, normal_label=0,
outlier_label=1).to_spec())))
assert_that(result, equal_to(expected))
if __name__ == '__main__':
logging.getLogger().setLevel(logging.WARNING)
unittest.main()