blob: 6cec4a5bf361b3ab7af709b71ea686f1cd115d73 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.
"""Unit tests for the transform.util classes."""
from __future__ import absolute_import
import logging
import time
import unittest
from builtins import object
from builtins import range
import apache_beam as beam
from apache_beam.coders import coders
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import TestWindowedValue
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import contains_in_any_order
from apache_beam.testing.util import equal_to
from apache_beam.transforms import util
from apache_beam.transforms import window
from apache_beam.transforms.window import GlobalWindow
from apache_beam.transforms.window import GlobalWindows
from apache_beam.transforms.window import IntervalWindow
from apache_beam.transforms.window import Sessions
from apache_beam.transforms.window import SlidingWindows
from apache_beam.transforms.window import TimestampedValue
from apache_beam.utils import timestamp
from apache_beam.utils.windowed_value import WindowedValue
class FakeClock(object):
def __init__(self):
self._now = time.time()
def __call__(self):
return self._now
def sleep(self, duration):
self._now += duration
class BatchElementsTest(unittest.TestCase):
def test_constant_batch(self):
# Assumes a single bundle...
with TestPipeline() as p:
res = (
| beam.Create(range(35))
| util.BatchElements(min_batch_size=10, max_batch_size=10)
| beam.Map(len))
assert_that(res, equal_to([10, 10, 10, 5]))
def test_grows_to_max_batch(self):
# Assumes a single bundle...
with TestPipeline() as p:
res = (
| beam.Create(range(164))
| util.BatchElements(
min_batch_size=1, max_batch_size=50, clock=FakeClock())
| beam.Map(len))
assert_that(res, equal_to([1, 1, 2, 4, 8, 16, 32, 50, 50]))
def test_windowed_batches(self):
# Assumes a single bundle, in order...
with TestPipeline() as p:
res = (
| beam.Create(range(47))
| beam.Map(lambda t: window.TimestampedValue(t, t))
| beam.WindowInto(window.FixedWindows(30))
| util.BatchElements(
min_batch_size=5, max_batch_size=10, clock=FakeClock())
| beam.Map(len))
assert_that(res, equal_to([
5, 5, 10, 10, # elements in [0, 30)
10, 7, # elements in [30, 47)
def test_target_duration(self):
clock = FakeClock()
batch_estimator = util._BatchSizeEstimator(
target_batch_overhead=None, target_batch_duration_secs=10, clock=clock)
batch_duration = lambda batch_size: 1 + .7 * batch_size
# 1 + 12 * .7 is as close as we can get to 10 as possible.
expected_sizes = [1, 2, 4, 8, 12, 12, 12]
actual_sizes = []
for _ in range(len(expected_sizes)):
with batch_estimator.record_time(actual_sizes[-1]):
self.assertEqual(expected_sizes, actual_sizes)
def test_target_overhead(self):
clock = FakeClock()
batch_estimator = util._BatchSizeEstimator(
target_batch_overhead=.05, target_batch_duration_secs=None, clock=clock)
batch_duration = lambda batch_size: 1 + .7 * batch_size
# At 27 items, a batch takes ~20 seconds with 5% (~1 second) overhead.
expected_sizes = [1, 2, 4, 8, 16, 27, 27, 27]
actual_sizes = []
for _ in range(len(expected_sizes)):
with batch_estimator.record_time(actual_sizes[-1]):
self.assertEqual(expected_sizes, actual_sizes)
class IdentityWindowTest(unittest.TestCase):
def test_window_preserved(self):
expected_timestamp = timestamp.Timestamp(5)
expected_window = window.IntervalWindow(1.0, 2.0)
class AddWindowDoFn(beam.DoFn):
def process(self, element):
yield WindowedValue(
element, expected_timestamp, [expected_window])
pipeline = TestPipeline()
data = [(1, 1), (2, 1), (3, 1), (1, 2), (2, 2), (1, 4)]
expected_windows = [
TestWindowedValue(kv, expected_timestamp, [expected_window])
for kv in data]
before_identity = (pipeline
| 'start' >> beam.Create(data)
| 'add_windows' >> beam.ParDo(AddWindowDoFn()))
assert_that(before_identity, equal_to(expected_windows),
label='before_identity', reify_windows=True)
after_identity = (before_identity
| 'window' >> beam.WindowInto(
assert_that(after_identity, equal_to(expected_windows),
label='after_identity', reify_windows=True)
def test_no_window_context_fails(self):
expected_timestamp = timestamp.Timestamp(5)
# Assuming the default window function is window.GlobalWindows.
expected_window = window.GlobalWindow()
class AddTimestampDoFn(beam.DoFn):
def process(self, element):
yield window.TimestampedValue(element, expected_timestamp)
pipeline = TestPipeline()
data = [(1, 1), (2, 1), (3, 1), (1, 2), (2, 2), (1, 4)]
expected_windows = [
TestWindowedValue(kv, expected_timestamp, [expected_window])
for kv in data]
before_identity = (pipeline
| 'start' >> beam.Create(data)
| 'add_timestamps' >> beam.ParDo(AddTimestampDoFn()))
assert_that(before_identity, equal_to(expected_windows),
label='before_identity', reify_windows=True)
after_identity = (before_identity
| 'window' >> beam.WindowInto(
# This DoFn will return TimestampedValues, making
# WindowFn.AssignContext passed to IdentityWindowFn
# contain a window of None. IdentityWindowFn should
# raise an exception.
| 'add_timestamps2' >> beam.ParDo(AddTimestampDoFn()))
assert_that(after_identity, equal_to(expected_windows),
label='after_identity', reify_windows=True)
with self.assertRaisesRegexp(ValueError, r'window.*None.*add_timestamps2'):
class ReshuffleTest(unittest.TestCase):
def test_reshuffle_contents_unchanged(self):
pipeline = TestPipeline()
data = [(1, 1), (2, 1), (3, 1), (1, 2), (2, 2), (1, 3)]
result = (pipeline
| beam.Create(data)
| beam.Reshuffle())
assert_that(result, equal_to(data))
def test_reshuffle_after_gbk_contents_unchanged(self):
pipeline = TestPipeline()
data = [(1, 1), (2, 1), (3, 1), (1, 2), (2, 2), (1, 3)]
expected_result = [(1, [1, 2, 3]), (2, [1, 2]), (3, [1])]
after_gbk = (pipeline
| beam.Create(data)
| beam.GroupByKey())
assert_that(after_gbk, equal_to(expected_result), label='after_gbk')
after_reshuffle = after_gbk | beam.Reshuffle()
assert_that(after_reshuffle, equal_to(expected_result),
def test_reshuffle_timestamps_unchanged(self):
pipeline = TestPipeline()
timestamp = 5
data = [(1, 1), (2, 1), (3, 1), (1, 2), (2, 2), (1, 3)]
expected_result = [TestWindowedValue(v, timestamp, [GlobalWindow()])
for v in data]
before_reshuffle = (pipeline
| 'start' >> beam.Create(data)
| 'add_timestamp' >> beam.Map(
lambda v: beam.window.TimestampedValue(v,
assert_that(before_reshuffle, equal_to(expected_result),
label='before_reshuffle', reify_windows=True)
after_reshuffle = before_reshuffle | beam.Reshuffle()
assert_that(after_reshuffle, equal_to(expected_result),
label='after_reshuffle', reify_windows=True)
def test_reshuffle_windows_unchanged(self):
pipeline = TestPipeline()
data = [(1, 1), (2, 1), (3, 1), (1, 2), (2, 2), (1, 4)]
expected_data = [TestWindowedValue(v, t, [w]) for (v, t, w) in [
((1, contains_in_any_order([2, 1])), 4.0, IntervalWindow(1.0, 4.0)),
((2, contains_in_any_order([2, 1])), 4.0, IntervalWindow(1.0, 4.0)),
((3, [1]), 3.0, IntervalWindow(1.0, 3.0)),
((1, [4]), 6.0, IntervalWindow(4.0, 6.0))]]
before_reshuffle = (pipeline
| 'start' >> beam.Create(data)
| 'add_timestamp' >> beam.Map(
lambda v: beam.window.TimestampedValue(v, v[1]))
| 'window' >> beam.WindowInto(Sessions(gap_size=2))
| 'group_by_key' >> beam.GroupByKey())
assert_that(before_reshuffle, equal_to(expected_data),
label='before_reshuffle', reify_windows=True)
after_reshuffle = before_reshuffle | beam.Reshuffle()
assert_that(after_reshuffle, equal_to(expected_data),
label='after reshuffle', reify_windows=True)
def test_reshuffle_window_fn_preserved(self):
pipeline = TestPipeline()
data = [(1, 1), (2, 1), (3, 1), (1, 2), (2, 2), (1, 4)]
expected_windows = [TestWindowedValue(v, t, [w]) for (v, t, w) in [
((1, 1), 1.0, IntervalWindow(1.0, 3.0)),
((2, 1), 1.0, IntervalWindow(1.0, 3.0)),
((3, 1), 1.0, IntervalWindow(1.0, 3.0)),
((1, 2), 2.0, IntervalWindow(2.0, 4.0)),
((2, 2), 2.0, IntervalWindow(2.0, 4.0)),
((1, 4), 4.0, IntervalWindow(4.0, 6.0))]]
expected_merged_windows = [TestWindowedValue(v, t, [w]) for (v, t, w) in [
((1, contains_in_any_order([2, 1])), 4.0, IntervalWindow(1.0, 4.0)),
((2, contains_in_any_order([2, 1])), 4.0, IntervalWindow(1.0, 4.0)),
((3, [1]), 3.0, IntervalWindow(1.0, 3.0)),
((1, [4]), 6.0, IntervalWindow(4.0, 6.0))]]
before_reshuffle = (pipeline
| 'start' >> beam.Create(data)
| 'add_timestamp' >> beam.Map(
lambda v: TimestampedValue(v, v[1]))
| 'window' >> beam.WindowInto(Sessions(gap_size=2)))
assert_that(before_reshuffle, equal_to(expected_windows),
label='before_reshuffle', reify_windows=True)
after_reshuffle = before_reshuffle | beam.Reshuffle()
assert_that(after_reshuffle, equal_to(expected_windows),
label='after_reshuffle', reify_windows=True)
after_group = after_reshuffle | beam.GroupByKey()
assert_that(after_group, equal_to(expected_merged_windows),
label='after_group', reify_windows=True)
def test_reshuffle_global_window(self):
pipeline = TestPipeline()
data = [(1, 1), (2, 1), (3, 1), (1, 2), (2, 2), (1, 4)]
expected_data = [(1, [1, 2, 4]), (2, [1, 2]), (3, [1])]
before_reshuffle = (pipeline
| beam.Create(data)
| beam.WindowInto(GlobalWindows())
| beam.GroupByKey())
assert_that(before_reshuffle, equal_to(expected_data),
after_reshuffle = before_reshuffle | beam.Reshuffle()
assert_that(after_reshuffle, equal_to(expected_data),
label='after reshuffle')
def test_reshuffle_sliding_window(self):
pipeline = TestPipeline()
data = [(1, 1), (2, 1), (3, 1), (1, 2), (2, 2), (1, 4)]
window_size = 2
expected_data = [(1, [1, 2, 4]), (2, [1, 2]), (3, [1])] * window_size
before_reshuffle = (pipeline
| beam.Create(data)
| beam.WindowInto(SlidingWindows(
size=window_size, period=1))
| beam.GroupByKey())
assert_that(before_reshuffle, equal_to(expected_data),
after_reshuffle = before_reshuffle | beam.Reshuffle()
# If Reshuffle applies the sliding window function a second time there
# should be extra values for each key.
assert_that(after_reshuffle, equal_to(expected_data),
label='after reshuffle')
def test_reshuffle_streaming_global_window(self):
options = PipelineOptions()
options.view_as(StandardOptions).streaming = True
pipeline = TestPipeline(options=options)
data = [(1, 1), (2, 1), (3, 1), (1, 2), (2, 2), (1, 4)]
expected_data = [(1, [1, 2, 4]), (2, [1, 2]), (3, [1])]
before_reshuffle = (pipeline
| beam.Create(data)
| beam.WindowInto(GlobalWindows())
| beam.GroupByKey())
assert_that(before_reshuffle, equal_to(expected_data),
after_reshuffle = before_reshuffle | beam.Reshuffle()
assert_that(after_reshuffle, equal_to(expected_data),
label='after reshuffle')
if __name__ == '__main__':