blob: 310264d6f37c0355e4e1d0889db381adb4fcefa6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.dataflow.worker;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkState;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertThat;
import org.apache.avro.reflect.Nullable;
import org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoFn;
import org.apache.beam.runners.dataflow.worker.util.common.worker.Receiver;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
import org.joda.time.Instant;
import org.junit.Test;
/** Tests for {@link ReifyTimestampAndWindowsParDoFnFactory} */
public class ReifyTimestampAndWindowsParDoFnFactoryTest {
private <K, V> void verifyReifiedIsInTheSameWindows(WindowedValue<KV<K, V>> elem)
throws Exception {
ParDoFn reifyFn =
new ReifyTimestampAndWindowsParDoFnFactory()
.create(null, null, null, null, null, null, null);
SingleValueReceiver<WindowedValue<KV<K, WindowedValue<V>>>> receiver =
new SingleValueReceiver<>();
reifyFn.startBundle(receiver);
// The important thing to test that is not just a restatement of the ParDoFn is that
// it only produces one element per input
reifyFn.processElement(elem);
assertThat(receiver.reified.getValue().getKey(), equalTo(elem.getValue().getKey()));
assertThat(
receiver.reified.getValue().getValue().getValue(), equalTo(elem.getValue().getValue()));
assertThat(receiver.reified.getValue().getValue().getTimestamp(), equalTo(elem.getTimestamp()));
assertThat(receiver.reified.getValue().getValue().getWindows(), equalTo(elem.getWindows()));
assertThat(receiver.reified.getValue().getValue().getPane(), equalTo(elem.getPane()));
}
@Test
public void testSingleWindow() throws Exception {
verifyReifiedIsInTheSameWindows(
WindowedValue.of(
KV.of(42, "bizzle"),
new Instant(73),
new IntervalWindow(new Instant(5), new Instant(15)),
PaneInfo.NO_FIRING));
}
@Test
public void testMultiWindowStaysCompressed() throws Exception {
verifyReifiedIsInTheSameWindows(
WindowedValue.of(
KV.of(42, "bizzle"),
new Instant(73),
ImmutableList.of(
new IntervalWindow(new Instant(5), new Instant(15)),
new IntervalWindow(new Instant(17), new Instant(97))),
PaneInfo.NO_FIRING));
}
private static class SingleValueReceiver<T> implements Receiver {
@Nullable public T reified = null;
@Override
public void process(Object outputElem) throws Exception {
checkState(reified == null, "Element output more than once");
reified = (T) outputElem;
}
}
}