blob: 3fdec453f3bce83500d1fc0473034416e3efa840 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.transforms;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
import static org.hamcrest.Matchers.isA;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NoSuchElementException;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.testing.DataflowPortabilityApiUnsupported;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.UsesSideInputs;
import org.apache.beam.sdk.testing.ValidatesRunner;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.InvalidWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.internal.matchers.ThrowableMessageMatcher;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
/**
* Tests for {@link View}. See also {@link ParDoTest}, which provides additional coverage since
* views can only be observed via {@link ParDo}.
*/
@RunWith(JUnit4.class)
@Category(UsesSideInputs.class)
public class ViewTest implements Serializable {
// This test is Serializable, just so that it's easy to have
// anonymous inner classes inside the non-static test methods.
@Rule public final transient TestPipeline pipeline = TestPipeline.create();
@Rule public transient ExpectedException thrown = ExpectedException.none();
@Test
@Category(ValidatesRunner.class)
public void testSingletonSideInput() {
final PCollectionView<Integer> view =
pipeline.apply("Create47", Create.of(47)).apply(View.asSingleton());
PCollection<Integer> output =
pipeline
.apply("Create123", Create.of(1, 2, 3))
.apply(
"OutputSideInputs",
ParDo.of(
new DoFn<Integer, Integer>() {
@ProcessElement
public void processElement(ProcessContext c) {
c.output(c.sideInput(view));
}
})
.withSideInputs(view));
PAssert.that(output).containsInAnyOrder(47, 47, 47);
pipeline.run();
}
@Test
@Category(ValidatesRunner.class)
public void testWindowedSingletonSideInput() {
final PCollectionView<Integer> view =
pipeline
.apply(
"Create47",
Create.timestamped(
TimestampedValue.of(47, new Instant(1)),
TimestampedValue.of(48, new Instant(11))))
.apply("SideWindowInto", Window.into(FixedWindows.of(Duration.millis(10))))
.apply(View.asSingleton());
PCollection<Integer> output =
pipeline
.apply(
"Create123",
Create.timestamped(
TimestampedValue.of(1, new Instant(4)),
TimestampedValue.of(2, new Instant(8)),
TimestampedValue.of(3, new Instant(12))))
.apply("MainWindowInto", Window.into(FixedWindows.of(Duration.millis(10))))
.apply(
"OutputSideInputs",
ParDo.of(
new DoFn<Integer, Integer>() {
@ProcessElement
public void processElement(ProcessContext c) {
c.output(c.sideInput(view));
}
})
.withSideInputs(view));
PAssert.that(output).containsInAnyOrder(47, 47, 48);
pipeline.run();
}
@Test
@Category(NeedsRunner.class)
public void testEmptySingletonSideInput() throws Exception {
final PCollectionView<Integer> view =
pipeline
.apply("CreateEmptyIntegers", Create.empty(VarIntCoder.of()))
.apply(View.asSingleton());
pipeline
.apply("Create123", Create.of(1, 2, 3))
.apply(
"OutputSideInputs",
ParDo.of(
new DoFn<Integer, Integer>() {
@ProcessElement
public void processElement(ProcessContext c) {
c.output(c.sideInput(view));
}
})
.withSideInputs(view));
thrown.expect(PipelineExecutionException.class);
thrown.expectCause(isA(NoSuchElementException.class));
thrown.expectMessage("Empty");
thrown.expectMessage("PCollection");
thrown.expectMessage("singleton");
pipeline.run();
}
@Test
@Category(NeedsRunner.class)
public void testNonSingletonSideInput() throws Exception {
PCollection<Integer> oneTwoThree = pipeline.apply(Create.of(1, 2, 3));
final PCollectionView<Integer> view = oneTwoThree.apply(View.asSingleton());
oneTwoThree.apply(
"OutputSideInputs",
ParDo.of(
new DoFn<Integer, Integer>() {
@ProcessElement
public void processElement(ProcessContext c) {
c.output(c.sideInput(view));
}
})
.withSideInputs(view));
thrown.expect(PipelineExecutionException.class);
thrown.expectCause(isA(IllegalArgumentException.class));
thrown.expectMessage("PCollection");
thrown.expectMessage("more than one");
thrown.expectMessage("singleton");
pipeline.run();
}
@Test
@Category(ValidatesRunner.class)
public void testListSideInput() {
final PCollectionView<List<Integer>> view =
pipeline.apply("CreateSideInput", Create.of(11, 13, 17, 23)).apply(View.asList());
PCollection<Integer> output =
pipeline
.apply("CreateMainInput", Create.of(29, 31))
.apply(
"OutputSideInputs",
ParDo.of(
new DoFn<Integer, Integer>() {
@ProcessElement
public void processElement(ProcessContext c) {
checkArgument(c.sideInput(view).size() == 4);
checkArgument(
c.sideInput(view).get(0).equals(c.sideInput(view).get(0)));
for (Integer i : c.sideInput(view)) {
c.output(i);
}
}
})
.withSideInputs(view));
PAssert.that(output).containsInAnyOrder(11, 13, 17, 23, 11, 13, 17, 23);
pipeline.run();
}
@Test
@Category(ValidatesRunner.class)
public void testWindowedListSideInput() {
final PCollectionView<List<Integer>> view =
pipeline
.apply(
"CreateSideInput",
Create.timestamped(
TimestampedValue.of(11, new Instant(1)),
TimestampedValue.of(13, new Instant(1)),
TimestampedValue.of(17, new Instant(1)),
TimestampedValue.of(23, new Instant(1)),
TimestampedValue.of(31, new Instant(11)),
TimestampedValue.of(33, new Instant(11)),
TimestampedValue.of(37, new Instant(11)),
TimestampedValue.of(43, new Instant(11))))
.apply("SideWindowInto", Window.into(FixedWindows.of(Duration.millis(10))))
.apply(View.asList());
PCollection<Integer> output =
pipeline
.apply(
"CreateMainInput",
Create.timestamped(
TimestampedValue.of(29, new Instant(1)),
TimestampedValue.of(35, new Instant(11))))
.apply("MainWindowInto", Window.into(FixedWindows.of(Duration.millis(10))))
.apply(
"OutputSideInputs",
ParDo.of(
new DoFn<Integer, Integer>() {
@ProcessElement
public void processElement(ProcessContext c) {
checkArgument(c.sideInput(view).size() == 4);
checkArgument(
c.sideInput(view).get(0).equals(c.sideInput(view).get(0)));
for (Integer i : c.sideInput(view)) {
c.output(i);
}
}
})
.withSideInputs(view));
PAssert.that(output).containsInAnyOrder(11, 13, 17, 23, 31, 33, 37, 43);
pipeline.run();
}
@Test
@Category(ValidatesRunner.class)
public void testEmptyListSideInput() throws Exception {
final PCollectionView<List<Integer>> view =
pipeline.apply("CreateEmptyView", Create.empty(VarIntCoder.of())).apply(View.asList());
PCollection<Integer> results =
pipeline
.apply("Create1", Create.of(1))
.apply(
"OutputSideInputs",
ParDo.of(
new DoFn<Integer, Integer>() {
@ProcessElement
public void processElement(ProcessContext c) {
assertTrue(c.sideInput(view).isEmpty());
assertFalse(c.sideInput(view).iterator().hasNext());
c.output(1);
}
})
.withSideInputs(view));
// Pass at least one value through to guarantee that DoFn executes.
PAssert.that(results).containsInAnyOrder(1);
pipeline.run();
}
@Test
@Category(ValidatesRunner.class)
public void testListSideInputIsImmutable() {
final PCollectionView<List<Integer>> view =
pipeline.apply("CreateSideInput", Create.of(11)).apply(View.asList());
PCollection<Integer> output =
pipeline
.apply("CreateMainInput", Create.of(29))
.apply(
"OutputSideInputs",
ParDo.of(
new DoFn<Integer, Integer>() {
@ProcessElement
public void processElement(ProcessContext c) {
try {
c.sideInput(view).clear();
fail("Expected UnsupportedOperationException on clear()");
} catch (UnsupportedOperationException expected) {
}
try {
c.sideInput(view).add(4);
fail("Expected UnsupportedOperationException on add()");
} catch (UnsupportedOperationException expected) {
}
try {
c.sideInput(view).addAll(new ArrayList<>());
fail("Expected UnsupportedOperationException on addAll()");
} catch (UnsupportedOperationException expected) {
}
try {
c.sideInput(view).remove(0);
fail("Expected UnsupportedOperationException on remove()");
} catch (UnsupportedOperationException expected) {
}
for (Integer i : c.sideInput(view)) {
c.output(i);
}
}
})
.withSideInputs(view));
// Pass at least one value through to guarantee that DoFn executes.
PAssert.that(output).containsInAnyOrder(11);
pipeline.run();
}
@Test
@Category(ValidatesRunner.class)
public void testIterableSideInput() {
final PCollectionView<Iterable<Integer>> view =
pipeline.apply("CreateSideInput", Create.of(11, 13, 17, 23)).apply(View.asIterable());
PCollection<Integer> output =
pipeline
.apply("CreateMainInput", Create.of(29, 31))
.apply(
"OutputSideInputs",
ParDo.of(
new DoFn<Integer, Integer>() {
@ProcessElement
public void processElement(ProcessContext c) {
for (Integer i : c.sideInput(view)) {
c.output(i);
}
}
})
.withSideInputs(view));
PAssert.that(output).containsInAnyOrder(11, 13, 17, 23, 11, 13, 17, 23);
pipeline.run();
}
@Test
@Category(ValidatesRunner.class)
public void testWindowedIterableSideInput() {
final PCollectionView<Iterable<Integer>> view =
pipeline
.apply(
"CreateSideInput",
Create.timestamped(
TimestampedValue.of(11, new Instant(1)),
TimestampedValue.of(13, new Instant(1)),
TimestampedValue.of(17, new Instant(1)),
TimestampedValue.of(23, new Instant(1)),
TimestampedValue.of(31, new Instant(11)),
TimestampedValue.of(33, new Instant(11)),
TimestampedValue.of(37, new Instant(11)),
TimestampedValue.of(43, new Instant(11))))
.apply("SideWindowInto", Window.into(FixedWindows.of(Duration.millis(10))))
.apply(View.asIterable());
PCollection<Integer> output =
pipeline
.apply(
"CreateMainInput",
Create.timestamped(
TimestampedValue.of(29, new Instant(1)),
TimestampedValue.of(35, new Instant(11))))
.apply("MainWindowInto", Window.into(FixedWindows.of(Duration.millis(10))))
.apply(
"OutputSideInputs",
ParDo.of(
new DoFn<Integer, Integer>() {
@ProcessElement
public void processElement(ProcessContext c) {
for (Integer i : c.sideInput(view)) {
c.output(i);
}
}
})
.withSideInputs(view));
PAssert.that(output).containsInAnyOrder(11, 13, 17, 23, 31, 33, 37, 43);
pipeline.run();
}
@Test
@Category(ValidatesRunner.class)
public void testEmptyIterableSideInput() throws Exception {
final PCollectionView<Iterable<Integer>> view =
pipeline.apply("CreateEmptyView", Create.empty(VarIntCoder.of())).apply(View.asIterable());
PCollection<Integer> results =
pipeline
.apply("Create1", Create.of(1))
.apply(
"OutputSideInputs",
ParDo.of(
new DoFn<Integer, Integer>() {
@ProcessElement
public void processElement(ProcessContext c) {
assertFalse(c.sideInput(view).iterator().hasNext());
c.output(1);
}
})
.withSideInputs(view));
// Pass at least one value through to guarantee that DoFn executes.
PAssert.that(results).containsInAnyOrder(1);
pipeline.run();
}
@Test
@Category(ValidatesRunner.class)
public void testIterableSideInputIsImmutable() {
final PCollectionView<Iterable<Integer>> view =
pipeline.apply("CreateSideInput", Create.of(11)).apply(View.asIterable());
PCollection<Integer> output =
pipeline
.apply("CreateMainInput", Create.of(29))
.apply(
"OutputSideInputs",
ParDo.of(
new DoFn<Integer, Integer>() {
@ProcessElement
public void processElement(ProcessContext c) {
Iterator<Integer> iterator = c.sideInput(view).iterator();
while (iterator.hasNext()) {
try {
iterator.remove();
fail("Expected UnsupportedOperationException on remove()");
} catch (UnsupportedOperationException expected) {
}
c.output(iterator.next());
}
}
})
.withSideInputs(view));
// Pass at least one value through to guarantee that DoFn executes.
PAssert.that(output).containsInAnyOrder(11);
pipeline.run();
}
@Test
@Category(ValidatesRunner.class)
public void testMultimapSideInput() {
final PCollectionView<Map<String, Iterable<Integer>>> view =
pipeline
.apply(
"CreateSideInput",
Create.of(KV.of("a", 1), KV.of("a", 1), KV.of("a", 2), KV.of("b", 3)))
.apply(View.asMultimap());
PCollection<KV<String, Integer>> output =
pipeline
.apply("CreateMainInput", Create.of("apple", "banana", "blackberry"))
.apply(
"OutputSideInputs",
ParDo.of(
new DoFn<String, KV<String, Integer>>() {
@ProcessElement
public void processElement(ProcessContext c) {
for (Integer v : c.sideInput(view).get(c.element().substring(0, 1))) {
c.output(KV.of(c.element(), v));
}
}
})
.withSideInputs(view));
PAssert.that(output)
.containsInAnyOrder(
KV.of("apple", 1),
KV.of("apple", 1),
KV.of("apple", 2),
KV.of("banana", 3),
KV.of("blackberry", 3));
pipeline.run();
}
@Test
@Category(ValidatesRunner.class)
public void testMultimapAsEntrySetSideInput() {
final PCollectionView<Map<String, Iterable<Integer>>> view =
pipeline
.apply(
"CreateSideInput",
Create.of(KV.of("a", 1), KV.of("a", 1), KV.of("a", 2), KV.of("b", 3)))
.apply(View.asMultimap());
PCollection<KV<String, Integer>> output =
pipeline
.apply("CreateMainInput", Create.of(2 /* size */))
.apply(
"OutputSideInputs",
ParDo.of(
new DoFn<Integer, KV<String, Integer>>() {
@ProcessElement
public void processElement(ProcessContext c) {
assertEquals((int) c.element(), c.sideInput(view).size());
assertEquals((int) c.element(), c.sideInput(view).entrySet().size());
for (Entry<String, Iterable<Integer>> entry :
c.sideInput(view).entrySet()) {
for (Integer value : entry.getValue()) {
c.output(KV.of(entry.getKey(), value));
}
}
}
})
.withSideInputs(view));
PAssert.that(output)
.containsInAnyOrder(KV.of("a", 1), KV.of("a", 1), KV.of("a", 2), KV.of("b", 3));
pipeline.run();
}
private static class NonDeterministicStringCoder extends AtomicCoder<String> {
@Override
public void encode(String value, OutputStream outStream) throws CoderException, IOException {
encode(value, outStream, Coder.Context.NESTED);
}
@Override
public void encode(String value, OutputStream outStream, Coder.Context context)
throws CoderException, IOException {
StringUtf8Coder.of().encode(value, outStream, context);
}
@Override
public String decode(InputStream inStream) throws CoderException, IOException {
return decode(inStream, Coder.Context.NESTED);
}
@Override
public String decode(InputStream inStream, Coder.Context context)
throws CoderException, IOException {
return StringUtf8Coder.of().decode(inStream, context);
}
@Override
public void verifyDeterministic()
throws org.apache.beam.sdk.coders.Coder.NonDeterministicException {
throw new NonDeterministicException(this, "Test coder is not deterministic on purpose.");
}
}
@Test
@Category({ValidatesRunner.class, DataflowPortabilityApiUnsupported.class})
public void testMultimapSideInputWithNonDeterministicKeyCoder() {
final PCollectionView<Map<String, Iterable<Integer>>> view =
pipeline
.apply(
"CreateSideInput",
Create.of(KV.of("a", 1), KV.of("a", 1), KV.of("a", 2), KV.of("b", 3))
.withCoder(KvCoder.of(new NonDeterministicStringCoder(), VarIntCoder.of())))
.apply(View.asMultimap());
PCollection<KV<String, Integer>> output =
pipeline
.apply("CreateMainInput", Create.of("apple", "banana", "blackberry"))
.apply(
"OutputSideInputs",
ParDo.of(
new DoFn<String, KV<String, Integer>>() {
@ProcessElement
public void processElement(ProcessContext c) {
for (Integer v : c.sideInput(view).get(c.element().substring(0, 1))) {
c.output(KV.of(c.element(), v));
}
}
})
.withSideInputs(view));
PAssert.that(output)
.containsInAnyOrder(
KV.of("apple", 1),
KV.of("apple", 1),
KV.of("apple", 2),
KV.of("banana", 3),
KV.of("blackberry", 3));
pipeline.run();
}
@Test
@Category(ValidatesRunner.class)
public void testWindowedMultimapSideInput() {
final PCollectionView<Map<String, Iterable<Integer>>> view =
pipeline
.apply(
"CreateSideInput",
Create.timestamped(
TimestampedValue.of(KV.of("a", 1), new Instant(1)),
TimestampedValue.of(KV.of("a", 1), new Instant(2)),
TimestampedValue.of(KV.of("a", 2), new Instant(7)),
TimestampedValue.of(KV.of("b", 3), new Instant(14))))
.apply("SideWindowInto", Window.into(FixedWindows.of(Duration.millis(10))))
.apply(View.asMultimap());
PCollection<KV<String, Integer>> output =
pipeline
.apply(
"CreateMainInput",
Create.timestamped(
TimestampedValue.of("apple", new Instant(5)),
TimestampedValue.of("banana", new Instant(13)),
TimestampedValue.of("blackberry", new Instant(16))))
.apply("MainWindowInto", Window.into(FixedWindows.of(Duration.millis(10))))
.apply(
"OutputSideInputs",
ParDo.of(
new DoFn<String, KV<String, Integer>>() {
@ProcessElement
public void processElement(ProcessContext c) {
for (Integer v : c.sideInput(view).get(c.element().substring(0, 1))) {
c.output(KV.of(c.element(), v));
}
}
})
.withSideInputs(view));
PAssert.that(output)
.containsInAnyOrder(
KV.of("apple", 1),
KV.of("apple", 1),
KV.of("apple", 2),
KV.of("banana", 3),
KV.of("blackberry", 3));
pipeline.run();
}
@Test
@Category(ValidatesRunner.class)
public void testWindowedMultimapAsEntrySetSideInput() {
final PCollectionView<Map<String, Iterable<Integer>>> view =
pipeline
.apply(
"CreateSideInput",
Create.timestamped(
TimestampedValue.of(KV.of("a", 1), new Instant(1)),
TimestampedValue.of(KV.of("a", 1), new Instant(2)),
TimestampedValue.of(KV.of("a", 2), new Instant(7)),
TimestampedValue.of(KV.of("b", 3), new Instant(14))))
.apply("SideWindowInto", Window.into(FixedWindows.of(Duration.millis(10))))
.apply(View.asMultimap());
PCollection<KV<String, Integer>> output =
pipeline
.apply(
"CreateMainInput",
Create.timestamped(
TimestampedValue.of(1 /* size */, new Instant(5)),
TimestampedValue.of(1 /* size */, new Instant(16))))
.apply("MainWindowInto", Window.into(FixedWindows.of(Duration.millis(10))))
.apply(
"OutputSideInputs",
ParDo.of(
new DoFn<Integer, KV<String, Integer>>() {
@ProcessElement
public void processElement(ProcessContext c) {
assertEquals((int) c.element(), c.sideInput(view).size());
assertEquals((int) c.element(), c.sideInput(view).entrySet().size());
for (Entry<String, Iterable<Integer>> entry :
c.sideInput(view).entrySet()) {
for (Integer value : entry.getValue()) {
c.output(KV.of(entry.getKey(), value));
}
}
}
})
.withSideInputs(view));
PAssert.that(output)
.containsInAnyOrder(KV.of("a", 1), KV.of("a", 1), KV.of("a", 2), KV.of("b", 3));
pipeline.run();
}
@Test
@Category({ValidatesRunner.class, DataflowPortabilityApiUnsupported.class})
public void testWindowedMultimapSideInputWithNonDeterministicKeyCoder() {
final PCollectionView<Map<String, Iterable<Integer>>> view =
pipeline
.apply(
"CreateSideInput",
Create.timestamped(
TimestampedValue.of(KV.of("a", 1), new Instant(1)),
TimestampedValue.of(KV.of("a", 1), new Instant(2)),
TimestampedValue.of(KV.of("a", 2), new Instant(7)),
TimestampedValue.of(KV.of("b", 3), new Instant(14)))
.withCoder(KvCoder.of(new NonDeterministicStringCoder(), VarIntCoder.of())))
.apply("SideWindowInto", Window.into(FixedWindows.of(Duration.millis(10))))
.apply(View.asMultimap());
PCollection<KV<String, Integer>> output =
pipeline
.apply(
"CreateMainInput",
Create.timestamped(
TimestampedValue.of("apple", new Instant(5)),
TimestampedValue.of("banana", new Instant(13)),
TimestampedValue.of("blackberry", new Instant(16))))
.apply("MainWindowInto", Window.into(FixedWindows.of(Duration.millis(10))))
.apply(
"OutputSideInputs",
ParDo.of(
new DoFn<String, KV<String, Integer>>() {
@ProcessElement
public void processElement(ProcessContext c) {
for (Integer v : c.sideInput(view).get(c.element().substring(0, 1))) {
c.output(KV.of(c.element(), v));
}
}
})
.withSideInputs(view));
PAssert.that(output)
.containsInAnyOrder(
KV.of("apple", 1),
KV.of("apple", 1),
KV.of("apple", 2),
KV.of("banana", 3),
KV.of("blackberry", 3));
pipeline.run();
}
@Test
@Category(ValidatesRunner.class)
public void testEmptyMultimapSideInput() throws Exception {
final PCollectionView<Map<String, Iterable<Integer>>> view =
pipeline
.apply(
"CreateEmptyView", Create.empty(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of())))
.apply(View.asMultimap());
PCollection<Integer> results =
pipeline
.apply("Create1", Create.of(1))
.apply(
"OutputSideInputs",
ParDo.of(
new DoFn<Integer, Integer>() {
@ProcessElement
public void processElement(ProcessContext c) {
assertTrue(c.sideInput(view).isEmpty());
assertTrue(c.sideInput(view).entrySet().isEmpty());
assertFalse(c.sideInput(view).entrySet().iterator().hasNext());
c.output(c.element());
}
})
.withSideInputs(view));
// Pass at least one value through to guarantee that DoFn executes.
PAssert.that(results).containsInAnyOrder(1);
pipeline.run();
}
@Test
@Category({ValidatesRunner.class, DataflowPortabilityApiUnsupported.class})
public void testEmptyMultimapSideInputWithNonDeterministicKeyCoder() throws Exception {
final PCollectionView<Map<String, Iterable<Integer>>> view =
pipeline
.apply(
"CreateEmptyView",
Create.empty(KvCoder.of(new NonDeterministicStringCoder(), VarIntCoder.of())))
.apply(View.asMultimap());
PCollection<Integer> results =
pipeline
.apply("Create1", Create.of(1))
.apply(
"OutputSideInputs",
ParDo.of(
new DoFn<Integer, Integer>() {
@ProcessElement
public void processElement(ProcessContext c) {
assertTrue(c.sideInput(view).isEmpty());
assertTrue(c.sideInput(view).entrySet().isEmpty());
assertFalse(c.sideInput(view).entrySet().iterator().hasNext());
c.output(c.element());
}
})
.withSideInputs(view));
// Pass at least one value through to guarantee that DoFn executes.
PAssert.that(results).containsInAnyOrder(1);
pipeline.run();
}
@Test
@Category(ValidatesRunner.class)
public void testMultimapSideInputIsImmutable() {
final PCollectionView<Map<String, Iterable<Integer>>> view =
pipeline.apply("CreateSideInput", Create.of(KV.of("a", 1))).apply(View.asMultimap());
PCollection<KV<String, Integer>> output =
pipeline
.apply("CreateMainInput", Create.of("apple"))
.apply(
"OutputSideInputs",
ParDo.of(
new DoFn<String, KV<String, Integer>>() {
@ProcessElement
public void processElement(ProcessContext c) {
try {
c.sideInput(view).clear();
fail("Expected UnsupportedOperationException on clear()");
} catch (UnsupportedOperationException expected) {
}
try {
c.sideInput(view).put("c", ImmutableList.of(3));
fail("Expected UnsupportedOperationException on put()");
} catch (UnsupportedOperationException expected) {
}
try {
c.sideInput(view).remove("c");
fail("Expected UnsupportedOperationException on remove()");
} catch (UnsupportedOperationException expected) {
}
try {
c.sideInput(view).putAll(new HashMap<>());
fail("Expected UnsupportedOperationException on putAll()");
} catch (UnsupportedOperationException expected) {
}
for (Integer v : c.sideInput(view).get(c.element().substring(0, 1))) {
c.output(KV.of(c.element(), v));
}
}
})
.withSideInputs(view));
// Pass at least one value through to guarantee that DoFn executes.
PAssert.that(output).containsInAnyOrder(KV.of("apple", 1));
pipeline.run();
}
@Test
@Category(ValidatesRunner.class)
public void testMapSideInput() {
final PCollectionView<Map<String, Integer>> view =
pipeline
.apply("CreateSideInput", Create.of(KV.of("a", 1), KV.of("b", 3)))
.apply(View.asMap());
PCollection<KV<String, Integer>> output =
pipeline
.apply("CreateMainInput", Create.of("apple", "banana", "blackberry"))
.apply(
"OutputSideInputs",
ParDo.of(
new DoFn<String, KV<String, Integer>>() {
@ProcessElement
public void processElement(ProcessContext c) {
c.output(
KV.of(
c.element(),
c.sideInput(view).get(c.element().substring(0, 1))));
}
})
.withSideInputs(view));
PAssert.that(output)
.containsInAnyOrder(KV.of("apple", 1), KV.of("banana", 3), KV.of("blackberry", 3));
pipeline.run();
}
@Test
@Category(ValidatesRunner.class)
public void testMapAsEntrySetSideInput() {
final PCollectionView<Map<String, Integer>> view =
pipeline
.apply("CreateSideInput", Create.of(KV.of("a", 1), KV.of("b", 3)))
.apply(View.asMap());
PCollection<KV<String, Integer>> output =
pipeline
.apply("CreateMainInput", Create.of(2 /* size */))
.apply(
"OutputSideInputs",
ParDo.of(
new DoFn<Integer, KV<String, Integer>>() {
@ProcessElement
public void processElement(ProcessContext c) {
assertEquals((int) c.element(), c.sideInput(view).size());
assertEquals((int) c.element(), c.sideInput(view).entrySet().size());
for (Entry<String, Integer> entry : c.sideInput(view).entrySet()) {
c.output(KV.of(entry.getKey(), entry.getValue()));
}
}
})
.withSideInputs(view));
PAssert.that(output).containsInAnyOrder(KV.of("a", 1), KV.of("b", 3));
pipeline.run();
}
@Test
@Category({ValidatesRunner.class, DataflowPortabilityApiUnsupported.class})
public void testMapSideInputWithNonDeterministicKeyCoder() {
final PCollectionView<Map<String, Integer>> view =
pipeline
.apply(
"CreateSideInput",
Create.of(KV.of("a", 1), KV.of("b", 3))
.withCoder(KvCoder.of(new NonDeterministicStringCoder(), VarIntCoder.of())))
.apply(View.asMap());
PCollection<KV<String, Integer>> output =
pipeline
.apply("CreateMainInput", Create.of("apple", "banana", "blackberry"))
.apply(
"OutputSideInputs",
ParDo.of(
new DoFn<String, KV<String, Integer>>() {
@ProcessElement
public void processElement(ProcessContext c) {
c.output(
KV.of(
c.element(),
c.sideInput(view).get(c.element().substring(0, 1))));
}
})
.withSideInputs(view));
PAssert.that(output)
.containsInAnyOrder(KV.of("apple", 1), KV.of("banana", 3), KV.of("blackberry", 3));
pipeline.run();
}
@Test
@Category(ValidatesRunner.class)
public void testWindowedMapSideInput() {
final PCollectionView<Map<String, Integer>> view =
pipeline
.apply(
"CreateSideInput",
Create.timestamped(
TimestampedValue.of(KV.of("a", 1), new Instant(1)),
TimestampedValue.of(KV.of("b", 2), new Instant(4)),
TimestampedValue.of(KV.of("b", 3), new Instant(18))))
.apply("SideWindowInto", Window.into(FixedWindows.of(Duration.millis(10))))
.apply(View.asMap());
PCollection<KV<String, Integer>> output =
pipeline
.apply(
"CreateMainInput",
Create.timestamped(
TimestampedValue.of("apple", new Instant(5)),
TimestampedValue.of("banana", new Instant(4)),
TimestampedValue.of("blackberry", new Instant(16))))
.apply("MainWindowInto", Window.into(FixedWindows.of(Duration.millis(10))))
.apply(
"OutputSideInputs",
ParDo.of(
new DoFn<String, KV<String, Integer>>() {
@ProcessElement
public void processElement(ProcessContext c) {
c.output(
KV.of(
c.element(),
c.sideInput(view).get(c.element().substring(0, 1))));
}
})
.withSideInputs(view));
PAssert.that(output)
.containsInAnyOrder(KV.of("apple", 1), KV.of("banana", 2), KV.of("blackberry", 3));
pipeline.run();
}
@Test
@Category(ValidatesRunner.class)
public void testWindowedMapAsEntrySetSideInput() {
final PCollectionView<Map<String, Integer>> view =
pipeline
.apply(
"CreateSideInput",
Create.timestamped(
TimestampedValue.of(KV.of("a", 1), new Instant(1)),
TimestampedValue.of(KV.of("b", 2), new Instant(4)),
TimestampedValue.of(KV.of("b", 3), new Instant(18))))
.apply("SideWindowInto", Window.into(FixedWindows.of(Duration.millis(10))))
.apply(View.asMap());
PCollection<KV<String, Integer>> output =
pipeline
.apply(
"CreateMainInput",
Create.timestamped(
TimestampedValue.of(2 /* size */, new Instant(5)),
TimestampedValue.of(1 /* size */, new Instant(16))))
.apply("MainWindowInto", Window.into(FixedWindows.of(Duration.millis(10))))
.apply(
"OutputSideInputs",
ParDo.of(
new DoFn<Integer, KV<String, Integer>>() {
@ProcessElement
public void processElement(ProcessContext c) {
assertEquals((int) c.element(), c.sideInput(view).size());
assertEquals((int) c.element(), c.sideInput(view).entrySet().size());
for (Entry<String, Integer> entry : c.sideInput(view).entrySet()) {
c.output(KV.of(entry.getKey(), entry.getValue()));
}
}
})
.withSideInputs(view));
PAssert.that(output).containsInAnyOrder(KV.of("a", 1), KV.of("b", 2), KV.of("b", 3));
pipeline.run();
}
@Test
@Category({ValidatesRunner.class, DataflowPortabilityApiUnsupported.class})
public void testWindowedMapSideInputWithNonDeterministicKeyCoder() {
final PCollectionView<Map<String, Integer>> view =
pipeline
.apply(
"CreateSideInput",
Create.timestamped(
TimestampedValue.of(KV.of("a", 1), new Instant(1)),
TimestampedValue.of(KV.of("b", 2), new Instant(4)),
TimestampedValue.of(KV.of("b", 3), new Instant(18)))
.withCoder(KvCoder.of(new NonDeterministicStringCoder(), VarIntCoder.of())))
.apply("SideWindowInto", Window.into(FixedWindows.of(Duration.millis(10))))
.apply(View.asMap());
PCollection<KV<String, Integer>> output =
pipeline
.apply(
"CreateMainInput",
Create.timestamped(
TimestampedValue.of("apple", new Instant(5)),
TimestampedValue.of("banana", new Instant(4)),
TimestampedValue.of("blackberry", new Instant(16))))
.apply("MainWindowInto", Window.into(FixedWindows.of(Duration.millis(10))))
.apply(
"OutputSideInputs",
ParDo.of(
new DoFn<String, KV<String, Integer>>() {
@ProcessElement
public void processElement(ProcessContext c) {
c.output(
KV.of(
c.element(),
c.sideInput(view).get(c.element().substring(0, 1))));
}
})
.withSideInputs(view));
PAssert.that(output)
.containsInAnyOrder(KV.of("apple", 1), KV.of("banana", 2), KV.of("blackberry", 3));
pipeline.run();
}
@Test
@Category(ValidatesRunner.class)
public void testEmptyMapSideInput() throws Exception {
final PCollectionView<Map<String, Integer>> view =
pipeline
.apply(
"CreateEmptyView", Create.empty(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of())))
.apply(View.asMap());
PCollection<Integer> results =
pipeline
.apply("Create1", Create.of(1))
.apply(
"OutputSideInputs",
ParDo.of(
new DoFn<Integer, Integer>() {
@ProcessElement
public void processElement(ProcessContext c) {
assertTrue(c.sideInput(view).isEmpty());
assertTrue(c.sideInput(view).entrySet().isEmpty());
assertFalse(c.sideInput(view).entrySet().iterator().hasNext());
c.output(c.element());
}
})
.withSideInputs(view));
// Pass at least one value through to guarantee that DoFn executes.
PAssert.that(results).containsInAnyOrder(1);
pipeline.run();
}
@Test
@Category({ValidatesRunner.class, DataflowPortabilityApiUnsupported.class})
public void testEmptyMapSideInputWithNonDeterministicKeyCoder() throws Exception {
final PCollectionView<Map<String, Integer>> view =
pipeline
.apply(
"CreateEmptyView",
Create.empty(KvCoder.of(new NonDeterministicStringCoder(), VarIntCoder.of())))
.apply(View.asMap());
PCollection<Integer> results =
pipeline
.apply("Create1", Create.of(1))
.apply(
"OutputSideInputs",
ParDo.of(
new DoFn<Integer, Integer>() {
@ProcessElement
public void processElement(ProcessContext c) {
assertTrue(c.sideInput(view).isEmpty());
assertTrue(c.sideInput(view).entrySet().isEmpty());
assertFalse(c.sideInput(view).entrySet().iterator().hasNext());
c.output(c.element());
}
})
.withSideInputs(view));
// Pass at least one value through to guarantee that DoFn executes.
PAssert.that(results).containsInAnyOrder(1);
pipeline.run();
}
@Test
@Category(NeedsRunner.class)
public void testMapSideInputWithNullValuesCatchesDuplicates() {
final PCollectionView<Map<String, Integer>> view =
pipeline
.apply(
"CreateSideInput",
Create.of(KV.of("a", (Integer) null), KV.of("a", (Integer) null))
.withCoder(
KvCoder.of(StringUtf8Coder.of(), NullableCoder.of(VarIntCoder.of()))))
.apply(View.asMap());
PCollection<KV<String, Integer>> output =
pipeline
.apply("CreateMainInput", Create.of("apple", "banana", "blackberry"))
.apply(
"OutputSideInputs",
ParDo.of(
new DoFn<String, KV<String, Integer>>() {
@ProcessElement
public void processElement(ProcessContext c) {
c.output(
KV.of(
c.element(),
c.sideInput(view).get(c.element().substring(0, 1))));
}
})
.withSideInputs(view));
PAssert.that(output)
.containsInAnyOrder(KV.of("apple", 1), KV.of("banana", 3), KV.of("blackberry", 3));
// PipelineExecutionException is thrown with cause having a message stating that a
// duplicate is not allowed.
thrown.expectCause(
ThrowableMessageMatcher.hasMessage(Matchers.containsString("Duplicate values for a")));
pipeline.run();
}
@Test
@Category(ValidatesRunner.class)
public void testMapSideInputIsImmutable() {
final PCollectionView<Map<String, Integer>> view =
pipeline.apply("CreateSideInput", Create.of(KV.of("a", 1))).apply(View.asMap());
PCollection<KV<String, Integer>> output =
pipeline
.apply("CreateMainInput", Create.of("apple"))
.apply(
"OutputSideInputs",
ParDo.of(
new DoFn<String, KV<String, Integer>>() {
@ProcessElement
public void processElement(ProcessContext c) {
try {
c.sideInput(view).clear();
fail("Expected UnsupportedOperationException on clear()");
} catch (UnsupportedOperationException expected) {
}
try {
c.sideInput(view).put("c", 3);
fail("Expected UnsupportedOperationException on put()");
} catch (UnsupportedOperationException expected) {
}
try {
c.sideInput(view).remove("c");
fail("Expected UnsupportedOperationException on remove()");
} catch (UnsupportedOperationException expected) {
}
try {
c.sideInput(view).putAll(new HashMap<>());
fail("Expected UnsupportedOperationException on putAll()");
} catch (UnsupportedOperationException expected) {
}
c.output(
KV.of(
c.element(),
c.sideInput(view).get(c.element().substring(0, 1))));
}
})
.withSideInputs(view));
// Pass at least one value through to guarantee that DoFn executes.
PAssert.that(output).containsInAnyOrder(KV.of("apple", 1));
pipeline.run();
}
@Test
@Category(ValidatesRunner.class)
public void testCombinedMapSideInput() {
final PCollectionView<Map<String, Integer>> view =
pipeline
.apply("CreateSideInput", Create.of(KV.of("a", 1), KV.of("a", 20), KV.of("b", 3)))
.apply("SumIntegers", Combine.perKey(Sum.ofIntegers()))
.apply(View.asMap());
PCollection<KV<String, Integer>> output =
pipeline
.apply("CreateMainInput", Create.of("apple", "banana", "blackberry"))
.apply(
"Output",
ParDo.of(
new DoFn<String, KV<String, Integer>>() {
@ProcessElement
public void processElement(ProcessContext c) {
c.output(
KV.of(
c.element(),
c.sideInput(view).get(c.element().substring(0, 1))));
}
})
.withSideInputs(view));
PAssert.that(output)
.containsInAnyOrder(KV.of("apple", 21), KV.of("banana", 3), KV.of("blackberry", 3));
pipeline.run();
}
@Test
@Category(ValidatesRunner.class)
public void testWindowedSideInputFixedToFixed() {
final PCollectionView<Integer> view =
pipeline
.apply(
"CreateSideInput",
Create.timestamped(
TimestampedValue.of(1, new Instant(1)),
TimestampedValue.of(2, new Instant(11)),
TimestampedValue.of(3, new Instant(13))))
.apply("WindowSideInput", Window.into(FixedWindows.of(Duration.millis(10))))
.apply(Sum.integersGlobally().withoutDefaults())
.apply(View.asSingleton());
PCollection<String> output =
pipeline
.apply(
"CreateMainInput",
Create.timestamped(
TimestampedValue.of("A", new Instant(4)),
TimestampedValue.of("B", new Instant(15)),
TimestampedValue.of("C", new Instant(7))))
.apply("WindowMainInput", Window.into(FixedWindows.of(Duration.millis(10))))
.apply(
"OutputMainAndSideInputs",
ParDo.of(
new DoFn<String, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
c.output(c.element() + c.sideInput(view));
}
})
.withSideInputs(view));
PAssert.that(output).containsInAnyOrder("A1", "B5", "C1");
pipeline.run();
}
@Test
@Category(ValidatesRunner.class)
public void testWindowedSideInputFixedToGlobal() {
final PCollectionView<Integer> view =
pipeline
.apply(
"CreateSideInput",
Create.timestamped(
TimestampedValue.of(1, new Instant(1)),
TimestampedValue.of(2, new Instant(11)),
TimestampedValue.of(3, new Instant(13))))
.apply("WindowSideInput", Window.into(new GlobalWindows()))
.apply(Sum.integersGlobally())
.apply(View.asSingleton());
PCollection<String> output =
pipeline
.apply(
"CreateMainInput",
Create.timestamped(
TimestampedValue.of("A", new Instant(4)),
TimestampedValue.of("B", new Instant(15)),
TimestampedValue.of("C", new Instant(7))))
.apply("WindowMainInput", Window.into(FixedWindows.of(Duration.millis(10))))
.apply(
"OutputMainAndSideInputs",
ParDo.of(
new DoFn<String, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
c.output(c.element() + c.sideInput(view));
}
})
.withSideInputs(view));
PAssert.that(output).containsInAnyOrder("A6", "B6", "C6");
pipeline.run();
}
@Test
@Category(ValidatesRunner.class)
public void testWindowedSideInputFixedToFixedWithDefault() {
final PCollectionView<Integer> view =
pipeline
.apply(
"CreateSideInput",
Create.timestamped(
TimestampedValue.of(2, new Instant(11)),
TimestampedValue.of(3, new Instant(13))))
.apply("WindowSideInput", Window.into(FixedWindows.of(Duration.millis(10))))
.apply(Sum.integersGlobally().asSingletonView());
PCollection<String> output =
pipeline
.apply(
"CreateMainInput",
Create.timestamped(
TimestampedValue.of("A", new Instant(4)),
TimestampedValue.of("B", new Instant(15)),
TimestampedValue.of("C", new Instant(7))))
.apply("WindowMainInput", Window.into(FixedWindows.of(Duration.millis(10))))
.apply(
"OutputMainAndSideInputs",
ParDo.of(
new DoFn<String, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
c.output(c.element() + c.sideInput(view));
}
})
.withSideInputs(view));
PAssert.that(output).containsInAnyOrder("A0", "B5", "C0");
pipeline.run();
}
@Test
@Category(ValidatesRunner.class)
public void testSideInputWithNullDefault() {
final PCollectionView<Void> view =
pipeline
.apply("CreateSideInput", Create.of((Void) null).withCoder(VoidCoder.of()))
.apply(Combine.globally((Iterable<Void> input) -> null).asSingletonView());
@SuppressWarnings("ObjectToString")
PCollection<String> output =
pipeline
.apply("CreateMainInput", Create.of(""))
.apply(
"OutputMainAndSideInputs",
ParDo.of(
new DoFn<String, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
c.output(c.element() + c.sideInput(view));
}
})
.withSideInputs(view));
PAssert.that(output).containsInAnyOrder("null");
pipeline.run();
}
@Test
@Category(ValidatesRunner.class)
public void testSideInputWithNestedIterables() {
final PCollectionView<Iterable<Integer>> view1 =
pipeline
.apply("CreateVoid1", Create.of((Void) null).withCoder(VoidCoder.of()))
.apply(
"OutputOneInteger",
ParDo.of(
new DoFn<Void, Integer>() {
@ProcessElement
public void processElement(ProcessContext c) {
c.output(17);
}
}))
.apply("View1", View.asIterable());
final PCollectionView<Iterable<Iterable<Integer>>> view2 =
pipeline
.apply("CreateVoid2", Create.of((Void) null).withCoder(VoidCoder.of()))
.apply(
"OutputSideInput",
ParDo.of(
new DoFn<Void, Iterable<Integer>>() {
@ProcessElement
public void processElement(ProcessContext c) {
c.output(c.sideInput(view1));
}
})
.withSideInputs(view1))
.apply("View2", View.asIterable());
PCollection<Integer> output =
pipeline
.apply("CreateVoid3", Create.of((Void) null).withCoder(VoidCoder.of()))
.apply(
"ReadIterableSideInput",
ParDo.of(
new DoFn<Void, Integer>() {
@ProcessElement
public void processElement(ProcessContext c) {
for (Iterable<Integer> input : c.sideInput(view2)) {
for (Integer i : input) {
c.output(i);
}
}
}
})
.withSideInputs(view2));
PAssert.that(output).containsInAnyOrder(17);
pipeline.run();
}
@Test
public void testViewGetName() {
assertEquals("View.AsSingleton", View.<Integer>asSingleton().getName());
assertEquals("View.AsIterable", View.<Integer>asIterable().getName());
assertEquals("View.AsMap", View.<String, Integer>asMap().getName());
assertEquals("View.AsMultimap", View.<String, Integer>asMultimap().getName());
}
private void testViewUnbounded(
Pipeline pipeline,
PTransform<PCollection<KV<String, Integer>>, ? extends PCollectionView<?>> view) {
thrown.expect(IllegalStateException.class);
thrown.expectMessage("Unable to create a side-input view from input");
thrown.expectCause(
ThrowableMessageMatcher.hasMessage(Matchers.containsString("non-bounded PCollection")));
pipeline
.apply(
new PTransform<PBegin, PCollection<KV<String, Integer>>>() {
@Override
public PCollection<KV<String, Integer>> expand(PBegin input) {
return PCollection.createPrimitiveOutputInternal(
input.getPipeline(),
WindowingStrategy.globalDefault(),
PCollection.IsBounded.UNBOUNDED,
KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()));
}
})
.apply(view);
}
private void testViewNonmerging(
Pipeline pipeline,
PTransform<PCollection<KV<String, Integer>>, ? extends PCollectionView<?>> view) {
thrown.expect(IllegalStateException.class);
thrown.expectMessage("Unable to create a side-input view from input");
thrown.expectCause(
ThrowableMessageMatcher.hasMessage(Matchers.containsString("Consumed by GroupByKey")));
pipeline
.apply(Create.of(KV.of("hello", 5)))
.apply(
Window.into(
new InvalidWindows<>(
"Consumed by GroupByKey", FixedWindows.of(Duration.standardHours(1)))))
.apply(view);
}
@Test
public void testViewUnboundedAsSingletonDirect() {
testViewUnbounded(pipeline, View.asSingleton());
}
@Test
public void testViewUnboundedAsIterableDirect() {
testViewUnbounded(pipeline, View.asIterable());
}
@Test
public void testViewUnboundedAsListDirect() {
testViewUnbounded(pipeline, View.asList());
}
@Test
public void testViewUnboundedAsMapDirect() {
testViewUnbounded(pipeline, View.asMap());
}
@Test
public void testViewUnboundedAsMultimapDirect() {
testViewUnbounded(pipeline, View.asMultimap());
}
@Test
public void testViewNonmergingAsSingletonDirect() {
testViewNonmerging(pipeline, View.asSingleton());
}
@Test
public void testViewNonmergingAsIterableDirect() {
testViewNonmerging(pipeline, View.asIterable());
}
@Test
public void testViewNonmergingAsListDirect() {
testViewNonmerging(pipeline, View.asList());
}
@Test
public void testViewNonmergingAsMapDirect() {
testViewNonmerging(pipeline, View.asMap());
}
@Test
public void testViewNonmergingAsMultimapDirect() {
testViewNonmerging(pipeline, View.asMultimap());
}
}