blob: 3f2f9cbafa46694c903f487f1fd4f3562bf89c07 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gearpump.experiments.yarn.client
import java.io.{ByteArrayInputStream, ByteArrayOutputStream, InputStream, OutputStream}
import java.util.Random
import java.util.zip.{ZipEntry, ZipOutputStream}
import akka.actor.ActorSystem
import akka.testkit.TestProbe
import com.typesafe.config.ConfigFactory
import org.apache.gearpump.cluster.TestUtil
import org.apache.gearpump.experiments.yarn.appmaster.YarnAppMaster.{ActiveConfig, GetActiveConfig}
import org.apache.gearpump.experiments.yarn.glue.Records._
import org.apache.gearpump.experiments.yarn.glue.{FileSystem, YarnClient, YarnConfig}
import org.apache.gearpump.util.FileUtils
import org.mockito.Matchers._
import org.mockito.Mockito._
import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
import scala.concurrent.Await
import scala.concurrent.duration.Duration
import scala.util.Try
class LaunchClusterSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
implicit var system: ActorSystem = null
val rand = new Random()
private def randomArray(size: Int): Array[Byte] = {
val array = new Array[Byte](size)
rand.nextBytes(array)
array
}
val appId = ApplicationId.newInstance(0L, 0)
val akka = ConfigFactory.parseString(
"""
|gearpump {
| yarn {
| client {
| package -path = "/user/gearpump/gearpump.zip"
| }
|
| applicationmaster {
| ## Memory of YarnAppMaster
| command = "$JAVA_HOME/bin/java -Xmx512m"
| memory = "512"
| vcores = "1"
| queue = "default"
| }
|
| master {
| ## Memory of master daemon
| command = "$JAVA_HOME/bin/java -Xmx512m"
| containers = "2"
| memory = "512"
| vcores = "1"
| }
|
| worker {
| ## memory of worker daemon
| command = "$JAVA_HOME/bin/java -Xmx512m"
| containers = "4"
| ## This also contains all memory for child executors.
| memory = "4096"
| vcores = "1"
| }
| services {
| enabled = true
| }
| }
|}
""".stripMargin).withFallback(TestUtil.DEFAULT_CONFIG)
override def beforeAll(): Unit = {
system = ActorSystem(getClass.getSimpleName, akka)
}
override def afterAll(): Unit = {
system.terminate()
Await.result(system.whenTerminated, Duration.Inf)
}
it should "reject non-zip files" in {
val yarnConfig = mock(classOf[YarnConfig])
val yarnClient = mock(classOf[YarnClient])
val fs = mock(classOf[FileSystem])
val appMasterResolver = mock(classOf[AppMasterResolver])
val launcher = new LaunchCluster(akka, yarnConfig, yarnClient, fs, system, appMasterResolver)
val packagePath = "gearpump.zip2"
assert(Try(launcher.submit("gearpump", packagePath)).isFailure)
}
it should "reject if we cannot find the package file on HDFS" in {
val yarnConfig = mock(classOf[YarnConfig])
val yarnClient = mock(classOf[YarnClient])
val fs = mock(classOf[FileSystem])
val appMasterResolver = mock(classOf[AppMasterResolver])
val launcher = new LaunchCluster(akka, yarnConfig, yarnClient, fs, system, appMasterResolver)
val packagePath = "gearpump.zip"
when(fs.exists(anyString)).thenReturn(false)
assert(Try(launcher.submit("gearpump", packagePath)).isFailure)
}
it should "throw when package exists on HDFS, but the file is corrupted" in {
val yarnConfig = mock(classOf[YarnConfig])
val yarnClient = mock(classOf[YarnClient])
val fs = mock(classOf[FileSystem])
val appMasterResolver = mock(classOf[AppMasterResolver])
val launcher = new LaunchCluster(akka, yarnConfig, yarnClient, fs, system, appMasterResolver)
val packagePath = "gearpump.zip"
when(fs.exists(anyString)).thenReturn(true)
val content = new ByteArrayInputStream(randomArray(10))
when(fs.open(anyString)).thenReturn(content)
assert(Try(launcher.submit("gearpump", packagePath)).isFailure)
content.close()
}
it should "throw when the HDFS package version is not consistent with local version" in {
val yarnConfig = mock(classOf[YarnConfig])
val yarnClient = mock(classOf[YarnClient])
val fs = mock(classOf[FileSystem])
val appMasterResolver = mock(classOf[AppMasterResolver])
val version = "gearpump-0.2"
val launcher = new LaunchCluster(akka, yarnConfig, yarnClient, fs, system,
appMasterResolver, version)
val packagePath = "gearpump.zip"
when(fs.exists(anyString)).thenReturn(true)
val oldVesion = "gearpump-0.1"
when(fs.open(anyString)).thenReturn(zipInputStream(oldVesion))
assert(Try(launcher.submit("gearpump", packagePath)).isFailure)
}
it should "upload config file to HDFS when submitting" in {
val yarnConfig = mock(classOf[YarnConfig])
val yarnClient = mock(classOf[YarnClient])
val fs = mock(classOf[FileSystem])
val appMasterResolver = mock(classOf[AppMasterResolver])
val version = "gearpump-0.2"
val launcher = new LaunchCluster(akka, yarnConfig, yarnClient,
fs, system, appMasterResolver, version)
val packagePath = "gearpump.zip"
val out = mock(classOf[OutputStream])
when(fs.exists(anyString)).thenReturn(true)
when(fs.create(anyString)).thenReturn(out)
when(fs.getHomeDirectory).thenReturn("/root")
when(fs.open(anyString)).thenReturn(zipInputStream(version))
val report = mock(classOf[ApplicationReport])
when(yarnClient.awaitApplication(any[ApplicationId], anyLong())).thenReturn(report)
when(report.getApplicationId).thenReturn(appId)
when(yarnClient.createApplication).thenReturn(appId)
assert(appId == launcher.submit("gearpump", packagePath))
// 3 Config files are uploaded to HDFS, one is akka.conf,
// one is yarn-site.xml, one is log4j.properties.
verify(fs, times(3)).create(anyString)
verify(out, times(3)).close()
// val workerResources = ArgumentCaptor.forClass(classOf[List[Resource]])
// scalastyle:off line.size.limit
val expectedCommand = "$JAVA_HOME/bin/java -Xmx512m -noverify -cp conf:pack/gearpump-0.2/conf:pack/gearpump-0.2/dashboard:pack/gearpump-0.2/lib/*:pack/gearpump-0.2/lib/daemon/*:pack/gearpump-0.2/lib/services/*:pack/gearpump-0.2/lib/yarn/*:$CLASSPATH -Dgearpump.home={{LOCAL_DIRS}}/{{CONTAINER_ID}}/pack/gearpump-0.2 -Dgearpump.binary-version-with-scala-version=gearpump-0.2 -Dgearpump.log.daemon.dir=<LOG_DIR> -Dgearpump.log.application.dir=<LOG_DIR> -Dgearpump.hostname={{NM_HOST}} org.apache.gearpump.experiments.yarn.appmaster.YarnAppMaster -conf /root/.gearpump_application_0_0000/conf/ -package gearpump.zip 2>&1 | /usr/bin/tee -a <LOG_DIR>/stderr"
// scalastyle:on line.size.limit
verify(yarnClient).submit("gearpump", appId, expectedCommand,
Resource.newInstance(512, 1), "default",
"gearpump.zip", "/root/.gearpump_application_0_0000/conf/")
}
it should "save active config from Gearpump cluster" in {
val yarnConfig = mock(classOf[YarnConfig])
val yarnClient = mock(classOf[YarnClient])
val fs = mock(classOf[FileSystem])
val appMasterResolver = mock(classOf[AppMasterResolver])
val appMaster = TestProbe()
val version = "gearpump-0.2"
val launcher = new LaunchCluster(akka, yarnConfig, yarnClient, fs, system,
appMasterResolver, version)
val outputPath = java.io.File.createTempFile("LaunchClusterSpec", ".conf")
when(appMasterResolver.resolve(any[ApplicationId], anyInt)).thenReturn(appMaster.ref)
val fileFuture = launcher.saveConfig(appId, outputPath.getPath)
appMaster.expectMsgType[GetActiveConfig]
appMaster.reply(ActiveConfig(ConfigFactory.empty()))
import scala.concurrent.duration._
val file = Await.result(fileFuture, 30.seconds).asInstanceOf[java.io.File]
assert(!FileUtils.read(file).isEmpty)
file.delete()
}
private def zipInputStream(version: String): InputStream = {
val bytes = new ByteArrayOutputStream(1000)
val zipOut = new ZipOutputStream(bytes)
// Not available on BufferedOutputStream
zipOut.putNextEntry(new ZipEntry(s"$version/README.md"))
zipOut.write("README".getBytes())
// Not available on BufferedOutputStream
zipOut.closeEntry()
zipOut.close()
new ByteArrayInputStream(bytes.toByteArray)
}
}