blob: 31a448402b73bd659bd6c38d0fd51860f27bc621 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Unit tests for the transform.util classes."""
from __future__ import absolute_import
from __future__ import division
import logging
import random
import time
import unittest
from builtins import object
from builtins import range
import apache_beam as beam
from apache_beam.coders import coders
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import TestWindowedValue
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import contains_in_any_order
from apache_beam.testing.util import equal_to
from apache_beam.transforms import util
from apache_beam.transforms import window
from apache_beam.transforms.window import GlobalWindow
from apache_beam.transforms.window import GlobalWindows
from apache_beam.transforms.window import IntervalWindow
from apache_beam.transforms.window import Sessions
from apache_beam.transforms.window import SlidingWindows
from apache_beam.transforms.window import TimestampedValue
from apache_beam.utils import timestamp
from apache_beam.utils.windowed_value import WindowedValue
class FakeClock(object):
def __init__(self):
self._now = time.time()
def __call__(self):
return self._now
def sleep(self, duration):
self._now += duration
class BatchElementsTest(unittest.TestCase):
def test_constant_batch(self):
# Assumes a single bundle...
with TestPipeline() as p:
res = (
p
| beam.Create(range(35))
| util.BatchElements(min_batch_size=10, max_batch_size=10)
| beam.Map(len))
assert_that(res, equal_to([10, 10, 10, 5]))
def test_grows_to_max_batch(self):
# Assumes a single bundle...
with TestPipeline() as p:
res = (
p
| beam.Create(range(164))
| util.BatchElements(
min_batch_size=1, max_batch_size=50, clock=FakeClock())
| beam.Map(len))
assert_that(res, equal_to([1, 1, 2, 4, 8, 16, 32, 50, 50]))
def test_windowed_batches(self):
# Assumes a single bundle, in order...
with TestPipeline() as p:
res = (
p
| beam.Create(range(47))
| beam.Map(lambda t: window.TimestampedValue(t, t))
| beam.WindowInto(window.FixedWindows(30))
| util.BatchElements(
min_batch_size=5, max_batch_size=10, clock=FakeClock())
| beam.Map(len))
assert_that(res, equal_to([
5, 5, 10, 10, # elements in [0, 30)
10, 7, # elements in [30, 47)
]))
def test_target_duration(self):
clock = FakeClock()
batch_estimator = util._BatchSizeEstimator(
target_batch_overhead=None, target_batch_duration_secs=10, clock=clock)
batch_duration = lambda batch_size: 1 + .7 * batch_size
# 1 + 12 * .7 is as close as we can get to 10 as possible.
expected_sizes = [1, 2, 4, 8, 12, 12, 12]
actual_sizes = []
for _ in range(len(expected_sizes)):
actual_sizes.append(batch_estimator.next_batch_size())
with batch_estimator.record_time(actual_sizes[-1]):
clock.sleep(batch_duration(actual_sizes[-1]))
self.assertEqual(expected_sizes, actual_sizes)
def test_target_overhead(self):
clock = FakeClock()
batch_estimator = util._BatchSizeEstimator(
target_batch_overhead=.05, target_batch_duration_secs=None, clock=clock)
batch_duration = lambda batch_size: 1 + .7 * batch_size
# At 27 items, a batch takes ~20 seconds with 5% (~1 second) overhead.
expected_sizes = [1, 2, 4, 8, 16, 27, 27, 27]
actual_sizes = []
for _ in range(len(expected_sizes)):
actual_sizes.append(batch_estimator.next_batch_size())
with batch_estimator.record_time(actual_sizes[-1]):
clock.sleep(batch_duration(actual_sizes[-1]))
self.assertEqual(expected_sizes, actual_sizes)
def test_variance(self):
clock = FakeClock()
variance = 0.25
batch_estimator = util._BatchSizeEstimator(
target_batch_overhead=.05, target_batch_duration_secs=None,
variance=variance, clock=clock)
batch_duration = lambda batch_size: 1 + .7 * batch_size
expected_target = 27
actual_sizes = []
for _ in range(util._BatchSizeEstimator._MAX_DATA_POINTS - 1):
actual_sizes.append(batch_estimator.next_batch_size())
with batch_estimator.record_time(actual_sizes[-1]):
clock.sleep(batch_duration(actual_sizes[-1]))
# Check that we're testing a good range of values.
stable_set = set(actual_sizes[-20:])
self.assertGreater(len(stable_set), 3)
self.assertGreater(
min(stable_set), expected_target - expected_target * variance)
self.assertLess(
max(stable_set), expected_target + expected_target * variance)
def _run_regression_test(self, linear_regression_fn, test_outliers):
xs = [random.random() for _ in range(10)]
ys = [2*x + 1 for x in xs]
a, b = linear_regression_fn(xs, ys)
self.assertAlmostEqual(a, 1)
self.assertAlmostEqual(b, 2)
xs = [1 + random.random() for _ in range(100)]
ys = [7*x + 5 + 0.01 * random.random() for x in xs]
a, b = linear_regression_fn(xs, ys)
self.assertAlmostEqual(a, 5, delta=0.02)
self.assertAlmostEqual(b, 7, delta=0.02)
# Test repeated xs
xs = [1 + random.random()] * 100
ys = [7 * x + 5 + 0.01 * random.random() for x in xs]
a, b = linear_regression_fn(xs, ys)
self.assertAlmostEqual(a, 0, delta=0.02)
self.assertAlmostEqual(
b, sum(ys)/(len(ys) * xs[0]), delta=0.02)
if test_outliers:
xs = [1 + random.random() for _ in range(100)]
ys = [2*x + 1 for x in xs]
a, b = linear_regression_fn(xs, ys)
self.assertAlmostEqual(a, 1)
self.assertAlmostEqual(b, 2)
# An outlier or two doesn't affect the result.
for _ in range(2):
xs += [10]
ys += [30]
a, b = linear_regression_fn(xs, ys)
self.assertAlmostEqual(a, 1)
self.assertAlmostEqual(b, 2)
# But enough of them, and they're no longer outliers.
xs += [10] * 10
ys += [30] * 10
a, b = linear_regression_fn(xs, ys)
self.assertLess(a, 0.5)
self.assertGreater(b, 2.5)
def test_no_numpy_regression(self):
self._run_regression_test(
util._BatchSizeEstimator.linear_regression_no_numpy, False)
def test_numpy_regression(self):
try:
# pylint: disable=wrong-import-order, wrong-import-position
import numpy as _
except ImportError:
self.skipTest('numpy not available')
self._run_regression_test(
util._BatchSizeEstimator.linear_regression_numpy, True)
class IdentityWindowTest(unittest.TestCase):
def test_window_preserved(self):
expected_timestamp = timestamp.Timestamp(5)
expected_window = window.IntervalWindow(1.0, 2.0)
class AddWindowDoFn(beam.DoFn):
def process(self, element):
yield WindowedValue(
element, expected_timestamp, [expected_window])
pipeline = TestPipeline()
data = [(1, 1), (2, 1), (3, 1), (1, 2), (2, 2), (1, 4)]
expected_windows = [
TestWindowedValue(kv, expected_timestamp, [expected_window])
for kv in data]
before_identity = (pipeline
| 'start' >> beam.Create(data)
| 'add_windows' >> beam.ParDo(AddWindowDoFn()))
assert_that(before_identity, equal_to(expected_windows),
label='before_identity', reify_windows=True)
after_identity = (before_identity
| 'window' >> beam.WindowInto(
beam.transforms.util._IdentityWindowFn(
coders.IntervalWindowCoder())))
assert_that(after_identity, equal_to(expected_windows),
label='after_identity', reify_windows=True)
pipeline.run()
def test_no_window_context_fails(self):
expected_timestamp = timestamp.Timestamp(5)
# Assuming the default window function is window.GlobalWindows.
expected_window = window.GlobalWindow()
class AddTimestampDoFn(beam.DoFn):
def process(self, element):
yield window.TimestampedValue(element, expected_timestamp)
pipeline = TestPipeline()
data = [(1, 1), (2, 1), (3, 1), (1, 2), (2, 2), (1, 4)]
expected_windows = [
TestWindowedValue(kv, expected_timestamp, [expected_window])
for kv in data]
before_identity = (pipeline
| 'start' >> beam.Create(data)
| 'add_timestamps' >> beam.ParDo(AddTimestampDoFn()))
assert_that(before_identity, equal_to(expected_windows),
label='before_identity', reify_windows=True)
after_identity = (before_identity
| 'window' >> beam.WindowInto(
beam.transforms.util._IdentityWindowFn(
coders.GlobalWindowCoder()))
# This DoFn will return TimestampedValues, making
# WindowFn.AssignContext passed to IdentityWindowFn
# contain a window of None. IdentityWindowFn should
# raise an exception.
| 'add_timestamps2' >> beam.ParDo(AddTimestampDoFn()))
assert_that(after_identity, equal_to(expected_windows),
label='after_identity', reify_windows=True)
with self.assertRaisesRegexp(ValueError, r'window.*None.*add_timestamps2'):
pipeline.run()
class ReshuffleTest(unittest.TestCase):
def test_reshuffle_contents_unchanged(self):
pipeline = TestPipeline()
data = [(1, 1), (2, 1), (3, 1), (1, 2), (2, 2), (1, 3)]
result = (pipeline
| beam.Create(data)
| beam.Reshuffle())
assert_that(result, equal_to(data))
pipeline.run()
def test_reshuffle_after_gbk_contents_unchanged(self):
pipeline = TestPipeline()
data = [(1, 1), (2, 1), (3, 1), (1, 2), (2, 2), (1, 3)]
expected_result = [(1, [1, 2, 3]), (2, [1, 2]), (3, [1])]
after_gbk = (pipeline
| beam.Create(data)
| beam.GroupByKey())
assert_that(after_gbk, equal_to(expected_result), label='after_gbk')
after_reshuffle = after_gbk | beam.Reshuffle()
assert_that(after_reshuffle, equal_to(expected_result),
label='after_reshuffle')
pipeline.run()
def test_reshuffle_timestamps_unchanged(self):
pipeline = TestPipeline()
timestamp = 5
data = [(1, 1), (2, 1), (3, 1), (1, 2), (2, 2), (1, 3)]
expected_result = [TestWindowedValue(v, timestamp, [GlobalWindow()])
for v in data]
before_reshuffle = (pipeline
| 'start' >> beam.Create(data)
| 'add_timestamp' >> beam.Map(
lambda v: beam.window.TimestampedValue(v,
timestamp)))
assert_that(before_reshuffle, equal_to(expected_result),
label='before_reshuffle', reify_windows=True)
after_reshuffle = before_reshuffle | beam.Reshuffle()
assert_that(after_reshuffle, equal_to(expected_result),
label='after_reshuffle', reify_windows=True)
pipeline.run()
def test_reshuffle_windows_unchanged(self):
pipeline = TestPipeline()
data = [(1, 1), (2, 1), (3, 1), (1, 2), (2, 2), (1, 4)]
expected_data = [TestWindowedValue(v, t, [w]) for (v, t, w) in [
((1, contains_in_any_order([2, 1])), 4.0, IntervalWindow(1.0, 4.0)),
((2, contains_in_any_order([2, 1])), 4.0, IntervalWindow(1.0, 4.0)),
((3, [1]), 3.0, IntervalWindow(1.0, 3.0)),
((1, [4]), 6.0, IntervalWindow(4.0, 6.0))]]
before_reshuffle = (pipeline
| 'start' >> beam.Create(data)
| 'add_timestamp' >> beam.Map(
lambda v: beam.window.TimestampedValue(v, v[1]))
| 'window' >> beam.WindowInto(Sessions(gap_size=2))
| 'group_by_key' >> beam.GroupByKey())
assert_that(before_reshuffle, equal_to(expected_data),
label='before_reshuffle', reify_windows=True)
after_reshuffle = before_reshuffle | beam.Reshuffle()
assert_that(after_reshuffle, equal_to(expected_data),
label='after reshuffle', reify_windows=True)
pipeline.run()
def test_reshuffle_window_fn_preserved(self):
pipeline = TestPipeline()
data = [(1, 1), (2, 1), (3, 1), (1, 2), (2, 2), (1, 4)]
expected_windows = [TestWindowedValue(v, t, [w]) for (v, t, w) in [
((1, 1), 1.0, IntervalWindow(1.0, 3.0)),
((2, 1), 1.0, IntervalWindow(1.0, 3.0)),
((3, 1), 1.0, IntervalWindow(1.0, 3.0)),
((1, 2), 2.0, IntervalWindow(2.0, 4.0)),
((2, 2), 2.0, IntervalWindow(2.0, 4.0)),
((1, 4), 4.0, IntervalWindow(4.0, 6.0))]]
expected_merged_windows = [TestWindowedValue(v, t, [w]) for (v, t, w) in [
((1, contains_in_any_order([2, 1])), 4.0, IntervalWindow(1.0, 4.0)),
((2, contains_in_any_order([2, 1])), 4.0, IntervalWindow(1.0, 4.0)),
((3, [1]), 3.0, IntervalWindow(1.0, 3.0)),
((1, [4]), 6.0, IntervalWindow(4.0, 6.0))]]
before_reshuffle = (pipeline
| 'start' >> beam.Create(data)
| 'add_timestamp' >> beam.Map(
lambda v: TimestampedValue(v, v[1]))
| 'window' >> beam.WindowInto(Sessions(gap_size=2)))
assert_that(before_reshuffle, equal_to(expected_windows),
label='before_reshuffle', reify_windows=True)
after_reshuffle = before_reshuffle | beam.Reshuffle()
assert_that(after_reshuffle, equal_to(expected_windows),
label='after_reshuffle', reify_windows=True)
after_group = after_reshuffle | beam.GroupByKey()
assert_that(after_group, equal_to(expected_merged_windows),
label='after_group', reify_windows=True)
pipeline.run()
def test_reshuffle_global_window(self):
pipeline = TestPipeline()
data = [(1, 1), (2, 1), (3, 1), (1, 2), (2, 2), (1, 4)]
expected_data = [(1, [1, 2, 4]), (2, [1, 2]), (3, [1])]
before_reshuffle = (pipeline
| beam.Create(data)
| beam.WindowInto(GlobalWindows())
| beam.GroupByKey())
assert_that(before_reshuffle, equal_to(expected_data),
label='before_reshuffle')
after_reshuffle = before_reshuffle | beam.Reshuffle()
assert_that(after_reshuffle, equal_to(expected_data),
label='after reshuffle')
pipeline.run()
def test_reshuffle_sliding_window(self):
pipeline = TestPipeline()
data = [(1, 1), (2, 1), (3, 1), (1, 2), (2, 2), (1, 4)]
window_size = 2
expected_data = [(1, [1, 2, 4]), (2, [1, 2]), (3, [1])] * window_size
before_reshuffle = (pipeline
| beam.Create(data)
| beam.WindowInto(SlidingWindows(
size=window_size, period=1))
| beam.GroupByKey())
assert_that(before_reshuffle, equal_to(expected_data),
label='before_reshuffle')
after_reshuffle = before_reshuffle | beam.Reshuffle()
# If Reshuffle applies the sliding window function a second time there
# should be extra values for each key.
assert_that(after_reshuffle, equal_to(expected_data),
label='after reshuffle')
pipeline.run()
def test_reshuffle_streaming_global_window(self):
options = PipelineOptions()
options.view_as(StandardOptions).streaming = True
pipeline = TestPipeline(options=options)
data = [(1, 1), (2, 1), (3, 1), (1, 2), (2, 2), (1, 4)]
expected_data = [(1, [1, 2, 4]), (2, [1, 2]), (3, [1])]
before_reshuffle = (pipeline
| beam.Create(data)
| beam.WindowInto(GlobalWindows())
| beam.GroupByKey())
assert_that(before_reshuffle, equal_to(expected_data),
label='before_reshuffle')
after_reshuffle = before_reshuffle | beam.Reshuffle()
assert_that(after_reshuffle, equal_to(expected_data),
label='after reshuffle')
pipeline.run()
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
unittest.main()