blob: 14027fdd747e2db7c3df82c8b082902d5b39aa2e [file] [log] [blame]
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from __future__ import absolute_import
from __future__ import division
import math
import random
import unittest
from builtins import range
from collections import defaultdict
import hamcrest as hc
from parameterized import parameterized
from tenacity import retry
from tenacity import stop_after_attempt
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import BeamAssertException
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
from apache_beam.transforms.core import Create
from apache_beam.transforms.display import DisplayData
from apache_beam.transforms.display_test import DisplayDataItemMatcher
from apache_beam.transforms.stats import ApproximateQuantilesCombineFn
class ApproximateUniqueTest(unittest.TestCase):
"""Unit tests for ApproximateUnique.Globally and ApproximateUnique.PerKey.
Hash() with Python3 is nondeterministic, so Approximation algorithm generates
different result each time and sometimes error rate is out of range, so add
retries for all tests who actually running approximation algorithm."""
def test_approximate_unique_global_by_invalid_size(self):
# test if the transformation throws an error as expected with an invalid
# small input size (< 16).
sample_size = 10
test_input = [random.randint(0, 1000) for _ in range(100)]
with self.assertRaises(ValueError) as e:
pipeline = TestPipeline()
_ = (pipeline
| 'create'
>> beam.Create(test_input)
| 'get_estimate'
>> beam.ApproximateUnique.Globally(size=sample_size))
pipeline.run()
expected_msg = beam.ApproximateUnique._INPUT_SIZE_ERR_MSG % (sample_size)
assert e.exception.args[0] == expected_msg
def test_approximate_unique_global_by_invalid_type_size(self):
# test if the transformation throws an error as expected with an invalid
# type of input size (not int).
sample_size = 100.0
test_input = [random.randint(0, 1000) for _ in range(100)]
with self.assertRaises(ValueError) as e:
pipeline = TestPipeline()
_ = (pipeline
| 'create' >> beam.Create(test_input)
| 'get_estimate'
>> beam.ApproximateUnique.Globally(size=sample_size))
pipeline.run()
expected_msg = beam.ApproximateUnique._INPUT_SIZE_ERR_MSG % (sample_size)
assert e.exception.args[0] == expected_msg
def test_approximate_unique_global_by_invalid_small_error(self):
# test if the transformation throws an error as expected with an invalid
# small input error (< 0.01).
est_err = 0.0
test_input = [random.randint(0, 1000) for _ in range(100)]
with self.assertRaises(ValueError) as e:
pipeline = TestPipeline()
_ = (pipeline
| 'create' >> beam.Create(test_input)
| 'get_estimate'
>> beam.ApproximateUnique.Globally(error=est_err))
pipeline.run()
expected_msg = beam.ApproximateUnique._INPUT_ERROR_ERR_MSG % (est_err)
assert e.exception.args[0] == expected_msg
def test_approximate_unique_global_by_invalid_big_error(self):
# test if the transformation throws an error as expected with an invalid
# big input error (> 0.50).
est_err = 0.6
test_input = [random.randint(0, 1000) for _ in range(100)]
with self.assertRaises(ValueError) as e:
pipeline = TestPipeline()
_ = (pipeline
| 'create' >> beam.Create(test_input)
| 'get_estimate'
>> beam.ApproximateUnique.Globally(error=est_err))
pipeline.run()
expected_msg = beam.ApproximateUnique._INPUT_ERROR_ERR_MSG % (est_err)
assert e.exception.args[0] == expected_msg
def test_approximate_unique_global_by_invalid_no_input(self):
# test if the transformation throws an error as expected with no input.
test_input = [random.randint(0, 1000) for _ in range(100)]
with self.assertRaises(ValueError) as e:
pipeline = TestPipeline()
_ = (pipeline
| 'create' >> beam.Create(test_input)
| 'get_estimate'
>> beam.ApproximateUnique.Globally())
pipeline.run()
expected_msg = beam.ApproximateUnique._NO_VALUE_ERR_MSG
assert e.exception.args[0] == expected_msg
def test_approximate_unique_global_by_invalid_both_input(self):
# test if the transformation throws an error as expected with multi input.
test_input = [random.randint(0, 1000) for _ in range(100)]
est_err = 0.2
sample_size = 30
with self.assertRaises(ValueError) as e:
pipeline = TestPipeline()
_ = (pipeline
| 'create' >> beam.Create(test_input)
| 'get_estimate'
>> beam.ApproximateUnique.Globally(size=sample_size, error=est_err))
pipeline.run()
expected_msg = beam.ApproximateUnique._MULTI_VALUE_ERR_MSG % (
sample_size, est_err)
assert e.exception.args[0] == expected_msg
def test_get_sample_size_from_est_error(self):
# test if get correct sample size from input error.
assert beam.ApproximateUnique._get_sample_size_from_est_error(0.5) == 16
assert beam.ApproximateUnique._get_sample_size_from_est_error(0.4) == 25
assert beam.ApproximateUnique._get_sample_size_from_est_error(0.2) == 100
assert beam.ApproximateUnique._get_sample_size_from_est_error(0.1) == 400
assert beam.ApproximateUnique._get_sample_size_from_est_error(0.05) == 1600
assert beam.ApproximateUnique._get_sample_size_from_est_error(0.01) == 40000
@unittest.skip('Skip it because hash function is not good enough. '
'TODO: BEAM-7654')
def test_approximate_unique_global_by_sample_size(self):
# test if estimation error with a given sample size is not greater than
# expected max error.
sample_size = 16
max_err = 2 / math.sqrt(sample_size)
test_input = [4, 34, 29, 46, 80, 66, 51, 81, 31, 9, 26, 36, 10, 41, 90, 35,
33, 19, 88, 86, 28, 93, 38, 76, 15, 87, 12, 39, 84, 13, 32,
49, 65, 100, 16, 27, 23, 30, 96, 54]
actual_count = len(set(test_input))
pipeline = TestPipeline()
result = (pipeline
| 'create' >> beam.Create(test_input)
| 'get_estimate'
>> beam.ApproximateUnique.Globally(size=sample_size)
| 'compare'
>> beam.FlatMap(lambda x: [abs(x - actual_count) * 1.0
/ actual_count <= max_err]))
assert_that(result, equal_to([True]),
label='assert:global_by_size')
pipeline.run()
@retry(reraise=True, stop=stop_after_attempt(5))
def test_approximate_unique_global_by_sample_size_with_duplicates(self):
# test if estimation error with a given sample size is not greater than
# expected max error with duplicated input.
sample_size = 30
max_err = 2 / math.sqrt(sample_size)
test_input = [10] * 50 + [20] * 50
actual_count = len(set(test_input))
pipeline = TestPipeline()
result = (pipeline
| 'create' >> beam.Create(test_input)
| 'get_estimate'
>> beam.ApproximateUnique.Globally(size=sample_size)
| 'compare'
>> beam.FlatMap(lambda x: [abs(x - actual_count) * 1.0
/ actual_count <= max_err]))
assert_that(result, equal_to([True]),
label='assert:global_by_size_with_duplicates')
pipeline.run()
@retry(reraise=True, stop=stop_after_attempt(5))
def test_approximate_unique_global_by_sample_size_with_small_population(self):
# test if estimation is exactly same to actual value when sample size is
# not smaller than population size (sample size > 100% of population).
sample_size = 31
test_input = [144, 160, 229, 923, 390, 756, 674, 769, 145, 888,
809, 159, 222, 101, 943, 901, 876, 194, 232, 631,
221, 829, 965, 729, 35, 33, 115, 894, 827, 364]
actual_count = len(set(test_input))
pipeline = TestPipeline()
result = (pipeline
| 'create' >> beam.Create(test_input)
| 'get_estimate'
>> beam.ApproximateUnique.Globally(size=sample_size))
assert_that(result, equal_to([actual_count]),
label='assert:global_by_sample_size_with_small_population')
pipeline.run()
@unittest.skip('Skip because hash function is not good enough. '
'TODO: BEAM-7654')
def test_approximate_unique_global_by_error(self):
# test if estimation error from input error is not greater than input error.
est_err = 0.3
test_input = [291, 371, 271, 126, 762, 391, 222, 565, 428, 786,
801, 867, 337, 690, 261, 436, 311, 568, 946, 722,
973, 386, 506, 546, 991, 450, 226, 889, 514, 693]
actual_count = len(set(test_input))
pipeline = TestPipeline()
result = (pipeline
| 'create' >> beam.Create(test_input)
| 'get_estimate'
>> beam.ApproximateUnique.Globally(error=est_err)
| 'compare'
>> beam.FlatMap(lambda x: [abs(x - actual_count) * 1.0
/ actual_count <= est_err]))
assert_that(result, equal_to([True]), label='assert:global_by_error')
pipeline.run()
@retry(reraise=True, stop=stop_after_attempt(5))
def test_approximate_unique_global_by_error_with_small_population(self):
# test if estimation error from input error of a small dataset is not
# greater than input error. Sample size is always not smaller than 16, so
# when population size is smaller than 16, estimation should be exactly
# same to actual value.
est_err = 0.01
test_input = [585, 104, 613, 503, 658, 640, 118, 492, 189, 798,
756, 755, 839, 79, 393]
actual_count = len(set(test_input))
pipeline = TestPipeline()
result = (pipeline
| 'create' >> beam.Create(test_input)
| 'get_estimate'
>> beam.ApproximateUnique.Globally(error=est_err))
assert_that(result, equal_to([actual_count]),
label='assert:global_by_error_with_small_population')
pipeline.run()
@retry(reraise=True, stop=stop_after_attempt(5))
def test_approximate_unique_perkey_by_size(self):
# test if est error per key from sample size is in a expected range.
sample_size = 20
max_err = 2 / math.sqrt(sample_size)
test_input = [(8, 73), (6, 724), (7, 70), (1, 576), (10, 120), (2, 662),
(7, 115), (3, 731), (6, 340), (6, 623), (1, 74), (9, 280),
(8, 298), (6, 440), (10, 243), (1, 125), (9, 754), (8, 833),
(9, 751), (4, 818), (6, 176), (9, 253), (2, 721), (8, 936),
(3, 691), (10, 685), (1, 69), (3, 155), (8, 86), (5, 693),
(2, 809), (4, 723), (8, 102), (9, 707), (8, 558), (4, 537),
(5, 371), (7, 432), (2, 51), (10, 397)]
actual_count_dict = defaultdict(set)
for (x, y) in test_input:
actual_count_dict[x].add(y)
pipeline = TestPipeline()
result = (pipeline
| 'create' >> beam.Create(test_input)
| 'get_estimate'
>> beam.ApproximateUnique.PerKey(size=sample_size)
| 'compare'
>> beam.FlatMap(lambda x: [abs(x[1]
- len(actual_count_dict[x[0]]))
* 1.0 / len(actual_count_dict[x[0]])
<= max_err]))
assert_that(result, equal_to([True] * len(actual_count_dict)),
label='assert:perkey_by_size')
pipeline.run()
@retry(reraise=True, stop=stop_after_attempt(5))
def test_approximate_unique_perkey_by_error(self):
# test if estimation error per key from input err is in the expected range.
est_err = 0.01
test_input = [(9, 6), (5, 5), (6, 9), (2, 4), (8, 3), (9, 0), (6, 10),
(8, 8), (9, 7), (2, 0), (9, 2), (1, 3), (4, 0), (7, 6),
(10, 6), (4, 7), (5, 8), (7, 2), (7, 10), (5, 10)]
actual_count_dict = defaultdict(set)
for (x, y) in test_input:
actual_count_dict[x].add(y)
pipeline = TestPipeline()
result = (pipeline
| 'create' >> beam.Create(test_input)
| 'get_estimate'
>> beam.ApproximateUnique.PerKey(error=est_err)
| 'compare'
>> beam.FlatMap(lambda x: [abs(x[1]
- len(actual_count_dict[x[0]]))
* 1.0 / len(actual_count_dict[x[0]])
<= est_err]))
assert_that(result, equal_to([True] * len(actual_count_dict)),
label='assert:perkey_by_error')
pipeline.run()
@retry(reraise=True, stop=stop_after_attempt(5))
def test_approximate_unique_globally_by_error_with_skewed_data(self):
# test if estimation error is within the expected range with skewed data.
est_err = 0.01
test_input = [19, 21, 32, 29, 5, 31, 52, 50, 59, 80, 7, 3, 34, 19, 13,
6, 55, 1, 13, 90, 4, 18, 52, 33, 0, 77, 21, 26, 5, 18]
actual_count = len(set(test_input))
pipeline = TestPipeline()
result = (pipeline
| 'create' >> beam.Create(test_input)
| 'get_estimate'
>> beam.ApproximateUnique.Globally(error=est_err)
| 'compare'
>> beam.FlatMap(lambda x: [abs(x - actual_count) * 1.0
/ actual_count <= est_err]))
assert_that(result, equal_to([True]),
label='assert:globally_by_error_with_skewed_data')
pipeline.run()
class ApproximateQuantilesTest(unittest.TestCase):
_kv_data = [("a", 1), ("a", 2), ("a", 3), ("b", 1), ("b", 10), ("b", 10),
("b", 100)]
_kv_str_data = [("a", "a"), ("a", "a"*2), ("a", "a"*3), ("b", "b"),
("b", "b"*10), ("b", "b"*10), ("b", "b"*100)]
@staticmethod
def _quantiles_matcher(expected):
l = len(expected)
def assert_true(exp):
if not exp:
raise BeamAssertException('%s Failed assert True' % repr(exp))
def match(actual):
actual = actual[0]
for i in range(l):
if isinstance(expected[i], list):
assert_true(expected[i][0] <= actual[i] <= expected[i][1])
else:
equal_to([expected[i]])([actual[i]])
return match
@staticmethod
def _approx_quantile_generator(size, num_of_quantiles, absoluteError):
quantiles = [0]
k = 1
while k < num_of_quantiles - 1:
expected = (size - 1) * k / (num_of_quantiles - 1)
quantiles.append([expected - absoluteError, expected + absoluteError])
k = k + 1
quantiles.append(size - 1)
return quantiles
def test_quantiles_globaly(self):
with TestPipeline() as p:
pc = p | Create(list(range(101)))
quantiles = pc | 'Quantiles globally' >> \
beam.ApproximateQuantiles.Globally(5)
quantiles_reversed = pc | 'Quantiles globally reversed' >> \
beam.ApproximateQuantiles.Globally(5, reverse=True)
assert_that(quantiles, equal_to([[0, 25, 50, 75, 100]]),
label='checkQuantilesGlobally')
assert_that(quantiles_reversed, equal_to([[100, 75, 50, 25, 0]]),
label='checkReversedQuantiles')
def test_quantiles_per_key(self):
with TestPipeline() as p:
data = self._kv_data
pc = p | Create(data)
per_key = pc | 'Quantiles PerKey' >> beam.ApproximateQuantiles.PerKey(2)
per_key_reversed = (pc | 'Quantiles PerKey Reversed' >>
beam.ApproximateQuantiles.PerKey(2, reverse=True))
assert_that(per_key, equal_to([('a', [1, 3]), ('b', [1, 100])]),
label='checkQuantilePerKey')
assert_that(per_key_reversed, equal_to([('a', [3, 1]), ('b', [100, 1])]),
label='checkReversedQuantilesPerKey')
def test_quantiles_per_key_with_key_argument(self):
with TestPipeline() as p:
data = self._kv_str_data
pc = p | Create(data)
per_key = pc | 'Per Key' >> beam.ApproximateQuantiles.PerKey(2, key=len)
per_key_reversed = (pc | 'Per Key Reversed' >> beam.ApproximateQuantiles.
PerKey(2, key=len, reverse=True))
assert_that(per_key, equal_to([('a', ['a', 'a' * 3]),
('b', ['b', 'b' * 100])]),
label='checkPerKey')
assert_that(per_key_reversed, equal_to([('a', ['a'*3, 'a']),
('b', ['b'*100, 'b'])]),
label='checkPerKeyReversed')
def test_singleton(self):
with TestPipeline() as p:
data = [389]
pc = p | Create(data)
qunatiles = pc | beam.ApproximateQuantiles.Globally(5)
assert_that(qunatiles, equal_to([[389, 389, 389, 389, 389]]))
def test_uneven_quantiles(self):
with TestPipeline() as p:
pc = p | Create(list(range(5000)))
qunatiles = pc | beam.ApproximateQuantiles.Globally(37)
aprox_quantiles = self._approx_quantile_generator(size=5000,
num_of_quantiles=37,
absoluteError=20)
assert_that(qunatiles, self._quantiles_matcher(aprox_quantiles))
def test_large_quantiles(self):
with TestPipeline() as p:
pc = p | Create(list(range(10001)))
qunatiles = pc | beam.ApproximateQuantiles.Globally(50)
aprox_quantiles = self._approx_quantile_generator(size=10001,
num_of_quantiles=50,
absoluteError=20)
assert_that(qunatiles, self._quantiles_matcher(aprox_quantiles))
def test_random_quantiles(self):
with TestPipeline() as p:
data = list(range(101))
random.shuffle(data)
pc = p | Create(data)
quantiles = pc | beam.ApproximateQuantiles.Globally(5)
assert_that(quantiles, equal_to([[0, 25, 50, 75, 100]]))
def test_duplicates(self):
y = list(range(101))
data = []
for _ in range(10):
data.extend(y)
with TestPipeline() as p:
pc = p | Create(data)
quantiles = (pc | 'Quantiles Globally' >>
beam.ApproximateQuantiles.Globally(5))
quantiles_reversed = (pc | 'Quantiles Reversed' >>
beam.ApproximateQuantiles.Globally(5, reverse=True))
assert_that(quantiles, equal_to([[0, 25, 50, 75, 100]]),
label="checkQuantilesGlobally")
assert_that(quantiles_reversed, equal_to([[100, 75, 50, 25, 0]]),
label="checkQuantileReversed")
def test_lots_of_duplicates(self):
with TestPipeline() as p:
data = [1]
data.extend([2 for _ in range(299)])
data.extend([3 for _ in range(799)])
pc = p | Create(data)
quantiles = pc | beam.ApproximateQuantiles.Globally(5)
assert_that(quantiles, equal_to([[1, 2, 3, 3, 3]]))
def test_log_distribution(self):
with TestPipeline() as p:
data = [int(math.log(x)) for x in range(1, 1000)]
pc = p | Create(data)
quantiles = pc | beam.ApproximateQuantiles.Globally(5)
assert_that(quantiles, equal_to([[0, 5, 6, 6, 6]]))
def test_zipfian_distribution(self):
with TestPipeline() as p:
data = []
for i in range(1, 1000):
data.append(int(1000 / i))
pc = p | Create(data)
quantiles = pc | beam.ApproximateQuantiles.Globally(5)
assert_that(quantiles, equal_to([[1, 1, 2, 4, 1000]]))
def test_alternate_quantiles(self):
data = ["aa", "aaa", "aaaa", "b", "ccccc", "dddd", "zz"]
with TestPipeline() as p:
pc = p | Create(data)
globally = pc | 'Globally' >> beam.ApproximateQuantiles.Globally(3)
with_key = (pc | 'Globally with key' >>
beam.ApproximateQuantiles.Globally(3, key=len))
key_with_reversed = (pc | 'Globally with key and reversed' >>
beam.ApproximateQuantiles.Globally(
3, key=len, reverse=True))
assert_that(globally, equal_to([["aa", "b", "zz"]]),
label='checkGlobally')
assert_that(with_key, equal_to([["b", "aaa", "ccccc"]]),
label='checkGloballyWithKey')
assert_that(key_with_reversed, equal_to([["ccccc", "aaa", "b"]]),
label='checkWithKeyAndReversed')
@staticmethod
def _display_data_matcher(instance):
expected_items = [
DisplayDataItemMatcher('num_quantiles', instance._num_quantiles),
DisplayDataItemMatcher('key', str(instance._key.__name__)),
DisplayDataItemMatcher('reverse', str(instance._reverse))
]
return expected_items
def test_global_display_data(self):
transform = beam.ApproximateQuantiles.Globally(3, key=len, reverse=True)
data = DisplayData.create_from(transform)
expected_items = self._display_data_matcher(transform)
hc.assert_that(data.items, hc.contains_inanyorder(*expected_items))
def test_perkey_display_data(self):
transform = beam.ApproximateQuantiles.PerKey(3, key=len, reverse=True)
data = DisplayData.create_from(transform)
expected_items = self._display_data_matcher(transform)
hc.assert_that(data.items, hc.contains_inanyorder(*expected_items))
def _build_quantilebuffer_test_data():
"""
Test data taken from "Munro-Paterson Algorithm" reference values table of
"Approximate Medians and other Quantiles in One Pass and with Limited Memory"
paper. See ApproximateQuantilesCombineFn for paper reference.
"""
epsilons = [0.1, 0.05, 0.01, 0.005, 0.001]
maxElementExponents = [5, 6, 7, 8, 9]
expectedNumBuffersValues = [
[11, 14, 17, 21, 24],
[11, 14, 17, 20, 23],
[9, 11, 14, 17, 21],
[8, 11, 14, 17, 20],
[6, 9, 11, 14, 17]
]
expectedBufferSizeValues = [
[98, 123, 153, 96, 120],
[98, 123, 153, 191, 239],
[391, 977, 1221, 1526, 954],
[782, 977, 1221, 1526, 1908],
[3125, 3907, 9766, 12208, 15259]
]
test_data = list()
i = 0
for epsilon in epsilons:
j = 0
for maxElementExponent in maxElementExponents:
test_data.append([
epsilon,
(10 ** maxElementExponent),
expectedNumBuffersValues[i][j],
expectedBufferSizeValues[i][j]
])
j += 1
i += 1
return test_data
class ApproximateQuantilesBufferTest(unittest.TestCase):
""" Approximate Quantiles Buffer Tests to ensure we are calculating the
optimal buffers."""
@parameterized.expand(_build_quantilebuffer_test_data)
def test_efficiency(self, epsilon, maxInputSize, expectedNumBuffers,
expectedBufferSize):
"""
Verify the buffers are efficiently calculated according to the reference
table values.
"""
combine_fn = ApproximateQuantilesCombineFn.create(
num_quantiles=10, max_num_elements=maxInputSize, epsilon=epsilon)
self.assertEqual(expectedNumBuffers, combine_fn._num_buffers,
"Number of buffers")
self.assertEqual(expectedBufferSize, combine_fn._buffer_size, "Buffer size")
@parameterized.expand(_build_quantilebuffer_test_data)
def test_correctness(self, epsilon, maxInputSize, *args):
"""
Verify that buffers are correct according to the two constraint equations.
"""
combine_fn = ApproximateQuantilesCombineFn.create(
num_quantiles=10, max_num_elements=maxInputSize, epsilon=epsilon)
b = combine_fn._num_buffers
k = combine_fn._buffer_size
n = maxInputSize
self.assertLessEqual((b - 2) * (1 << (b - 2)) + 0.5, (epsilon * n),
'(b-2)2^(b-2) + 1/2 <= eN')
self.assertGreaterEqual((k * 2) ** (b - 1), n, 'k2^(b-1) >= N')
if __name__ == '__main__':
unittest.main()