blob: 739438035c8801f4106094789ced5a37441aa885 [file] [log] [blame]
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# pytype: skip-file
import math
import random
import sys
import unittest
from collections import defaultdict
import hamcrest as hc
from parameterized import parameterized
from parameterized import parameterized_class
import apache_beam as beam
from apache_beam.coders import coders
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import BeamAssertException
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
from apache_beam.transforms.core import Create
from apache_beam.transforms.display import DisplayData
from apache_beam.transforms.display_test import DisplayDataItemMatcher
from apache_beam.transforms.stats import ApproximateQuantilesCombineFn
from apache_beam.transforms.stats import ApproximateUniqueCombineFn
try:
import mmh3
mmh3_options = [(mmh3, ), (None, )]
except ImportError:
mmh3_options = [(None, )]
@parameterized_class(('mmh3_option', ), mmh3_options)
class ApproximateUniqueTest(unittest.TestCase):
"""Unit tests for ApproximateUnique.Globally and ApproximateUnique.PerKey."""
random.seed(0)
def setUp(self):
sys.modules['mmh3'] = self.mmh3_option
@parameterized.expand([
(
'small_population_by_size',
list(range(30)),
32,
None,
'assert:global_by_sample_size_with_small_population'),
(
'large_population_by_size',
list(range(100)),
16,
None,
'assert:global_by_sample_size_with_large_population'),
(
'with_duplicates_by_size', [10] * 50 + [20] * 50,
30,
None,
'assert:global_by_sample_size_with_duplicates'),
(
'small_population_by_error',
list(range(30)),
None,
0.3,
'assert:global_by_error_with_small_population'),
(
'large_population_by_error',
[random.randint(1, 1000) for _ in range(500)],
None,
0.1,
'assert:global_by_error_with_large_population'),
])
def test_approximate_unique_global(
self, name, test_input, sample_size, est_error, label):
# check that only either sample_size or est_error is not None
assert bool(sample_size) != bool(est_error)
if sample_size:
error = 2 / math.sqrt(sample_size)
else:
error = est_error
random.shuffle(test_input)
actual_count = len(set(test_input))
with TestPipeline() as pipeline:
result = (
pipeline
| 'create' >> beam.Create(test_input)
| 'get_estimate' >> beam.ApproximateUnique.Globally(
size=sample_size, error=est_error)
| 'compare' >> beam.FlatMap(
lambda x: [abs(x - actual_count) * 1.0 / actual_count <= error]))
assert_that(result, equal_to([True]), label=label)
@parameterized.expand([
('by_size', 20, None, 'assert:unique_perkey_by_sample_size'),
('by_error', None, 0.02, 'assert:unique_perkey_by_error')
])
def test_approximate_unique_perkey(self, name, sample_size, est_error, label):
# check that only either sample_size or est_error is set
assert bool(sample_size) != bool(est_error)
if sample_size:
error = 2 / math.sqrt(sample_size)
else:
error = est_error
test_input = [(8, 73), (6, 724), (7, 70), (1, 576), (10, 120), (2, 662),
(7, 115), (3, 731), (6, 340), (6, 623), (1, 74), (9, 280),
(8, 298), (6, 440), (10, 243), (1, 125), (9, 754), (8, 833),
(9, 751), (4, 818), (6, 176), (9, 253), (2, 721), (8, 936),
(3, 691), (10, 685), (1, 69), (3, 155), (8, 86), (5, 693),
(2, 809), (4, 723), (8, 102), (9, 707), (8, 558), (4, 537),
(5, 371), (7, 432), (2, 51), (10, 397)]
actual_count_dict = defaultdict(set)
for (x, y) in test_input:
actual_count_dict[x].add(y)
with TestPipeline() as pipeline:
result = (
pipeline
| 'create' >> beam.Create(test_input)
| 'get_estimate' >> beam.ApproximateUnique.PerKey(
size=sample_size, error=est_error)
| 'compare' >> beam.FlatMap(
lambda x: [
abs(x[1] - len(actual_count_dict[x[0]])) * 1.0 / len(
actual_count_dict[x[0]]) <= error
]))
assert_that(
result, equal_to([True] * len(actual_count_dict)), label=label)
@parameterized.expand([
(
'invalid_input_size',
list(range(30)),
10,
None,
beam.ApproximateUnique._INPUT_SIZE_ERR_MSG % 10),
(
'invalid_type_size',
list(range(30)),
100.0,
None,
beam.ApproximateUnique._INPUT_SIZE_ERR_MSG % 100.0),
(
'invalid_small_error',
list(range(30)),
None,
0.0,
beam.ApproximateUnique._INPUT_ERROR_ERR_MSG % 0.0),
(
'invalid_big_error',
list(range(30)),
None,
0.6,
beam.ApproximateUnique._INPUT_ERROR_ERR_MSG % 0.6),
(
'no_input',
list(range(30)),
None,
None,
beam.ApproximateUnique._NO_VALUE_ERR_MSG),
(
'both_input',
list(range(30)),
30,
0.2,
beam.ApproximateUnique._MULTI_VALUE_ERR_MSG % (30, 0.2)),
])
def test_approximate_unique_global_value_error(
self, name, test_input, sample_size, est_error, expected_msg):
with self.assertRaises(ValueError) as e:
with TestPipeline() as pipeline:
_ = (
pipeline
| 'create' >> beam.Create(test_input)
| 'get_estimate' >> beam.ApproximateUnique.Globally(
size=sample_size, error=est_error))
assert e.exception.args[0] == expected_msg
def test_approximate_unique_combine_fn_requires_nondeterministic_coder(self):
sample_size = 30
coder = coders.Base64PickleCoder()
with self.assertRaises(ValueError) as e:
_ = ApproximateUniqueCombineFn(sample_size, coder)
self.assertRegex(
e.exception.args[0],
'The key coder "Base64PickleCoder" '
'for ApproximateUniqueCombineFn is not deterministic.')
def test_approximate_unique_combine_fn_requires_compatible_coder(self):
test_input = 'a'
sample_size = 30
coder = coders.FloatCoder()
combine_fn = ApproximateUniqueCombineFn(sample_size, coder)
accumulator = combine_fn.create_accumulator()
with self.assertRaises(RuntimeError) as e:
accumulator = combine_fn.add_input(accumulator, test_input)
self.assertRegex(e.exception.args[0], 'Runtime exception')
def test_get_sample_size_from_est_error(self):
# test if get correct sample size from input error.
assert beam.ApproximateUnique._get_sample_size_from_est_error(0.5) == 16
assert beam.ApproximateUnique._get_sample_size_from_est_error(0.4) == 25
assert beam.ApproximateUnique._get_sample_size_from_est_error(0.2) == 100
assert beam.ApproximateUnique._get_sample_size_from_est_error(0.1) == 400
assert beam.ApproximateUnique._get_sample_size_from_est_error(0.05) == 1600
assert beam.ApproximateUnique._get_sample_size_from_est_error(0.01) == 40000
class ApproximateQuantilesTest(unittest.TestCase):
_kv_data = [("a", 1), ("a", 2), ("a", 3), ("b", 1), ("b", 10), ("b", 10),
("b", 100)]
_kv_str_data = [("a", "a"), ("a", "a" * 2), ("a", "a" * 3), ("b", "b"),
("b", "b" * 10), ("b", "b" * 10), ("b", "b" * 100)]
@staticmethod
def _quantiles_matcher(expected):
l = len(expected)
def assert_true(exp):
if not exp:
raise BeamAssertException('%s Failed assert True' % repr(exp))
def match(actual):
actual = actual[0]
for i in range(l):
if isinstance(expected[i], list):
assert_true(expected[i][0] <= actual[i] <= expected[i][1])
else:
equal_to([expected[i]])([actual[i]])
return match
@staticmethod
def _approx_quantile_generator(size, num_of_quantiles, absoluteError):
quantiles = [0]
k = 1
while k < num_of_quantiles - 1:
expected = (size - 1) * k / (num_of_quantiles - 1)
quantiles.append([expected - absoluteError, expected + absoluteError])
k = k + 1
quantiles.append(size - 1)
return quantiles
def test_quantiles_globaly(self):
with TestPipeline() as p:
pc = p | Create(list(range(101)))
quantiles = pc | 'Quantiles globally' >> \
beam.ApproximateQuantiles.Globally(5)
quantiles_reversed = pc | 'Quantiles globally reversed' >> \
beam.ApproximateQuantiles.Globally(5, reverse=True)
assert_that(
quantiles,
equal_to([[0, 25, 50, 75, 100]]),
label='checkQuantilesGlobally')
assert_that(
quantiles_reversed,
equal_to([[100, 75, 50, 25, 0]]),
label='checkReversedQuantiles')
def test_quantiles_globally_weighted(self):
num_inputs = 1e3
a = -3
b = 3
# Weighting function coincides with the pdf of the standard normal
# distribution up to a constant. Since 99.7% of the probability mass for
# this pdf is concentrated in the interval [a, b] = [-3, 3], the quantiles
# for a sample from this interval with the given weight function are
# expected to be close to the quantiles of the standard normal distribution.
def weight(x):
return math.exp(-(x**2) / 2)
input_data = [
(a + (b - a) * i / num_inputs, weight(a + (b - a) * i / num_inputs))
for i in range(int(num_inputs) + 1)
]
with TestPipeline() as p:
pc = p | Create(input_data)
weighted_quantiles = pc | "Quantiles globally weighted" >> \
beam.ApproximateQuantiles.Globally(5, weighted=True)
reversed_weighted_quantiles = (
pc | 'Quantiles globally weighted reversed' >>
beam.ApproximateQuantiles.Globally(5, reverse=True, weighted=True))
assert_that(
weighted_quantiles,
equal_to([[-3., -0.6720000000000002, 0., 0.6720000000000002, 3.]]),
label="checkWeightedQuantilesGlobally")
assert_that(
reversed_weighted_quantiles,
equal_to([[3., 0.6720000000000002, 0., -0.6720000000000002, -3.]]),
label="checkWeightedReversedQuantilesGlobally")
def test_quantiles_per_key(self):
with TestPipeline() as p:
data = self._kv_data
pc = p | Create(data)
per_key = pc | 'Quantiles PerKey' >> beam.ApproximateQuantiles.PerKey(2)
per_key_reversed = (
pc | 'Quantiles PerKey Reversed' >> beam.ApproximateQuantiles.PerKey(
2, reverse=True))
assert_that(
per_key,
equal_to([('a', [1, 3]), ('b', [1, 100])]),
label='checkQuantilePerKey')
assert_that(
per_key_reversed,
equal_to([('a', [3, 1]), ('b', [100, 1])]),
label='checkReversedQuantilesPerKey')
def test_quantiles_per_key_weighted(self):
with TestPipeline() as p:
data = [(k, (v, 2.)) for k, v in self._kv_data]
pc = p | Create(data)
per_key = pc | 'Weighted Quantiles PerKey' >> \
beam.ApproximateQuantiles.PerKey(2, weighted=True)
per_key_reversed = pc | 'Weighted Quantiles PerKey Reversed' >> \
beam.ApproximateQuantiles.PerKey(
2, reverse=True, weighted=True)
assert_that(
per_key,
equal_to([('a', [1, 3]), ('b', [1, 100])]),
label='checkWeightedQuantilesPerKey')
assert_that(
per_key_reversed,
equal_to([('a', [3, 1]), ('b', [100, 1])]),
label='checkWeightedReversedQuantilesPerKey')
def test_quantiles_per_key_with_key_argument(self):
with TestPipeline() as p:
data = self._kv_str_data
pc = p | Create(data)
per_key = pc | 'Per Key' >> beam.ApproximateQuantiles.PerKey(2, key=len)
per_key_reversed = (
pc | 'Per Key Reversed' >> beam.ApproximateQuantiles.PerKey(
2, key=len, reverse=True))
assert_that(
per_key,
equal_to([('a', ['a', 'a' * 3]), ('b', ['b', 'b' * 100])]),
label='checkPerKey')
assert_that(
per_key_reversed,
equal_to([('a', ['a' * 3, 'a']), ('b', ['b' * 100, 'b'])]),
label='checkPerKeyReversed')
def test_singleton(self):
with TestPipeline() as p:
data = [389]
pc = p | Create(data)
quantiles = pc | beam.ApproximateQuantiles.Globally(5)
assert_that(quantiles, equal_to([[389, 389, 389, 389, 389]]))
def test_uneven_quantiles(self):
with TestPipeline() as p:
pc = p | Create(list(range(5000)))
quantiles = pc | beam.ApproximateQuantiles.Globally(37)
approx_quantiles = self._approx_quantile_generator(
size=5000, num_of_quantiles=37, absoluteError=20)
assert_that(quantiles, self._quantiles_matcher(approx_quantiles))
def test_large_quantiles(self):
with TestPipeline() as p:
pc = p | Create(list(range(10001)))
quantiles = pc | beam.ApproximateQuantiles.Globally(50)
approx_quantiles = self._approx_quantile_generator(
size=10001, num_of_quantiles=50, absoluteError=20)
assert_that(quantiles, self._quantiles_matcher(approx_quantiles))
def test_random_quantiles(self):
with TestPipeline() as p:
data = list(range(101))
random.shuffle(data)
pc = p | Create(data)
quantiles = pc | beam.ApproximateQuantiles.Globally(5)
assert_that(quantiles, equal_to([[0, 25, 50, 75, 100]]))
def test_duplicates(self):
y = list(range(101))
data = []
for _ in range(10):
data.extend(y)
with TestPipeline() as p:
pc = p | Create(data)
quantiles = (
pc | 'Quantiles Globally' >> beam.ApproximateQuantiles.Globally(5))
quantiles_reversed = (
pc | 'Quantiles Reversed' >> beam.ApproximateQuantiles.Globally(
5, reverse=True))
assert_that(
quantiles,
equal_to([[0, 25, 50, 75, 100]]),
label="checkQuantilesGlobally")
assert_that(
quantiles_reversed,
equal_to([[100, 75, 50, 25, 0]]),
label="checkQuantileReversed")
def test_lots_of_duplicates(self):
with TestPipeline() as p:
data = [1]
data.extend([2 for _ in range(299)])
data.extend([3 for _ in range(799)])
pc = p | Create(data)
quantiles = pc | beam.ApproximateQuantiles.Globally(5)
assert_that(quantiles, equal_to([[1, 2, 3, 3, 3]]))
def test_log_distribution(self):
with TestPipeline() as p:
data = [int(math.log(x)) for x in range(1, 1000)]
pc = p | Create(data)
quantiles = pc | beam.ApproximateQuantiles.Globally(5)
assert_that(quantiles, equal_to([[0, 5, 6, 6, 6]]))
def test_zipfian_distribution(self):
with TestPipeline() as p:
data = []
for i in range(1, 1000):
data.append(int(1000 / i))
pc = p | Create(data)
quantiles = pc | beam.ApproximateQuantiles.Globally(5)
assert_that(quantiles, equal_to([[1, 1, 2, 4, 1000]]))
def test_alternate_quantiles(self):
data = ["aa", "aaa", "aaaa", "b", "ccccc", "dddd", "zz"]
with TestPipeline() as p:
pc = p | Create(data)
globally = pc | 'Globally' >> beam.ApproximateQuantiles.Globally(3)
with_key = (
pc |
'Globally with key' >> beam.ApproximateQuantiles.Globally(3, key=len))
key_with_reversed = (
pc | 'Globally with key and reversed' >>
beam.ApproximateQuantiles.Globally(3, key=len, reverse=True))
assert_that(
globally, equal_to([["aa", "b", "zz"]]), label='checkGlobally')
assert_that(
with_key,
equal_to([["b", "aaa", "ccccc"]]),
label='checkGloballyWithKey')
assert_that(
key_with_reversed,
equal_to([["ccccc", "aaa", "b"]]),
label='checkWithKeyAndReversed')
def test_batched_quantiles(self):
with TestPipeline() as p:
data = []
for i in range(100):
data.append([(j / 10, abs(j - 500))
for j in range(i * 10, (i + 1) * 10)])
pc = p | Create(data)
globally = (
pc | 'Globally' >> beam.ApproximateQuantiles.Globally(
3, input_batched=True))
with_key = (
pc | 'Globally with key' >> beam.ApproximateQuantiles.Globally(
3, key=sum, input_batched=True))
key_with_reversed = (
pc | 'Globally with key and reversed' >>
beam.ApproximateQuantiles.Globally(
3, key=sum, reverse=True, input_batched=True))
assert_that(
globally,
equal_to([[(0.0, 500), (49.9, 1), (99.9, 499)]]),
label='checkGlobally')
assert_that(
with_key,
equal_to([[(50.0, 0), (72.5, 225), (99.9, 499)]]),
label='checkGloballyWithKey')
assert_that(
key_with_reversed,
equal_to([[(99.9, 499), (72.5, 225), (50.0, 0)]]),
label='checkGloballyWithKeyAndReversed')
def test_batched_weighted_quantiles(self):
with TestPipeline() as p:
data = []
for i in range(100):
data.append([[(i / 10, abs(i - 500))
for i in range(i * 10, (i + 1) * 10)], [i] * 10])
pc = p | Create(data)
globally = (
pc | 'Globally' >> beam.ApproximateQuantiles.Globally(
3, weighted=True, input_batched=True))
with_key = (
pc | 'Globally with key' >> beam.ApproximateQuantiles.Globally(
3, key=sum, weighted=True, input_batched=True))
key_with_reversed = (
pc | 'Globally with key and reversed' >>
beam.ApproximateQuantiles.Globally(
3, key=sum, reverse=True, weighted=True, input_batched=True))
assert_that(
globally,
equal_to([[(0.0, 500), (70.8, 208), (99.9, 499)]]),
label='checkGlobally')
assert_that(
with_key,
equal_to([[(50.0, 0), (21.0, 290), (99.9, 499)]]),
label='checkGloballyWithKey')
assert_that(
key_with_reversed,
equal_to([[(99.9, 499), (21.0, 290), (50.0, 0)]]),
label='checkGloballyWithKeyAndReversed')
def test_quantiles_merge_accumulators(self):
# This test exercises merging multiple buffers and approximation accuracy.
# The max_num_elements is set to a small value to trigger buffers collapse
# and interpolation. Under the conditions below, buffer_size=125 and
# num_buffers=4, so we're only allowed to keep half of the input values.
num_accumulators = 100
num_quantiles = 5
eps = 0.01
max_num_elements = 1000
combine_fn = ApproximateQuantilesCombineFn.create(
num_quantiles, eps, max_num_elements)
combine_fn_weighted = ApproximateQuantilesCombineFn.create(
num_quantiles, eps, max_num_elements, weighted=True)
data = list(range(1000))
weights = list(reversed(range(1000)))
step = math.ceil(len(data) / num_accumulators)
accumulators = []
accumulators_weighted = []
for i in range(num_accumulators):
accumulator = combine_fn.create_accumulator()
accumulator_weighted = combine_fn_weighted.create_accumulator()
for element, weight in zip(data[i*step:(i+1)*step],
weights[i*step:(i+1)*step]):
accumulator = combine_fn.add_input(accumulator, element)
accumulator_weighted = combine_fn_weighted.add_input(
accumulator_weighted, (element, weight))
accumulators.append(accumulator)
accumulators_weighted.append(accumulator_weighted)
accumulator = combine_fn.merge_accumulators(accumulators)
accumulator_weighted = combine_fn_weighted.merge_accumulators(
accumulators_weighted)
quantiles = combine_fn.extract_output(accumulator)
quantiles_weighted = combine_fn_weighted.extract_output(
accumulator_weighted)
# In fact, the final accuracy is much higher than eps, but we test for a
# minimal accuracy here.
for q, actual_q in zip(quantiles, [0, 249, 499, 749, 999]):
self.assertAlmostEqual(q, actual_q, delta=max_num_elements * eps)
for q, actual_q in zip(quantiles_weighted, [0, 133, 292, 499, 999]):
self.assertAlmostEqual(q, actual_q, delta=max_num_elements * eps)
@staticmethod
def _display_data_matcher(instance):
expected_items = [
DisplayDataItemMatcher('num_quantiles', instance._num_quantiles),
DisplayDataItemMatcher('weighted', str(instance._weighted)),
DisplayDataItemMatcher('key', str(instance._key.__name__)),
DisplayDataItemMatcher('reverse', str(instance._reverse)),
DisplayDataItemMatcher('input_batched', str(instance._input_batched)),
]
return expected_items
def test_global_display_data(self):
transform = beam.ApproximateQuantiles.Globally(
3, weighted=True, key=len, reverse=True)
data = DisplayData.create_from(transform)
expected_items = self._display_data_matcher(transform)
hc.assert_that(data.items, hc.contains_inanyorder(*expected_items))
def test_perkey_display_data(self):
transform = beam.ApproximateQuantiles.PerKey(
3, weighted=True, key=len, reverse=True)
data = DisplayData.create_from(transform)
expected_items = self._display_data_matcher(transform)
hc.assert_that(data.items, hc.contains_inanyorder(*expected_items))
def _build_quantilebuffer_test_data():
"""
Test data taken from "Munro-Paterson Algorithm" reference values table of
"Approximate Medians and other Quantiles in One Pass and with Limited Memory"
paper. See ApproximateQuantilesCombineFn for paper reference.
"""
epsilons = [0.1, 0.05, 0.01, 0.005, 0.001]
maxElementExponents = [5, 6, 7, 8, 9]
expectedNumBuffersValues = [[11, 14, 17, 21, 24], [11, 14, 17, 20, 23],
[9, 11, 14, 17, 21], [8, 11, 14, 17,
20], [6, 9, 11, 14, 17]]
expectedBufferSizeValues = [[98, 123, 153, 96, 120], [98, 123, 153, 191, 239],
[391, 977, 1221, 1526,
954], [782, 977, 1221, 1526,
1908], [3125, 3907, 9766, 12208, 15259]]
test_data = list()
i = 0
for epsilon in epsilons:
j = 0
for maxElementExponent in maxElementExponents:
test_data.append([
epsilon, (10**maxElementExponent),
expectedNumBuffersValues[i][j],
expectedBufferSizeValues[i][j]
])
j += 1
i += 1
return test_data
class ApproximateQuantilesBufferTest(unittest.TestCase):
""" Approximate Quantiles Buffer Tests to ensure we are calculating the
optimal buffers."""
@parameterized.expand(_build_quantilebuffer_test_data)
def test_efficiency(
self, epsilon, maxInputSize, expectedNumBuffers, expectedBufferSize):
"""
Verify the buffers are efficiently calculated according to the reference
table values.
"""
combine_fn = ApproximateQuantilesCombineFn.create(
num_quantiles=10, max_num_elements=maxInputSize, epsilon=epsilon)
self.assertEqual(
expectedNumBuffers, combine_fn._spec.num_buffers, "Number of buffers")
self.assertEqual(
expectedBufferSize, combine_fn._spec.buffer_size, "Buffer size")
@parameterized.expand(_build_quantilebuffer_test_data)
def test_correctness(self, epsilon, maxInputSize, *args):
"""
Verify that buffers are correct according to the two constraint equations.
"""
combine_fn = ApproximateQuantilesCombineFn.create(
num_quantiles=10, max_num_elements=maxInputSize, epsilon=epsilon)
b = combine_fn._spec.num_buffers
k = combine_fn._spec.buffer_size
n = maxInputSize
self.assertLessEqual((b - 2) * (1 << (b - 2)) + 0.5, (epsilon * n),
'(b-2)2^(b-2) + 1/2 <= eN')
self.assertGreaterEqual((k * 2)**(b - 1), n, 'k2^(b-1) >= N')
if __name__ == '__main__':
unittest.main()