/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.heron.streamlet.scala.impl

import java.util.{Map => JMap}
import java.util.{HashMap => JHashMap}

import scala.collection.JavaConverters

import org.apache.heron.api.grouping.StreamGrouping
import org.apache.heron.streamlet.{
  IStreamletOperator,
  JoinType,
  KeyValue,
  KeyedWindow,
  SerializablePredicate,
  Streamlet => JavaStreamlet,
  WindowConfig
}
import org.apache.heron.streamlet.impl.{StreamletImpl => JavaStreamletImpl}
import org.apache.heron.streamlet.impl.streamlets.SupplierStreamlet

import org.apache.heron.streamlet.scala.{
  SerializableTransformer,
  Sink,
  Streamlet
}
import org.apache.heron.streamlet.scala.converter.ScalaToJavaConverter._

object StreamletImpl {

  def fromJavaStreamlet[R](javaStreamlet: JavaStreamlet[R]): Streamlet[R] =
    new StreamletImpl[R](javaStreamlet)

  def toJavaStreamlet[R](streamlet: Streamlet[R]): JavaStreamlet[R] =
    streamlet.asInstanceOf[StreamletImpl[R]].javaStreamlet
}

/**
 * This class provides Scala Streamlet Implementation by wrapping Java Streamlet API.
 * Passed User defined Scala Functions are transformed to related FunctionalInterface versions and
 * related Java Streamlet is transformed to Scala version again.
 */
class StreamletImpl[R](val javaStreamlet: JavaStreamlet[R])
    extends Streamlet[R] {

  import StreamletImpl._

  /**
    * Sets the name of the Streamlet.
    *
    * @param sName The name given by the user for this Streamlet
    * @return Returns back the Streamlet with changed name
    */
  override def setName(sName: String): Streamlet[R] =
    fromJavaStreamlet[R](javaStreamlet.setName(sName))

  /**
    * Gets the name of the Streamlet.
    *
    * @return Returns the name of the Streamlet
    */
  override def getName(): String = javaStreamlet.getName

  /**
    * Sets the number of partitions of the streamlet
    *
    * @param numPartitions The user assigned number of partitions
    * @return Returns back the Streamlet with changed number of partitions
    */
  override def setNumPartitions(numPartitions: Int): Streamlet[R] =
    fromJavaStreamlet[R](javaStreamlet.setNumPartitions(numPartitions))

  /**
    * Gets the number of partitions of this Streamlet.
    *
    * @return the number of partitions of this Streamlet
    */
  override def getNumPartitions(): Int = javaStreamlet.getNumPartitions

  /**
   * Set the id of the stream to be used by the children nodes.
   * Usage (assuming source is a Streamlet object with two output streams: stream1 and stream2):
   *   source.withStream("stream1").filter(...).log();
   *   source.withStream("stream2").filter(...).log();
   * @param streamId The specified stream id
   * @return Returns back the Streamlet with changed stream id
   */
  override def withStream(streamId: String): Streamlet[R] =
    fromJavaStreamlet[R](javaStreamlet.withStream(streamId))

  /**
   * Gets the stream id of this Streamlet.
   * @return the stream id of this Streamlet
   */
  override def getStreamId(): String = javaStreamlet.getStreamId

  /**
    * Return a new Streamlet by applying mapFn to each element of this Streamlet
    *
    * @param mapFn The Map Function that should be applied to each element
    */
  override def map[T](mapFn: R => T): Streamlet[T] = {
    val serializableFunction = toSerializableFunction[R, T](mapFn)
    val newJavaStreamlet = javaStreamlet.map[T](serializableFunction)
    fromJavaStreamlet[T](newJavaStreamlet)
  }

  /**
    * Return a new Streamlet by applying flatMapFn to each element of this Streamlet and
    * flattening the result
    *
    * @param flatMapFn The FlatMap Function that should be applied to each element
    */
  override def flatMap[T](flatMapFn: R => Iterable[_ <: T]): Streamlet[T] = {
    val serializableFunction =
      toSerializableFunctionWithIterable[R, T](flatMapFn)
    val newJavaStreamlet = javaStreamlet.flatMap[T](serializableFunction)
    fromJavaStreamlet[T](newJavaStreamlet)
  }

  /**
    * Return a new Streamlet by applying the filterFn on each element of this streamlet
    * and including only those elements that satisfy the filterFn
    *
    * @param filterFn The filter Function that should be applied to each element
    */
  override def filter(filterFn: R => Boolean): Streamlet[R] = {
    val serializablePredicate = toSerializablePredicate[R](filterFn)
    val newJavaStreamlet = javaStreamlet.filter(serializablePredicate)
    fromJavaStreamlet[R](newJavaStreamlet)
  }

  /**
    * Same as filter(filterFn).setNumPartitions(nPartitions) where filterFn is identity
    */
  override def repartition(numPartitions: Int): Streamlet[R] = {
    val newJavaStreamlet = javaStreamlet.repartition(numPartitions)
    fromJavaStreamlet[R](newJavaStreamlet)
  }

  /**
    * A more generalized version of repartition where a user can determine which partitions
    * any particular tuple should go to. For each element of the current streamlet, the user
    * supplied partitionFn is invoked passing in the element as the first argument. The second
    * argument is the number of partitions of the downstream streamlet. The partitionFn should
    * return 0 or more unique numbers between 0 and n partitions to indicate which partitions
    * this element should be routed to.
    */
  override def repartition(numPartitions: Int,
                           partitionFn: (R, Int) => Seq[Int]): Streamlet[R] = {
    val partitionFunction = toSerializableBiFunctionWithSeq[R](partitionFn)
    val newJavaStreamlet =
      javaStreamlet.repartition(numPartitions, partitionFunction)
    fromJavaStreamlet[R](newJavaStreamlet)
  }

  /**
    * Clones the current Streamlet. It returns an array of numClones Streamlets where each
    * Streamlet contains all the tuples of the current Streamlet
    *
    * @param numClones The number of clones to clone
    */
  override def clone(numClones: Int): Seq[Streamlet[R]] = {
    val javaClonedStreamlets = javaStreamlet.clone(numClones)
    val javaClonedStreamletsAsScalaList = JavaConverters
      .asScalaBufferConverter(javaClonedStreamlets)
      .asScala
    javaClonedStreamletsAsScalaList.map(streamlet =>
      fromJavaStreamlet[R](streamlet))
  }

  /**
    * Return a new Streamlet by inner joining 'this streamlet with ‘other’ streamlet.
    * The join is done over elements accumulated over a time window defined by windowCfg.
    * The elements are compared using the thisKeyExtractor for this streamlet with the
    * otherKeyExtractor for the other streamlet. On each matching pair, the joinFunction is applied.
    *
    * @param other             The Streamlet that we are joining with.
    * @param thisKeyExtractor  The function applied to a tuple of this streamlet to get the key
    * @param otherKeyExtractor The function applied to a tuple of the other streamlet to get the key
    * @param windowCfg         This is a specification of what kind of windowing strategy you like to
    * have. Typical windowing strategies are sliding windows and tumbling windows
    * @param joinFunction      The join function that needs to be applied
    */
  override def join[K, S, T](
      other: Streamlet[S],
      thisKeyExtractor: R => K,
      otherKeyExtractor: S => K,
      windowCfg: WindowConfig,
      joinFunction: (R, S) => T): Streamlet[KeyValue[KeyedWindow[K], T]] = {
    val javaOtherStreamlet = toJavaStreamlet[S](other)
    val javaThisKeyExtractor = toSerializableFunction[R, K](thisKeyExtractor)
    val javaOtherKeyExtractor = toSerializableFunction[S, K](otherKeyExtractor)
    val javaJoinFunction = toSerializableBiFunction[R, S, T](joinFunction)

    val newJavaStreamlet = javaStreamlet.join[K, S, T](javaOtherStreamlet,
                                                       javaThisKeyExtractor,
                                                       javaOtherKeyExtractor,
                                                       windowCfg,
                                                       javaJoinFunction)
    fromJavaStreamlet[KeyValue[KeyedWindow[K], T]](newJavaStreamlet)
  }

  /**
    * Return a new KVStreamlet by joining 'this streamlet with ‘other’ streamlet. The type of joining
    * is declared by the joinType parameter.
    * The join is done over elements accumulated over a time window defined by windowCfg.
    * The elements are compared using the thisKeyExtractor for this streamlet with the
    * otherKeyExtractor for the other streamlet. On each matching pair, the joinFunction is applied.
    * Types of joins {@link JoinType}
    *
    * @param other             The Streamlet that we are joining with.
    * @param thisKeyExtractor  The function applied to a tuple of this streamlet to get the key
    * @param otherKeyExtractor The function applied to a tuple of the other streamlet to get the key
    * @param windowCfg         This is a specification of what kind of windowing strategy you like to
    * have. Typical windowing strategies are sliding windows and tumbling windows
    * @param joinType          Type of Join. Options { @link JoinType}
    * @param joinFunction      The join function that needs to be applied
    */
  override def join[K, S, T](
      other: Streamlet[S],
      thisKeyExtractor: R => K,
      otherKeyExtractor: S => K,
      windowCfg: WindowConfig,
      joinType: JoinType,
      joinFunction: (R, S) => T): Streamlet[KeyValue[KeyedWindow[K], T]] = {
    val javaOtherStreamlet = toJavaStreamlet[S](other)
    val javaThisKeyExtractor = toSerializableFunction[R, K](thisKeyExtractor)
    val javaOtherKeyExtractor = toSerializableFunction[S, K](otherKeyExtractor)
    val javaJoinFunction = toSerializableBiFunction[R, S, T](joinFunction)

    val newJavaStreamlet = javaStreamlet.join[K, S, T](javaOtherStreamlet,
                                                       javaThisKeyExtractor,
                                                       javaOtherKeyExtractor,
                                                       windowCfg,
                                                       joinType,
                                                       javaJoinFunction)
    fromJavaStreamlet[KeyValue[KeyedWindow[K], T]](newJavaStreamlet)
  }

  /**
    * Return a new Streamlet accumulating tuples of this streamlet over a Window defined by
    * windowCfg and applying reduceFn on those tuples.
    *
    * @param keyExtractor   The function applied to a tuple of this streamlet to get the key
    * @param valueExtractor The function applied to a tuple of this streamlet to extract the value
    *                       to be reduced on
    * @param windowCfg      This is a specification of what kind of windowing strategy you like to have.
    *                       Typical windowing strategies are sliding windows and tumbling windows
    * @param reduceFn       The reduce function that you want to apply to all the values of a key.
    */
  override def reduceByKeyAndWindow[K, V](
      keyExtractor: R => K,
      valueExtractor: R => V,
      windowCfg: WindowConfig,
      reduceFn: (V, V) => V): Streamlet[KeyValue[KeyedWindow[K], V]] = {
    val javaKeyExtractor = toSerializableFunction[R, K](keyExtractor)
    val javaValueExtractor = toSerializableFunction[R, V](valueExtractor)
    val javaReduceFunction = toSerializableBinaryOperator[V](reduceFn)

    val newJavaStreamlet = javaStreamlet.reduceByKeyAndWindow[K, V](
      javaKeyExtractor,
      javaValueExtractor,
      windowCfg,
      javaReduceFunction)
    fromJavaStreamlet[KeyValue[KeyedWindow[K], V]](newJavaStreamlet)
  }

  /**
    * Return a new Streamlet accumulating tuples of this streamlet over a Window defined by
    * windowCfg and applying reduceFn on those tuples. For each window, the value identity is used
    * as a initial value. All the matching tuples are reduced using reduceFn startin from this
    * initial value.
    *
    * @param keyExtractor The function applied to a tuple of this streamlet to get the key
    * @param windowCfg    This is a specification of what kind of windowing strategy you like to have.
    *                     Typical windowing strategies are sliding windows and tumbling windows
    * @param identity     The identity element is both the initial value inside the reduction window
    *                     and the default result if there are no elements in the window
    * @param reduceFn     The reduce function takes two parameters: a partial result of the reduction
    *                     and the next element of the stream. It returns a new partial result.
    */
  override def reduceByKeyAndWindow[K, T](
      keyExtractor: R => K,
      windowCfg: WindowConfig,
      identity: T,
      reduceFn: (T, R) => T): Streamlet[KeyValue[KeyedWindow[K], T]] = {
    val javaKeyExtractor = toSerializableFunction[R, K](keyExtractor)
    val javaReduceFunction = toSerializableBiFunction[T, R, T](reduceFn)

    val newJavaStreamlet = javaStreamlet.reduceByKeyAndWindow[K, T](
      javaKeyExtractor,
      windowCfg,
      identity,
      javaReduceFunction)
    fromJavaStreamlet[KeyValue[KeyedWindow[K], T]](newJavaStreamlet)
  }

  /**
    * Returns a new Streamlet that is the union of this and the ‘other’ streamlet. Essentially
    * the new streamlet will contain tuples belonging to both Streamlets
    */
  override def union(other: Streamlet[_ <: R]): Streamlet[R] = {
    val newJavaStreamlet = javaStreamlet.union(toJavaStreamlet(other))
    fromJavaStreamlet(newJavaStreamlet)
  }

  /**
    * Returns a  new Streamlet by applying the transformFunction on each element of this streamlet.
    * Before starting to cycle the transformFunction over the Streamlet, the open function is called.
    * This allows the transform Function to do any kind of initialization/loading, etc.
    *
    * @param serializableTransformer The transformation function to be applied
    * @param <                       T> The return type of the transform
    * @return Streamlet containing the output of the transformFunction
    */
  override def transform[T](
      serializableTransformer: SerializableTransformer[R, _ <: T])
    : Streamlet[T] = {
    val javaSerializableTransformer =
      toSerializableTransformer[R, T](serializableTransformer)
    val newJavaStreamlet =
      javaStreamlet.transform[T](javaSerializableTransformer)
    fromJavaStreamlet(newJavaStreamlet)
  }

  /**
   * Returns a new Streamlet by applying the operator on each element of this streamlet.
   * @param operator The operator to be applied
   * @param <T> The return type of the transform
   * @return Streamlet containing the output of the operation
   */
  override def applyOperator[T](operator: IStreamletOperator[R, T]): Streamlet[T] = {
    val newJavaStreamlet = javaStreamlet.applyOperator[T](operator)
    fromJavaStreamlet(newJavaStreamlet)
  }

  /**
   * Returns a new Streamlet by applying the operator on each element of this streamlet.
   * @param operator The operator to be applied
   * @param grouper The grouper to be applied with the operator
   * @param <T> The return type of the transform
   * @return Streamlet containing the output of the operation
   */
  override def applyOperator[T](operator: IStreamletOperator[R, T],
                                grouper: StreamGrouping): Streamlet[T] = {
    val newJavaStreamlet = javaStreamlet.applyOperator[T](operator, grouper)
    fromJavaStreamlet(newJavaStreamlet)
  }

  /*
   * Returns multiple streams by splitting incoming stream.
   * @param splitFns The Split Functions that test if the tuple should be emitted into each stream
   * Note that there could be 0 or multiple target stream ids
   */
  override def split(splitFns: Map[String, R => Boolean]): Streamlet[R] = {
    val javaSerializablePredicates: JMap[String, SerializablePredicate[R]] = new JHashMap()
    splitFns.foreach { case (key, func) =>
      javaSerializablePredicates.put(key, toSerializablePredicate[R](func))
    }
    val newJavaStreamlet = javaStreamlet.split(javaSerializablePredicates)
    fromJavaStreamlet[R](newJavaStreamlet)
  }

  /**
    * Logs every element of the streamlet using String.valueOf function
    * This is one of the sink functions in the sense that this operation returns void
    */
  override def log(): Unit = javaStreamlet.log()

  /**
    * Applies the consumer function to every element of the stream
    * This function does not return anything.
    *
    * @param consumer The user supplied consumer function that is invoked for each element
    *                 of this streamlet.
    */
  override def consume(consumer: R => Unit): Unit = {
    val serializableConsumer = toSerializableConsumer[R](consumer)
    javaStreamlet.consume(serializableConsumer)
  }

  /**
    * Applies the sink's put function to every element of the stream
    * This function does not return anything.
    *
    * @param sink The Sink whose put method consumes each element
    *             of this streamlet.
    */
  override def toSink(sink: Sink[R]): Unit = {
    val javaSink = toJavaSink[R](sink)
    javaStreamlet.toSink(javaSink)
  }

  /**
    * Gets all the children of this streamlet.
    * Children of a streamlet are streamlets that are resulting from transformations of elements of
    * this and potentially other streamlets.
    *
    * @return The kid streamlets
    */
  private[impl] def getChildren: List[JavaStreamletImpl[_]] = {
    import _root_.scala.collection.JavaConversions._
    val children =
      javaStreamlet
        .asInstanceOf[JavaStreamletImpl[_]]
        .getChildren
    children.toList
  }
}
