/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */


package org.apache.heron.streamlet;

import java.io.Serializable;

import org.apache.heron.common.basics.ByteAmount;
import org.apache.heron.streamlet.impl.KryoSerializer;

/**
 * Config is the way users configure the execution of the topology.
 * Things like streamlet delivery semantics, resources used, as well as
 * user-defined key/value pairs are passed on to the topology runner via
 * this class.
 */
public final class Config implements Serializable {
  private static final long serialVersionUID = 6204498077403076352L;
  private final double cpu;
  private final ByteAmount ram;
  private final DeliverySemantics deliverySemantics;
  private final Serializer serializer;
  private org.apache.heron.api.Config heronConfig;
  private static final long MB = 1024 * 1024;
  private static final long GB = 1024 * MB;

  /**
   * An enum encapsulating the delivery semantics that can be applied to Heron topologies. The
   * options are currently: at most once, at least once, or effectively once.
   */
  public enum DeliverySemantics {
    ATMOST_ONCE,
    ATLEAST_ONCE,
    EFFECTIVELY_ONCE
  }

  /**
   * An enum encapsulating the serializers that can be used for data in the topology. The options
   * are currently: the Kryo serializer or the native Java serializer.
   */
  public enum Serializer {
    JAVA,
    KRYO
  }

  private static class Defaults {
    static final boolean USE_KRYO = true;
    static final double CPU = -1.0;                             // -1 means undefined
    static final ByteAmount RAM = ByteAmount.fromBytes(-1);     // -1 means undefined
    static final DeliverySemantics SEMANTICS = DeliverySemantics.ATMOST_ONCE;
    static final Serializer SERIALIZER = Serializer.KRYO;
  }

  private Config(Builder builder) {
    serializer = builder.serializer;
    heronConfig = builder.config;
    cpu = builder.cpu;
    ram = builder.ram;
    deliverySemantics = builder.deliverySemantics;
  }

  /**
   * Sets the topology to use the default configuration: 100 megabytes of RAM per container, 1.0
   * CPUs per container, at-most-once delivery semantics, and the Kryo serializer.
   */
  public static Config defaultConfig() {
    return new Builder()
        .build();
  }

  /**
   * Returns a new {@link Builder} that can be used to create a configuration object for Streamlet
   * API topologies
   */
  public static Builder newBuilder() {
    return new Builder();
  }

  public org.apache.heron.api.Config getHeronConfig() {
    return heronConfig;
  }

  /**
   * Gets the CPU used per topology container
   * @return the per-container CPU as a double
   */
  public double getPerContainerCpu() {
    return cpu;
  }

  /**
   * Gets the RAM used per topology container as a number of bytes
   * @return the per-container RAM in bytes
   */
  public long getPerContainerRam() {
    return getPerContainerRamAsBytes();
  }

  /**
   * Gets the RAM used per topology container as a number of bytes
   * @return the per-container RAM in bytes
   */
  public long getPerContainerRamAsBytes() {
    return ram.asBytes();
  }

  /**
   * Gets the RAM used per topology container as a number of megabytes
   * @return the per-container RAM in megabytes
   */
  public long getPerContainerRamAsMegabytes() {
    return ram.asMegabytes();
  }

  /**
   * Gets the RAM used per topology container as a number of gigabytes
   * @return the per-container RAM in gigabytes
   */
  public long getPerContainerRamAsGigabytes() {
    return ram.asGigabytes();
  }

  /**
   * Gets the delivery semantics applied to the topology
   * @return the delivery semantics as an enum
   */
  public DeliverySemantics getDeliverySemantics() {
    return deliverySemantics;
  }

  /**
   * Gets the serializer used by the topology
   * @return the serializer as an enum
   */
  public Serializer getSerializer() {
    return serializer;
  }

  private static org.apache.heron.api.Config.TopologyReliabilityMode translateSemantics(
      DeliverySemantics semantics) {
    switch (semantics) {
      case ATMOST_ONCE:
        return org.apache.heron.api.Config.TopologyReliabilityMode.ATMOST_ONCE;
      case ATLEAST_ONCE:
        return org.apache.heron.api.Config.TopologyReliabilityMode.ATLEAST_ONCE;
      case EFFECTIVELY_ONCE:
        return org.apache.heron.api.Config.TopologyReliabilityMode.EFFECTIVELY_ONCE;
      default:
        return org.apache.heron.api.Config.TopologyReliabilityMode.ATMOST_ONCE;
    }
  }

  public static final class Builder {
    private org.apache.heron.api.Config config;
    private double cpu;
    private ByteAmount ram;
    private DeliverySemantics deliverySemantics;
    private Serializer serializer;

    private Builder() {
      config = new org.apache.heron.api.Config();
      cpu = Defaults.CPU;
      ram = Defaults.RAM;
      deliverySemantics = Defaults.SEMANTICS;
      serializer = Serializer.KRYO;
    }

    /**
     * Sets the per-container (per-instance) CPU to be used by this topology
     * @param perContainerCpu Per-container (per-instance) CPU as a double
     */
    public Builder setPerContainerCpu(double perContainerCpu) {
      this.cpu = perContainerCpu;
      // Different packing algorithm might use different configs. Set all of them here.
      config.setContainerCpuRequested(perContainerCpu);
      config.setContainerMaxCpuHint(perContainerCpu);
      return this;
    }

    /**
     * Sets the per-container (per-instance) RAM to be used by this topology
     * @param perContainerRam Per-container (per-instance) RAM expressed as a Long.
     */
    public Builder setPerContainerRam(long perContainerRam) {
      return setPerContainerRamInBytes(perContainerRam);
    }

    /**
     * Sets the per-container (per-instance) RAM to be used by this topology as a number of bytes
     * @param perContainerRam Per-container (per-instance) RAM expressed as a Long.
     */
    public Builder setPerContainerRamInBytes(long perContainerRam) {
      this.ram = ByteAmount.fromBytes(perContainerRam);
      // Different packing algorithm might use different configs. Set all of them here.
      config.setContainerRamRequested(ram);
      config.setContainerMaxRamHint(ram);
      return this;
    }

    /**
     * Sets the per-container (per-instance) RAM to be used by this topology in megabytes
     * @param perContainerRamMB Per-container (per-instance) RAM expressed as a Long.
     */
    public Builder setPerContainerRamInMegabytes(long perContainerRamMB) {
      return setPerContainerRam(perContainerRamMB * MB);
    }

    /**
     * Sets the per-container (per-instance) RAM to be used by this topology in gigabytes
     * @param perContainerRamGB Per-container (per-instance) RAM expressed as a Long.
     */
    public Builder setPerContainerRamInGigabytes(long perContainerRamGB) {
      return setPerContainerRam(perContainerRamGB * GB);
    }

    /**
     * Sets the number of containers to run this topology
     * @param numContainers The number of containers across which to distribute this topology
     */
    public Builder setNumContainers(int numContainers) {
      config.setNumStmgrs(numContainers);
      return this;
    }

    /**
     * Sets the delivery semantics of the topology
     * @param semantics The delivery semantic to be enforced
     */
    public Builder setDeliverySemantics(DeliverySemantics semantics) {
      this.deliverySemantics = semantics;
      config.setTopologyReliabilityMode(Config.translateSemantics(semantics));
      return this;
    }

    /**
     * Sets some user-defined key/value mapping
     * @param key The user-defined key
     * @param value The user-defined value
     */
    public Builder setUserConfig(String key, Object value) {
      config.put(key, value);
      return this;
    }

    private void useKryo() {
      try {
        config.setSerializationClassName(KryoSerializer.class.getName());
      } catch (NoClassDefFoundError e) {
        throw new RuntimeException("Linking with kryo is needed because useKryoSerializer is used");
      }
    }

    /**
     * Sets the {@link Serializer} to be used by the topology (current options are {@link
     * KryoSerializer} and the native Java serializer.
     * @param topologySerializer The data serializer to use for streamlet elements in the topology.
     */
    public Builder setSerializer(Serializer topologySerializer) {
      this.serializer = topologySerializer;
      return this;
    }

    public Config build() {
      if (serializer.equals(Serializer.KRYO)) {
        useKryo();
      }
      return new Config(this);
    }
  }
}
