/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.gearpump.experiments.yarn.client

import java.io.{ByteArrayInputStream, ByteArrayOutputStream, InputStream, OutputStream}
import java.util.Random
import java.util.zip.{ZipEntry, ZipOutputStream}

import akka.actor.ActorSystem
import akka.testkit.TestProbe
import com.typesafe.config.ConfigFactory
import org.apache.gearpump.cluster.TestUtil
import org.apache.gearpump.experiments.yarn.appmaster.YarnAppMaster.{ActiveConfig, GetActiveConfig}
import org.apache.gearpump.experiments.yarn.glue.Records._
import org.apache.gearpump.experiments.yarn.glue.{FileSystem, YarnClient, YarnConfig}
import org.apache.gearpump.util.FileUtils
import org.mockito.Matchers._
import org.mockito.Mockito._
import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}

import scala.concurrent.Await
import scala.concurrent.duration.Duration
import scala.util.Try
class LaunchClusterSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
  implicit var system: ActorSystem = null

  val rand = new Random()

  private def randomArray(size: Int): Array[Byte] = {
    val array = new Array[Byte](size)
    rand.nextBytes(array)
    array
  }
  val appId = ApplicationId.newInstance(0L, 0)

  val akka = ConfigFactory.parseString(

    """
      |gearpump {
      |  yarn {
      |    client {
      |      package -path = "/user/gearpump/gearpump.zip"
      |    }
      |
      |    applicationmaster {
      |      ## Memory of YarnAppMaster
      |        command = "$JAVA_HOME/bin/java -Xmx512m"
      |      memory = "512"
      |      vcores = "1"
      |      queue = "default"
      |    }
      |
      |    master {
      |      ## Memory of master daemon
      |      command = "$JAVA_HOME/bin/java  -Xmx512m"
      |      containers = "2"
      |      memory = "512"
      |      vcores = "1"
      |    }
      |
      |    worker {
      |      ## memory of worker daemon
      |      command = "$JAVA_HOME/bin/java  -Xmx512m"
      |      containers = "4"
      |      ## This also contains all memory for child executors.
      |      memory = "4096"
      |      vcores = "1"
      |    }
      |    services {
      |      enabled = true
      |    }
      |  }
      |}
    """.stripMargin).withFallback(TestUtil.DEFAULT_CONFIG)

  override def beforeAll(): Unit = {
    system = ActorSystem(getClass.getSimpleName, akka)
  }

  override def afterAll(): Unit = {
    system.terminate()
    Await.result(system.whenTerminated, Duration.Inf)
  }

  it should "reject non-zip files" in {
    val yarnConfig = mock(classOf[YarnConfig])
    val yarnClient = mock(classOf[YarnClient])
    val fs = mock(classOf[FileSystem])
    val appMasterResolver = mock(classOf[AppMasterResolver])

    val launcher = new LaunchCluster(akka, yarnConfig, yarnClient, fs, system, appMasterResolver)
    val packagePath = "gearpump.zip2"
    assert(Try(launcher.submit("gearpump", packagePath)).isFailure)
  }

  it should "reject if we cannot find the package file on HDFS" in {
    val yarnConfig = mock(classOf[YarnConfig])
    val yarnClient = mock(classOf[YarnClient])
    val fs = mock(classOf[FileSystem])
    val appMasterResolver = mock(classOf[AppMasterResolver])

    val launcher = new LaunchCluster(akka, yarnConfig, yarnClient, fs, system, appMasterResolver)
    val packagePath = "gearpump.zip"
    when(fs.exists(anyString)).thenReturn(false)
    assert(Try(launcher.submit("gearpump", packagePath)).isFailure)
  }

  it should "throw when package exists on HDFS, but the file is corrupted" in {
    val yarnConfig = mock(classOf[YarnConfig])
    val yarnClient = mock(classOf[YarnClient])
    val fs = mock(classOf[FileSystem])
    val appMasterResolver = mock(classOf[AppMasterResolver])

    val launcher = new LaunchCluster(akka, yarnConfig, yarnClient, fs, system, appMasterResolver)
    val packagePath = "gearpump.zip"
    when(fs.exists(anyString)).thenReturn(true)

    val content = new ByteArrayInputStream(randomArray(10))
    when(fs.open(anyString)).thenReturn(content)
    assert(Try(launcher.submit("gearpump", packagePath)).isFailure)
    content.close()
  }

  it should "throw when the HDFS package version is not consistent with local version" in {
    val yarnConfig = mock(classOf[YarnConfig])
    val yarnClient = mock(classOf[YarnClient])
    val fs = mock(classOf[FileSystem])
    val appMasterResolver = mock(classOf[AppMasterResolver])

    val version = "gearpump-0.2"
    val launcher = new LaunchCluster(akka, yarnConfig, yarnClient, fs, system,
      appMasterResolver, version)
    val packagePath = "gearpump.zip"
    when(fs.exists(anyString)).thenReturn(true)

    val oldVesion = "gearpump-0.1"
    when(fs.open(anyString)).thenReturn(zipInputStream(oldVesion))
    assert(Try(launcher.submit("gearpump", packagePath)).isFailure)
  }

  it should "upload config file to HDFS when submitting" in {
    val yarnConfig = mock(classOf[YarnConfig])
    val yarnClient = mock(classOf[YarnClient])
    val fs = mock(classOf[FileSystem])
    val appMasterResolver = mock(classOf[AppMasterResolver])

    val version = "gearpump-0.2"
    val launcher = new LaunchCluster(akka, yarnConfig, yarnClient,
      fs, system, appMasterResolver, version)
    val packagePath = "gearpump.zip"

    val out = mock(classOf[OutputStream])
    when(fs.exists(anyString)).thenReturn(true)
    when(fs.create(anyString)).thenReturn(out)
    when(fs.getHomeDirectory).thenReturn("/root")

    when(fs.open(anyString)).thenReturn(zipInputStream(version))

    val report = mock(classOf[ApplicationReport])
    when(yarnClient.awaitApplication(any[ApplicationId], anyLong())).thenReturn(report)

    when(report.getApplicationId).thenReturn(appId)
    when(yarnClient.createApplication).thenReturn(appId)
    assert(appId == launcher.submit("gearpump", packagePath))

    // 3 Config files are uploaded to HDFS, one is akka.conf,
    // one is yarn-site.xml, one is log4j.properties.
    verify(fs, times(3)).create(anyString)
    verify(out, times(3)).close()

    // val workerResources = ArgumentCaptor.forClass(classOf[List[Resource]])
    // scalastyle:off line.size.limit
    val expectedCommand = "$JAVA_HOME/bin/java -Xmx512m -noverify -cp conf:pack/gearpump-0.2/conf:pack/gearpump-0.2/dashboard:pack/gearpump-0.2/lib/*:pack/gearpump-0.2/lib/daemon/*:pack/gearpump-0.2/lib/services/*:pack/gearpump-0.2/lib/yarn/*:$CLASSPATH -Dgearpump.home={{LOCAL_DIRS}}/{{CONTAINER_ID}}/pack/gearpump-0.2 -Dgearpump.binary-version-with-scala-version=gearpump-0.2 -Dgearpump.log.daemon.dir=<LOG_DIR> -Dgearpump.log.application.dir=<LOG_DIR> -Dgearpump.hostname={{NM_HOST}} org.apache.gearpump.experiments.yarn.appmaster.YarnAppMaster  -conf /root/.gearpump_application_0_0000/conf/ -package gearpump.zip 2>&1 | /usr/bin/tee -a <LOG_DIR>/stderr"
    // scalastyle:on line.size.limit
    verify(yarnClient).submit("gearpump", appId, expectedCommand,
      Resource.newInstance(512, 1), "default",
      "gearpump.zip", "/root/.gearpump_application_0_0000/conf/")
  }

  it should "save active config from Gearpump cluster" in {
    val yarnConfig = mock(classOf[YarnConfig])
    val yarnClient = mock(classOf[YarnClient])
    val fs = mock(classOf[FileSystem])
    val appMasterResolver = mock(classOf[AppMasterResolver])
    val appMaster = TestProbe()

    val version = "gearpump-0.2"
    val launcher = new LaunchCluster(akka, yarnConfig, yarnClient, fs, system,
      appMasterResolver, version)
    val outputPath = java.io.File.createTempFile("LaunchClusterSpec", ".conf")

    when(appMasterResolver.resolve(any[ApplicationId], anyInt)).thenReturn(appMaster.ref)
    val fileFuture = launcher.saveConfig(appId, outputPath.getPath)
    appMaster.expectMsgType[GetActiveConfig]
    appMaster.reply(ActiveConfig(ConfigFactory.empty()))

    import scala.concurrent.duration._
    val file = Await.result(fileFuture, 30.seconds).asInstanceOf[java.io.File]

    assert(!FileUtils.read(file).isEmpty)
    file.delete()
  }

  private def zipInputStream(version: String): InputStream = {
    val bytes = new ByteArrayOutputStream(1000)
    val zipOut = new ZipOutputStream(bytes)

    // Not available on BufferedOutputStream
    zipOut.putNextEntry(new ZipEntry(s"$version/README.md"))
    zipOut.write("README".getBytes())
    // Not available on BufferedOutputStream
    zipOut.closeEntry()
    zipOut.close()
    new ByteArrayInputStream(bytes.toByteArray)
  }
}