blob: ca2cd2539a8ca96fa71287a4b155d380d2799a88 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# pytype: skip-file
import unittest
import hamcrest as hc
import apache_beam as beam
from apache_beam.io.restriction_trackers import OffsetRange
from apache_beam.io.restriction_trackers import OffsetRestrictionTracker
from apache_beam.io.watermark_estimators import ManualWatermarkEstimator
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.portability.api import beam_runner_api_pb2
from apache_beam.runners.common import DoFnSignature
from apache_beam.runners.common import PerWindowInvoker
from apache_beam.runners.common import merge_common_environments
from apache_beam.runners.portability.expansion_service_test import FibTransform
from apache_beam.runners.sdf_utils import SplitResultPrimary
from apache_beam.runners.sdf_utils import SplitResultResidual
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.test_stream import TestStream
from apache_beam.transforms import trigger
from apache_beam.transforms import window
from apache_beam.transforms.core import DoFn
from apache_beam.transforms.core import RestrictionProvider
from apache_beam.transforms.window import IntervalWindow
from apache_beam.utils.timestamp import Timestamp
from apache_beam.utils.windowed_value import WindowedValue
class DoFnSignatureTest(unittest.TestCase):
def test_dofn_validate_process_error(self):
class MyDoFn(DoFn):
def process(self, element, w1=DoFn.WindowParam, w2=DoFn.WindowParam):
pass
with self.assertRaises(ValueError):
DoFnSignature(MyDoFn())
def test_dofn_get_defaults(self):
class MyDoFn(DoFn):
def process(self, element, w=DoFn.WindowParam):
pass
signature = DoFnSignature(MyDoFn())
self.assertEqual(signature.process_method.defaults, [DoFn.WindowParam])
@unittest.skip('BEAM-5878')
def test_dofn_get_defaults_kwonly(self):
class MyDoFn(DoFn):
def process(self, element, *, w=DoFn.WindowParam):
pass
signature = DoFnSignature(MyDoFn())
self.assertEqual(signature.process_method.defaults, [DoFn.WindowParam])
def test_dofn_validate_start_bundle_error(self):
class MyDoFn(DoFn):
def process(self, element):
pass
def start_bundle(self, w1=DoFn.WindowParam):
pass
with self.assertRaises(ValueError):
DoFnSignature(MyDoFn())
def test_dofn_validate_finish_bundle_error(self):
class MyDoFn(DoFn):
def process(self, element):
pass
def finish_bundle(self, w1=DoFn.WindowParam):
pass
with self.assertRaises(ValueError):
DoFnSignature(MyDoFn())
def test_unbounded_element_process_fn(self):
class UnboundedDoFn(DoFn):
@DoFn.unbounded_per_element()
def process(self, element):
pass
class BoundedDoFn(DoFn):
def process(self, element):
pass
signature = DoFnSignature(UnboundedDoFn())
self.assertTrue(signature.is_unbounded_per_element())
signature = DoFnSignature(BoundedDoFn())
self.assertFalse(signature.is_unbounded_per_element())
class DoFnProcessTest(unittest.TestCase):
# pylint: disable=expression-not-assigned
all_records = None
def setUp(self):
DoFnProcessTest.all_records = []
def record_dofn(self):
class RecordDoFn(DoFn):
def process(self, element):
DoFnProcessTest.all_records.append(element)
return RecordDoFn()
def test_dofn_process_keyparam(self):
class DoFnProcessWithKeyparam(DoFn):
def process(self, element, mykey=DoFn.KeyParam):
yield "{key}-verify".format(key=mykey)
pipeline_options = PipelineOptions()
with TestPipeline(options=pipeline_options) as p:
test_stream = (TestStream().advance_watermark_to(10).add_elements([1, 2]))
(
p
| test_stream
| beam.Map(lambda x: (x, "some-value"))
| "window_into" >> beam.WindowInto(
window.FixedWindows(5),
accumulation_mode=trigger.AccumulationMode.DISCARDING)
| beam.ParDo(DoFnProcessWithKeyparam())
| beam.ParDo(self.record_dofn()))
self.assertEqual(['1-verify', '2-verify'],
sorted(DoFnProcessTest.all_records))
def test_dofn_process_keyparam_error_no_key(self):
class DoFnProcessWithKeyparam(DoFn):
def process(self, element, mykey=DoFn.KeyParam):
yield "{key}-verify".format(key=mykey)
pipeline_options = PipelineOptions()
with self.assertRaises(ValueError),\
TestPipeline(options=pipeline_options) as p:
test_stream = (TestStream().advance_watermark_to(10).add_elements([1, 2]))
(p | test_stream | beam.ParDo(DoFnProcessWithKeyparam()))
def test_pardo_with_unbounded_per_element_dofn(self):
class UnboundedDoFn(beam.DoFn):
@beam.DoFn.unbounded_per_element()
def process(self, element):
pass
class BoundedDoFn(beam.DoFn):
def process(self, element):
pass
with TestPipeline() as p:
source = p | beam.Impulse()
unbounded_pcoll = source | beam.ParDo(UnboundedDoFn())
bounded_pcoll = source | beam.ParDo(BoundedDoFn())
self.assertEqual(unbounded_pcoll.is_bounded, False)
self.assertEqual(bounded_pcoll.is_bounded, True)
class TestOffsetRestrictionProvider(RestrictionProvider):
def restriction_size(self, element, restriction):
return restriction.size()
class PerWindowInvokerSplitTest(unittest.TestCase):
def setUp(self):
self.window1 = IntervalWindow(0, 10)
self.window2 = IntervalWindow(10, 20)
self.window3 = IntervalWindow(20, 30)
self.windowed_value = WindowedValue(
'a', 57, (self.window1, self.window2, self.window3))
self.restriction = OffsetRange(0, 100)
self.watermark_estimator_state = Timestamp(21)
self.restriction_provider = TestOffsetRestrictionProvider()
self.watermark_estimator = ManualWatermarkEstimator(Timestamp(42))
self.maxDiff = None
def create_split_in_window(self, offset_index, windows):
return (
SplitResultPrimary(
primary_value=WindowedValue(((
'a',
(OffsetRange(0, offset_index), self.watermark_estimator_state)),
offset_index),
57,
windows)),
SplitResultResidual(
residual_value=WindowedValue(((
'a',
(
OffsetRange(offset_index, 100),
self.watermark_estimator.get_estimator_state())),
100 - offset_index),
57,
windows),
current_watermark=self.watermark_estimator.current_watermark(),
deferred_timestamp=None))
def create_split_across_windows(self, primary_windows, residual_windows):
primary = SplitResultPrimary(
primary_value=WindowedValue(
(('a', (OffsetRange(0, 100), self.watermark_estimator_state)), 100),
57,
primary_windows)) if primary_windows else None
residual = SplitResultResidual(
residual_value=WindowedValue(
(('a', (OffsetRange(0, 100), self.watermark_estimator_state)), 100),
57,
residual_windows),
current_watermark=None,
deferred_timestamp=None) if residual_windows else None
return primary, residual
def test_non_window_observing_checkpoint(self):
# test checkpoint
restriction_tracker = OffsetRestrictionTracker(self.restriction)
restriction_tracker.try_claim(30)
(primaries, residuals, stop_index) = PerWindowInvoker._try_split(
0.0,
None,
None,
self.windowed_value,
self.restriction,
self.watermark_estimator_state,
self.restriction_provider,
restriction_tracker,
self.watermark_estimator)
expected_primary_split, expected_residual_split = (
self.create_split_in_window(31, self.windowed_value.windows))
self.assertEqual([expected_primary_split], primaries)
self.assertEqual([expected_residual_split], residuals)
# We don't expect the stop index to be set for non window observing splits
self.assertIsNone(stop_index)
def test_non_window_observing_split(self):
restriction_tracker = OffsetRestrictionTracker(self.restriction)
restriction_tracker.try_claim(30)
(primaries, residuals, stop_index) = PerWindowInvoker._try_split(
0.1,
None,
None,
self.windowed_value,
self.restriction,
self.watermark_estimator_state,
self.restriction_provider,
restriction_tracker,
self.watermark_estimator)
expected_primary_split, expected_residual_split = (
self.create_split_in_window(37, self.windowed_value.windows))
self.assertEqual([expected_primary_split], primaries)
self.assertEqual([expected_residual_split], residuals)
# We don't expect the stop index to be set for non window observing splits
self.assertIsNone(stop_index)
def test_non_window_observing_split_when_restriction_is_done(self):
restriction_tracker = OffsetRestrictionTracker(self.restriction)
restriction_tracker.try_claim(100)
self.assertIsNone(
PerWindowInvoker._try_split(
0.1,
None,
None,
self.windowed_value,
self.restriction,
self.watermark_estimator_state,
self.restriction_provider,
restriction_tracker,
self.watermark_estimator))
def test_window_observing_checkpoint_on_first_window(self):
restriction_tracker = OffsetRestrictionTracker(self.restriction)
restriction_tracker.try_claim(30)
(primaries, residuals, stop_index) = PerWindowInvoker._try_split(
0.0,
0,
3,
self.windowed_value,
self.restriction,
self.watermark_estimator_state,
self.restriction_provider,
restriction_tracker,
self.watermark_estimator)
expected_primary_split, expected_residual_split = (
self.create_split_in_window(31, (self.window1, )))
_, expected_residual_windows = (
self.create_split_across_windows(None, (self.window2, self.window3,)))
hc.assert_that(primaries, hc.contains_inanyorder(expected_primary_split))
hc.assert_that(
residuals,
hc.contains_inanyorder(
expected_residual_split,
expected_residual_windows,
))
self.assertEqual(stop_index, 1)
def test_window_observing_checkpoint_on_first_window_after_prior_split(self):
restriction_tracker = OffsetRestrictionTracker(self.restriction)
restriction_tracker.try_claim(30)
(primaries, residuals, stop_index) = PerWindowInvoker._try_split(
0.0,
0,
2, # stop index < len(windows) representing a prior split had occurred
self.windowed_value,
self.restriction,
self.watermark_estimator_state,
self.restriction_provider,
restriction_tracker,
self.watermark_estimator)
expected_primary_split, expected_residual_split = (
self.create_split_in_window(31, (self.window1, )))
_, expected_residual_windows = (
self.create_split_across_windows(None, (self.window2, )))
hc.assert_that(primaries, hc.contains_inanyorder(expected_primary_split))
hc.assert_that(
residuals,
hc.contains_inanyorder(
expected_residual_split,
expected_residual_windows,
))
self.assertEqual(stop_index, 1)
def test_window_observing_split_on_first_window(self):
restriction_tracker = OffsetRestrictionTracker(self.restriction)
restriction_tracker.try_claim(30)
(primaries, residuals, stop_index) = PerWindowInvoker._try_split(
0.2,
0,
3,
self.windowed_value,
self.restriction,
self.watermark_estimator_state,
self.restriction_provider,
restriction_tracker,
self.watermark_estimator)
# 20% of 2.7 windows = 20% of 270 offset left = 54 offset
# 30 + 54 = 84 split offset
expected_primary_split, expected_residual_split = (
self.create_split_in_window(84, (self.window1, )))
_, expected_residual_windows = (
self.create_split_across_windows(None, (self.window2, self.window3, )))
hc.assert_that(primaries, hc.contains_inanyorder(expected_primary_split))
hc.assert_that(
residuals,
hc.contains_inanyorder(
expected_residual_split,
expected_residual_windows,
))
self.assertEqual(stop_index, 1)
def test_window_observing_split_on_middle_window(self):
restriction_tracker = OffsetRestrictionTracker(self.restriction)
restriction_tracker.try_claim(30)
(primaries, residuals, stop_index) = PerWindowInvoker._try_split(
0.2,
1,
3,
self.windowed_value,
self.restriction,
self.watermark_estimator_state,
self.restriction_provider,
restriction_tracker,
self.watermark_estimator)
# 20% of 1.7 windows = 20% of 170 offset left = 34 offset
# 30 + 34 = 64 split offset
expected_primary_split, expected_residual_split = (
self.create_split_in_window(64, (self.window2, )))
expected_primary_windows, expected_residual_windows = (
self.create_split_across_windows((self.window1, ), (self.window3, )))
hc.assert_that(
primaries,
hc.contains_inanyorder(
expected_primary_split,
expected_primary_windows,
))
hc.assert_that(
residuals,
hc.contains_inanyorder(
expected_residual_split,
expected_residual_windows,
))
self.assertEqual(stop_index, 2)
def test_window_observing_split_on_last_window(self):
restriction_tracker = OffsetRestrictionTracker(self.restriction)
restriction_tracker.try_claim(30)
(primaries, residuals, stop_index) = PerWindowInvoker._try_split(
0.2,
2,
3,
self.windowed_value,
self.restriction,
self.watermark_estimator_state,
self.restriction_provider,
restriction_tracker,
self.watermark_estimator)
# 20% of 0.7 windows = 20% of 70 offset left = 14 offset
# 30 + 14 = 44 split offset
expected_primary_split, expected_residual_split = (
self.create_split_in_window(44, (self.window3, )))
expected_primary_windows, _ = (
self.create_split_across_windows((self.window1, self.window2, ), None))
hc.assert_that(
primaries,
hc.contains_inanyorder(
expected_primary_split,
expected_primary_windows,
))
hc.assert_that(residuals, hc.contains_inanyorder(expected_residual_split, ))
self.assertEqual(stop_index, 3)
def test_window_observing_split_on_first_window_fallback(self):
restriction_tracker = OffsetRestrictionTracker(self.restriction)
restriction_tracker.try_claim(100)
# We assume that we can't split this fully claimed restriction
self.assertIsNone(restriction_tracker.try_split(0))
(primaries, residuals, stop_index) = PerWindowInvoker._try_split(
0.0,
0,
3,
self.windowed_value,
self.restriction,
self.watermark_estimator_state,
self.restriction_provider,
restriction_tracker,
self.watermark_estimator)
expected_primary_windows, expected_residual_windows = (
self.create_split_across_windows(
(self.window1, ), (self.window2, self.window3, )))
hc.assert_that(
primaries, hc.contains_inanyorder(
expected_primary_windows,
))
hc.assert_that(
residuals, hc.contains_inanyorder(
expected_residual_windows,
))
self.assertEqual(stop_index, 1)
def test_window_observing_split_on_middle_window_fallback(self):
restriction_tracker = OffsetRestrictionTracker(self.restriction)
restriction_tracker.try_claim(100)
# We assume that we can't split this fully claimed restriction
self.assertIsNone(restriction_tracker.try_split(0))
(primaries, residuals, stop_index) = PerWindowInvoker._try_split(
0.0,
1,
3,
self.windowed_value,
self.restriction,
self.watermark_estimator_state,
self.restriction_provider,
restriction_tracker,
self.watermark_estimator)
expected_primary_windows, expected_residual_windows = (
self.create_split_across_windows(
(self.window1, self.window2, ), (self.window3, )))
hc.assert_that(
primaries, hc.contains_inanyorder(
expected_primary_windows,
))
hc.assert_that(
residuals, hc.contains_inanyorder(
expected_residual_windows,
))
self.assertEqual(stop_index, 2)
def test_window_observing_split_on_last_window_when_split_not_possible(self):
restriction_tracker = OffsetRestrictionTracker(self.restriction)
restriction_tracker.try_claim(100)
# We assume that we can't split this fully claimed restriction
self.assertIsNone(restriction_tracker.try_split(0))
self.assertIsNone(
PerWindowInvoker._try_split(
0.0,
2,
3,
self.windowed_value,
self.restriction,
self.watermark_estimator_state,
self.restriction_provider,
restriction_tracker,
self.watermark_estimator))
def test_window_observing_split_on_window_boundary_round_up(self):
restriction_tracker = OffsetRestrictionTracker(self.restriction)
restriction_tracker.try_claim(30)
(primaries, residuals, stop_index) = PerWindowInvoker._try_split(
0.6,
0,
3,
self.windowed_value,
self.restriction,
self.watermark_estimator_state,
self.restriction_provider,
restriction_tracker,
self.watermark_estimator)
# 60% of 2.7 windows = 60% of 270 offset left = 162 offset
# 30 + 162 = 192 offset --> round to end of window 2
expected_primary_windows, expected_residual_windows = (
self.create_split_across_windows(
(self.window1, self.window2, ), (self.window3, )))
hc.assert_that(
primaries, hc.contains_inanyorder(
expected_primary_windows,
))
hc.assert_that(
residuals, hc.contains_inanyorder(
expected_residual_windows,
))
self.assertEqual(stop_index, 2)
def test_window_observing_split_on_window_boundary_round_down(self):
restriction_tracker = OffsetRestrictionTracker(self.restriction)
restriction_tracker.try_claim(30)
(primaries, residuals, stop_index) = PerWindowInvoker._try_split(
0.3,
0,
3,
self.windowed_value,
self.restriction,
self.watermark_estimator_state,
self.restriction_provider,
restriction_tracker,
self.watermark_estimator)
# 30% of 2.7 windows = 30% of 270 offset left = 81 offset
# 30 + 81 = 111 offset --> round to end of window 1
expected_primary_windows, expected_residual_windows = (
self.create_split_across_windows(
(self.window1, ), (self.window2, self.window3, )))
hc.assert_that(
primaries, hc.contains_inanyorder(
expected_primary_windows,
))
hc.assert_that(
residuals, hc.contains_inanyorder(
expected_residual_windows,
))
self.assertEqual(stop_index, 1)
def test_window_observing_split_on_window_boundary_round_down_on_last_window(
self):
restriction_tracker = OffsetRestrictionTracker(self.restriction)
restriction_tracker.try_claim(30)
(primaries, residuals, stop_index) = PerWindowInvoker._try_split(
0.9,
0,
3,
self.windowed_value,
self.restriction,
self.watermark_estimator_state,
self.restriction_provider,
restriction_tracker,
self.watermark_estimator)
# 90% of 2.7 windows = 90% of 270 offset left = 243 offset
# 30 + 243 = 273 offset --> prefer a split so round to end of window 2
# instead of no split
expected_primary_windows, expected_residual_windows = (
self.create_split_across_windows(
(self.window1, self.window2, ), (self.window3, )))
hc.assert_that(
primaries, hc.contains_inanyorder(
expected_primary_windows,
))
hc.assert_that(
residuals, hc.contains_inanyorder(
expected_residual_windows,
))
self.assertEqual(stop_index, 2)
class UtilitiesTest(unittest.TestCase):
def test_equal_environments_merged(self):
pipeline_proto = merge_common_environments(
beam_runner_api_pb2.Pipeline(
components=beam_runner_api_pb2.Components(
environments={
'a1': beam_runner_api_pb2.Environment(urn='A'),
'a2': beam_runner_api_pb2.Environment(urn='A'),
'b1': beam_runner_api_pb2.Environment(
urn='B', payload=b'x'),
'b2': beam_runner_api_pb2.Environment(
urn='B', payload=b'x'),
'b3': beam_runner_api_pb2.Environment(
urn='B', payload=b'y'),
},
transforms={
't1': beam_runner_api_pb2.PTransform(
unique_name='t1', environment_id='a1'),
't2': beam_runner_api_pb2.PTransform(
unique_name='t2', environment_id='a2'),
},
windowing_strategies={
'w1': beam_runner_api_pb2.WindowingStrategy(
environment_id='b1'),
'w2': beam_runner_api_pb2.WindowingStrategy(
environment_id='b2'),
})))
self.assertEqual(len(pipeline_proto.components.environments), 3)
self.assertTrue(('a1' in pipeline_proto.components.environments)
^ ('a2' in pipeline_proto.components.environments))
self.assertTrue(('b1' in pipeline_proto.components.environments)
^ ('b2' in pipeline_proto.components.environments))
self.assertEqual(
len(
set(
t.environment_id
for t in pipeline_proto.components.transforms.values())),
1)
self.assertEqual(
len(
set(
w.environment_id for w in
pipeline_proto.components.windowing_strategies.values())),
1)
def test_external_merged(self):
p = beam.Pipeline()
# This transform recursively creates several external environments.
_ = p | FibTransform(4)
pipeline_proto = p.to_runner_api()
# All our external environments are equal and consolidated.
# We also have a placeholder "default" environment that has not been
# resolved do anything concrete yet.
self.assertEqual(len(pipeline_proto.components.environments), 2)
if __name__ == '__main__':
unittest.main()