blob: 7f4ebda054b9669df379dd5daec21d0b0e329c10 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.core.construction;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertThat;
import java.io.Serializable;
import java.util.Collections;
import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.io.DefaultFilenamePolicy;
import org.apache.beam.sdk.io.DynamicFileDestinations;
import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
import org.apache.beam.sdk.io.LocalResources;
import org.apache.beam.sdk.io.WriteFiles;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.runners.PTransformMatcher;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.state.Timer;
import org.apache.beam.sdk.state.TimerSpec;
import org.apache.beam.sdk.state.TimerSpecs;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
import org.apache.beam.sdk.transforms.ViewFn;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollection.IsBounded;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PCollectionViews;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
/** Tests for {@link PTransformMatcher}. */
@RunWith(JUnit4.class)
public class PTransformMatchersTest implements Serializable {
@Rule
public transient TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);
/**
* Gets the {@link AppliedPTransform} that has a created {@code PCollection<KV<String, Integer>>}
* as input.
*/
private AppliedPTransform<?, ?, ?> getAppliedTransform(PTransform pardo) {
PCollection<KV<String, Integer>> input =
PCollection.createPrimitiveOutputInternal(
p,
WindowingStrategy.globalDefault(),
IsBounded.BOUNDED,
KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()));
input.setName("dummy input");
PCollection<Integer> output =
PCollection.createPrimitiveOutputInternal(
p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarIntCoder.of());
output.setName("dummy output");
return AppliedPTransform.of("pardo", input.expand(), output.expand(), pardo, p);
}
@Test
public void classEqualToMatchesSameClass() {
PTransformMatcher matcher = PTransformMatchers.classEqualTo(ParDo.SingleOutput.class);
AppliedPTransform<?, ?, ?> application =
getAppliedTransform(
ParDo.of(
new DoFn<KV<String, Integer>, Integer>() {
@ProcessElement
public void doStuff(ProcessContext ctxt) {}
}));
assertThat(matcher.matches(application), is(true));
}
@Test
public void classEqualToDoesNotMatchSubclass() {
class MyPTransform extends PTransform<PCollection<KV<String, Integer>>, PCollection<Integer>> {
@Override
public PCollection<Integer> expand(PCollection<KV<String, Integer>> input) {
return PCollection.createPrimitiveOutputInternal(
input.getPipeline(), input.getWindowingStrategy(), input.isBounded(), VarIntCoder.of());
}
}
PTransformMatcher matcher = PTransformMatchers.classEqualTo(MyPTransform.class);
MyPTransform subclass = new MyPTransform() {};
assertThat(subclass.getClass(), not(Matchers.<Class<?>>equalTo(MyPTransform.class)));
assertThat(subclass, instanceOf(MyPTransform.class));
AppliedPTransform<?, ?, ?> application = getAppliedTransform(subclass);
assertThat(matcher.matches(application), is(false));
}
@Test
public void classEqualToDoesNotMatchUnrelatedClass() {
PTransformMatcher matcher = PTransformMatchers.classEqualTo(ParDo.SingleOutput.class);
AppliedPTransform<?, ?, ?> application =
getAppliedTransform(Window.<KV<String, Integer>>into(new GlobalWindows()));
assertThat(matcher.matches(application), is(false));
}
private DoFn<KV<String, Integer>, Integer> doFn =
new DoFn<KV<String, Integer>, Integer>() {
@ProcessElement
public void simpleProcess(ProcessContext ctxt) {
ctxt.output(ctxt.element().getValue() + 1);
}
};
private abstract static class SomeTracker extends RestrictionTracker<Void, Void> {}
private DoFn<KV<String, Integer>, Integer> splittableDoFn =
new DoFn<KV<String, Integer>, Integer>() {
@ProcessElement
public void processElement(
ProcessContext context, RestrictionTracker<Void, Void> tracker) {}
@GetInitialRestriction
public Void getInitialRestriction(KV<String, Integer> element) {
return null;
}
@NewTracker
public SomeTracker newTracker(Void restriction) {
return null;
}
};
private DoFn<KV<String, Integer>, Integer> doFnWithState =
new DoFn<KV<String, Integer>, Integer>() {
private final String stateId = "mystate";
@StateId(stateId)
private final StateSpec<ValueState<Integer>> intState = StateSpecs.value(VarIntCoder.of());
@ProcessElement
public void processElement(ProcessContext c, @StateId(stateId) ValueState<Integer> state) {
Integer currentValue = MoreObjects.firstNonNull(state.read(), 0);
c.output(currentValue);
state.write(currentValue + 1);
}
};
private DoFn<KV<String, Integer>, Integer> doFnWithTimers =
new DoFn<KV<String, Integer>, Integer>() {
private final String timerId = "myTimer";
@TimerId(timerId)
private final TimerSpec spec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
@ProcessElement
public void processElement(ProcessContext context, @TimerId(timerId) Timer timer) {
timer.offset(Duration.standardSeconds(1)).setRelative();
context.output(3);
}
@OnTimer(timerId)
public void onTimer(OnTimerContext context) {
context.output(42);
}
};
/** Demonstrates that a {@link ParDo.SingleOutput} does not match any ParDo matcher. */
@Test
public void parDoSingle() {
AppliedPTransform<?, ?, ?> parDoApplication = getAppliedTransform(ParDo.of(doFn));
assertThat(PTransformMatchers.splittableParDoMulti().matches(parDoApplication), is(false));
assertThat(PTransformMatchers.stateOrTimerParDoMulti().matches(parDoApplication), is(false));
assertThat(PTransformMatchers.splittableParDoSingle().matches(parDoApplication), is(false));
assertThat(PTransformMatchers.stateOrTimerParDoSingle().matches(parDoApplication), is(false));
}
@Test
public void parDoSingleSplittable() {
AppliedPTransform<?, ?, ?> parDoApplication = getAppliedTransform(ParDo.of(splittableDoFn));
assertThat(PTransformMatchers.splittableParDoSingle().matches(parDoApplication), is(true));
assertThat(PTransformMatchers.splittableParDoMulti().matches(parDoApplication), is(false));
assertThat(PTransformMatchers.stateOrTimerParDoMulti().matches(parDoApplication), is(false));
assertThat(PTransformMatchers.stateOrTimerParDoSingle().matches(parDoApplication), is(false));
}
@Test
public void parDoSingleWithState() {
AppliedPTransform<?, ?, ?> parDoApplication = getAppliedTransform(ParDo.of(doFnWithState));
assertThat(PTransformMatchers.stateOrTimerParDoSingle().matches(parDoApplication), is(true));
assertThat(PTransformMatchers.splittableParDoMulti().matches(parDoApplication), is(false));
assertThat(PTransformMatchers.stateOrTimerParDoMulti().matches(parDoApplication), is(false));
assertThat(PTransformMatchers.splittableParDoSingle().matches(parDoApplication), is(false));
}
@Test
public void parDoSingleWithTimers() {
AppliedPTransform<?, ?, ?> parDoApplication = getAppliedTransform(ParDo.of(doFnWithTimers));
assertThat(PTransformMatchers.stateOrTimerParDoSingle().matches(parDoApplication), is(true));
assertThat(PTransformMatchers.splittableParDoMulti().matches(parDoApplication), is(false));
assertThat(PTransformMatchers.stateOrTimerParDoMulti().matches(parDoApplication), is(false));
assertThat(PTransformMatchers.splittableParDoSingle().matches(parDoApplication), is(false));
}
@Test
public void parDoMulti() {
AppliedPTransform<?, ?, ?> parDoApplication =
getAppliedTransform(ParDo.of(doFn).withOutputTags(new TupleTag<>(), TupleTagList.empty()));
assertThat(PTransformMatchers.splittableParDoMulti().matches(parDoApplication), is(false));
assertThat(PTransformMatchers.stateOrTimerParDoMulti().matches(parDoApplication), is(false));
assertThat(PTransformMatchers.splittableParDoSingle().matches(parDoApplication), is(false));
assertThat(PTransformMatchers.stateOrTimerParDoSingle().matches(parDoApplication), is(false));
}
@Test
public void parDoMultiSplittable() {
AppliedPTransform<?, ?, ?> parDoApplication =
getAppliedTransform(
ParDo.of(splittableDoFn).withOutputTags(new TupleTag<>(), TupleTagList.empty()));
assertThat(PTransformMatchers.splittableParDoMulti().matches(parDoApplication), is(true));
assertThat(PTransformMatchers.stateOrTimerParDoMulti().matches(parDoApplication), is(false));
assertThat(PTransformMatchers.splittableParDoSingle().matches(parDoApplication), is(false));
assertThat(PTransformMatchers.stateOrTimerParDoSingle().matches(parDoApplication), is(false));
}
@Test
public void parDoSplittable() {
AppliedPTransform<?, ?, ?> parDoApplication =
getAppliedTransform(
ParDo.of(splittableDoFn).withOutputTags(new TupleTag<>(), TupleTagList.empty()));
assertThat(PTransformMatchers.splittableParDo().matches(parDoApplication), is(true));
assertThat(PTransformMatchers.stateOrTimerParDoMulti().matches(parDoApplication), is(false));
assertThat(PTransformMatchers.splittableParDoSingle().matches(parDoApplication), is(false));
assertThat(PTransformMatchers.stateOrTimerParDoSingle().matches(parDoApplication), is(false));
}
@Test
public void parDoMultiWithState() {
AppliedPTransform<?, ?, ?> parDoApplication =
getAppliedTransform(
ParDo.of(doFnWithState).withOutputTags(new TupleTag<>(), TupleTagList.empty()));
assertThat(PTransformMatchers.stateOrTimerParDoMulti().matches(parDoApplication), is(true));
assertThat(PTransformMatchers.splittableParDoMulti().matches(parDoApplication), is(false));
assertThat(PTransformMatchers.splittableParDoSingle().matches(parDoApplication), is(false));
assertThat(PTransformMatchers.stateOrTimerParDoSingle().matches(parDoApplication), is(false));
}
@Test
public void parDoWithState() {
AppliedPTransform<?, ?, ?> statefulApplication =
getAppliedTransform(
ParDo.of(doFnWithState).withOutputTags(new TupleTag<>(), TupleTagList.empty()));
assertThat(PTransformMatchers.stateOrTimerParDo().matches(statefulApplication), is(true));
AppliedPTransform<?, ?, ?> splittableApplication =
getAppliedTransform(
ParDo.of(splittableDoFn).withOutputTags(new TupleTag<>(), TupleTagList.empty()));
assertThat(PTransformMatchers.stateOrTimerParDo().matches(splittableApplication), is(false));
}
@Test
public void parDoMultiWithTimers() {
AppliedPTransform<?, ?, ?> parDoApplication =
getAppliedTransform(
ParDo.of(doFnWithTimers).withOutputTags(new TupleTag<>(), TupleTagList.empty()));
assertThat(PTransformMatchers.stateOrTimerParDoMulti().matches(parDoApplication), is(true));
assertThat(PTransformMatchers.splittableParDoMulti().matches(parDoApplication), is(false));
assertThat(PTransformMatchers.splittableParDoSingle().matches(parDoApplication), is(false));
assertThat(PTransformMatchers.stateOrTimerParDoSingle().matches(parDoApplication), is(false));
}
@Test
public void parDoRequiresStableInput() {
DoFn<Object, Object> doFnRSI =
new DoFn<Object, Object>() {
@RequiresStableInput
@ProcessElement
public void process(ProcessContext ctxt) {}
};
AppliedPTransform<?, ?, ?> single = getAppliedTransform(ParDo.of(doFn));
AppliedPTransform<?, ?, ?> singleRSI = getAppliedTransform(ParDo.of(doFnRSI));
AppliedPTransform<?, ?, ?> multi =
getAppliedTransform(ParDo.of(doFn).withOutputTags(new TupleTag<>(), TupleTagList.empty()));
AppliedPTransform<?, ?, ?> multiRSI =
getAppliedTransform(
ParDo.of(doFnRSI).withOutputTags(new TupleTag<>(), TupleTagList.empty()));
assertThat(PTransformMatchers.requiresStableInputParDoSingle().matches(single), is(false));
assertThat(PTransformMatchers.requiresStableInputParDoSingle().matches(singleRSI), is(true));
assertThat(PTransformMatchers.requiresStableInputParDoSingle().matches(multi), is(false));
assertThat(PTransformMatchers.requiresStableInputParDoSingle().matches(multiRSI), is(false));
assertThat(PTransformMatchers.requiresStableInputParDoMulti().matches(single), is(false));
assertThat(PTransformMatchers.requiresStableInputParDoMulti().matches(singleRSI), is(false));
assertThat(PTransformMatchers.requiresStableInputParDoMulti().matches(multi), is(false));
assertThat(PTransformMatchers.requiresStableInputParDoMulti().matches(multiRSI), is(true));
}
@Test
public void parDoWithFnTypeWithMatchingType() {
DoFn<Object, Object> fn =
new DoFn<Object, Object>() {
@ProcessElement
public void process(ProcessContext ctxt) {}
};
AppliedPTransform<?, ?, ?> parDoSingle = getAppliedTransform(ParDo.of(fn));
AppliedPTransform<?, ?, ?> parDoMulti =
getAppliedTransform(ParDo.of(fn).withOutputTags(new TupleTag<>(), TupleTagList.empty()));
PTransformMatcher matcher = PTransformMatchers.parDoWithFnType(fn.getClass());
assertThat(matcher.matches(parDoSingle), is(true));
assertThat(matcher.matches(parDoMulti), is(true));
}
@Test
public void parDoWithFnTypeWithNoMatch() {
DoFn<Object, Object> fn =
new DoFn<Object, Object>() {
@ProcessElement
public void process(ProcessContext ctxt) {}
};
AppliedPTransform<?, ?, ?> parDoSingle = getAppliedTransform(ParDo.of(fn));
AppliedPTransform<?, ?, ?> parDoMulti =
getAppliedTransform(ParDo.of(fn).withOutputTags(new TupleTag<>(), TupleTagList.empty()));
PTransformMatcher matcher = PTransformMatchers.parDoWithFnType(doFnWithState.getClass());
assertThat(matcher.matches(parDoSingle), is(false));
assertThat(matcher.matches(parDoMulti), is(false));
}
@Test
public void parDoWithFnTypeNotParDo() {
AppliedPTransform<?, ?, ?> notParDo = getAppliedTransform(Create.empty(VoidCoder.of()));
PTransformMatcher matcher = PTransformMatchers.parDoWithFnType(doFnWithState.getClass());
assertThat(matcher.matches(notParDo), is(false));
}
@Test
public void createViewWithViewFn() {
PCollection<Integer> input = p.apply(Create.of(1));
PCollectionView<Iterable<Integer>> view = input.apply(View.asIterable());
ViewFn<?, ?> viewFn = view.getViewFn();
CreatePCollectionView<?, ?> createView = CreatePCollectionView.of(view);
PTransformMatcher matcher = PTransformMatchers.createViewWithViewFn(viewFn.getClass());
assertThat(matcher.matches(getAppliedTransform(createView)), is(true));
}
@Test
public void createViewWithViewFnDifferentViewFn() {
PCollection<Integer> input = p.apply(Create.of(1));
PCollectionView<Iterable<Integer>> view = input.apply(View.asIterable());
// Purposely create a subclass to get a different class then what was expected.
ViewFn<?, ?> viewFn = new PCollectionViews.IterableViewFn() {};
CreatePCollectionView<?, ?> createView = CreatePCollectionView.of(view);
PTransformMatcher matcher = PTransformMatchers.createViewWithViewFn(viewFn.getClass());
assertThat(matcher.matches(getAppliedTransform(createView)), is(false));
}
@Test
public void createViewWithViewFnNotCreatePCollectionView() {
PCollection<Integer> input = p.apply(Create.of(1));
PCollectionView<Iterable<Integer>> view = input.apply(View.asIterable());
PTransformMatcher matcher =
PTransformMatchers.createViewWithViewFn(view.getViewFn().getClass());
assertThat(matcher.matches(getAppliedTransform(View.asIterable())), is(false));
}
@Test
public void emptyFlattenWithEmptyFlatten() {
AppliedPTransform application =
AppliedPTransform.of(
"EmptyFlatten",
Collections.emptyMap(),
Collections.singletonMap(
new TupleTag<Integer>(),
PCollection.createPrimitiveOutputInternal(
p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarIntCoder.of())),
Flatten.pCollections(),
p);
assertThat(PTransformMatchers.emptyFlatten().matches(application), is(true));
}
@Test
public void emptyFlattenWithNonEmptyFlatten() {
AppliedPTransform application =
AppliedPTransform.of(
"Flatten",
Collections.singletonMap(
new TupleTag<Integer>(),
PCollection.createPrimitiveOutputInternal(
p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarIntCoder.of())),
Collections.singletonMap(
new TupleTag<Integer>(),
PCollection.createPrimitiveOutputInternal(
p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarIntCoder.of())),
Flatten.pCollections(),
p);
assertThat(PTransformMatchers.emptyFlatten().matches(application), is(false));
}
@Test
public void emptyFlattenWithNonFlatten() {
AppliedPTransform application =
AppliedPTransform
.<PCollection<Iterable<Integer>>, PCollection<Integer>, Flatten.Iterables<Integer>>of(
"EmptyFlatten",
Collections.emptyMap(),
Collections.singletonMap(
new TupleTag<Integer>(),
PCollection.createPrimitiveOutputInternal(
p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarIntCoder.of())),
/* This isn't actually possible to construct, but for the sake of example */
Flatten.iterables(),
p);
assertThat(PTransformMatchers.emptyFlatten().matches(application), is(false));
}
@Test
public void flattenWithDuplicateInputsWithoutDuplicates() {
AppliedPTransform application =
AppliedPTransform.of(
"Flatten",
Collections.singletonMap(
new TupleTag<Integer>(),
PCollection.createPrimitiveOutputInternal(
p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarIntCoder.of())),
Collections.singletonMap(
new TupleTag<Integer>(),
PCollection.createPrimitiveOutputInternal(
p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarIntCoder.of())),
Flatten.pCollections(),
p);
assertThat(PTransformMatchers.flattenWithDuplicateInputs().matches(application), is(false));
}
@Test
public void flattenWithDuplicateInputsWithDuplicates() {
PCollection<Integer> duplicate =
PCollection.createPrimitiveOutputInternal(
p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarIntCoder.of());
AppliedPTransform application =
AppliedPTransform.of(
"Flatten",
ImmutableMap.<TupleTag<?>, PValue>builder()
.put(new TupleTag<Integer>(), duplicate)
.put(new TupleTag<Integer>(), duplicate)
.build(),
Collections.singletonMap(
new TupleTag<Integer>(),
PCollection.createPrimitiveOutputInternal(
p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarIntCoder.of())),
Flatten.pCollections(),
p);
assertThat(PTransformMatchers.flattenWithDuplicateInputs().matches(application), is(true));
}
@Test
public void flattenWithDuplicateInputsNonFlatten() {
AppliedPTransform application =
AppliedPTransform
.<PCollection<Iterable<Integer>>, PCollection<Integer>, Flatten.Iterables<Integer>>of(
"EmptyFlatten",
Collections.emptyMap(),
Collections.singletonMap(
new TupleTag<Integer>(),
PCollection.createPrimitiveOutputInternal(
p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarIntCoder.of())),
/* This isn't actually possible to construct, but for the sake of example */
Flatten.iterables(),
p);
assertThat(PTransformMatchers.flattenWithDuplicateInputs().matches(application), is(false));
}
@Test
public void writeWithRunnerDeterminedSharding() {
ResourceId outputDirectory = LocalResources.fromString("/foo/bar", true /* isDirectory */);
FilenamePolicy policy =
DefaultFilenamePolicy.fromStandardParameters(
StaticValueProvider.of(outputDirectory),
DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE,
"",
false);
WriteFiles<Integer, Void, Integer> write =
WriteFiles.to(
new FileBasedSink<Integer, Void, Integer>(
StaticValueProvider.of(outputDirectory), DynamicFileDestinations.constant(policy)) {
@Override
public WriteOperation<Void, Integer> createWriteOperation() {
return null;
}
});
assertThat(
PTransformMatchers.writeWithRunnerDeterminedSharding().matches(appliedWrite(write)),
is(true));
WriteFiles<Integer, Void, Integer> withStaticSharding = write.withNumShards(3);
assertThat(
PTransformMatchers.writeWithRunnerDeterminedSharding()
.matches(appliedWrite(withStaticSharding)),
is(false));
WriteFiles<Integer, Void, Integer> withCustomSharding =
write.withSharding(Sum.integersGlobally().asSingletonView());
assertThat(
PTransformMatchers.writeWithRunnerDeterminedSharding()
.matches(appliedWrite(withCustomSharding)),
is(false));
}
private AppliedPTransform<?, ?, ?> appliedWrite(WriteFiles<Integer, Void, Integer> write) {
return AppliedPTransform.of(
"WriteFiles", Collections.emptyMap(), Collections.emptyMap(), write, p);
}
private static class FakeFilenamePolicy extends FilenamePolicy {
@Override
public ResourceId windowedFilename(
int shardNumber,
int numShards,
BoundedWindow window,
PaneInfo paneInfo,
FileBasedSink.OutputFileHints outputFileHints) {
throw new UnsupportedOperationException("should not be called");
}
@Nullable
@Override
public ResourceId unwindowedFilename(
int shardNumber, int numShards, FileBasedSink.OutputFileHints outputFileHints) {
throw new UnsupportedOperationException("should not be called");
}
}
}