blob: 0645673ce32e9b054a4449af57cd49e45ba46759 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.direct.portable;
import static org.apache.beam.sdk.transforms.DoFn.ProcessContinuation.resume;
import static org.apache.beam.sdk.transforms.DoFn.ProcessContinuation.stop;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Collections;
import java.util.Set;
import org.apache.beam.runners.core.construction.JavaReadViaImpulse;
import org.apache.beam.runners.core.construction.PTransformMatchers;
import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
import org.apache.beam.runners.core.construction.PipelineTranslation;
import org.apache.beam.runners.direct.ParDoMultiOverrideFactory;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.runners.PTransformOverride;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Reshuffle;
import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableSet;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
/** Tests for the {@link ReferenceRunner}. */
@RunWith(JUnit4.class)
public class ReferenceRunnerTest implements Serializable {
@Test
public void pipelineExecution() throws Exception {
Pipeline p = Pipeline.create();
TupleTag<KV<String, Integer>> food = new TupleTag<>();
TupleTag<Integer> originals = new TupleTag<Integer>() {};
PCollectionTuple parDoOutputs =
p.apply(Create.of(1, 2, 3))
.apply(
ParDo.of(
new DoFn<Integer, KV<String, Integer>>() {
@ProcessElement
public void process(@Element Integer e, MultiOutputReceiver r) {
for (int i = 0; i < e; i++) {
r.get(food)
.outputWithTimestamp(
KV.of("foo", e),
new Instant(0).plus(Duration.standardHours(i)));
}
r.get(originals).output(e);
}
})
.withOutputTags(food, TupleTagList.of(originals)));
FixedWindows windowFn = FixedWindows.of(Duration.standardMinutes(5L));
PCollection<KV<String, Set<Integer>>> grouped =
parDoOutputs
.get(food)
.apply(Window.into(windowFn))
.apply(GroupByKey.create())
.apply(
ParDo.of(
new DoFn<KV<String, Iterable<Integer>>, KV<String, Set<Integer>>>() {
@ProcessElement
public void process(
@Element KV<String, Iterable<Integer>> e,
OutputReceiver<KV<String, Set<Integer>>> r) {
r.output(KV.of(e.getKey(), ImmutableSet.copyOf(e.getValue())));
}
}));
PAssert.that(grouped)
.containsInAnyOrder(
KV.of("foo", ImmutableSet.of(1, 2, 3)),
KV.of("foo", ImmutableSet.of(2, 3)),
KV.of("foo", ImmutableSet.of(3)));
p.replaceAll(Collections.singletonList(JavaReadViaImpulse.boundedOverride()));
ReferenceRunner runner =
ReferenceRunner.forInProcessPipeline(
PipelineTranslation.toProto(p),
PipelineOptionsTranslation.toProto(PipelineOptionsFactory.create()));
runner.execute();
}
@Test
public void testGBK() throws Exception {
Pipeline p = Pipeline.create();
PAssert.that(
p.apply(Create.of(KV.of(42, 0), KV.of(42, 1), KV.of(42, 2)))
// Will create one bundle for each value, since direct runner uses 1 bundle per key
.apply(Reshuffle.viaRandomKey())
// Multiple bundles will emit values onto the same key 42.
// They must be processed sequentially rather than in parallel, since
// the trigger firing code expects to receive values sequentially for a key.
.apply(GroupByKey.create()))
.satisfies(
input -> {
KV<Integer, Iterable<Integer>> kv = Iterables.getOnlyElement(input);
assertEquals(42, kv.getKey().intValue());
assertThat(kv.getValue(), containsInAnyOrder(0, 1, 2));
return null;
});
p.replaceAll(Collections.singletonList(JavaReadViaImpulse.boundedOverride()));
ReferenceRunner runner =
ReferenceRunner.forInProcessPipeline(
PipelineTranslation.toProto(p),
PipelineOptionsTranslation.toProto(PipelineOptionsFactory.create()));
runner.execute();
}
static class PairStringWithIndexToLength extends DoFn<String, KV<String, Integer>> {
@ProcessElement
public ProcessContinuation process(ProcessContext c, OffsetRangeTracker tracker) {
for (long i = tracker.currentRestriction().getFrom(), numIterations = 0;
tracker.tryClaim(i);
++i, ++numIterations) {
c.output(KV.of(c.element(), (int) i));
if (numIterations % 3 == 0) {
return resume();
}
}
return stop();
}
@GetInitialRestriction
public OffsetRange getInitialRange(String element) {
return new OffsetRange(0, element.length());
}
@SplitRestriction
public void splitRange(
String element, OffsetRange range, OutputReceiver<OffsetRange> receiver) {
long middle = (range.getFrom() + range.getTo()) / 2;
receiver.output(new OffsetRange(range.getFrom(), middle));
receiver.output(new OffsetRange(middle, range.getTo()));
}
}
@Test
@Ignore("TODO: BEAM-3743")
public void testSDF() throws Exception {
Pipeline p = Pipeline.create();
PCollection<KV<String, Integer>> res =
p.apply(Create.of("a", "bb", "ccccc"))
.apply(ParDo.of(new PairStringWithIndexToLength()))
.setCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()));
PAssert.that(res)
.containsInAnyOrder(
Arrays.asList(
KV.of("a", 0),
KV.of("bb", 0),
KV.of("bb", 1),
KV.of("ccccc", 0),
KV.of("ccccc", 1),
KV.of("ccccc", 2),
KV.of("ccccc", 3),
KV.of("ccccc", 4)));
p.replaceAll(
Arrays.asList(
JavaReadViaImpulse.boundedOverride(),
PTransformOverride.of(
PTransformMatchers.splittableParDo(), new ParDoMultiOverrideFactory())));
ReferenceRunner runner =
ReferenceRunner.forInProcessPipeline(
PipelineTranslation.toProto(p),
PipelineOptionsTranslation.toProto(PipelineOptionsFactory.create()));
runner.execute();
}
}