blob: e698e02be22b5dc4b531ed897c00cf40de2b450a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.dataflow;
import static org.apache.beam.sdk.util.WindowedValue.valueInGlobalWindow;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import java.util.List;
import java.util.Map;
import org.apache.beam.runners.dataflow.BatchViewOverrides.TransformedMap;
import org.apache.beam.runners.dataflow.internal.IsmFormat;
import org.apache.beam.runners.dataflow.internal.IsmFormat.IsmRecord;
import org.apache.beam.runners.dataflow.internal.IsmFormat.IsmRecordCoder;
import org.apache.beam.runners.dataflow.internal.IsmFormat.MetadataKeyCoder;
import org.apache.beam.sdk.coders.BigEndianLongCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.transforms.DoFnTester;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
import org.joda.time.Instant;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
/** Tests for {@link BatchViewOverrides}. */
@RunWith(JUnit4.class)
public class BatchViewOverridesTest {
@Rule public ExpectedException thrown = ExpectedException.none();
@Test
public void testBatchViewAsSingletonToIsmRecord() throws Exception {
DoFnTester<
KV<Integer, Iterable<KV<GlobalWindow, WindowedValue<String>>>>,
IsmRecord<WindowedValue<String>>>
doFnTester =
DoFnTester.of(
new BatchViewOverrides.BatchViewAsSingleton.IsmRecordForSingularValuePerWindowDoFn<
String, GlobalWindow>(GlobalWindow.Coder.INSTANCE));
assertThat(
doFnTester.processBundle(
ImmutableList.of(
KV.of(
0, ImmutableList.of(KV.of(GlobalWindow.INSTANCE, valueInGlobalWindow("a")))))),
contains(IsmRecord.of(ImmutableList.of(GlobalWindow.INSTANCE), valueInGlobalWindow("a"))));
}
@Test
public void testBatchViewAsSingletonToIsmRecordWithMultipleValuesThrowsException()
throws Exception {
DoFnTester<
KV<Integer, Iterable<KV<GlobalWindow, WindowedValue<String>>>>,
IsmRecord<WindowedValue<String>>>
doFnTester =
DoFnTester.of(
new BatchViewOverrides.BatchViewAsSingleton.IsmRecordForSingularValuePerWindowDoFn<
String, GlobalWindow>(GlobalWindow.Coder.INSTANCE));
thrown.expect(IllegalStateException.class);
thrown.expectMessage("found for singleton within window");
doFnTester.processBundle(
ImmutableList.of(
KV.of(
0,
ImmutableList.of(
KV.of(GlobalWindow.INSTANCE, valueInGlobalWindow("a")),
KV.of(GlobalWindow.INSTANCE, valueInGlobalWindow("b"))))));
}
@Test
public void testBatchViewAsListToIsmRecordForGlobalWindow() throws Exception {
DoFnTester<String, IsmRecord<WindowedValue<String>>> doFnTester =
DoFnTester.of(
new BatchViewOverrides.BatchViewAsList.ToIsmRecordForGlobalWindowDoFn<String>());
// The order of the output elements is important relative to processing order
assertThat(
doFnTester.processBundle(ImmutableList.of("a", "b", "c")),
contains(
IsmRecord.of(ImmutableList.of(GlobalWindow.INSTANCE, 0L), valueInGlobalWindow("a")),
IsmRecord.of(ImmutableList.of(GlobalWindow.INSTANCE, 1L), valueInGlobalWindow("b")),
IsmRecord.of(ImmutableList.of(GlobalWindow.INSTANCE, 2L), valueInGlobalWindow("c"))));
}
@Test
public void testBatchViewAsListToIsmRecordForNonGlobalWindow() throws Exception {
DoFnTester<
KV<Integer, Iterable<KV<IntervalWindow, WindowedValue<Long>>>>,
IsmRecord<WindowedValue<Long>>>
doFnTester =
DoFnTester.of(
new BatchViewOverrides.BatchViewAsList.ToIsmRecordForNonGlobalWindowDoFn<
Long, IntervalWindow>(IntervalWindow.getCoder()));
IntervalWindow windowA = new IntervalWindow(new Instant(0), new Instant(10));
IntervalWindow windowB = new IntervalWindow(new Instant(10), new Instant(20));
IntervalWindow windowC = new IntervalWindow(new Instant(20), new Instant(30));
Iterable<KV<Integer, Iterable<KV<IntervalWindow, WindowedValue<Long>>>>> inputElements =
ImmutableList.of(
KV.of(
1,
(Iterable<KV<IntervalWindow, WindowedValue<Long>>>)
ImmutableList.of(
KV.of(
windowA,
WindowedValue.of(110L, new Instant(1), windowA, PaneInfo.NO_FIRING)),
KV.of(
windowA,
WindowedValue.of(111L, new Instant(3), windowA, PaneInfo.NO_FIRING)),
KV.of(
windowA,
WindowedValue.of(112L, new Instant(4), windowA, PaneInfo.NO_FIRING)),
KV.of(
windowB,
WindowedValue.of(120L, new Instant(12), windowB, PaneInfo.NO_FIRING)),
KV.of(
windowB,
WindowedValue.of(121L, new Instant(14), windowB, PaneInfo.NO_FIRING)))),
KV.of(
2,
(Iterable<KV<IntervalWindow, WindowedValue<Long>>>)
ImmutableList.of(
KV.of(
windowC,
WindowedValue.of(
210L, new Instant(25), windowC, PaneInfo.NO_FIRING)))));
// The order of the output elements is important relative to processing order
assertThat(
doFnTester.processBundle(inputElements),
contains(
IsmRecord.of(
ImmutableList.of(windowA, 0L),
WindowedValue.of(110L, new Instant(1), windowA, PaneInfo.NO_FIRING)),
IsmRecord.of(
ImmutableList.of(windowA, 1L),
WindowedValue.of(111L, new Instant(3), windowA, PaneInfo.NO_FIRING)),
IsmRecord.of(
ImmutableList.of(windowA, 2L),
WindowedValue.of(112L, new Instant(4), windowA, PaneInfo.NO_FIRING)),
IsmRecord.of(
ImmutableList.of(windowB, 0L),
WindowedValue.of(120L, new Instant(12), windowB, PaneInfo.NO_FIRING)),
IsmRecord.of(
ImmutableList.of(windowB, 1L),
WindowedValue.of(121L, new Instant(14), windowB, PaneInfo.NO_FIRING)),
IsmRecord.of(
ImmutableList.of(windowC, 0L),
WindowedValue.of(210L, new Instant(25), windowC, PaneInfo.NO_FIRING))));
}
@Test
public void testToIsmRecordForMapLikeDoFn() throws Exception {
TupleTag<KV<Integer, KV<IntervalWindow, Long>>> outputForSizeTag = new TupleTag<>();
TupleTag<KV<Integer, KV<IntervalWindow, Long>>> outputForEntrySetTag = new TupleTag<>();
Coder<Long> keyCoder = VarLongCoder.of();
Coder<IntervalWindow> windowCoder = IntervalWindow.getCoder();
IsmRecordCoder<WindowedValue<Long>> ismCoder =
IsmRecordCoder.of(
1,
2,
ImmutableList.of(
MetadataKeyCoder.of(keyCoder), IntervalWindow.getCoder(), BigEndianLongCoder.of()),
FullWindowedValueCoder.of(VarLongCoder.of(), windowCoder));
DoFnTester<
KV<Integer, Iterable<KV<KV<Long, IntervalWindow>, WindowedValue<Long>>>>,
IsmRecord<WindowedValue<Long>>>
doFnTester =
DoFnTester.of(
new BatchViewOverrides.BatchViewAsMultimap.ToIsmRecordForMapLikeDoFn<>(
outputForSizeTag,
outputForEntrySetTag,
windowCoder,
keyCoder,
ismCoder,
false /* unique keys */));
IntervalWindow windowA = new IntervalWindow(new Instant(0), new Instant(10));
IntervalWindow windowB = new IntervalWindow(new Instant(10), new Instant(20));
IntervalWindow windowC = new IntervalWindow(new Instant(20), new Instant(30));
Iterable<KV<Integer, Iterable<KV<KV<Long, IntervalWindow>, WindowedValue<Long>>>>>
inputElements =
ImmutableList.of(
KV.of(
1,
(Iterable<KV<KV<Long, IntervalWindow>, WindowedValue<Long>>>)
ImmutableList.of(
KV.of(
KV.of(1L, windowA),
WindowedValue.of(
110L, new Instant(1), windowA, PaneInfo.NO_FIRING)),
// same window same key as to previous
KV.of(
KV.of(1L, windowA),
WindowedValue.of(
111L, new Instant(2), windowA, PaneInfo.NO_FIRING)),
// same window different key as to previous
KV.of(
KV.of(2L, windowA),
WindowedValue.of(
120L, new Instant(3), windowA, PaneInfo.NO_FIRING)),
// different window same key as to previous
KV.of(
KV.of(2L, windowB),
WindowedValue.of(
210L, new Instant(11), windowB, PaneInfo.NO_FIRING)),
// different window and different key as to previous
KV.of(
KV.of(3L, windowB),
WindowedValue.of(
220L, new Instant(12), windowB, PaneInfo.NO_FIRING)))),
KV.of(
2,
(Iterable<KV<KV<Long, IntervalWindow>, WindowedValue<Long>>>)
ImmutableList.of(
// different shard
KV.of(
KV.of(4L, windowC),
WindowedValue.of(
330L, new Instant(21), windowC, PaneInfo.NO_FIRING)))));
// The order of the output elements is important relative to processing order
assertThat(
doFnTester.processBundle(inputElements),
contains(
IsmRecord.of(
ImmutableList.of(1L, windowA, 0L),
WindowedValue.of(110L, new Instant(1), windowA, PaneInfo.NO_FIRING)),
IsmRecord.of(
ImmutableList.of(1L, windowA, 1L),
WindowedValue.of(111L, new Instant(2), windowA, PaneInfo.NO_FIRING)),
IsmRecord.of(
ImmutableList.of(2L, windowA, 0L),
WindowedValue.of(120L, new Instant(3), windowA, PaneInfo.NO_FIRING)),
IsmRecord.of(
ImmutableList.of(2L, windowB, 0L),
WindowedValue.of(210L, new Instant(11), windowB, PaneInfo.NO_FIRING)),
IsmRecord.of(
ImmutableList.of(3L, windowB, 0L),
WindowedValue.of(220L, new Instant(12), windowB, PaneInfo.NO_FIRING)),
IsmRecord.of(
ImmutableList.of(4L, windowC, 0L),
WindowedValue.of(330L, new Instant(21), windowC, PaneInfo.NO_FIRING))));
// Verify the number of unique keys per window.
assertThat(
doFnTester.takeOutputElements(outputForSizeTag),
contains(
KV.of(
ismCoder.hash(ImmutableList.of(IsmFormat.getMetadataKey(), windowA)),
KV.of(windowA, 2L)),
KV.of(
ismCoder.hash(ImmutableList.of(IsmFormat.getMetadataKey(), windowB)),
KV.of(windowB, 2L)),
KV.of(
ismCoder.hash(ImmutableList.of(IsmFormat.getMetadataKey(), windowC)),
KV.of(windowC, 1L))));
// Verify the output for the unique keys.
assertThat(
doFnTester.takeOutputElements(outputForEntrySetTag),
contains(
KV.of(
ismCoder.hash(ImmutableList.of(IsmFormat.getMetadataKey(), windowA)),
KV.of(windowA, 1L)),
KV.of(
ismCoder.hash(ImmutableList.of(IsmFormat.getMetadataKey(), windowA)),
KV.of(windowA, 2L)),
KV.of(
ismCoder.hash(ImmutableList.of(IsmFormat.getMetadataKey(), windowB)),
KV.of(windowB, 2L)),
KV.of(
ismCoder.hash(ImmutableList.of(IsmFormat.getMetadataKey(), windowB)),
KV.of(windowB, 3L)),
KV.of(
ismCoder.hash(ImmutableList.of(IsmFormat.getMetadataKey(), windowC)),
KV.of(windowC, 4L))));
}
@Test
public void testToIsmRecordForMapLikeDoFnWithoutUniqueKeysThrowsException() throws Exception {
TupleTag<KV<Integer, KV<IntervalWindow, Long>>> outputForSizeTag = new TupleTag<>();
TupleTag<KV<Integer, KV<IntervalWindow, Long>>> outputForEntrySetTag = new TupleTag<>();
Coder<Long> keyCoder = VarLongCoder.of();
Coder<IntervalWindow> windowCoder = IntervalWindow.getCoder();
IsmRecordCoder<WindowedValue<Long>> ismCoder =
IsmRecordCoder.of(
1,
2,
ImmutableList.of(
MetadataKeyCoder.of(keyCoder), IntervalWindow.getCoder(), BigEndianLongCoder.of()),
FullWindowedValueCoder.of(VarLongCoder.of(), windowCoder));
DoFnTester<
KV<Integer, Iterable<KV<KV<Long, IntervalWindow>, WindowedValue<Long>>>>,
IsmRecord<WindowedValue<Long>>>
doFnTester =
DoFnTester.of(
new BatchViewOverrides.BatchViewAsMultimap.ToIsmRecordForMapLikeDoFn<>(
outputForSizeTag,
outputForEntrySetTag,
windowCoder,
keyCoder,
ismCoder,
true /* unique keys */));
IntervalWindow windowA = new IntervalWindow(new Instant(0), new Instant(10));
Iterable<KV<Integer, Iterable<KV<KV<Long, IntervalWindow>, WindowedValue<Long>>>>>
inputElements =
ImmutableList.of(
KV.of(
1,
(Iterable<KV<KV<Long, IntervalWindow>, WindowedValue<Long>>>)
ImmutableList.of(
KV.of(
KV.of(1L, windowA),
WindowedValue.of(
110L, new Instant(1), windowA, PaneInfo.NO_FIRING)),
// same window same key as to previous
KV.of(
KV.of(1L, windowA),
WindowedValue.of(
111L, new Instant(2), windowA, PaneInfo.NO_FIRING)))));
thrown.expect(IllegalStateException.class);
thrown.expectMessage("Unique keys are expected but found key");
doFnTester.processBundle(inputElements);
}
@Test
public void testToIsmMetadataRecordForSizeDoFn() throws Exception {
Coder<Long> keyCoder = VarLongCoder.of();
Coder<IntervalWindow> windowCoder = IntervalWindow.getCoder();
IsmRecordCoder<WindowedValue<Long>> ismCoder =
IsmRecordCoder.of(
1,
2,
ImmutableList.of(
MetadataKeyCoder.of(keyCoder), IntervalWindow.getCoder(), BigEndianLongCoder.of()),
FullWindowedValueCoder.of(VarLongCoder.of(), windowCoder));
DoFnTester<KV<Integer, Iterable<KV<IntervalWindow, Long>>>, IsmRecord<WindowedValue<Long>>>
doFnTester =
DoFnTester.of(
new BatchViewOverrides.BatchViewAsMultimap.ToIsmMetadataRecordForSizeDoFn<
Long, Long, IntervalWindow>(windowCoder));
IntervalWindow windowA = new IntervalWindow(new Instant(0), new Instant(10));
IntervalWindow windowB = new IntervalWindow(new Instant(10), new Instant(20));
IntervalWindow windowC = new IntervalWindow(new Instant(20), new Instant(30));
Iterable<KV<Integer, Iterable<KV<IntervalWindow, Long>>>> inputElements =
ImmutableList.of(
KV.of(
1,
(Iterable<KV<IntervalWindow, Long>>)
ImmutableList.of(KV.of(windowA, 2L), KV.of(windowA, 3L), KV.of(windowB, 7L))),
KV.of(
ismCoder.hash(ImmutableList.of(IsmFormat.getMetadataKey(), windowB)),
(Iterable<KV<IntervalWindow, Long>>) ImmutableList.of(KV.of(windowC, 9L))));
// The order of the output elements is important relative to processing order
assertThat(
doFnTester.processBundle(inputElements),
contains(
IsmRecord.<WindowedValue<Long>>meta(
ImmutableList.of(IsmFormat.getMetadataKey(), windowA, 0L),
CoderUtils.encodeToByteArray(VarLongCoder.of(), 5L)),
IsmRecord.<WindowedValue<Long>>meta(
ImmutableList.of(IsmFormat.getMetadataKey(), windowB, 0L),
CoderUtils.encodeToByteArray(VarLongCoder.of(), 7L)),
IsmRecord.<WindowedValue<Long>>meta(
ImmutableList.of(IsmFormat.getMetadataKey(), windowC, 0L),
CoderUtils.encodeToByteArray(VarLongCoder.of(), 9L))));
}
@Test
public void testToIsmMetadataRecordForKeyDoFn() throws Exception {
Coder<Long> keyCoder = VarLongCoder.of();
Coder<IntervalWindow> windowCoder = IntervalWindow.getCoder();
IsmRecordCoder<WindowedValue<Long>> ismCoder =
IsmRecordCoder.of(
1,
2,
ImmutableList.of(
MetadataKeyCoder.of(keyCoder), IntervalWindow.getCoder(), BigEndianLongCoder.of()),
FullWindowedValueCoder.of(VarLongCoder.of(), windowCoder));
DoFnTester<KV<Integer, Iterable<KV<IntervalWindow, Long>>>, IsmRecord<WindowedValue<Long>>>
doFnTester =
DoFnTester.of(
new BatchViewOverrides.BatchViewAsMultimap.ToIsmMetadataRecordForKeyDoFn<
Long, Long, IntervalWindow>(keyCoder, windowCoder));
IntervalWindow windowA = new IntervalWindow(new Instant(0), new Instant(10));
IntervalWindow windowB = new IntervalWindow(new Instant(10), new Instant(20));
IntervalWindow windowC = new IntervalWindow(new Instant(20), new Instant(30));
Iterable<KV<Integer, Iterable<KV<IntervalWindow, Long>>>> inputElements =
ImmutableList.of(
KV.of(
1,
(Iterable<KV<IntervalWindow, Long>>)
ImmutableList.of(
KV.of(windowA, 2L),
// same window as previous
KV.of(windowA, 3L),
// different window as previous
KV.of(windowB, 3L))),
KV.of(
ismCoder.hash(ImmutableList.of(IsmFormat.getMetadataKey(), windowB)),
(Iterable<KV<IntervalWindow, Long>>) ImmutableList.of(KV.of(windowC, 3L))));
// The order of the output elements is important relative to processing order
assertThat(
doFnTester.processBundle(inputElements),
contains(
IsmRecord.<WindowedValue<Long>>meta(
ImmutableList.of(IsmFormat.getMetadataKey(), windowA, 1L),
CoderUtils.encodeToByteArray(VarLongCoder.of(), 2L)),
IsmRecord.<WindowedValue<Long>>meta(
ImmutableList.of(IsmFormat.getMetadataKey(), windowA, 2L),
CoderUtils.encodeToByteArray(VarLongCoder.of(), 3L)),
IsmRecord.<WindowedValue<Long>>meta(
ImmutableList.of(IsmFormat.getMetadataKey(), windowB, 1L),
CoderUtils.encodeToByteArray(VarLongCoder.of(), 3L)),
IsmRecord.<WindowedValue<Long>>meta(
ImmutableList.of(IsmFormat.getMetadataKey(), windowC, 1L),
CoderUtils.encodeToByteArray(VarLongCoder.of(), 3L))));
}
@Test
public void testToMapDoFn() throws Exception {
Coder<IntervalWindow> windowCoder = IntervalWindow.getCoder();
DoFnTester<
KV<Integer, Iterable<KV<IntervalWindow, WindowedValue<KV<Long, Long>>>>>,
IsmRecord<WindowedValue<TransformedMap<Long, WindowedValue<Long>, Long>>>>
doFnTester =
DoFnTester.of(
new BatchViewOverrides.BatchViewAsMap.ToMapDoFn<Long, Long, IntervalWindow>(
windowCoder));
IntervalWindow windowA = new IntervalWindow(new Instant(0), new Instant(10));
IntervalWindow windowB = new IntervalWindow(new Instant(10), new Instant(20));
IntervalWindow windowC = new IntervalWindow(new Instant(20), new Instant(30));
Iterable<KV<Integer, Iterable<KV<IntervalWindow, WindowedValue<KV<Long, Long>>>>>>
inputElements =
ImmutableList.of(
KV.of(
1,
(Iterable<KV<IntervalWindow, WindowedValue<KV<Long, Long>>>>)
ImmutableList.of(
KV.of(
windowA,
WindowedValue.of(
KV.of(1L, 11L), new Instant(3), windowA, PaneInfo.NO_FIRING)),
KV.of(
windowA,
WindowedValue.of(
KV.of(2L, 21L), new Instant(7), windowA, PaneInfo.NO_FIRING)),
KV.of(
windowB,
WindowedValue.of(
KV.of(2L, 21L), new Instant(13), windowB, PaneInfo.NO_FIRING)),
KV.of(
windowB,
WindowedValue.of(
KV.of(3L, 31L),
new Instant(15),
windowB,
PaneInfo.NO_FIRING)))),
KV.of(
2,
(Iterable<KV<IntervalWindow, WindowedValue<KV<Long, Long>>>>)
ImmutableList.of(
KV.of(
windowC,
WindowedValue.of(
KV.of(4L, 41L),
new Instant(25),
windowC,
PaneInfo.NO_FIRING)))));
// The order of the output elements is important relative to processing order
List<IsmRecord<WindowedValue<TransformedMap<Long, WindowedValue<Long>, Long>>>> output =
doFnTester.processBundle(inputElements);
assertEquals(3, output.size());
Map<Long, Long> outputMap;
outputMap = output.get(0).getValue().getValue();
assertEquals(2, outputMap.size());
assertEquals(ImmutableMap.of(1L, 11L, 2L, 21L), outputMap);
outputMap = output.get(1).getValue().getValue();
assertEquals(2, outputMap.size());
assertEquals(ImmutableMap.of(2L, 21L, 3L, 31L), outputMap);
outputMap = output.get(2).getValue().getValue();
assertEquals(1, outputMap.size());
assertEquals(ImmutableMap.of(4L, 41L), outputMap);
}
@Test
public void testToMultimapDoFn() throws Exception {
Coder<IntervalWindow> windowCoder = IntervalWindow.getCoder();
DoFnTester<
KV<Integer, Iterable<KV<IntervalWindow, WindowedValue<KV<Long, Long>>>>>,
IsmRecord<
WindowedValue<TransformedMap<Long, Iterable<WindowedValue<Long>>, Iterable<Long>>>>>
doFnTester =
DoFnTester.of(
new BatchViewOverrides.BatchViewAsMultimap.ToMultimapDoFn<
Long, Long, IntervalWindow>(windowCoder));
IntervalWindow windowA = new IntervalWindow(new Instant(0), new Instant(10));
IntervalWindow windowB = new IntervalWindow(new Instant(10), new Instant(20));
IntervalWindow windowC = new IntervalWindow(new Instant(20), new Instant(30));
Iterable<KV<Integer, Iterable<KV<IntervalWindow, WindowedValue<KV<Long, Long>>>>>>
inputElements =
ImmutableList.of(
KV.of(
1,
(Iterable<KV<IntervalWindow, WindowedValue<KV<Long, Long>>>>)
ImmutableList.of(
KV.of(
windowA,
WindowedValue.of(
KV.of(1L, 11L), new Instant(3), windowA, PaneInfo.NO_FIRING)),
// [BEAM-5184] Specifically test with a duplicate value to ensure that
// duplicate key/values are not lost.
KV.of(
windowA,
WindowedValue.of(
KV.of(1L, 11L), new Instant(3), windowA, PaneInfo.NO_FIRING)),
KV.of(
windowA,
WindowedValue.of(
KV.of(1L, 12L), new Instant(5), windowA, PaneInfo.NO_FIRING)),
KV.of(
windowA,
WindowedValue.of(
KV.of(2L, 21L), new Instant(7), windowA, PaneInfo.NO_FIRING)),
KV.of(
windowB,
WindowedValue.of(
KV.of(2L, 21L), new Instant(13), windowB, PaneInfo.NO_FIRING)),
KV.of(
windowB,
WindowedValue.of(
KV.of(3L, 31L),
new Instant(15),
windowB,
PaneInfo.NO_FIRING)))),
KV.of(
2,
(Iterable<KV<IntervalWindow, WindowedValue<KV<Long, Long>>>>)
ImmutableList.of(
KV.of(
windowC,
WindowedValue.of(
KV.of(4L, 41L),
new Instant(25),
windowC,
PaneInfo.NO_FIRING)))));
// The order of the output elements is important relative to processing order
List<
IsmRecord<
WindowedValue<TransformedMap<Long, Iterable<WindowedValue<Long>>, Iterable<Long>>>>>
output = doFnTester.processBundle(inputElements);
assertEquals(3, output.size());
Map<Long, Iterable<Long>> outputMap;
outputMap = output.get(0).getValue().getValue();
assertEquals(2, outputMap.size());
assertThat(outputMap.get(1L), containsInAnyOrder(11L, 11L, 12L));
assertThat(outputMap.get(2L), containsInAnyOrder(21L));
outputMap = output.get(1).getValue().getValue();
assertEquals(2, outputMap.size());
assertThat(outputMap.get(2L), containsInAnyOrder(21L));
assertThat(outputMap.get(3L), containsInAnyOrder(31L));
outputMap = output.get(2).getValue().getValue();
assertEquals(1, outputMap.size());
assertThat(outputMap.get(4L), containsInAnyOrder(41L));
}
}