/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.beam.sdk.nexmark;

import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.nexmark.model.Auction;
import org.apache.beam.sdk.nexmark.model.AuctionBid;
import org.apache.beam.sdk.nexmark.model.AuctionCount;
import org.apache.beam.sdk.nexmark.model.AuctionPrice;
import org.apache.beam.sdk.nexmark.model.Bid;
import org.apache.beam.sdk.nexmark.model.BidsPerSession;
import org.apache.beam.sdk.nexmark.model.CategoryPrice;
import org.apache.beam.sdk.nexmark.model.Done;
import org.apache.beam.sdk.nexmark.model.Event;
import org.apache.beam.sdk.nexmark.model.IdNameReserve;
import org.apache.beam.sdk.nexmark.model.KnownSize;
import org.apache.beam.sdk.nexmark.model.NameCityStateId;
import org.apache.beam.sdk.nexmark.model.Person;
import org.apache.beam.sdk.nexmark.model.SellerPrice;
import org.apache.beam.sdk.nexmark.sources.BoundedEventSource;
import org.apache.beam.sdk.nexmark.sources.UnboundedEventSource;
import org.apache.beam.sdk.nexmark.sources.generator.Generator;
import org.apache.beam.sdk.nexmark.sources.generator.GeneratorConfig;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.windowing.AfterPane;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Splitter;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Strings;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v20_0.com.google.common.hash.Hashing;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Odd's 'n Ends used throughout queries and driver. */
public class NexmarkUtils {
  private static final Logger LOG = LoggerFactory.getLogger(NexmarkUtils.class);

  /** Mapper for (de)serializing JSON. */
  public static final ObjectMapper MAPPER = new ObjectMapper();

  /** Possible sources for events. */
  public enum SourceType {
    /** Produce events directly. */
    DIRECT,
    /** Read events from an Avro file. */
    AVRO,
    /** Read from a PubSub topic. It will be fed the same synthetic events by this pipeline. */
    PUBSUB,
    /**
     * Read events from a Kafka topic. It will be fed the same synthetic events by this pipeline.
     */
    KAFKA
  }

  /** Possible sinks for query results. */
  public enum SinkType {
    /** Discard all results. */
    COUNT_ONLY,
    /** Discard all results after converting them to strings. */
    DEVNULL,
    /** Write to a PubSub topic. It will be drained by this pipeline. */
    PUBSUB,
    /** Write to a Kafka topic. It will be drained by this pipeline. */
    KAFKA,
    /** Write to a text file. Only works in batch mode. */
    TEXT,
    /** Write raw Events to Avro. Only works in batch mode. */
    AVRO,
    /** Write raw Events to BigQuery. */
    BIGQUERY
  }

  /** Pub/sub mode to run in. */
  public enum PubSubMode {
    /** Publish events to pub/sub, but don't run the query. */
    PUBLISH_ONLY,
    /** Consume events from pub/sub and run the query, but don't publish. */
    SUBSCRIBE_ONLY,
    /** Both publish and consume, but as separate jobs. */
    COMBINED
  }

  /** Possible side input sources. */
  public enum SideInputType {
    /** Produce the side input via {@link Create}. */
    DIRECT,
    /** Read side input from CSV files. */
    CSV
  }

  /** Coder strategies. */
  public enum CoderStrategy {
    /** Hand-written. */
    HAND,
    /** Avro. */
    AVRO,
    /** Java serialization. */
    JAVA
  }

  /** How to determine resource names. */
  public enum ResourceNameMode {
    /** Names are used as provided. */
    VERBATIM,
    /** Names are suffixed with the query being run. */
    QUERY,
    /** Names are suffixed with the query being run and a random number. */
    QUERY_AND_SALT,
    /** Names are suffixed with the runner being used and a the mode (streaming/batch). */
    QUERY_RUNNER_AND_MODE
  }

  /** Return a query name with query language (if applicable). */
  static String fullQueryName(String queryLanguage, String query) {
    return queryLanguage != null ? query + "_" + queryLanguage : query;
  }

  /** Return a BigQuery table spec. */
  static String tableSpec(NexmarkOptions options, String queryName, long now, String version) {
    return String.format(
        "%s:%s.%s",
        options.getProject(),
        options.getBigQueryDataset(),
        tableName(options, queryName, now, version));
  }

  /** Return a BigQuery table name. */
  static String tableName(NexmarkOptions options, String queryName, long now, String version) {
    String baseTableName = options.getBigQueryTable();
    if (Strings.isNullOrEmpty(baseTableName)) {
      throw new RuntimeException("Missing --bigQueryTable");
    }

    switch (options.getResourceNameMode()) {
      case VERBATIM:
        return String.format("%s_%s", baseTableName, version);
      case QUERY:
        return String.format("%s_%s_%s", baseTableName, queryName, version);
      case QUERY_AND_SALT:
        return String.format("%s_%s_%s_%d", baseTableName, queryName, version, now);
      case QUERY_RUNNER_AND_MODE:
        String runnerName = options.getRunner().getSimpleName();
        boolean isStreaming = options.isStreaming();

        String tableName =
            String.format(
                "%s_%s_%s_%s", baseTableName, queryName, runnerName, processingMode(isStreaming));

        return version != null ? String.format("%s_%s", tableName, version) : tableName;
    }
    throw new RuntimeException("Unrecognized enum " + options.getResourceNameMode());
  }

  private static String processingMode(boolean isStreaming) {
    return isStreaming ? "streaming" : "batch";
  }

  /** Units for rates. */
  public enum RateUnit {
    PER_SECOND(1_000_000L),
    PER_MINUTE(60_000_000L);

    RateUnit(long usPerUnit) {
      this.usPerUnit = usPerUnit;
    }

    /** Number of microseconds per unit. */
    private final long usPerUnit;

    /** Number of microseconds between events at given rate. */
    public long rateToPeriodUs(long rate) {
      return (usPerUnit + rate / 2) / rate;
    }
  }

  /** Shape of event rate. */
  public enum RateShape {
    SQUARE,
    SINE;

    /** Number of steps used to approximate sine wave. */
    private static final int N = 10;

    /**
     * Return inter-event delay, in microseconds, for each generator to follow in order to achieve
     * {@code rate} at {@code unit} using {@code numGenerators}.
     */
    public long interEventDelayUs(int rate, RateUnit unit, int numGenerators) {
      return unit.rateToPeriodUs(rate) * numGenerators;
    }

    /**
     * Return array of successive inter-event delays, in microseconds, for each generator to follow
     * in order to achieve this shape with {@code firstRate/nextRate} at {@code unit} using {@code
     * numGenerators}.
     */
    public long[] interEventDelayUs(int firstRate, int nextRate, RateUnit unit, int numGenerators) {
      if (firstRate == nextRate) {
        long[] interEventDelayUs = new long[1];
        interEventDelayUs[0] = unit.rateToPeriodUs(firstRate) * numGenerators;
        return interEventDelayUs;
      }

      switch (this) {
        case SQUARE:
          {
            long[] interEventDelayUs = new long[2];
            interEventDelayUs[0] = unit.rateToPeriodUs(firstRate) * numGenerators;
            interEventDelayUs[1] = unit.rateToPeriodUs(nextRate) * numGenerators;
            return interEventDelayUs;
          }
        case SINE:
          {
            double mid = (firstRate + nextRate) / 2.0;
            double amp = (firstRate - nextRate) / 2.0; // may be -ve
            long[] interEventDelayUs = new long[N];
            for (int i = 0; i < N; i++) {
              double r = (2.0 * Math.PI * i) / N;
              double rate = mid + amp * Math.cos(r);
              interEventDelayUs[i] = unit.rateToPeriodUs(Math.round(rate)) * numGenerators;
            }
            return interEventDelayUs;
          }
      }
      throw new RuntimeException(); // switch should be exhaustive
    }

    /**
     * Return delay between steps, in seconds, for result of {@link #interEventDelayUs}, so as to
     * cycle through the entire sequence every {@code ratePeriodSec}.
     */
    public int stepLengthSec(int ratePeriodSec) {
      int n = 0;
      switch (this) {
        case SQUARE:
          n = 2;
          break;
        case SINE:
          n = N;
          break;
      }
      return (ratePeriodSec + n - 1) / n;
    }
  }

  /** Set to true to capture all info messages. The logging level flags don't currently work. */
  private static final boolean LOG_INFO = false;

  /**
   * Set to true to log directly to stdout. If run using Google Dataflow, you can watch the results
   * in real-time with: tail -f /var/log/dataflow/streaming-harness/harness-stdout.log
   */
  private static final boolean LOG_TO_CONSOLE = false;

  /** Log info message. */
  public static void info(String format, Object... args) {
    if (LOG_INFO) {
      LOG.info(String.format(format, args));
      if (LOG_TO_CONSOLE) {
        System.out.println(String.format(format, args));
      }
    }
  }

  /** Log message to console. For client side only. */
  public static void console(String format, Object... args) {
    System.out.printf("%s %s%n", Instant.now(), String.format(format, args));
  }

  /** Label to use for timestamps on pub/sub messages. */
  public static final String PUBSUB_TIMESTAMP = "timestamp";

  /** Label to use for windmill ids on pub/sub messages. */
  public static final String PUBSUB_ID = "id";

  /** All events will be given a timestamp relative to this time (ms since epoch). */
  private static final long BASE_TIME = Instant.parse("2015-07-15T00:00:00.000Z").getMillis();

  /**
   * Instants guaranteed to be strictly before and after all event timestamps, and which won't be
   * subject to underflow/overflow.
   */
  public static final Instant BEGINNING_OF_TIME = new Instant(0).plus(Duration.standardDays(365));

  public static final Instant END_OF_TIME =
      BoundedWindow.TIMESTAMP_MAX_VALUE.minus(Duration.standardDays(365));

  /** Setup pipeline with codes and some other options. */
  public static void setupPipeline(CoderStrategy coderStrategy, Pipeline p) {
    CoderRegistry registry = p.getCoderRegistry();
    switch (coderStrategy) {
      case HAND:
        registry.registerCoderForClass(Auction.class, Auction.CODER);
        registry.registerCoderForClass(AuctionBid.class, AuctionBid.CODER);
        registry.registerCoderForClass(AuctionCount.class, AuctionCount.CODER);
        registry.registerCoderForClass(AuctionPrice.class, AuctionPrice.CODER);
        registry.registerCoderForClass(Bid.class, Bid.CODER);
        registry.registerCoderForClass(CategoryPrice.class, CategoryPrice.CODER);
        registry.registerCoderForClass(Event.class, Event.CODER);
        registry.registerCoderForClass(IdNameReserve.class, IdNameReserve.CODER);
        registry.registerCoderForClass(NameCityStateId.class, NameCityStateId.CODER);
        registry.registerCoderForClass(Person.class, Person.CODER);
        registry.registerCoderForClass(SellerPrice.class, SellerPrice.CODER);
        registry.registerCoderForClass(Done.class, Done.CODER);
        registry.registerCoderForClass(BidsPerSession.class, BidsPerSession.CODER);
        break;
      case AVRO:
        registry.registerCoderProvider(AvroCoder.getCoderProvider());
        break;
      case JAVA:
        registry.registerCoderProvider(SerializableCoder.getCoderProvider());
        break;
    }
  }

  /** Return a generator config to match the given {@code options}. */
  private static GeneratorConfig standardGeneratorConfig(NexmarkConfiguration configuration) {
    return new GeneratorConfig(
        configuration,
        configuration.useWallclockEventTime ? System.currentTimeMillis() : BASE_TIME,
        0,
        configuration.numEvents,
        0);
  }

  /** Return an iterator of events using the 'standard' generator config. */
  public static Iterator<TimestampedValue<Event>> standardEventIterator(
      NexmarkConfiguration configuration) {
    return new Generator(standardGeneratorConfig(configuration));
  }

  /** Return a transform which yields a finite number of synthesized events generated as a batch. */
  public static PTransform<PBegin, PCollection<Event>> batchEventsSource(
      NexmarkConfiguration configuration) {
    return Read.from(
        new BoundedEventSource(
            standardGeneratorConfig(configuration), configuration.numEventGenerators));
  }

  /**
   * Return a transform which yields a finite number of synthesized events generated on-the-fly in
   * real time.
   */
  public static PTransform<PBegin, PCollection<Event>> streamEventsSource(
      NexmarkConfiguration configuration) {
    return Read.from(
        new UnboundedEventSource(
            NexmarkUtils.standardGeneratorConfig(configuration),
            configuration.numEventGenerators,
            configuration.watermarkHoldbackSec,
            configuration.isRateLimited));
  }

  /** Return a transform to pass-through events, but count them as they go by. */
  public static ParDo.SingleOutput<Event, Event> snoop(final String name) {
    return ParDo.of(
        new DoFn<Event, Event>() {
          final Counter eventCounter = Metrics.counter(name, "events");
          final Counter newPersonCounter = Metrics.counter(name, "newPersons");
          final Counter newAuctionCounter = Metrics.counter(name, "newAuctions");
          final Counter bidCounter = Metrics.counter(name, "bids");
          final Counter endOfStreamCounter = Metrics.counter(name, "endOfStream");

          @ProcessElement
          public void processElement(ProcessContext c) {
            eventCounter.inc();
            if (c.element().newPerson != null) {
              newPersonCounter.inc();
            } else if (c.element().newAuction != null) {
              newAuctionCounter.inc();
            } else if (c.element().bid != null) {
              bidCounter.inc();
            } else {
              endOfStreamCounter.inc();
            }
            info("%s snooping element %s", name, c.element());
            c.output(c.element());
          }
        });
  }

  /** Return a transform to count and discard each element. */
  public static <T> ParDo.SingleOutput<T, Void> devNull(final String name) {
    return ParDo.of(
        new DoFn<T, Void>() {
          final Counter discardedCounterMetric = Metrics.counter(name, "discarded");

          @ProcessElement
          public void processElement(ProcessContext c) {
            discardedCounterMetric.inc();
          }
        });
  }

  /** Return a transform to log each element, passing it through unchanged. */
  public static <T> ParDo.SingleOutput<T, T> log(final String name) {
    return ParDo.of(
        new DoFn<T, T>() {
          @ProcessElement
          public void processElement(ProcessContext c) {
            LOG.info("%s: %s", name, c.element());
            c.output(c.element());
          }
        });
  }

  /** Return a transform to format each element as a string. */
  public static <T> ParDo.SingleOutput<T, String> format(final String name) {
    return ParDo.of(
        new DoFn<T, String>() {
          final Counter recordCounterMetric = Metrics.counter(name, "records");

          @ProcessElement
          public void processElement(ProcessContext c) {
            recordCounterMetric.inc();
            c.output(c.element().toString());
          }
        });
  }

  /** Return a transform to make explicit the timestamp of each element. */
  public static <T> ParDo.SingleOutput<T, TimestampedValue<T>> stamp(String name) {
    return ParDo.of(
        new DoFn<T, TimestampedValue<T>>() {
          @ProcessElement
          public void processElement(ProcessContext c) {
            c.output(TimestampedValue.of(c.element(), c.timestamp()));
          }
        });
  }

  /** Return a transform to reduce a stream to a single, order-invariant long hash. */
  public static <T> PTransform<PCollection<T>, PCollection<Long>> hash(
      final long numEvents, String name) {
    return new PTransform<PCollection<T>, PCollection<Long>>(name) {
      @Override
      public PCollection<Long> expand(PCollection<T> input) {
        return input
            .apply(
                Window.<T>into(new GlobalWindows())
                    .triggering(AfterPane.elementCountAtLeast((int) numEvents))
                    .withAllowedLateness(Duration.standardDays(1))
                    .discardingFiredPanes())
            .apply(
                name + ".Hash",
                ParDo.of(
                    new DoFn<T, Long>() {
                      @ProcessElement
                      public void processElement(ProcessContext c) {
                        long hash =
                            Hashing.murmur3_128()
                                .newHasher()
                                .putLong(c.timestamp().getMillis())
                                .putString(c.element().toString(), StandardCharsets.UTF_8)
                                .hash()
                                .asLong();
                        c.output(hash);
                      }
                    }))
            .apply(
                Combine.globally(
                    new Combine.BinaryCombineFn<Long>() {
                      @Override
                      public Long apply(Long left, Long right) {
                        return left ^ right;
                      }
                    }));
      }
    };
  }

  private static final long MASK = (1L << 16) - 1L;
  private static final long HASH = 0x243F6A8885A308D3L;
  private static final long INIT_PLAINTEXT = 50000L;

  /** Return a transform to keep the CPU busy for given milliseconds on every record. */
  public static <T> ParDo.SingleOutput<T, T> cpuDelay(String name, final long delayMs) {
    return ParDo.of(
        new DoFn<T, T>() {
          @ProcessElement
          public void processElement(ProcessContext c) {
            long now = System.currentTimeMillis();
            long end = now + delayMs;
            while (now < end) {
              // Find plaintext which hashes to HASH in lowest MASK bits.
              // Values chosen to roughly take 1ms on typical workstation.
              long p = INIT_PLAINTEXT;
              while (true) {
                long t = Hashing.murmur3_128().hashLong(p).asLong();
                if ((t & MASK) == (HASH & MASK)) {
                  break;
                }
                p++;
              }
              now = System.currentTimeMillis();
            }
            c.output(c.element());
          }
        });
  }

  private static final int MAX_BUFFER_SIZE = 1 << 24;

  private static class DiskBusyTransform<T> extends PTransform<PCollection<T>, PCollection<T>> {

    private long bytes;

    private DiskBusyTransform(long bytes) {
      this.bytes = bytes;
    }

    @Override
    public PCollection<T> expand(PCollection<T> input) {
      // Add dummy key to be able to use State API
      PCollection<KV<Integer, T>> kvCollection =
          input.apply(
              "diskBusy.keyElements",
              ParDo.of(
                  new DoFn<T, KV<Integer, T>>() {

                    @ProcessElement
                    public void processElement(ProcessContext context) {
                      context.output(KV.of(0, context.element()));
                    }
                  }));
      // Apply actual transform that generates disk IO using state API
      return kvCollection.apply(
          "diskBusy.generateIO",
          ParDo.of(
              new DoFn<KV<Integer, T>, T>() {

                private static final String DISK_BUSY = "diskBusy";

                @StateId(DISK_BUSY)
                private final StateSpec<ValueState<byte[]>> spec =
                    StateSpecs.value(ByteArrayCoder.of());

                @ProcessElement
                public void processElement(
                    ProcessContext c, @StateId(DISK_BUSY) ValueState<byte[]> state) {
                  long remain = bytes;
                  long now = System.currentTimeMillis();
                  while (remain > 0) {
                    long thisBytes = Math.min(remain, MAX_BUFFER_SIZE);
                    remain -= thisBytes;
                    byte[] arr = new byte[(int) thisBytes];
                    for (int i = 0; i < thisBytes; i++) {
                      arr[i] = (byte) now;
                    }
                    state.write(arr);
                    now = System.currentTimeMillis();
                  }
                  c.output(c.element().getValue());
                }
              }));
    }
  }

  /** Return a transform to write given number of bytes to durable store on every record. */
  public static <T> PTransform<PCollection<T>, PCollection<T>> diskBusy(final long bytes) {
    return new DiskBusyTransform<>(bytes);
  }

  /** Return a transform to cast each element to {@link KnownSize}. */
  private static <T extends KnownSize> ParDo.SingleOutput<T, KnownSize> castToKnownSize() {
    return ParDo.of(
        new DoFn<T, KnownSize>() {
          @ProcessElement
          public void processElement(ProcessContext c) {
            c.output(c.element());
          }
        });
  }

  private static class GenerateSideInputData
      extends PTransform<PBegin, PCollection<KV<Long, String>>> {

    private final NexmarkConfiguration config;

    private GenerateSideInputData(NexmarkConfiguration config) {
      this.config = config;
    }

    @Override
    public PCollection<KV<Long, String>> expand(PBegin input) {
      return input
          .apply(GenerateSequence.from(0).to(config.sideInputRowCount))
          .apply(
              MapElements.via(
                  new SimpleFunction<Long, KV<Long, String>>() {
                    @Override
                    public KV<Long, String> apply(Long input) {
                      return KV.of(input, String.valueOf(input));
                    }
                  }));
    }
  }

  /**
   * Write data to be read as a side input.
   *
   * <p>Contains pairs of a number and its string representation to model lookups of some enrichment
   * data by id.
   *
   * <p>Generated data covers the range {@code [0, sideInputRowCount)} so lookup joins on any
   * desired id field can be modeled by looking up {@code id % sideInputRowCount}.
   */
  public static PCollection<KV<Long, String>> prepareSideInput(
      Pipeline queryPipeline, NexmarkConfiguration config) {

    checkArgument(
        config.sideInputRowCount > 0, "Side input required but sideInputRowCount is not >0");

    PTransform<PBegin, PCollection<KV<Long, String>>> generateSideInputData =
        new GenerateSideInputData(config);

    switch (config.sideInputType) {
      case DIRECT:
        return queryPipeline.apply(generateSideInputData);
      case CSV:
        checkArgument(
            config.sideInputUrl != null,
            "Side input type %s requires a URL but sideInputUrl not specified",
            SideInputType.CSV.toString());

        checkArgument(
            config.sideInputNumShards > 0,
            "Side input type %s requires explicit numShards but sideInputNumShards not specified",
            SideInputType.CSV.toString());

        Pipeline tempPipeline = Pipeline.create();
        tempPipeline
            .apply(generateSideInputData)
            .apply(
                MapElements.via(
                    new SimpleFunction<KV<Long, String>, String>(
                        kv -> String.format("%s,%s", kv.getKey(), kv.getValue())) {}))
            .apply(TextIO.write().withNumShards(config.sideInputNumShards).to(config.sideInputUrl));
        tempPipeline.run().waitUntilFinish();

        return queryPipeline
            .apply(TextIO.read().from(config.sideInputUrl + "*"))
            .apply(
                MapElements.via(
                    new SimpleFunction<String, KV<Long, String>>(
                        line -> {
                          List<String> cols = ImmutableList.copyOf(Splitter.on(",").split(line));
                          return KV.of(Long.valueOf(cols.get(0)), cols.get(1));
                        }) {}));
      default:
        throw new IllegalArgumentException(
            String.format("Unknown type of side input requested: %s", config.sideInputType));
    }
  }

  /** Frees any resources used to make the side input available. */
  public static void cleanUpSideInput(NexmarkConfiguration config) throws IOException {
    switch (config.sideInputType) {
      case DIRECT:
        break;
      case CSV:
        FileSystems.delete(
            FileSystems.match(config.sideInputUrl + "*").metadata().stream()
                .map(metadata -> metadata.resourceId())
                .collect(Collectors.toList()));
        break;
      default:
        throw new IllegalArgumentException(
            String.format(
                "Unknown type of %s clean up requested", SideInputType.class.getSimpleName()));
    }
  }

  /**
   * A coder for instances of {@code T} cast up to {@link KnownSize}.
   *
   * @param <T> True type of object.
   */
  private static class CastingCoder<T extends KnownSize> extends CustomCoder<KnownSize> {
    private final Coder<T> trueCoder;

    public CastingCoder(Coder<T> trueCoder) {
      this.trueCoder = trueCoder;
    }

    @Override
    public void encode(KnownSize value, OutputStream outStream) throws CoderException, IOException {
      @SuppressWarnings("unchecked")
      T typedValue = (T) value;
      trueCoder.encode(typedValue, outStream);
    }

    @Override
    public KnownSize decode(InputStream inStream) throws CoderException, IOException {
      return trueCoder.decode(inStream);
    }
  }

  /** Return a coder for {@code KnownSize} that are known to be exactly of type {@code T}. */
  private static <T extends KnownSize> Coder<KnownSize> makeCastingCoder(Coder<T> trueCoder) {
    return new CastingCoder<>(trueCoder);
  }

  /** Return {@code elements} as {@code KnownSize}s. */
  public static <T extends KnownSize> PCollection<KnownSize> castToKnownSize(
      final String name, PCollection<T> elements) {
    return elements
        .apply(name + ".Forget", castToKnownSize())
        .setCoder(makeCastingCoder(elements.getCoder()));
  }

  // Do not instantiate.
  private NexmarkUtils() {}
}
