/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.beam.runners.core.construction;

import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertThat;

import java.io.Serializable;
import java.util.Collections;
import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.io.DefaultFilenamePolicy;
import org.apache.beam.sdk.io.DynamicFileDestinations;
import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
import org.apache.beam.sdk.io.LocalResources;
import org.apache.beam.sdk.io.WriteFiles;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.runners.PTransformMatcher;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.state.Timer;
import org.apache.beam.sdk.state.TimerSpec;
import org.apache.beam.sdk.state.TimerSpecs;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
import org.apache.beam.sdk.transforms.ViewFn;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollection.IsBounded;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PCollectionViews;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

/** Tests for {@link PTransformMatcher}. */
@RunWith(JUnit4.class)
public class PTransformMatchersTest implements Serializable {
  @Rule
  public transient TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);

  /**
   * Gets the {@link AppliedPTransform} that has a created {@code PCollection<KV<String, Integer>>}
   * as input.
   */
  private AppliedPTransform<?, ?, ?> getAppliedTransform(PTransform pardo) {
    PCollection<KV<String, Integer>> input =
        PCollection.createPrimitiveOutputInternal(
            p,
            WindowingStrategy.globalDefault(),
            IsBounded.BOUNDED,
            KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()));
    input.setName("dummy input");

    PCollection<Integer> output =
        PCollection.createPrimitiveOutputInternal(
            p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarIntCoder.of());
    output.setName("dummy output");

    return AppliedPTransform.of("pardo", input.expand(), output.expand(), pardo, p);
  }

  @Test
  public void classEqualToMatchesSameClass() {
    PTransformMatcher matcher = PTransformMatchers.classEqualTo(ParDo.SingleOutput.class);
    AppliedPTransform<?, ?, ?> application =
        getAppliedTransform(
            ParDo.of(
                new DoFn<KV<String, Integer>, Integer>() {
                  @ProcessElement
                  public void doStuff(ProcessContext ctxt) {}
                }));

    assertThat(matcher.matches(application), is(true));
  }

  @Test
  public void classEqualToDoesNotMatchSubclass() {
    class MyPTransform extends PTransform<PCollection<KV<String, Integer>>, PCollection<Integer>> {
      @Override
      public PCollection<Integer> expand(PCollection<KV<String, Integer>> input) {
        return PCollection.createPrimitiveOutputInternal(
            input.getPipeline(), input.getWindowingStrategy(), input.isBounded(), VarIntCoder.of());
      }
    }

    PTransformMatcher matcher = PTransformMatchers.classEqualTo(MyPTransform.class);
    MyPTransform subclass = new MyPTransform() {};

    assertThat(subclass.getClass(), not(Matchers.<Class<?>>equalTo(MyPTransform.class)));
    assertThat(subclass, instanceOf(MyPTransform.class));

    AppliedPTransform<?, ?, ?> application = getAppliedTransform(subclass);

    assertThat(matcher.matches(application), is(false));
  }

  @Test
  public void classEqualToDoesNotMatchUnrelatedClass() {
    PTransformMatcher matcher = PTransformMatchers.classEqualTo(ParDo.SingleOutput.class);
    AppliedPTransform<?, ?, ?> application =
        getAppliedTransform(Window.<KV<String, Integer>>into(new GlobalWindows()));

    assertThat(matcher.matches(application), is(false));
  }

  private DoFn<KV<String, Integer>, Integer> doFn =
      new DoFn<KV<String, Integer>, Integer>() {
        @ProcessElement
        public void simpleProcess(ProcessContext ctxt) {
          ctxt.output(ctxt.element().getValue() + 1);
        }
      };

  private abstract static class SomeTracker extends RestrictionTracker<Void, Void> {}

  private DoFn<KV<String, Integer>, Integer> splittableDoFn =
      new DoFn<KV<String, Integer>, Integer>() {
        @ProcessElement
        public void processElement(
            ProcessContext context, RestrictionTracker<Void, Void> tracker) {}

        @GetInitialRestriction
        public Void getInitialRestriction(KV<String, Integer> element) {
          return null;
        }

        @NewTracker
        public SomeTracker newTracker(Void restriction) {
          return null;
        }
      };
  private DoFn<KV<String, Integer>, Integer> doFnWithState =
      new DoFn<KV<String, Integer>, Integer>() {
        private final String stateId = "mystate";

        @StateId(stateId)
        private final StateSpec<ValueState<Integer>> intState = StateSpecs.value(VarIntCoder.of());

        @ProcessElement
        public void processElement(ProcessContext c, @StateId(stateId) ValueState<Integer> state) {
          Integer currentValue = MoreObjects.firstNonNull(state.read(), 0);
          c.output(currentValue);
          state.write(currentValue + 1);
        }
      };
  private DoFn<KV<String, Integer>, Integer> doFnWithTimers =
      new DoFn<KV<String, Integer>, Integer>() {
        private final String timerId = "myTimer";

        @TimerId(timerId)
        private final TimerSpec spec = TimerSpecs.timer(TimeDomain.EVENT_TIME);

        @ProcessElement
        public void processElement(ProcessContext context, @TimerId(timerId) Timer timer) {
          timer.offset(Duration.standardSeconds(1)).setRelative();
          context.output(3);
        }

        @OnTimer(timerId)
        public void onTimer(OnTimerContext context) {
          context.output(42);
        }
      };

  /** Demonstrates that a {@link ParDo.SingleOutput} does not match any ParDo matcher. */
  @Test
  public void parDoSingle() {
    AppliedPTransform<?, ?, ?> parDoApplication = getAppliedTransform(ParDo.of(doFn));

    assertThat(PTransformMatchers.splittableParDoMulti().matches(parDoApplication), is(false));
    assertThat(PTransformMatchers.stateOrTimerParDoMulti().matches(parDoApplication), is(false));
    assertThat(PTransformMatchers.splittableParDoSingle().matches(parDoApplication), is(false));
    assertThat(PTransformMatchers.stateOrTimerParDoSingle().matches(parDoApplication), is(false));
  }

  @Test
  public void parDoSingleSplittable() {
    AppliedPTransform<?, ?, ?> parDoApplication = getAppliedTransform(ParDo.of(splittableDoFn));
    assertThat(PTransformMatchers.splittableParDoSingle().matches(parDoApplication), is(true));

    assertThat(PTransformMatchers.splittableParDoMulti().matches(parDoApplication), is(false));
    assertThat(PTransformMatchers.stateOrTimerParDoMulti().matches(parDoApplication), is(false));
    assertThat(PTransformMatchers.stateOrTimerParDoSingle().matches(parDoApplication), is(false));
  }

  @Test
  public void parDoSingleWithState() {
    AppliedPTransform<?, ?, ?> parDoApplication = getAppliedTransform(ParDo.of(doFnWithState));
    assertThat(PTransformMatchers.stateOrTimerParDoSingle().matches(parDoApplication), is(true));

    assertThat(PTransformMatchers.splittableParDoMulti().matches(parDoApplication), is(false));
    assertThat(PTransformMatchers.stateOrTimerParDoMulti().matches(parDoApplication), is(false));
    assertThat(PTransformMatchers.splittableParDoSingle().matches(parDoApplication), is(false));
  }

  @Test
  public void parDoSingleWithTimers() {
    AppliedPTransform<?, ?, ?> parDoApplication = getAppliedTransform(ParDo.of(doFnWithTimers));
    assertThat(PTransformMatchers.stateOrTimerParDoSingle().matches(parDoApplication), is(true));

    assertThat(PTransformMatchers.splittableParDoMulti().matches(parDoApplication), is(false));
    assertThat(PTransformMatchers.stateOrTimerParDoMulti().matches(parDoApplication), is(false));
    assertThat(PTransformMatchers.splittableParDoSingle().matches(parDoApplication), is(false));
  }

  @Test
  public void parDoMulti() {
    AppliedPTransform<?, ?, ?> parDoApplication =
        getAppliedTransform(ParDo.of(doFn).withOutputTags(new TupleTag<>(), TupleTagList.empty()));

    assertThat(PTransformMatchers.splittableParDoMulti().matches(parDoApplication), is(false));
    assertThat(PTransformMatchers.stateOrTimerParDoMulti().matches(parDoApplication), is(false));
    assertThat(PTransformMatchers.splittableParDoSingle().matches(parDoApplication), is(false));
    assertThat(PTransformMatchers.stateOrTimerParDoSingle().matches(parDoApplication), is(false));
  }

  @Test
  public void parDoMultiSplittable() {
    AppliedPTransform<?, ?, ?> parDoApplication =
        getAppliedTransform(
            ParDo.of(splittableDoFn).withOutputTags(new TupleTag<>(), TupleTagList.empty()));
    assertThat(PTransformMatchers.splittableParDoMulti().matches(parDoApplication), is(true));

    assertThat(PTransformMatchers.stateOrTimerParDoMulti().matches(parDoApplication), is(false));
    assertThat(PTransformMatchers.splittableParDoSingle().matches(parDoApplication), is(false));
    assertThat(PTransformMatchers.stateOrTimerParDoSingle().matches(parDoApplication), is(false));
  }

  @Test
  public void parDoSplittable() {
    AppliedPTransform<?, ?, ?> parDoApplication =
        getAppliedTransform(
            ParDo.of(splittableDoFn).withOutputTags(new TupleTag<>(), TupleTagList.empty()));
    assertThat(PTransformMatchers.splittableParDo().matches(parDoApplication), is(true));

    assertThat(PTransformMatchers.stateOrTimerParDoMulti().matches(parDoApplication), is(false));
    assertThat(PTransformMatchers.splittableParDoSingle().matches(parDoApplication), is(false));
    assertThat(PTransformMatchers.stateOrTimerParDoSingle().matches(parDoApplication), is(false));
  }

  @Test
  public void parDoMultiWithState() {
    AppliedPTransform<?, ?, ?> parDoApplication =
        getAppliedTransform(
            ParDo.of(doFnWithState).withOutputTags(new TupleTag<>(), TupleTagList.empty()));
    assertThat(PTransformMatchers.stateOrTimerParDoMulti().matches(parDoApplication), is(true));

    assertThat(PTransformMatchers.splittableParDoMulti().matches(parDoApplication), is(false));
    assertThat(PTransformMatchers.splittableParDoSingle().matches(parDoApplication), is(false));
    assertThat(PTransformMatchers.stateOrTimerParDoSingle().matches(parDoApplication), is(false));
  }

  @Test
  public void parDoWithState() {
    AppliedPTransform<?, ?, ?> statefulApplication =
        getAppliedTransform(
            ParDo.of(doFnWithState).withOutputTags(new TupleTag<>(), TupleTagList.empty()));
    assertThat(PTransformMatchers.stateOrTimerParDo().matches(statefulApplication), is(true));

    AppliedPTransform<?, ?, ?> splittableApplication =
        getAppliedTransform(
            ParDo.of(splittableDoFn).withOutputTags(new TupleTag<>(), TupleTagList.empty()));
    assertThat(PTransformMatchers.stateOrTimerParDo().matches(splittableApplication), is(false));
  }

  @Test
  public void parDoMultiWithTimers() {
    AppliedPTransform<?, ?, ?> parDoApplication =
        getAppliedTransform(
            ParDo.of(doFnWithTimers).withOutputTags(new TupleTag<>(), TupleTagList.empty()));
    assertThat(PTransformMatchers.stateOrTimerParDoMulti().matches(parDoApplication), is(true));

    assertThat(PTransformMatchers.splittableParDoMulti().matches(parDoApplication), is(false));
    assertThat(PTransformMatchers.splittableParDoSingle().matches(parDoApplication), is(false));
    assertThat(PTransformMatchers.stateOrTimerParDoSingle().matches(parDoApplication), is(false));
  }

  @Test
  public void parDoRequiresStableInput() {
    DoFn<Object, Object> doFnRSI =
        new DoFn<Object, Object>() {
          @RequiresStableInput
          @ProcessElement
          public void process(ProcessContext ctxt) {}
        };

    AppliedPTransform<?, ?, ?> single = getAppliedTransform(ParDo.of(doFn));
    AppliedPTransform<?, ?, ?> singleRSI = getAppliedTransform(ParDo.of(doFnRSI));
    AppliedPTransform<?, ?, ?> multi =
        getAppliedTransform(ParDo.of(doFn).withOutputTags(new TupleTag<>(), TupleTagList.empty()));
    AppliedPTransform<?, ?, ?> multiRSI =
        getAppliedTransform(
            ParDo.of(doFnRSI).withOutputTags(new TupleTag<>(), TupleTagList.empty()));

    assertThat(PTransformMatchers.requiresStableInputParDoSingle().matches(single), is(false));
    assertThat(PTransformMatchers.requiresStableInputParDoSingle().matches(singleRSI), is(true));
    assertThat(PTransformMatchers.requiresStableInputParDoSingle().matches(multi), is(false));
    assertThat(PTransformMatchers.requiresStableInputParDoSingle().matches(multiRSI), is(false));
    assertThat(PTransformMatchers.requiresStableInputParDoMulti().matches(single), is(false));
    assertThat(PTransformMatchers.requiresStableInputParDoMulti().matches(singleRSI), is(false));
    assertThat(PTransformMatchers.requiresStableInputParDoMulti().matches(multi), is(false));
    assertThat(PTransformMatchers.requiresStableInputParDoMulti().matches(multiRSI), is(true));
  }

  @Test
  public void parDoWithFnTypeWithMatchingType() {
    DoFn<Object, Object> fn =
        new DoFn<Object, Object>() {
          @ProcessElement
          public void process(ProcessContext ctxt) {}
        };
    AppliedPTransform<?, ?, ?> parDoSingle = getAppliedTransform(ParDo.of(fn));
    AppliedPTransform<?, ?, ?> parDoMulti =
        getAppliedTransform(ParDo.of(fn).withOutputTags(new TupleTag<>(), TupleTagList.empty()));

    PTransformMatcher matcher = PTransformMatchers.parDoWithFnType(fn.getClass());
    assertThat(matcher.matches(parDoSingle), is(true));
    assertThat(matcher.matches(parDoMulti), is(true));
  }

  @Test
  public void parDoWithFnTypeWithNoMatch() {
    DoFn<Object, Object> fn =
        new DoFn<Object, Object>() {
          @ProcessElement
          public void process(ProcessContext ctxt) {}
        };
    AppliedPTransform<?, ?, ?> parDoSingle = getAppliedTransform(ParDo.of(fn));
    AppliedPTransform<?, ?, ?> parDoMulti =
        getAppliedTransform(ParDo.of(fn).withOutputTags(new TupleTag<>(), TupleTagList.empty()));

    PTransformMatcher matcher = PTransformMatchers.parDoWithFnType(doFnWithState.getClass());
    assertThat(matcher.matches(parDoSingle), is(false));
    assertThat(matcher.matches(parDoMulti), is(false));
  }

  @Test
  public void parDoWithFnTypeNotParDo() {
    AppliedPTransform<?, ?, ?> notParDo = getAppliedTransform(Create.empty(VoidCoder.of()));
    PTransformMatcher matcher = PTransformMatchers.parDoWithFnType(doFnWithState.getClass());
    assertThat(matcher.matches(notParDo), is(false));
  }

  @Test
  public void createViewWithViewFn() {
    PCollection<Integer> input = p.apply(Create.of(1));
    PCollectionView<Iterable<Integer>> view = input.apply(View.asIterable());
    ViewFn<?, ?> viewFn = view.getViewFn();
    CreatePCollectionView<?, ?> createView = CreatePCollectionView.of(view);

    PTransformMatcher matcher = PTransformMatchers.createViewWithViewFn(viewFn.getClass());
    assertThat(matcher.matches(getAppliedTransform(createView)), is(true));
  }

  @Test
  public void createViewWithViewFnDifferentViewFn() {
    PCollection<Integer> input = p.apply(Create.of(1));
    PCollectionView<Iterable<Integer>> view = input.apply(View.asIterable());

    // Purposely create a subclass to get a different class then what was expected.
    ViewFn<?, ?> viewFn = new PCollectionViews.IterableViewFn() {};
    CreatePCollectionView<?, ?> createView = CreatePCollectionView.of(view);

    PTransformMatcher matcher = PTransformMatchers.createViewWithViewFn(viewFn.getClass());
    assertThat(matcher.matches(getAppliedTransform(createView)), is(false));
  }

  @Test
  public void createViewWithViewFnNotCreatePCollectionView() {
    PCollection<Integer> input = p.apply(Create.of(1));
    PCollectionView<Iterable<Integer>> view = input.apply(View.asIterable());
    PTransformMatcher matcher =
        PTransformMatchers.createViewWithViewFn(view.getViewFn().getClass());
    assertThat(matcher.matches(getAppliedTransform(View.asIterable())), is(false));
  }

  @Test
  public void emptyFlattenWithEmptyFlatten() {
    AppliedPTransform application =
        AppliedPTransform.of(
            "EmptyFlatten",
            Collections.emptyMap(),
            Collections.singletonMap(
                new TupleTag<Integer>(),
                PCollection.createPrimitiveOutputInternal(
                    p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarIntCoder.of())),
            Flatten.pCollections(),
            p);

    assertThat(PTransformMatchers.emptyFlatten().matches(application), is(true));
  }

  @Test
  public void emptyFlattenWithNonEmptyFlatten() {
    AppliedPTransform application =
        AppliedPTransform.of(
            "Flatten",
            Collections.singletonMap(
                new TupleTag<Integer>(),
                PCollection.createPrimitiveOutputInternal(
                    p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarIntCoder.of())),
            Collections.singletonMap(
                new TupleTag<Integer>(),
                PCollection.createPrimitiveOutputInternal(
                    p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarIntCoder.of())),
            Flatten.pCollections(),
            p);

    assertThat(PTransformMatchers.emptyFlatten().matches(application), is(false));
  }

  @Test
  public void emptyFlattenWithNonFlatten() {
    AppliedPTransform application =
        AppliedPTransform
            .<PCollection<Iterable<Integer>>, PCollection<Integer>, Flatten.Iterables<Integer>>of(
                "EmptyFlatten",
                Collections.emptyMap(),
                Collections.singletonMap(
                    new TupleTag<Integer>(),
                    PCollection.createPrimitiveOutputInternal(
                        p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarIntCoder.of())),
                /* This isn't actually possible to construct, but for the sake of example */
                Flatten.iterables(),
                p);

    assertThat(PTransformMatchers.emptyFlatten().matches(application), is(false));
  }

  @Test
  public void flattenWithDuplicateInputsWithoutDuplicates() {
    AppliedPTransform application =
        AppliedPTransform.of(
            "Flatten",
            Collections.singletonMap(
                new TupleTag<Integer>(),
                PCollection.createPrimitiveOutputInternal(
                    p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarIntCoder.of())),
            Collections.singletonMap(
                new TupleTag<Integer>(),
                PCollection.createPrimitiveOutputInternal(
                    p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarIntCoder.of())),
            Flatten.pCollections(),
            p);

    assertThat(PTransformMatchers.flattenWithDuplicateInputs().matches(application), is(false));
  }

  @Test
  public void flattenWithDuplicateInputsWithDuplicates() {
    PCollection<Integer> duplicate =
        PCollection.createPrimitiveOutputInternal(
            p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarIntCoder.of());
    AppliedPTransform application =
        AppliedPTransform.of(
            "Flatten",
            ImmutableMap.<TupleTag<?>, PValue>builder()
                .put(new TupleTag<Integer>(), duplicate)
                .put(new TupleTag<Integer>(), duplicate)
                .build(),
            Collections.singletonMap(
                new TupleTag<Integer>(),
                PCollection.createPrimitiveOutputInternal(
                    p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarIntCoder.of())),
            Flatten.pCollections(),
            p);

    assertThat(PTransformMatchers.flattenWithDuplicateInputs().matches(application), is(true));
  }

  @Test
  public void flattenWithDuplicateInputsNonFlatten() {
    AppliedPTransform application =
        AppliedPTransform
            .<PCollection<Iterable<Integer>>, PCollection<Integer>, Flatten.Iterables<Integer>>of(
                "EmptyFlatten",
                Collections.emptyMap(),
                Collections.singletonMap(
                    new TupleTag<Integer>(),
                    PCollection.createPrimitiveOutputInternal(
                        p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarIntCoder.of())),
                /* This isn't actually possible to construct, but for the sake of example */
                Flatten.iterables(),
                p);

    assertThat(PTransformMatchers.flattenWithDuplicateInputs().matches(application), is(false));
  }

  @Test
  public void writeWithRunnerDeterminedSharding() {
    ResourceId outputDirectory = LocalResources.fromString("/foo/bar", true /* isDirectory */);
    FilenamePolicy policy =
        DefaultFilenamePolicy.fromStandardParameters(
            StaticValueProvider.of(outputDirectory),
            DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE,
            "",
            false);
    WriteFiles<Integer, Void, Integer> write =
        WriteFiles.to(
            new FileBasedSink<Integer, Void, Integer>(
                StaticValueProvider.of(outputDirectory), DynamicFileDestinations.constant(policy)) {
              @Override
              public WriteOperation<Void, Integer> createWriteOperation() {
                return null;
              }
            });
    assertThat(
        PTransformMatchers.writeWithRunnerDeterminedSharding().matches(appliedWrite(write)),
        is(true));

    WriteFiles<Integer, Void, Integer> withStaticSharding = write.withNumShards(3);
    assertThat(
        PTransformMatchers.writeWithRunnerDeterminedSharding()
            .matches(appliedWrite(withStaticSharding)),
        is(false));

    WriteFiles<Integer, Void, Integer> withCustomSharding =
        write.withSharding(Sum.integersGlobally().asSingletonView());
    assertThat(
        PTransformMatchers.writeWithRunnerDeterminedSharding()
            .matches(appliedWrite(withCustomSharding)),
        is(false));
  }

  private AppliedPTransform<?, ?, ?> appliedWrite(WriteFiles<Integer, Void, Integer> write) {
    return AppliedPTransform.of(
        "WriteFiles", Collections.emptyMap(), Collections.emptyMap(), write, p);
  }

  private static class FakeFilenamePolicy extends FilenamePolicy {
    @Override
    public ResourceId windowedFilename(
        int shardNumber,
        int numShards,
        BoundedWindow window,
        PaneInfo paneInfo,
        FileBasedSink.OutputFileHints outputFileHints) {
      throw new UnsupportedOperationException("should not be called");
    }

    @Nullable
    @Override
    public ResourceId unwindowedFilename(
        int shardNumber, int numShards, FileBasedSink.OutputFileHints outputFileHints) {
      throw new UnsupportedOperationException("should not be called");
    }
  }
}
