/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.gearpump.akkastream.scaladsl

import akka.stream.Attributes
import org.apache.gearpump.akkastream.module._
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.streaming.sink.DataSink
import org.apache.gearpump.streaming.source.DataSource
import org.apache.gearpump.streaming.task.Task
import org.reactivestreams.{Publisher, Subscriber}


object GearSource{

  /**
   * Construct a Source which accepts out of band input messages.
   *
   *                   [[SourceBridgeModule]] -> Sink
   *                          /
   *                         /
   *                        V
   *                materialize to [[Subscriber]]
   *                                   /|
   *                                  /
   *       upstream [[Publisher]] send out of band message
   *
   */
  def bridge[IN, OUT]: Source[OUT, Subscriber[IN]] = {
    val source = new Source(new DummySource[IN](Attributes.name("dummy"), Source.shape("dummy")))
    val flow = new Flow[IN, OUT, Subscriber[IN]](new SourceBridgeModule[IN, OUT]())
    source.viaMat(flow)(Keep.right)
  }

  /**
   * Construct a Source from Gearpump [[DataSource]].
   *
   *    [[SourceTaskModule]] -> downstream Sink
   *
   */
  def from[OUT](source: DataSource): Source[OUT, Unit] = {
    val taskSource = new Source[OUT, Unit](SourceTaskModule(source, UserConfig.empty))
    taskSource
  }

  /**
   * Construct a Source from Gearpump [[org.apache.gearpump.streaming.Processor]].
   *
   *    [[ProcessorModule]] -> downstream Sink
   *
   */
  def from[OUT](processor: Class[_ <: Task], conf: UserConfig): Source[OUT, Unit] = {
    val source = new Source(new DummySource[Unit](Attributes.name("dummy"), Source.shape("dummy")))
    val flow = Processor.apply[Unit, OUT](processor, conf)
    source.viaMat(flow)(Keep.right)
  }
}

object GearSink {

  /**
   * Construct a Sink which output messages to a out of band channel.
   *
   *   Souce ->   [[SinkBridgeModule]]
   *                    \
   *                     \|
   *         materialize to [[Publisher]]
   *                              \
   *                               \
   *                                \|
   *       send out of band message to downstream [[Subscriber]]
   *
   */
  def bridge[IN, OUT]: Sink[IN, Publisher[OUT]] = {
    val sink = new Sink(new DummySink[OUT](Attributes.name("dummy"), Sink.shape("dummy")))
    val flow = new Flow[IN, OUT, Publisher[OUT]](new SinkBridgeModule[IN, OUT]())
    flow.to(sink)
  }

  /**
   * Construct a Sink from Gearpump [[DataSink]].
   *
   *    Upstream Source -> [[SinkTaskModule]]
   *
   */
  def to[IN](sink: DataSink): Sink[IN, Unit] = {
    val taskSink = new Sink[IN, Unit](new SinkTaskModule(sink, UserConfig.empty))
    taskSink
  }

  /**
   * Construct a Sink from Gearpump [[org.apache.gearpump.streaming.Processor]].
   *
   *    Upstream Source -> [[ProcessorModule]]
   *
   */
  def to[IN](processor: Class[_ <: Task], conf: UserConfig): Sink[IN, Unit] = {
    val sink = new Sink(new DummySink[Unit](Attributes.name("dummy"), Sink.shape("dummy")))
    val flow = Processor.apply[IN, Unit](processor, conf)
    flow.to(sink)
  }
}

/**
 *
 * GroupBy will divide the main input stream to a set of sub-streams.
 * This is a work-around to bypass the limitation of official API Flow.groupBy
 *
 *
 * For example, to do a word count, we can write code like this:
 *
 * case class KV(key: String, value: String)
 * case class Count(key: String, count: Int)
 *
 * val flow: Flow[KV] = GroupBy[KV](foo).map{ kv =>
 *   Count(kv.key, 1)
 * }.fold(Count(null, 0)) {(base, add) =>
 *   Count(add.key, base.count + add.count)
 * }.log("count of current key")
 * .flatten()
 * .to(sink)
 *
 * map, fold will transform data on all sub-streams, If there are 10 groups,
 * then there will be 10 sub-streams, and for each sub-stream, there will be
 * a map and fold.
 *
 * flatten will collect all sub-stream into the main stream,
 *
 * sink will only operate on the main stream.
 *
 */
object GroupBy{
  def apply[T, Group](groupBy: T => Group): Flow[T, T, Unit] = {
    new Flow[T, T, Unit](new GroupByModule(groupBy))
  }
}

/**
 * Aggregate on the data.
 *
 * val flow = Reduce({(a: Int, b: Int) => a + b})
 *
 *
 */
object Reduce{
  def apply[T](reduce: (T, T) => T): Flow[T, T, Unit] = {
    new Flow[T, T, Unit](new ReduceModule(reduce))
  }
}


/**
 * Create a Flow by providing a Gearpump Processor class and configuration
 *
 *
 */
object Processor{
  def apply[In, Out](processor: Class[_ <: Task], conf: UserConfig): Flow[In, Out, Unit] = {
    new Flow[In, Out, Unit](new ProcessorModule[In, Out, Unit](processor, conf))
  }
}

object Implicits {

  /**
   * Help util to support reduce and groupBy
   */
  implicit class SourceOps[T, Mat](source: Source[T, Mat]) {

    // TODO It is named as groupBy2 to avoid conflict with built-in
    // groupBy. Eventually, we think the built-in groupBy should
    // be replace with this implementation.
    def groupBy2[Group](groupBy: T => Group): Source[T, Mat] = {
      val stage = GroupBy.apply(groupBy)
      source.via[T, Unit](stage)
    }


    def reduce(reduce: (T, T) => T): Source[T, Mat] = {
      val stage = Reduce.apply(reduce)
      source.via[T, Unit](stage)
    }

    def process[R](processor: Class[_ <: Task], conf: UserConfig): Source[R, Mat] = {
      val stage = Processor.apply[T, R](processor, conf)
      source.via(stage)
    }
  }

  /**
   * Help util to support reduce and groupBy
   */
  implicit class FlowOps[IN, OUT, Mat](flow: Flow[IN, OUT, Mat]) {
    def groupBy2[Group](groupBy: OUT => Group): Flow[IN, OUT, Mat] = {
      val stage = GroupBy.apply(groupBy)
      flow.via(stage)
    }

    def reduce(reduce: (OUT, OUT) => OUT): Flow[IN, OUT, Mat] = {
      val stage = Reduce.apply(reduce)
      flow.via(stage)
    }

    def process[R](processor: Class[_ <: Task], conf: UserConfig): Flow[IN, R, Mat] = {
      val stage = Processor.apply[OUT, R](processor, conf)
      flow.via(stage)
    }
  }

  /**
   * Help util to support groupByKey and sum
   */
  implicit class KVSourceOps[K, V, Mat](source: Source[(K, V), Mat]) {

    /**
     * if it is a KV Pair, we can group the KV pair by the key.
     * @return
     */
    def groupByKey: Source[(K, V), Mat] = {
      val stage = GroupBy.apply(getTupleKey[K, V])
      source.via(stage)
    }

    /**
     * do sum on values
     *
     * Before doing this, you need to do groupByKey to group same key together
     * , otherwise, it will do the sum no matter what current key is.
     *
     * @param numeric Numeric[V]
     * @return
     */
    def sumOnValue(implicit numeric: Numeric[V]): Source[(K, V), Mat] = {
      val stage = Reduce.apply(sumByKey[K, V](numeric))
      source.via(stage)
    }
  }

  /**
   * Help util to support groupByKey and sum
   */
  implicit class KVFlowOps[K, V, Mat](flow: Flow[(K, V), (K, V), Mat]) {

    /**
     * if it is a KV Pair, we can group the KV pair by the key.
     * @return
     */
    def groupByKey: Flow[(K, V), (K, V), Mat] = {
      val stage = GroupBy.apply(getTupleKey[K, V])
      flow.via(stage)
    }

    /**
     * do sum on values
     *
     * Before doing this, you need to do groupByKey to group same key together
     * , otherwise, it will do the sum no matter what current key is.
     *
     * @param numeric Numeric[V]
     * @return
     */
    def sumOnValue(implicit numeric: Numeric[V]): Flow[(K, V), (K, V), Mat] = {
      val stage = Reduce.apply(sumByKey[K, V](numeric))
      flow.via(stage)
    }
  }

  private def getTupleKey[K, V](tuple: Tuple2[K, V]): K = tuple._1

  private def sumByKey[K, V](numeric: Numeric[V]): (Tuple2[K, V], Tuple2[K, V]) => Tuple2[K, V] =
    (tuple1, tuple2) => Tuple2(tuple1._1, numeric.plus(tuple1._2, tuple2._2))
}
