blob: b6ec53dacef009b84ad12aed91ee68d6cfb93c4b [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# pytype: skip-file
"""Unit tests for deduplicate transform by using TestStream."""
import unittest
import pytest
import apache_beam as beam
from apache_beam.coders import coders
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.test_stream import TestStream
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
from apache_beam.testing.util import equal_to_per_window
from apache_beam.transforms import deduplicate
from apache_beam.transforms import window
from apache_beam.utils.timestamp import Duration
from apache_beam.utils.timestamp import Timestamp
# TestStream is only supported in streaming pipeline. The Deduplicate transform
# also requires Timer support. Sickbaying this testsuite until dataflow runner
# supports both TestStream and user timer.
@pytest.mark.no_sickbay_batch
@pytest.mark.no_sickbay_streaming
@pytest.mark.it_validatesrunner
class DeduplicateTest(unittest.TestCase):
def __init__(self, *args, **kwargs):
self.runner = None
self.options = None
super(DeduplicateTest, self).__init__(*args, **kwargs)
def set_runner(self, runner):
self.runner = runner
def set_options(self, options):
self.options = options
def create_pipeline(self):
if self.runner and self.options:
return TestPipeline(runner=self.runner, options=self.options)
elif self.runner:
return TestPipeline(runner=self.runner)
elif self.options:
return TestPipeline(options=self.options)
else:
return TestPipeline()
def test_deduplication_in_different_windows(self):
with self.create_pipeline() as p:
test_stream = (
TestStream(
coder=coders.StrUtf8Coder()).advance_watermark_to(0).add_elements(
[
window.TimestampedValue('k1', 0),
window.TimestampedValue('k2', 10),
window.TimestampedValue('k3', 20),
window.TimestampedValue('k1', 30),
window.TimestampedValue('k2', 40),
window.TimestampedValue('k3', 50),
window.TimestampedValue('k4', 60),
window.TimestampedValue('k5', 70),
window.TimestampedValue('k6', 80)
]).advance_watermark_to_infinity())
res = (
p
| test_stream
| beam.WindowInto(window.FixedWindows(30))
| deduplicate.Deduplicate(processing_time_duration=10 * 60)
| beam.Map(lambda e, ts=beam.DoFn.TimestampParam: (e, ts)))
# Deduplication should happen per window.
expect_unique_keys_per_window = {
window.IntervalWindow(0, 30): [('k1', Timestamp(0)),
('k2', Timestamp(10)),
('k3', Timestamp(20))],
window.IntervalWindow(30, 60): [('k1', Timestamp(30)),
('k2', Timestamp(40)),
('k3', Timestamp(50))],
window.IntervalWindow(60, 90): [('k4', Timestamp(60)),
('k5', Timestamp(70)),
('k6', Timestamp(80))],
}
assert_that(
res,
equal_to_per_window(expect_unique_keys_per_window),
use_global_window=False,
label='assert per window')
@unittest.skip('TestStream not yet supported')
def test_deduplication_with_event_time(self):
deduplicate_duration = 60
with self.create_pipeline() as p:
test_stream = (
TestStream(coder=coders.StrUtf8Coder()).with_output_types(
str).advance_watermark_to(0).add_elements([
window.TimestampedValue('k1', 0),
window.TimestampedValue('k2', 20),
window.TimestampedValue('k3', 30)
]).advance_watermark_to(30).add_elements([
window.TimestampedValue('k1', 40),
window.TimestampedValue('k2', 50),
window.TimestampedValue('k3', 60)
]).advance_watermark_to(deduplicate_duration).add_elements([
window.TimestampedValue('k1', 70)
]).advance_watermark_to_infinity())
res = (
p
| test_stream
| deduplicate.Deduplicate(
event_time_duration=Duration(deduplicate_duration))
| beam.Map(lambda e, ts=beam.DoFn.TimestampParam: (e, ts)))
assert_that(
res,
equal_to([('k1', Timestamp(0)), ('k2', Timestamp(20)),
('k3', Timestamp(30)), ('k1', Timestamp(70))]))
@unittest.skip('TestStream not yet supported')
def test_deduplication_with_processing_time(self):
deduplicate_duration = 60
with self.create_pipeline() as p:
test_stream = (
TestStream(coder=coders.StrUtf8Coder()).with_output_types(
str).advance_watermark_to(0).add_elements([
window.TimestampedValue('k1', 0),
window.TimestampedValue('k2', 20),
window.TimestampedValue('k3', 30)
]).advance_processing_time(30).add_elements([
window.TimestampedValue('k1', 40),
window.TimestampedValue('k2', 50),
window.TimestampedValue('k3', 60)
]).advance_processing_time(deduplicate_duration).add_elements([
window.TimestampedValue('k1', 70)
]).advance_watermark_to_infinity())
res = (
p
| test_stream
| deduplicate.Deduplicate(
processing_time_duration=Duration(deduplicate_duration))
| beam.Map(lambda e, ts=beam.DoFn.TimestampParam: (e, ts)))
assert_that(
res,
equal_to([('k1', Timestamp(0)), ('k2', Timestamp(20)),
('k3', Timestamp(30)), ('k1', Timestamp(70))]))
if __name__ == '__main__':
unittest.main()