blob: 3c45d833c42698c6184195ab959546fb30be4ff2 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Unit tests for the windowing classes."""
from __future__ import absolute_import
from __future__ import division
import unittest
from builtins import range
from nose.plugins.attrib import attr
import apache_beam as beam
from apache_beam.runners import pipeline_context
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
from apache_beam.transforms import CombinePerKey
from apache_beam.transforms import Create
from apache_beam.transforms import FlatMapTuple
from apache_beam.transforms import GroupByKey
from apache_beam.transforms import Map
from apache_beam.transforms import MapTuple
from apache_beam.transforms import WindowInto
from apache_beam.transforms import combiners
from apache_beam.transforms import core
from apache_beam.transforms.core import Windowing
from apache_beam.transforms.trigger import AccumulationMode
from apache_beam.transforms.trigger import AfterCount
from apache_beam.transforms.window import FixedWindows
from apache_beam.transforms.window import GlobalWindow
from apache_beam.transforms.window import GlobalWindows
from apache_beam.transforms.window import IntervalWindow
from apache_beam.transforms.window import Sessions
from apache_beam.transforms.window import SlidingWindows
from apache_beam.transforms.window import TimestampCombiner
from apache_beam.transforms.window import TimestampedValue
from apache_beam.transforms.window import WindowedValue
from apache_beam.transforms.window import WindowFn
from apache_beam.utils.timestamp import MAX_TIMESTAMP
from apache_beam.utils.timestamp import MIN_TIMESTAMP
def context(element, timestamp):
return WindowFn.AssignContext(timestamp, element)
class ReifyWindowsFn(core.DoFn):
def process(self, element, window=core.DoFn.WindowParam):
key, values = element
yield "%s @ %s" % (key, window), values
reify_windows = core.ParDo(ReifyWindowsFn())
class WindowTest(unittest.TestCase):
def test_timestamped_value_cmp(self):
self.assertEqual(TimestampedValue('a', 2), TimestampedValue('a', 2))
self.assertEqual(TimestampedValue('a', 2), TimestampedValue('a', 2.0))
self.assertNotEqual(TimestampedValue('a', 2), TimestampedValue('a', 2.1))
self.assertNotEqual(TimestampedValue('a', 2), TimestampedValue('b', 2))
def test_global_window(self):
self.assertEqual(GlobalWindow(), GlobalWindow())
self.assertNotEqual(GlobalWindow(),
IntervalWindow(MIN_TIMESTAMP, MAX_TIMESTAMP))
self.assertNotEqual(IntervalWindow(MIN_TIMESTAMP, MAX_TIMESTAMP),
GlobalWindow())
self.assertTrue(GlobalWindow().max_timestamp() < MAX_TIMESTAMP)
def test_fixed_windows(self):
# Test windows with offset: 2, 7, 12, 17, ...
windowfn = FixedWindows(size=5, offset=2)
self.assertEqual([IntervalWindow(7, 12)],
windowfn.assign(context('v', 7)))
self.assertEqual([IntervalWindow(7, 12)],
windowfn.assign(context('v', 11)))
self.assertEqual([IntervalWindow(12, 17)],
windowfn.assign(context('v', 12)))
# Test windows without offset: 0, 5, 10, 15, ...
windowfn = FixedWindows(size=5)
self.assertEqual([IntervalWindow(5, 10)],
windowfn.assign(context('v', 5)))
self.assertEqual([IntervalWindow(5, 10)],
windowfn.assign(context('v', 9)))
self.assertEqual([IntervalWindow(10, 15)],
windowfn.assign(context('v', 10)))
# Test windows with offset out of range.
windowfn = FixedWindows(size=5, offset=12)
self.assertEqual([IntervalWindow(7, 12)],
windowfn.assign(context('v', 11)))
def test_sliding_windows_assignment(self):
windowfn = SlidingWindows(size=15, period=5, offset=2)
expected = [IntervalWindow(7, 22),
IntervalWindow(2, 17),
IntervalWindow(-3, 12)]
self.assertEqual(expected, windowfn.assign(context('v', 7)))
self.assertEqual(expected, windowfn.assign(context('v', 8)))
self.assertEqual(expected, windowfn.assign(context('v', 11)))
def test_sliding_windows_assignment_fraction(self):
windowfn = SlidingWindows(size=3.5, period=2.5, offset=1.5)
self.assertEqual([IntervalWindow(1.5, 5.0), IntervalWindow(-1.0, 2.5)],
windowfn.assign(context('v', 1.7)))
self.assertEqual([IntervalWindow(1.5, 5.0)],
windowfn.assign(context('v', 3)))
def test_sliding_windows_assignment_fraction_large_offset(self):
windowfn = SlidingWindows(size=3.5, period=2.5, offset=4.0)
self.assertEqual([IntervalWindow(1.5, 5.0), IntervalWindow(-1.0, 2.5)],
windowfn.assign(context('v', 1.7)))
self.assertEqual([IntervalWindow(4.0, 7.5), IntervalWindow(1.5, 5.0)],
windowfn.assign(context('v', 4.5)))
def test_sessions_merging(self):
windowfn = Sessions(10)
def merge(*timestamps):
windows = [windowfn.assign(context(None, t)) for t in timestamps]
running = set()
class TestMergeContext(WindowFn.MergeContext):
def __init__(self):
super(TestMergeContext, self).__init__(running)
def merge(self, to_be_merged, merge_result):
for w in to_be_merged:
if w in running:
running.remove(w)
running.add(merge_result)
for ws in windows:
running.update(ws)
windowfn.merge(TestMergeContext())
windowfn.merge(TestMergeContext())
return sorted(running)
self.assertEqual([IntervalWindow(2, 12)], merge(2))
self.assertEqual([IntervalWindow(2, 12), IntervalWindow(19, 29)],
merge(2, 19))
self.assertEqual([IntervalWindow(2, 19)], merge(2, 9))
self.assertEqual([IntervalWindow(2, 19)], merge(9, 2))
self.assertEqual([IntervalWindow(2, 19), IntervalWindow(19, 29)],
merge(2, 9, 19))
self.assertEqual([IntervalWindow(2, 19), IntervalWindow(19, 29)],
merge(19, 9, 2))
self.assertEqual([IntervalWindow(2, 25)], merge(2, 15, 10))
def timestamped_key_values(self, pipeline, key, *timestamps):
return (pipeline | 'start' >> Create(timestamps)
| Map(lambda x: WindowedValue((key, x), x, [GlobalWindow()])))
def test_sliding_windows(self):
with TestPipeline() as p:
pcoll = self.timestamped_key_values(p, 'key', 1, 2, 3)
result = (pcoll
| 'w' >> WindowInto(SlidingWindows(period=2, size=4))
| GroupByKey()
| reify_windows)
expected = [('key @ [-2.0, 2.0)', [1]),
('key @ [0.0, 4.0)', [1, 2, 3]),
('key @ [2.0, 6.0)', [2, 3])]
assert_that(result, equal_to(expected))
def test_sessions(self):
with TestPipeline() as p:
pcoll = self.timestamped_key_values(p, 'key', 1, 2, 3, 20, 35, 27)
sort_values = Map(lambda k_vs: (k_vs[0], sorted(k_vs[1])))
result = (pcoll
| 'w' >> WindowInto(Sessions(10))
| GroupByKey()
| sort_values
| reify_windows)
expected = [('key @ [1.0, 13.0)', [1, 2, 3]),
('key @ [20.0, 45.0)', [20, 27, 35])]
assert_that(result, equal_to(expected))
def test_timestamped_value(self):
with TestPipeline() as p:
result = (p
| 'start' >> Create([(k, k) for k in range(10)])
| Map(lambda x_t: TimestampedValue(x_t[0], x_t[1]))
| 'w' >> WindowInto(FixedWindows(5))
| Map(lambda v: ('key', v))
| GroupByKey())
assert_that(result, equal_to([('key', [0, 1, 2, 3, 4]),
('key', [5, 6, 7, 8, 9])]))
def test_rewindow(self):
with TestPipeline() as p:
result = (p
| Create([(k, k) for k in range(10)])
| Map(lambda x_t1: TimestampedValue(x_t1[0], x_t1[1]))
| 'window' >> WindowInto(SlidingWindows(period=2, size=6))
# Per the model, each element is now duplicated across
# three windows. Rewindowing must preserve this duplication.
| 'rewindow' >> WindowInto(FixedWindows(5))
| 'rewindow2' >> WindowInto(FixedWindows(5))
| Map(lambda v: ('key', v))
| GroupByKey())
assert_that(result, equal_to([('key', sorted([0, 1, 2, 3, 4] * 3)),
('key', sorted([5, 6, 7, 8, 9] * 3))]))
def test_rewindow_regroup(self):
with TestPipeline() as p:
grouped = (p
| Create(range(5))
| Map(lambda t: TimestampedValue(('key', t), t))
| 'window' >> WindowInto(FixedWindows(5, offset=3))
| GroupByKey()
| MapTuple(lambda k, vs: (k, sorted(vs))))
# Both of these group-and-ungroup sequences should be idempotent.
regrouped1 = (grouped
| 'w1' >> WindowInto(FixedWindows(5, offset=3))
| 'g1' >> GroupByKey()
| FlatMapTuple(lambda k, vs: [(k, v) for v in vs]))
regrouped2 = (grouped
| FlatMapTuple(lambda k, vs: [(k, v) for v in vs])
| 'w2' >> WindowInto(FixedWindows(5, offset=3))
| 'g2' >> GroupByKey()
| MapTuple(lambda k, vs: (k, sorted(vs))))
with_windows = Map(lambda e, w=beam.DoFn.WindowParam: (e, w))
expected = [(('key', [0, 1, 2]), IntervalWindow(-2, 3)),
(('key', [3, 4]), IntervalWindow(3, 8))]
assert_that(grouped | 'ww' >> with_windows, equal_to(expected))
assert_that(
regrouped1 | 'ww1' >> with_windows, equal_to(expected), label='r1')
assert_that(
regrouped2 | 'ww2' >> with_windows, equal_to(expected), label='r2')
def test_timestamped_with_combiners(self):
with TestPipeline() as p:
result = (p
# Create some initial test values.
| 'start' >> Create([(k, k) for k in range(10)])
# The purpose of the WindowInto transform is to establish a
# FixedWindows windowing function for the PCollection.
# It does not bucket elements into windows since the timestamps
# from Create are not spaced 5 ms apart and very likely they all
# fall into the same window.
| 'w' >> WindowInto(FixedWindows(5))
# Generate timestamped values using the values as timestamps.
# Now there are values 5 ms apart and since Map propagates the
# windowing function from input to output the output PCollection
# will have elements falling into different 5ms windows.
| Map(lambda x_t2: TimestampedValue(x_t2[0], x_t2[1]))
# We add a 'key' to each value representing the index of the
# window. This is important since there is no guarantee of
# order for the elements of a PCollection.
| Map(lambda v: (v // 5, v)))
# Sum all elements associated with a key and window. Although it
# is called CombinePerKey it is really CombinePerKeyAndWindow the
# same way GroupByKey is really GroupByKeyAndWindow.
sum_per_window = result | CombinePerKey(sum)
# Compute mean per key and window.
mean_per_window = result | combiners.Mean.PerKey()
assert_that(sum_per_window, equal_to([(0, 10), (1, 35)]),
label='assert:sum')
assert_that(mean_per_window, equal_to([(0, 2.0), (1, 7.0)]),
label='assert:mean')
@attr('ValidatesRunner')
def test_window_assignment_idempotency(self):
with TestPipeline() as p:
pcoll = self.timestamped_key_values(p, 'key', 0, 2, 4)
result = (pcoll
| 'window' >> WindowInto(FixedWindows(2))
| 'same window' >> WindowInto(FixedWindows(2))
| 'same window again' >> WindowInto(FixedWindows(2))
| GroupByKey())
assert_that(result, equal_to([('key', [0]),
('key', [2]),
('key', [4])]))
@attr('ValidatesRunner')
def test_window_assignment_through_multiple_gbk_idempotency(self):
with TestPipeline() as p:
pcoll = self.timestamped_key_values(p, 'key', 0, 2, 4)
result = (pcoll
| 'window' >> WindowInto(FixedWindows(2))
| 'gbk' >> GroupByKey()
| 'same window' >> WindowInto(FixedWindows(2))
| 'another gbk' >> GroupByKey()
| 'same window again' >> WindowInto(FixedWindows(2))
| 'gbk again' >> GroupByKey())
assert_that(result, equal_to([('key', [[[0]]]),
('key', [[[2]]]),
('key', [[[4]]])]))
class RunnerApiTest(unittest.TestCase):
def test_windowfn_encoding(self):
for window_fn in (GlobalWindows(),
FixedWindows(37),
SlidingWindows(2, 389),
Sessions(5077)):
context = pipeline_context.PipelineContext()
self.assertEqual(
window_fn,
WindowFn.from_runner_api(window_fn.to_runner_api(context), context))
def test_windowing_encoding(self):
for windowing in (
Windowing(GlobalWindows()),
Windowing(FixedWindows(1, 3), AfterCount(6),
accumulation_mode=AccumulationMode.ACCUMULATING),
Windowing(SlidingWindows(10, 15, 21), AfterCount(28),
timestamp_combiner=TimestampCombiner.OUTPUT_AT_LATEST,
accumulation_mode=AccumulationMode.DISCARDING)):
context = pipeline_context.PipelineContext()
self.assertEqual(
windowing,
Windowing.from_runner_api(windowing.to_runner_api(context), context))
if __name__ == '__main__':
unittest.main()