/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.beam.sdk.transforms;

import static org.apache.beam.sdk.testing.CombineFnTester.testCombineFn;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasNamespace;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includesDisplayDataFor;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkNotNull;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkState;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
import org.apache.beam.sdk.coders.BigEndianLongCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.DoubleCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.testing.DataflowPortabilityApiUnsupported;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.UsesSideInputs;
import org.apache.beam.sdk.testing.ValidatesRunner;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.CombineTest.SharedTestBase.TestCombineFn.Accumulator;
import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext;
import org.apache.beam.sdk.transforms.CombineWithContext.Context;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
import org.apache.beam.sdk.transforms.windowing.AfterPane;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.Sessions;
import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableSet;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Assume;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

/** Tests for {@link Combine} transforms. */
public class CombineTest implements Serializable {
  // This test is Serializable, just so that it's easy to have
  // anonymous inner classes inside the non-static test methods.

  /** Base class to share setup/teardown and helpers. */
  public abstract static class SharedTestBase {
    @Rule public final transient TestPipeline pipeline = TestPipeline.create();

    protected void runTestSimpleCombine(
        List<KV<String, Integer>> table, int globalSum, List<KV<String, String>> perKeyCombines) {
      PCollection<KV<String, Integer>> input = createInput(pipeline, table);

      PCollection<Integer> sum =
          input.apply(Values.create()).apply(Combine.globally(new SumInts()));

      PCollection<KV<String, String>> sumPerKey = input.apply(Combine.perKey(new TestCombineFn()));

      PAssert.that(sum).containsInAnyOrder(globalSum);
      PAssert.that(sumPerKey).containsInAnyOrder(perKeyCombines);

      pipeline.run();
    }

    @SuppressWarnings("unchecked")
    protected void runTestBasicCombine(
        List<KV<String, Integer>> table,
        Set<Integer> globalUnique,
        List<KV<String, Set<Integer>>> perKeyUnique) {
      PCollection<KV<String, Integer>> input = createInput(pipeline, table);

      PCollection<Set<Integer>> unique =
          input.apply(Values.create()).apply(Combine.globally(new UniqueInts()));

      PCollection<KV<String, Set<Integer>>> uniquePerKey =
          input.apply(Combine.perKey(new UniqueInts()));

      PAssert.that(unique).containsInAnyOrder(globalUnique);
      PAssert.that(uniquePerKey).containsInAnyOrder(perKeyUnique);

      pipeline.run();
    }

    protected void runTestSimpleCombineWithContext(
        List<KV<String, Integer>> table,
        int globalSum,
        List<KV<String, String>> perKeyCombines,
        String[] globallyCombines) {
      PCollection<KV<String, Integer>> perKeyInput = createInput(pipeline, table);
      PCollection<Integer> globallyInput = perKeyInput.apply(Values.create());

      PCollection<Integer> sum = globallyInput.apply("Sum", Combine.globally(new SumInts()));

      PCollectionView<Integer> globallySumView = sum.apply(View.asSingleton());

      PCollection<KV<String, String>> combinePerKey =
          perKeyInput.apply(
              Combine.<String, Integer, String>perKey(new TestCombineFnWithContext(globallySumView))
                  .withSideInputs(globallySumView));

      PCollection<String> combineGlobally =
          globallyInput.apply(
              Combine.globally(new TestCombineFnWithContext(globallySumView))
                  .withoutDefaults()
                  .withSideInputs(globallySumView));

      PAssert.that(sum).containsInAnyOrder(globalSum);
      PAssert.that(combinePerKey).containsInAnyOrder(perKeyCombines);
      PAssert.that(combineGlobally).containsInAnyOrder(globallyCombines);

      pipeline.run();
    }

    protected void runTestAccumulatingCombine(
        List<KV<String, Integer>> table, Double globalMean, List<KV<String, Double>> perKeyMeans) {
      PCollection<KV<String, Integer>> input = createInput(pipeline, table);

      PCollection<Double> mean =
          input.apply(Values.create()).apply(Combine.globally(new MeanInts()));

      PCollection<KV<String, Double>> meanPerKey = input.apply(Combine.perKey(new MeanInts()));

      PAssert.that(mean).containsInAnyOrder(globalMean);
      PAssert.that(meanPerKey).containsInAnyOrder(perKeyMeans);

      pipeline.run();
    }

    ////////////////////////////////////////////////////////////////////////////
    // Test classes, for different kinds of combining fns.

    /** Another example AccumulatingCombineFn. */
    public static class TestCounter
        extends Combine.AccumulatingCombineFn<Integer, TestCounter.Counter, Iterable<Long>> {

      /** An accumulator that observes its merges and outputs. */
      public static class Counter
          implements Combine.AccumulatingCombineFn.Accumulator<Integer, Counter, Iterable<Long>>,
              Serializable {

        public long sum = 0;
        public long inputs = 0;
        public long merges = 0;
        public long outputs = 0;

        public Counter(long sum, long inputs, long merges, long outputs) {
          this.sum = sum;
          this.inputs = inputs;
          this.merges = merges;
          this.outputs = outputs;
        }

        @Override
        public void addInput(Integer element) {
          checkState(merges == 0);
          checkState(outputs == 0);

          inputs++;
          sum += element;
        }

        @Override
        public void mergeAccumulator(Counter accumulator) {
          checkState(outputs == 0);
          assertEquals(0, accumulator.outputs);

          merges += accumulator.merges + 1;
          inputs += accumulator.inputs;
          sum += accumulator.sum;
        }

        @Override
        public Iterable<Long> extractOutput() {
          checkState(outputs == 0);

          return Arrays.asList(sum, inputs, merges, outputs);
        }

        @Override
        public int hashCode() {
          return (int) (sum * 17 + inputs * 31 + merges * 43 + outputs * 181);
        }

        @Override
        public boolean equals(Object otherObj) {
          if (otherObj instanceof Counter) {
            Counter other = (Counter) otherObj;
            return (sum == other.sum
                && inputs == other.inputs
                && merges == other.merges
                && outputs == other.outputs);
          }
          return false;
        }

        @Override
        public String toString() {
          return sum + ":" + inputs + ":" + merges + ":" + outputs;
        }
      }

      @Override
      public Counter createAccumulator() {
        return new Counter(0, 0, 0, 0);
      }

      @Override
      public Coder<Counter> getAccumulatorCoder(CoderRegistry registry, Coder<Integer> inputCoder) {
        // This is a *very* inefficient encoding to send over the wire, but suffices
        // for tests.
        return SerializableCoder.of(Counter.class);
      }
    }

    /**
     * A {@link CombineFn} that results in a sorted list of all characters occurring in the key and
     * the decimal representations of each value.
     */
    public static class TestCombineFn
        extends CombineFn<Integer, TestCombineFn.Accumulator, String> {

      // Not serializable.
      static class Accumulator {
        final String seed;
        String value;

        public Accumulator(String seed, String value) {
          this.seed = seed;
          this.value = value;
        }

        public static Coder<Accumulator> getCoder() {
          return new AtomicCoder<Accumulator>() {
            @Override
            public void encode(Accumulator accumulator, OutputStream outStream) throws IOException {
              StringUtf8Coder.of().encode(accumulator.seed, outStream);
              StringUtf8Coder.of().encode(accumulator.value, outStream);
            }

            @Override
            public Accumulator decode(InputStream inStream) throws IOException {
              String seed = StringUtf8Coder.of().decode(inStream);
              String value = StringUtf8Coder.of().decode(inStream);
              return new Accumulator(seed, value);
            }
          };
        }
      }

      @Override
      public Coder<Accumulator> getAccumulatorCoder(
          CoderRegistry registry, Coder<Integer> inputCoder) {
        return Accumulator.getCoder();
      }

      @Override
      public Accumulator createAccumulator() {
        return new Accumulator("", "");
      }

      @Override
      public Accumulator addInput(Accumulator accumulator, Integer value) {
        try {
          return new Accumulator(accumulator.seed, accumulator.value + String.valueOf(value));
        } finally {
          accumulator.value = "cleared in addInput";
        }
      }

      @Override
      public Accumulator mergeAccumulators(Iterable<Accumulator> accumulators) {
        Accumulator seedAccumulator = null;
        StringBuilder all = new StringBuilder();
        for (Accumulator accumulator : accumulators) {
          if (seedAccumulator == null) {
            seedAccumulator = accumulator;
          } else {
            assertEquals(
                String.format(
                    "Different seed values in accumulator: %s vs. %s",
                    seedAccumulator, accumulator),
                seedAccumulator.seed,
                accumulator.seed);
          }
          all.append(accumulator.value);
          accumulator.value = "cleared in mergeAccumulators";
        }
        return new Accumulator(checkNotNull(seedAccumulator).seed, all.toString());
      }

      @Override
      public String extractOutput(Accumulator accumulator) {
        char[] chars = accumulator.value.toCharArray();
        Arrays.sort(chars);
        return new String(chars);
      }
    }

    /**
     * A {@link CombineFnWithContext} that produces a sorted list of all characters occurring in the
     * key and the decimal representations of main and side inputs values.
     */
    public static class TestCombineFnWithContext
        extends CombineFnWithContext<Integer, Accumulator, String> {
      private final PCollectionView<Integer> view;

      public TestCombineFnWithContext(PCollectionView<Integer> view) {
        this.view = view;
      }

      @Override
      public Coder<TestCombineFn.Accumulator> getAccumulatorCoder(
          CoderRegistry registry, Coder<Integer> inputCoder) {
        return TestCombineFn.Accumulator.getCoder();
      }

      @Override
      public TestCombineFn.Accumulator createAccumulator(Context c) {
        Integer sideInputValue = c.sideInput(view);
        return new TestCombineFn.Accumulator(sideInputValue.toString(), "");
      }

      @Override
      public TestCombineFn.Accumulator addInput(
          TestCombineFn.Accumulator accumulator, Integer value, Context c) {
        try {
          assertThat(
              "Not expecting view contents to change",
              accumulator.seed,
              Matchers.equalTo(Integer.toString(c.sideInput(view))));
          return new TestCombineFn.Accumulator(
              accumulator.seed, accumulator.value + String.valueOf(value));
        } finally {
          accumulator.value = "cleared in addInput";
        }
      }

      @Override
      public TestCombineFn.Accumulator mergeAccumulators(
          Iterable<TestCombineFn.Accumulator> accumulators, Context c) {
        String sideInputValue = c.sideInput(view).toString();
        StringBuilder all = new StringBuilder();
        for (TestCombineFn.Accumulator accumulator : accumulators) {
          assertThat(
              "Accumulators should all have the same Side Input Value",
              accumulator.seed,
              Matchers.equalTo(sideInputValue));
          all.append(accumulator.value);
          accumulator.value = "cleared in mergeAccumulators";
        }
        return new TestCombineFn.Accumulator(sideInputValue, all.toString());
      }

      @Override
      public String extractOutput(TestCombineFn.Accumulator accumulator, Context c) {
        assertThat(accumulator.seed, Matchers.startsWith(c.sideInput(view).toString()));
        char[] chars = accumulator.value.toCharArray();
        Arrays.sort(chars);
        return accumulator.seed + ":" + new String(chars);
      }
    }

    /** Sample DoFn for testing combine. */
    protected static class FormatPaneInfo extends DoFn<Integer, String> {
      @ProcessElement
      public void processElement(ProcessContext c) {
        c.output(c.element() + ": " + c.pane().isLast());
      }
    }

    protected static final SerializableFunction<String, Integer> HOT_KEY_FANOUT =
        input -> "a".equals(input) ? 3 : 0;

    protected static final SerializableFunction<String, Integer> SPLIT_HOT_KEY_FANOUT =
        input -> Math.random() < 0.5 ? 3 : 0;

    /** Sample DoFn for testing hot keys. */
    protected static class GetLast extends DoFn<Integer, Integer> {
      @ProcessElement
      public void processElement(ProcessContext c) {
        if (c.pane().isLast()) {
          c.output(c.element());
        }
      }
    }

    /** Sample BinaryCombineFn for testing int inputs. */
    protected static final class TestProdInt extends Combine.BinaryCombineIntegerFn {
      @Override
      public int apply(int left, int right) {
        return left * right;
      }

      @Override
      public int identity() {
        return 1;
      }
    }

    /** Sample BinaryCombineFn for testing Integer inputs. */
    protected static final class TestProdObj extends Combine.BinaryCombineFn<Integer> {
      @Override
      public Integer apply(Integer left, Integer right) {
        return left * right;
      }
    }

    /** Computes the product, considering null values to be 2. */
    protected static final class NullCombiner extends Combine.BinaryCombineFn<Integer> {
      @Override
      public Integer apply(Integer left, Integer right) {
        return (left == null ? 2 : left) * (right == null ? 2 : right);
      }
    }

    /** Example SerializableFunction combiner. */
    public static class SumInts implements SerializableFunction<Iterable<Integer>, Integer> {
      @Override
      public Integer apply(Iterable<Integer> input) {
        int sum = 0;
        for (int item : input) {
          sum += item;
        }
        return sum;
      }
    }

    /** Example CombineFn. */
    public static class UniqueInts extends Combine.CombineFn<Integer, Set<Integer>, Set<Integer>> {

      @Override
      public Set<Integer> createAccumulator() {
        return new HashSet<>();
      }

      @Override
      public Set<Integer> addInput(Set<Integer> accumulator, Integer input) {
        accumulator.add(input);
        return accumulator;
      }

      @Override
      public Set<Integer> mergeAccumulators(Iterable<Set<Integer>> accumulators) {
        Set<Integer> all = new HashSet<>();
        for (Set<Integer> part : accumulators) {
          all.addAll(part);
        }
        return all;
      }

      @Override
      public Set<Integer> extractOutput(Set<Integer> accumulator) {
        return accumulator;
      }
    }

    /** Example AccumulatingCombineFn. */
    protected static class MeanInts
        extends Combine.AccumulatingCombineFn<Integer, MeanInts.CountSum, Double> {
      private static final Coder<Long> LONG_CODER = BigEndianLongCoder.of();
      private static final Coder<Double> DOUBLE_CODER = DoubleCoder.of();

      static class CountSum
          implements Combine.AccumulatingCombineFn.Accumulator<Integer, CountSum, Double> {
        long count = 0;
        double sum = 0.0;

        CountSum(long count, double sum) {
          this.count = count;
          this.sum = sum;
        }

        @Override
        public void addInput(Integer element) {
          count++;
          sum += element.doubleValue();
        }

        @Override
        public void mergeAccumulator(CountSum accumulator) {
          count += accumulator.count;
          sum += accumulator.sum;
        }

        @Override
        public Double extractOutput() {
          return count == 0 ? 0.0 : sum / count;
        }

        @Override
        public int hashCode() {
          return Objects.hash(count, sum);
        }

        @Override
        public boolean equals(Object obj) {
          if (obj == this) {
            return true;
          }
          if (!(obj instanceof CountSum)) {
            return false;
          }
          CountSum other = (CountSum) obj;
          return this.count == other.count && (Math.abs(this.sum - other.sum) < 0.1);
        }

        @Override
        public String toString() {
          return MoreObjects.toStringHelper(this).add("count", count).add("sum", sum).toString();
        }
      }

      @Override
      public CountSum createAccumulator() {
        return new CountSum(0, 0.0);
      }

      @Override
      public Coder<CountSum> getAccumulatorCoder(
          CoderRegistry registry, Coder<Integer> inputCoder) {
        return new CountSumCoder();
      }

      /** A {@link Coder} for {@link CountSum}. */
      private static class CountSumCoder extends AtomicCoder<CountSum> {
        @Override
        public void encode(CountSum value, OutputStream outStream) throws IOException {
          LONG_CODER.encode(value.count, outStream);
          DOUBLE_CODER.encode(value.sum, outStream);
        }

        @Override
        public CountSum decode(InputStream inStream) throws IOException {
          long count = LONG_CODER.decode(inStream);
          double sum = DOUBLE_CODER.decode(inStream);
          return new CountSum(count, sum);
        }

        @Override
        public void verifyDeterministic() throws NonDeterministicException {}

        @Override
        public boolean isRegisterByteSizeObserverCheap(CountSum value) {
          return true;
        }

        @Override
        public void registerByteSizeObserver(CountSum value, ElementByteSizeObserver observer)
            throws Exception {
          LONG_CODER.registerByteSizeObserver(value.count, observer);
          DOUBLE_CODER.registerByteSizeObserver(value.sum, observer);
        }
      }
    }

    protected static <T> PCollection<T> copy(PCollection<T> pc, final int n) {
      return pc.apply(
          ParDo.of(
              new DoFn<T, T>() {
                @ProcessElement
                public void processElement(ProcessContext c) throws Exception {
                  for (int i = 0; i < n; i++) {
                    c.output(c.element());
                  }
                }
              }));
    }

    /** Class for use in testing use of Java 8 method references. */
    protected static class Summer implements Serializable {
      public int sum(Iterable<Integer> integers) {
        int sum = 0;
        for (int i : integers) {
          sum += i;
        }
        return sum;
      }

      public int add(int a, int b) {
        return a + b;
      }
    }
  }

  static final List<KV<String, Integer>> EMPTY_TABLE = Collections.emptyList();

  private static PCollection<KV<String, Integer>> createInput(
      Pipeline p, List<KV<String, Integer>> table) {
    return p.apply(
        Create.of(table).withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of())));
  }

  /** Tests validating basic Combine transform scenarios. */
  @RunWith(JUnit4.class)
  public static class BasicTests extends SharedTestBase {
    @Test
    @Category({
      ValidatesRunner.class,
      UsesSideInputs.class,
      DataflowPortabilityApiUnsupported.class
    })
    @SuppressWarnings({"rawtypes", "unchecked"})
    public void testSimpleCombine() {
      runTestSimpleCombine(
          Arrays.asList(KV.of("a", 1), KV.of("a", 1), KV.of("a", 4), KV.of("b", 1), KV.of("b", 13)),
          20,
          Arrays.asList(KV.of("a", "114"), KV.of("b", "113")));
    }

    @Test
    @Category({
      ValidatesRunner.class,
      UsesSideInputs.class,
      DataflowPortabilityApiUnsupported.class
    })
    public void testSimpleCombineEmpty() {
      runTestSimpleCombine(EMPTY_TABLE, 0, Collections.emptyList());
    }

    @Test
    @Category({ValidatesRunner.class, UsesSideInputs.class})
    public void testBasicCombine() {
      runTestBasicCombine(
          Arrays.asList(KV.of("a", 1), KV.of("a", 1), KV.of("a", 4), KV.of("b", 1), KV.of("b", 13)),
          ImmutableSet.of(1, 13, 4),
          Arrays.asList(
              KV.of("a", (Set<Integer>) ImmutableSet.of(1, 4)),
              KV.of("b", (Set<Integer>) ImmutableSet.of(1, 13))));
    }

    @Test
    @Category({ValidatesRunner.class, UsesSideInputs.class})
    public void testBasicCombineEmpty() {
      runTestBasicCombine(EMPTY_TABLE, ImmutableSet.of(), Collections.emptyList());
    }

    // Checks that Min, Max, Mean, Sum (operations that pass-through to Combine) have good names.
    @Test
    public void testCombinerNames() {
      Combine.PerKey<String, Integer, Integer> min = Min.integersPerKey();
      Combine.PerKey<String, Integer, Integer> max = Max.integersPerKey();
      Combine.PerKey<String, Integer, Double> mean = Mean.perKey();
      Combine.PerKey<String, Integer, Integer> sum = Sum.integersPerKey();

      assertThat(min.getName(), equalTo("Combine.perKey(MinInteger)"));
      assertThat(max.getName(), equalTo("Combine.perKey(MaxInteger)"));
      assertThat(mean.getName(), equalTo("Combine.perKey(Mean)"));
      assertThat(sum.getName(), equalTo("Combine.perKey(SumInteger)"));
    }

    @Test
    @Category({ValidatesRunner.class, DataflowPortabilityApiUnsupported.class})
    public void testHotKeyCombining() {
      PCollection<KV<String, Integer>> input =
          copy(
              createInput(
                  pipeline,
                  Arrays.asList(
                      KV.of("a", 1), KV.of("a", 1), KV.of("a", 4), KV.of("b", 1), KV.of("b", 13))),
              10);

      CombineFn<Integer, ?, Double> mean = new MeanInts();
      PCollection<KV<String, Double>> coldMean =
          input.apply(
              "ColdMean", Combine.<String, Integer, Double>perKey(mean).withHotKeyFanout(0));
      PCollection<KV<String, Double>> warmMean =
          input.apply(
              "WarmMean",
              Combine.<String, Integer, Double>perKey(mean).withHotKeyFanout(HOT_KEY_FANOUT));
      PCollection<KV<String, Double>> hotMean =
          input.apply("HotMean", Combine.<String, Integer, Double>perKey(mean).withHotKeyFanout(5));
      PCollection<KV<String, Double>> splitMean =
          input.apply(
              "SplitMean",
              Combine.<String, Integer, Double>perKey(mean).withHotKeyFanout(SPLIT_HOT_KEY_FANOUT));

      List<KV<String, Double>> expected = Arrays.asList(KV.of("a", 2.0), KV.of("b", 7.0));
      PAssert.that(coldMean).containsInAnyOrder(expected);
      PAssert.that(warmMean).containsInAnyOrder(expected);
      PAssert.that(hotMean).containsInAnyOrder(expected);
      PAssert.that(splitMean).containsInAnyOrder(expected);

      pipeline.run();
    }

    @Test
    @Category(ValidatesRunner.class)
    public void testHotKeyCombiningWithAccumulationMode() {
      PCollection<Integer> input = pipeline.apply(Create.of(1, 2, 3, 4, 5));

      PCollection<Integer> output =
          input
              .apply(
                  Window.<Integer>into(new GlobalWindows())
                      .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
                      .accumulatingFiredPanes()
                      .withAllowedLateness(new Duration(0), ClosingBehavior.FIRE_ALWAYS))
              .apply(Sum.integersGlobally().withoutDefaults().withFanout(2))
              .apply(ParDo.of(new GetLast()));

      PAssert.that(output)
          .satisfies(
              input1 -> {
                assertThat(input1, hasItem(15));
                return null;
              });

      pipeline.run();
    }

    @Test
    @Category(NeedsRunner.class)
    public void testBinaryCombineFn() {
      PCollection<KV<String, Integer>> input =
          copy(
              createInput(
                  pipeline,
                  Arrays.asList(
                      KV.of("a", 1), KV.of("a", 1), KV.of("a", 4), KV.of("b", 1), KV.of("b", 13))),
              2);
      PCollection<KV<String, Integer>> intProduct =
          input.apply("IntProduct", Combine.perKey(new TestProdInt()));
      PCollection<KV<String, Integer>> objProduct =
          input.apply("ObjProduct", Combine.perKey(new TestProdObj()));

      List<KV<String, Integer>> expected = Arrays.asList(KV.of("a", 16), KV.of("b", 169));
      PAssert.that(intProduct).containsInAnyOrder(expected);
      PAssert.that(objProduct).containsInAnyOrder(expected);

      pipeline.run();
    }

    @Test
    public void testBinaryCombineFnWithNulls() {
      testCombineFn(new NullCombiner(), Arrays.asList(3, 3, 5), 45);
      testCombineFn(new NullCombiner(), Arrays.asList(null, 3, 5), 30);
      testCombineFn(new NullCombiner(), Arrays.asList(3, 3, null), 18);
      testCombineFn(new NullCombiner(), Arrays.asList(null, 3, null), 12);
      testCombineFn(new NullCombiner(), Arrays.asList(null, null, null), 8);
    }

    @Test
    public void testCombineGetName() {
      assertEquals("Combine.globally(SumInts)", Combine.globally(new SumInts()).getName());
      assertEquals(
          "Combine.GloballyAsSingletonView",
          Combine.globally(new SumInts()).asSingletonView().getName());
      assertEquals("Combine.perKey(Test)", Combine.perKey(new TestCombineFn()).getName());
      assertEquals(
          "Combine.perKeyWithFanout(Test)",
          Combine.perKey(new TestCombineFn()).withHotKeyFanout(10).getName());
    }

    @Test
    public void testDisplayData() {
      UniqueInts combineFn =
          new UniqueInts() {
            @Override
            public void populateDisplayData(DisplayData.Builder builder) {
              builder.add(DisplayData.item("fnMetadata", "foobar"));
            }
          };
      Combine.Globally<?, ?> combine = Combine.globally(combineFn).withFanout(1234);
      DisplayData displayData = DisplayData.from(combine);

      assertThat(displayData, hasDisplayItem("combineFn", combineFn.getClass()));
      assertThat(displayData, hasDisplayItem("emitDefaultOnEmptyInput", true));
      assertThat(displayData, hasDisplayItem("fanout", 1234));
      assertThat(displayData, includesDisplayDataFor("combineFn", combineFn));
    }

    @Test
    public void testDisplayDataForWrappedFn() {
      UniqueInts combineFn =
          new UniqueInts() {
            @Override
            public void populateDisplayData(DisplayData.Builder builder) {
              builder.add(DisplayData.item("foo", "bar"));
            }
          };
      Combine.PerKey<?, ?, ?> combine = Combine.perKey(combineFn);
      DisplayData displayData = DisplayData.from(combine);

      assertThat(displayData, hasDisplayItem("combineFn", combineFn.getClass()));
      assertThat(displayData, hasDisplayItem(hasNamespace(combineFn.getClass())));
    }

    @Test
    @Category(ValidatesRunner.class)
    public void testCombinePerKeyPrimitiveDisplayData() {
      DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();

      UniqueInts combineFn = new UniqueInts();
      PTransform<PCollection<KV<Integer, Integer>>, ? extends POutput> combine =
          Combine.perKey(combineFn);

      Set<DisplayData> displayData =
          evaluator.displayDataForPrimitiveTransforms(
              combine, KvCoder.of(VarIntCoder.of(), VarIntCoder.of()));

      assertThat(
          "Combine.perKey should include the combineFn in its primitive transform",
          displayData,
          hasItem(hasDisplayItem("combineFn", combineFn.getClass())));
    }

    @Test
    @Category(ValidatesRunner.class)
    public void testCombinePerKeyWithHotKeyFanoutPrimitiveDisplayData() {
      int hotKeyFanout = 2;
      DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();

      UniqueInts combineFn = new UniqueInts();
      PTransform<PCollection<KV<Integer, Integer>>, PCollection<KV<Integer, Set<Integer>>>>
          combine =
              Combine.<Integer, Integer, Set<Integer>>perKey(combineFn)
                  .withHotKeyFanout(hotKeyFanout);

      Set<DisplayData> displayData =
          evaluator.displayDataForPrimitiveTransforms(
              combine, KvCoder.of(VarIntCoder.of(), VarIntCoder.of()));

      assertThat(
          "Combine.perKey.withHotKeyFanout should include the combineFn in its primitive "
              + "transform",
          displayData,
          hasItem(hasDisplayItem("combineFn", combineFn.getClass())));
      assertThat(
          "Combine.perKey.withHotKeyFanout(int) should include the fanout in its primitive "
              + "transform",
          displayData,
          hasItem(hasDisplayItem("fanout", hotKeyFanout)));
    }

    /** Tests creation of a per-key {@link Combine} via a Java 8 lambda. */
    @Test
    @Category(ValidatesRunner.class)
    public void testCombinePerKeyLambda() {

      PCollection<KV<String, Integer>> output =
          pipeline
              .apply(Create.of(KV.of("a", 1), KV.of("b", 2), KV.of("a", 3), KV.of("c", 4)))
              .apply(
                  Combine.perKey(
                      integers -> {
                        int sum = 0;
                        for (int i : integers) {
                          sum += i;
                        }
                        return sum;
                      }));

      PAssert.that(output).containsInAnyOrder(KV.of("a", 4), KV.of("b", 2), KV.of("c", 4));
      pipeline.run();
    }

    /** Tests creation of a per-key binary {@link Combine} via a Java 8 lambda. */
    @Test
    @Category(ValidatesRunner.class)
    public void testBinaryCombinePerKeyLambda() {

      PCollection<KV<String, Integer>> output =
          pipeline
              .apply(Create.of(KV.of("a", 1), KV.of("b", 2), KV.of("a", 3), KV.of("c", 4)))
              .apply(Combine.perKey((a, b) -> a + b));

      PAssert.that(output).containsInAnyOrder(KV.of("a", 4), KV.of("b", 2), KV.of("c", 4));
      pipeline.run();
    }

    /** Tests creation of a per-key {@link Combine} via a Java 8 method reference. */
    @Test
    @Category(ValidatesRunner.class)
    public void testCombinePerKeyInstanceMethodReference() {

      PCollection<KV<String, Integer>> output =
          pipeline
              .apply(Create.of(KV.of("a", 1), KV.of("b", 2), KV.of("a", 3), KV.of("c", 4)))
              .apply(Combine.perKey(new Summer()::sum));

      PAssert.that(output).containsInAnyOrder(KV.of("a", 4), KV.of("b", 2), KV.of("c", 4));
      pipeline.run();
    }

    /** Tests creation of a per-key binary {@link Combine} via a Java 8 method reference. */
    @Test
    @Category(ValidatesRunner.class)
    public void testBinaryCombinePerKeyInstanceMethodReference() {

      PCollection<KV<String, Integer>> output =
          pipeline
              .apply(Create.of(KV.of("a", 1), KV.of("b", 2), KV.of("a", 3), KV.of("c", 4)))
              .apply(Combine.perKey(new Summer()::add));

      PAssert.that(output).containsInAnyOrder(KV.of("a", 4), KV.of("b", 2), KV.of("c", 4));
      pipeline.run();
    }

    /**
     * Tests that we can serialize {@link Combine.CombineFn CombineFns} constructed from a lambda.
     * Lambdas can be problematic because the {@link Class} object is synthetic and cannot be
     * deserialized.
     */
    @Test
    public void testLambdaSerialization() {
      SerializableFunction<Iterable<Object>, Object> combiner = xs -> Iterables.getFirst(xs, 0);

      boolean lambdaClassSerializationThrows;
      try {
        SerializableUtils.clone(combiner.getClass());
        lambdaClassSerializationThrows = false;
      } catch (IllegalArgumentException e) {
        // Expected
        lambdaClassSerializationThrows = true;
      }
      Assume.assumeTrue(
          "Expected lambda class serialization to fail. "
              + "If it's fixed, we can remove special behavior in Combine.",
          lambdaClassSerializationThrows);

      Combine.Globally<?, ?> combine = Combine.globally(combiner);
      SerializableUtils.clone(combine); // should not throw.
    }

    @Test
    public void testLambdaDisplayData() {
      Combine.Globally<?, ?> combine = Combine.globally(xs -> Iterables.getFirst(xs, 0));
      DisplayData displayData = DisplayData.from(combine);
      MatcherAssert.assertThat(displayData.items(), not(empty()));
    }
  }

  /** Tests validating CombineWithContext behaviors. */
  @RunWith(JUnit4.class)
  public static class CombineWithContextTests extends SharedTestBase {
    @Test
    @Category({ValidatesRunner.class, UsesSideInputs.class})
    @SuppressWarnings({"rawtypes", "unchecked"})
    public void testSimpleCombineWithContext() {
      runTestSimpleCombineWithContext(
          Arrays.asList(KV.of("a", 1), KV.of("a", 1), KV.of("a", 4), KV.of("b", 1), KV.of("b", 13)),
          20,
          Arrays.asList(KV.of("a", "20:114"), KV.of("b", "20:113")),
          new String[] {"20:111134"});
    }

    @Test
    @Category({ValidatesRunner.class, UsesSideInputs.class})
    public void testSimpleCombineWithContextEmpty() {
      runTestSimpleCombineWithContext(EMPTY_TABLE, 0, Collections.emptyList(), new String[] {});
    }

    @Test
    public void testWithDefaultsPreservesSideInputs() {
      final PCollectionView<Integer> view =
          pipeline.apply(Create.of(1)).apply(Sum.integersGlobally().asSingletonView());

      Combine.Globally<Integer, String> combine =
          Combine.globally(new TestCombineFnWithContext(view))
              .withSideInputs(view)
              .withoutDefaults();

      assertEquals(Collections.singletonList(view), combine.getSideInputs());
    }

    @Test
    public void testWithFanoutPreservesSideInputs() {
      final PCollectionView<Integer> view =
          pipeline.apply(Create.of(1)).apply(Sum.integersGlobally().asSingletonView());

      Combine.Globally<Integer, String> combine =
          Combine.globally(new TestCombineFnWithContext(view)).withSideInputs(view).withFanout(1);

      assertEquals(Collections.singletonList(view), combine.getSideInputs());
    }
  }

  /** Tests validating windowing behaviors. */
  @RunWith(JUnit4.class)
  public static class WindowingTests extends SharedTestBase implements Serializable {
    @Test
    @Category({ValidatesRunner.class, DataflowPortabilityApiUnsupported.class})
    public void testFixedWindowsCombine() {
      PCollection<KV<String, Integer>> input =
          pipeline
              .apply(
                  Create.timestamped(
                          TimestampedValue.of(KV.of("a", 1), new Instant(0L)),
                          TimestampedValue.of(KV.of("a", 1), new Instant(1L)),
                          TimestampedValue.of(KV.of("a", 4), new Instant(6L)),
                          TimestampedValue.of(KV.of("b", 1), new Instant(7L)),
                          TimestampedValue.of(KV.of("b", 13), new Instant(8L)))
                      .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of())))
              .apply(Window.into(FixedWindows.of(Duration.millis(2))));

      PCollection<Integer> sum =
          input.apply(Values.create()).apply(Combine.globally(new SumInts()).withoutDefaults());

      PCollection<KV<String, String>> sumPerKey = input.apply(Combine.perKey(new TestCombineFn()));

      PAssert.that(sum).containsInAnyOrder(2, 5, 13);
      PAssert.that(sumPerKey)
          .containsInAnyOrder(
              Arrays.asList(KV.of("a", "11"), KV.of("a", "4"), KV.of("b", "1"), KV.of("b", "13")));
      pipeline.run();
    }

    @Test
    @Category({ValidatesRunner.class, UsesSideInputs.class})
    public void testFixedWindowsCombineWithContext() {
      PCollection<KV<String, Integer>> perKeyInput =
          pipeline
              .apply(
                  Create.timestamped(
                          TimestampedValue.of(KV.of("a", 1), new Instant(0L)),
                          TimestampedValue.of(KV.of("a", 1), new Instant(1L)),
                          TimestampedValue.of(KV.of("a", 4), new Instant(6L)),
                          TimestampedValue.of(KV.of("b", 1), new Instant(7L)),
                          TimestampedValue.of(KV.of("b", 13), new Instant(8L)))
                      .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of())))
              .apply(Window.into(FixedWindows.of(Duration.millis(2))));

      PCollection<Integer> globallyInput = perKeyInput.apply(Values.create());

      PCollection<Integer> sum =
          globallyInput.apply("Sum", Combine.globally(new SumInts()).withoutDefaults());

      PCollectionView<Integer> globallySumView = sum.apply(View.asSingleton());

      PCollection<KV<String, String>> combinePerKeyWithContext =
          perKeyInput.apply(
              Combine.<String, Integer, String>perKey(new TestCombineFnWithContext(globallySumView))
                  .withSideInputs(globallySumView));

      PCollection<String> combineGloballyWithContext =
          globallyInput.apply(
              Combine.globally(new TestCombineFnWithContext(globallySumView))
                  .withoutDefaults()
                  .withSideInputs(globallySumView));

      PAssert.that(sum).containsInAnyOrder(2, 5, 13);
      PAssert.that(combinePerKeyWithContext)
          .containsInAnyOrder(
              Arrays.asList(
                  KV.of("a", "2:11"), KV.of("a", "5:4"), KV.of("b", "5:1"), KV.of("b", "13:13")));
      PAssert.that(combineGloballyWithContext).containsInAnyOrder("2:11", "5:14", "13:13");
      pipeline.run();
    }

    @Test
    @Category({ValidatesRunner.class, UsesSideInputs.class})
    public void testSlidingWindowsCombine() {
      PCollection<String> input =
          pipeline
              .apply(
                  Create.timestamped(
                      TimestampedValue.of("a", new Instant(1L)),
                      TimestampedValue.of("b", new Instant(2L)),
                      TimestampedValue.of("c", new Instant(3L))))
              .apply(Window.into(SlidingWindows.of(Duration.millis(3)).every(Duration.millis(1L))));
      PCollection<List<String>> combined =
          input.apply(
              Combine.globally(
                      new CombineFn<String, List<String>, List<String>>() {
                        @Override
                        public List<String> createAccumulator() {
                          return new ArrayList<>();
                        }

                        @Override
                        public List<String> addInput(List<String> accumulator, String input) {
                          accumulator.add(input);
                          return accumulator;
                        }

                        @Override
                        public List<String> mergeAccumulators(Iterable<List<String>> accumulators) {
                          // Mutate all of the accumulators. Instances should be used in only one
                          // place, and not
                          // reused after merging.
                          List<String> cur = createAccumulator();
                          for (List<String> accumulator : accumulators) {
                            accumulator.addAll(cur);
                            cur = accumulator;
                          }
                          return cur;
                        }

                        @Override
                        public List<String> extractOutput(List<String> accumulator) {
                          List<String> result = new ArrayList<>(accumulator);
                          Collections.sort(result);
                          return result;
                        }
                      })
                  .withoutDefaults());

      PAssert.that(combined)
          .containsInAnyOrder(
              ImmutableList.of("a"),
              ImmutableList.of("a", "b"),
              ImmutableList.of("a", "b", "c"),
              ImmutableList.of("b", "c"),
              ImmutableList.of("c"));

      pipeline.run();
    }

    @Test
    @Category({ValidatesRunner.class, UsesSideInputs.class})
    public void testSlidingWindowsCombineWithContext() {
      // [a: 1, 1], [a: 4; b: 1], [b: 13]
      PCollection<KV<String, Integer>> perKeyInput =
          pipeline
              .apply(
                  Create.timestamped(
                          TimestampedValue.of(KV.of("a", 1), new Instant(2L)),
                          TimestampedValue.of(KV.of("a", 1), new Instant(3L)),
                          TimestampedValue.of(KV.of("a", 4), new Instant(8L)),
                          TimestampedValue.of(KV.of("b", 1), new Instant(9L)),
                          TimestampedValue.of(KV.of("b", 13), new Instant(10L)))
                      .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of())))
              .apply(Window.into(SlidingWindows.of(Duration.millis(2))));

      PCollection<Integer> globallyInput = perKeyInput.apply(Values.create());

      PCollection<Integer> sum =
          globallyInput.apply("Sum", Sum.integersGlobally().withoutDefaults());

      PCollectionView<Integer> globallySumView = sum.apply(View.asSingleton());

      PCollection<KV<String, String>> combinePerKeyWithContext =
          perKeyInput.apply(
              Combine.<String, Integer, String>perKey(new TestCombineFnWithContext(globallySumView))
                  .withSideInputs(globallySumView));

      PCollection<String> combineGloballyWithContext =
          globallyInput.apply(
              Combine.globally(new TestCombineFnWithContext(globallySumView))
                  .withoutDefaults()
                  .withSideInputs(globallySumView));

      PAssert.that(sum).containsInAnyOrder(1, 2, 1, 4, 5, 14, 13);
      PAssert.that(combinePerKeyWithContext)
          .containsInAnyOrder(
              Arrays.asList(
                  KV.of("a", "1:1"),
                  KV.of("a", "2:11"),
                  KV.of("a", "1:1"),
                  KV.of("a", "4:4"),
                  KV.of("a", "5:4"),
                  KV.of("b", "5:1"),
                  KV.of("b", "14:113"),
                  KV.of("b", "13:13")));
      PAssert.that(combineGloballyWithContext)
          .containsInAnyOrder("1:1", "2:11", "1:1", "4:4", "5:14", "14:113", "13:13");
      pipeline.run();
    }

    @Test
    @Category({ValidatesRunner.class, UsesSideInputs.class})
    public void testGlobalCombineWithDefaultsAndTriggers() {
      PCollection<Integer> input = pipeline.apply(Create.of(1, 1));

      PCollection<String> output =
          input
              .apply(
                  Window.<Integer>into(new GlobalWindows())
                      .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
                      .accumulatingFiredPanes()
                      .withAllowedLateness(new Duration(0), ClosingBehavior.FIRE_ALWAYS))
              .apply(Sum.integersGlobally())
              .apply(ParDo.of(new FormatPaneInfo()));

      // The actual elements produced are nondeterministic. Could be one, could be two.
      // But it should certainly have a final element with the correct final sum.
      PAssert.that(output)
          .satisfies(
              input1 -> {
                assertThat(input1, hasItem("2: true"));
                return null;
              });

      pipeline.run();
    }

    @Test
    @Category(ValidatesRunner.class)
    public void testSessionsCombine() {
      PCollection<KV<String, Integer>> input =
          pipeline
              .apply(
                  Create.timestamped(
                          TimestampedValue.of(KV.of("a", 1), new Instant(0L)),
                          TimestampedValue.of(KV.of("a", 1), new Instant(4L)),
                          TimestampedValue.of(KV.of("a", 4), new Instant(7L)),
                          TimestampedValue.of(KV.of("b", 1), new Instant(10L)),
                          TimestampedValue.of(KV.of("b", 13), new Instant(16L)))
                      .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of())))
              .apply(Window.into(Sessions.withGapDuration(Duration.millis(5))));

      PCollection<Integer> sum =
          input.apply(Values.create()).apply(Combine.globally(new SumInts()).withoutDefaults());

      PCollection<KV<String, String>> sumPerKey = input.apply(Combine.perKey(new TestCombineFn()));

      PAssert.that(sum).containsInAnyOrder(7, 13);
      PAssert.that(sumPerKey)
          .containsInAnyOrder(Arrays.asList(KV.of("a", "114"), KV.of("b", "1"), KV.of("b", "13")));
      pipeline.run();
    }

    @Test
    @Category({ValidatesRunner.class, UsesSideInputs.class})
    public void testSessionsCombineWithContext() {
      PCollection<KV<String, Integer>> perKeyInput =
          pipeline.apply(
              Create.timestamped(
                      TimestampedValue.of(KV.of("a", 1), new Instant(0L)),
                      TimestampedValue.of(KV.of("a", 1), new Instant(4L)),
                      TimestampedValue.of(KV.of("a", 4), new Instant(7L)),
                      TimestampedValue.of(KV.of("b", 1), new Instant(10L)),
                      TimestampedValue.of(KV.of("b", 13), new Instant(16L)))
                  .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of())));

      PCollection<Integer> globallyInput = perKeyInput.apply(Values.create());

      PCollection<Integer> fixedWindowsSum =
          globallyInput
              .apply("FixedWindows", Window.into(FixedWindows.of(Duration.millis(5))))
              .apply("Sum", Combine.globally(new SumInts()).withoutDefaults());

      PCollectionView<Integer> globallyFixedWindowsView =
          fixedWindowsSum.apply(View.<Integer>asSingleton().withDefaultValue(0));

      PCollection<KV<String, String>> sessionsCombinePerKey =
          perKeyInput
              .apply(
                  "PerKey Input Sessions",
                  Window.into(Sessions.withGapDuration(Duration.millis(5))))
              .apply(
                  Combine.<String, Integer, String>perKey(
                          new TestCombineFnWithContext(globallyFixedWindowsView))
                      .withSideInputs(globallyFixedWindowsView));

      PCollection<String> sessionsCombineGlobally =
          globallyInput
              .apply(
                  "Globally Input Sessions",
                  Window.into(Sessions.withGapDuration(Duration.millis(5))))
              .apply(
                  Combine.globally(new TestCombineFnWithContext(globallyFixedWindowsView))
                      .withoutDefaults()
                      .withSideInputs(globallyFixedWindowsView));

      PAssert.that(fixedWindowsSum).containsInAnyOrder(2, 4, 1, 13);
      PAssert.that(sessionsCombinePerKey)
          .containsInAnyOrder(
              Arrays.asList(KV.of("a", "1:114"), KV.of("b", "1:1"), KV.of("b", "0:13")));
      PAssert.that(sessionsCombineGlobally).containsInAnyOrder("1:1114", "0:13");
      pipeline.run();
    }

    @Test
    @Category({ValidatesRunner.class, DataflowPortabilityApiUnsupported.class})
    public void testWindowedCombineEmpty() {
      PCollection<Double> mean =
          pipeline
              .apply(Create.empty(BigEndianIntegerCoder.of()))
              .apply(Window.into(FixedWindows.of(Duration.millis(1))))
              .apply(Combine.globally(new MeanInts()).withoutDefaults());

      PAssert.that(mean).empty();

      pipeline.run();
    }

    @Test
    @Category({ValidatesRunner.class, UsesSideInputs.class})
    public void testCombineGloballyAsSingletonView() {
      final PCollectionView<Integer> view =
          pipeline
              .apply("CreateEmptySideInput", Create.empty(BigEndianIntegerCoder.of()))
              .apply(Sum.integersGlobally().asSingletonView());

      PCollection<Integer> output =
          pipeline
              .apply("CreateVoidMainInput", Create.of((Void) null))
              .apply(
                  "OutputSideInput",
                  ParDo.of(
                          new DoFn<Void, Integer>() {
                            @ProcessElement
                            public void processElement(ProcessContext c) {
                              c.output(c.sideInput(view));
                            }
                          })
                      .withSideInputs(view));

      PAssert.thatSingleton(output).isEqualTo(0);
      pipeline.run();
    }

    @Test
    @Category({ValidatesRunner.class, UsesSideInputs.class})
    public void testWindowedCombineGloballyAsSingletonView() {
      FixedWindows windowFn = FixedWindows.of(Duration.standardMinutes(1));
      final PCollectionView<Integer> view =
          pipeline
              .apply(
                  "CreateSideInput",
                  Create.timestamped(
                      TimestampedValue.of(1, new Instant(100)),
                      TimestampedValue.of(3, new Instant(100))))
              .apply("WindowSideInput", Window.into(windowFn))
              .apply("CombineSideInput", Sum.integersGlobally().asSingletonView());

      TimestampedValue<Void> nonEmptyElement = TimestampedValue.of(null, new Instant(100));
      TimestampedValue<Void> emptyElement = TimestampedValue.atMinimumTimestamp(null);
      PCollection<Integer> output =
          pipeline
              .apply(
                  "CreateMainInput",
                  Create.timestamped(nonEmptyElement, emptyElement).withCoder(VoidCoder.of()))
              .apply("WindowMainInput", Window.into(windowFn))
              .apply(
                  "OutputSideInput",
                  ParDo.of(
                          new DoFn<Void, Integer>() {
                            @ProcessElement
                            public void processElement(ProcessContext c) {
                              c.output(c.sideInput(view));
                            }
                          })
                      .withSideInputs(view));

      PAssert.that(output).containsInAnyOrder(4, 0);
      PAssert.that(output)
          .inWindow(windowFn.assignWindow(nonEmptyElement.getTimestamp()))
          .containsInAnyOrder(4);
      PAssert.that(output)
          .inWindow(windowFn.assignWindow(emptyElement.getTimestamp()))
          .containsInAnyOrder(0);
      pipeline.run();
    }

    /** Tests creation of a global {@link Combine} via Java 8 lambda. */
    @Test
    @Category({ValidatesRunner.class, UsesSideInputs.class})
    public void testCombineGloballyLambda() {

      PCollection<Integer> output =
          pipeline
              .apply(Create.of(1, 2, 3, 4))
              .apply(
                  Combine.globally(
                      integers -> {
                        int sum = 0;
                        for (int i : integers) {
                          sum += i;
                        }
                        return sum;
                      }));

      PAssert.that(output).containsInAnyOrder(10);
      pipeline.run();
    }

    /** Tests creation of a global {@link Combine} via a Java 8 method reference. */
    @Test
    @Category({ValidatesRunner.class, UsesSideInputs.class})
    public void testCombineGloballyInstanceMethodReference() {

      PCollection<Integer> output =
          pipeline.apply(Create.of(1, 2, 3, 4)).apply(Combine.globally(new Summer()::sum));

      PAssert.that(output).containsInAnyOrder(10);
      pipeline.run();
    }
  }

  /** Tests validating accumulation scenarios. */
  @RunWith(JUnit4.class)
  public static class AccumulationTests extends SharedTestBase {
    @Test
    @Category({
      ValidatesRunner.class,
      UsesSideInputs.class,
      DataflowPortabilityApiUnsupported.class
    })
    public void testAccumulatingCombine() {
      runTestAccumulatingCombine(
          Arrays.asList(KV.of("a", 1), KV.of("a", 1), KV.of("a", 4), KV.of("b", 1), KV.of("b", 13)),
          4.0,
          Arrays.asList(KV.of("a", 2.0), KV.of("b", 7.0)));
    }

    @Test
    @Category({
      ValidatesRunner.class,
      UsesSideInputs.class,
      DataflowPortabilityApiUnsupported.class
    })
    public void testAccumulatingCombineEmpty() {
      runTestAccumulatingCombine(EMPTY_TABLE, 0.0, Collections.emptyList());
    }
  }
}
