/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.gearpump.streaming.appmaster

import akka.actor.{ExtendedActorSystem, Actor, ActorRef, Stash}
import akka.serialization.JavaSerializer
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.streaming.partitioner.PartitionerDescription
import org.apache.gearpump.streaming._
import org.apache.gearpump.streaming.appmaster.DagManager._
import org.apache.gearpump.streaming.storage.AppDataStore
import org.apache.gearpump.streaming.task.Subscriber
import org.apache.gearpump.util.{Graph, LogUtil}
import org.slf4j.Logger

import scala.concurrent.Future

/**
 * Handles dag modification and other stuff related with DAG
 *
 * DagManager maintains multiple version of DAGs. For each version, the DAG is immutable.
 * For operations like modifying a processor, it will create a new version of DAG.
 */
class DagManager(appId: Int, userConfig: UserConfig, store: AppDataStore, dag: Option[DAG])
  extends Actor with Stash {

  import context.dispatcher
  private val LOG: Logger = LogUtil.getLogger(getClass, app = appId)
  private val NOT_INITIALIZED = -1

  private var dags = List.empty[DAG]
  private var maxProcessorId = -1
  private implicit val system = context.system

  private var watchers = List.empty[ActorRef]
  private val serializer = new JavaSerializer(system.asInstanceOf[ExtendedActorSystem])

  override def receive: Receive = null

  override def preStart(): Unit = {
    LOG.info("Initializing Dag Service, get stored Dag ....")
    store.get(StreamApplication.DAG).asInstanceOf[Future[Array[Byte]]].foreach{ bytes =>
      if (bytes != null) {
        val storedDag = serializer.fromBinary(bytes).asInstanceOf[DAG]
        dags :+= storedDag
      } else {
        dags :+= dag.getOrElse(DAG(userConfig.getValue[Graph[ProcessorDescription,
          PartitionerDescription]](StreamApplication.DAG).get))
      }
      maxProcessorId = {
        val keys = dags.head.processors.keys
        if (keys.isEmpty) {
          0
        } else {
          keys.max
        }
      }
      self ! DagInitiated
    }
    context.become(waitForDagInitiate)
  }

  def waitForDagInitiate: Receive = {
    case DagInitiated =>
      unstashAll()
      context.become(dagService)
    case _ =>
      stash()
  }

  private def nextProcessorId: ProcessorId = {
    maxProcessorId += 1
    maxProcessorId
  }

  private def taskLaunchData(dag: DAG, processorId: Int, context: AnyRef): TaskLaunchData = {
    val processorDescription = dag.processors(processorId)
    val subscribers = Subscriber.of(processorId, dag)
    TaskLaunchData(processorDescription, subscribers, context)
  }

  def dagService: Receive = {
    case GetLatestDAG =>
      // Get the latest version of DAG.
      sender ! LatestDAG(dags.last)
    case GetTaskLaunchData(version, processorId, launchContext) =>
      // Task information like Processor class, downstream subscriber processors and etc.
      dags.find(_.version == version).foreach { dag =>
        LOG.info(s"Get task launcher data for processor: $processorId, dagVersion: $version")
        sender ! taskLaunchData(dag, processorId, launchContext)
      }
    case ReplaceProcessor(oldProcessorId, inputNewProcessor, inheritConfig) =>
      // Replace a processor with new implementation. The upstream processors and downstream
      // processors are NOT changed.
      var newProcessor = inputNewProcessor.copy(id = nextProcessorId)
      if (inputNewProcessor.jar == null) {
        val oldJar = dags.last.processors.get(oldProcessorId).get
        newProcessor = newProcessor.copy(jar = oldJar.jar)
      }

      if (inheritConfig) {
        val oldConf = dags.last.processors.get(oldProcessorId).get.taskConf
        newProcessor = newProcessor.copy(taskConf = oldConf)
      }

      if (dags.length > 1) {
        sender ! DAGOperationFailed(
          "We are in the process of handling previous dynamic dag change")
      } else {
        val oldDAG = dags.last
        val newVersion = oldDAG.version + 1
        val newDAG = replaceDAG(oldDAG, oldProcessorId, newProcessor, newVersion)
        dags :+= newDAG

        LOG.info(s"ReplaceProcessor old: $oldProcessorId, new: $newProcessor")
        LOG.info(s"new DAG: $newDAG")
        watchers.foreach(_ ! LatestDAG(newDAG))
        sender ! DAGOperationSuccess
      }

    case WatchChange(watcher) =>
      // Checks whether there are modifications for this DAG.
      if (!this.watchers.contains(watcher)) {
        this.watchers :+= watcher
      }

    case NewDAGDeployed(dagVersion) =>
      // Means dynamic Dag transition completed, and the new DAG version has been successfully
      // deployed. The obsolete dag versions will be removed.
      if (dagVersion != NOT_INITIALIZED) {
        dags = dags.filter(_.version == dagVersion)
        store.put(StreamApplication.DAG, serializer.toBinary(dags.last))
      }
  }

  private def replaceDAG(
      dag: DAG, oldProcessorId: ProcessorId, newProcessor: ProcessorDescription, newVersion: Int)
    : DAG = {
    val oldProcessorLife = LifeTime(dag.processors(oldProcessorId).life.birth,
      newProcessor.life.birth)

    val newProcessorMap = dag.processors ++
      Map(oldProcessorId -> dag.processors(oldProcessorId).copy(life = oldProcessorLife),
        newProcessor.id -> newProcessor)

    val newGraph = dag.graph.subGraph(oldProcessorId).
      replaceVertex(oldProcessorId, newProcessor.id).addGraph(dag.graph)
    new DAG(newVersion, newProcessorMap, newGraph)
  }
}

object DagManager {
  case object DagInitiated

  case class WatchChange(watcher: ActorRef)

  case object GetLatestDAG
  case class LatestDAG(dag: DAG)

  case class GetTaskLaunchData(dagVersion: Int, processorId: Int, context: AnyRef = null)
  case class TaskLaunchData(processorDescription : ProcessorDescription,
      subscribers: List[Subscriber], context: AnyRef = null)

  sealed trait DAGOperation

  case class ReplaceProcessor(oldProcessorId: ProcessorId,
      newProcessorDescription: ProcessorDescription, inheritConf: Boolean) extends DAGOperation

  sealed trait DAGOperationResult
  case object DAGOperationSuccess extends DAGOperationResult
  case class DAGOperationFailed(reason: String) extends DAGOperationResult

  case class NewDAGDeployed(dagVersion: Int)
}