blob: f85bb566b78937c133b457ec1e177859001ac272 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.job.yarn
import java.net.ConnectException
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.permission.FsPermission
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.hadoop.yarn.api.records.{ApplicationId, ApplicationReport, FinalApplicationStatus, YarnApplicationState}
import org.apache.hadoop.yarn.client.api.YarnClient
import org.apache.samza.SamzaException
import org.apache.samza.config.{JobConfig, MapConfig, YarnConfig}
import org.apache.samza.job.ApplicationStatus
import org.apache.samza.webapp.ApplicationMasterRestClient
import org.junit.Assert.{assertEquals, assertNotNull}
import org.mockito.Matchers.any
import org.mockito.Mockito._
import org.scalatest.FunSuite
import org.scalatest.mockito.MockitoSugar
class TestClientHelper extends FunSuite {
import MockitoSugar._
val hadoopConfig = mock[Configuration]
val mockAmClient = mock[ApplicationMasterRestClient]
val clientHelper = new ClientHelper(hadoopConfig) {
override def createYarnClient() = {
mock[YarnClient]
}
override def createAmClient(applicationReport: ApplicationReport) = {
mockAmClient
}
}
test("test allContainersRunning is false when there are ConnectExceptions") {
val exceptionThrowingClientHelper = new ClientHelper(hadoopConfig) {
override def createYarnClient() = {
mock[YarnClient]
}
override def createAmClient(applicationReport: ApplicationReport) = {
val amClient = mock[ApplicationMasterRestClient]
when(amClient.getMetrics).thenThrow(new ConnectException())
amClient
}
}
val allContainersRunning = exceptionThrowingClientHelper.allContainersRunning(null)
assertEquals(false, allContainersRunning)
}
test("test validateJobConfig") {
import collection.JavaConverters._
var config = new MapConfig()
intercept[SamzaException] {
clientHelper.validateJobConfig(config)
}
config = new MapConfig(Map(JobConfig.JOB_SECURITY_MANAGER_FACTORY -> "some value").asJava)
clientHelper.validateJobConfig(config)
}
test("test prepareJobConfig") {
val jobContext = new JobContext
jobContext.setAppStagingDir(new Path("/user/temp/.samzaStaging/app_123"))
clientHelper.jobContext = jobContext
val ret = clientHelper.getSecurityYarnConfig
assert(ret.size == 2)
assert(ret.get(YarnConfig.YARN_JOB_STAGING_DIRECTORY) == Some("/user/temp/.samzaStaging/app_123"))
assert(ret.get(YarnConfig.YARN_CREDENTIALS_FILE) == Some("/user/temp/.samzaStaging/app_123/credentials"))
}
test("test setupAMLocalResources") {
val applicationId = mock[ApplicationId]
when(applicationId.toString).thenReturn("application_123")
val jobContext = new JobContext
jobContext.setAppId(applicationId)
clientHelper.jobContext = jobContext
val mockFs = mock[FileSystem]
val fileStatus = new FileStatus(0, false, 0, 0, System.currentTimeMillis(), null)
when(mockFs.getHomeDirectory).thenReturn(new Path("/user/test"))
when(mockFs.getFileStatus(any[Path])).thenReturn(fileStatus)
when(mockFs.mkdirs(any[Path])).thenReturn(true)
doNothing().when(mockFs).copyFromLocalFile(any[Path], any[Path])
doNothing().when(mockFs).setPermission(any[Path], any[FsPermission])
val ret = clientHelper.setupAMLocalResources(mockFs, Some("some.principal"), Some("some.keytab"))
assert(ret.size == 1)
assert(ret.contains("some.keytab"))
}
test("test toAppStatus") {
val appReport = mock[ApplicationReport]
when(appReport.getYarnApplicationState).thenReturn(YarnApplicationState.FAILED)
when(appReport.getDiagnostics).thenReturn("some yarn diagnostics")
var appStatus = clientHelper.toAppStatus(appReport).get
assertEquals(appStatus, ApplicationStatus.UnsuccessfulFinish)
assertNotNull(appStatus.getThrowable)
when(appReport.getYarnApplicationState).thenReturn(YarnApplicationState.NEW)
appStatus = clientHelper.toAppStatus(appReport).get
assertEquals(appStatus, ApplicationStatus.New)
when(appReport.getYarnApplicationState).thenReturn(YarnApplicationState.FINISHED)
when(appReport.getFinalApplicationStatus).thenReturn(FinalApplicationStatus.FAILED)
appStatus = clientHelper.toAppStatus(appReport).get
assertEquals(appStatus, ApplicationStatus.UnsuccessfulFinish)
when(appReport.getFinalApplicationStatus).thenReturn(FinalApplicationStatus.KILLED)
appStatus = clientHelper.toAppStatus(appReport).get
assertEquals(appStatus, ApplicationStatus.UnsuccessfulFinish)
when(appReport.getFinalApplicationStatus).thenReturn(FinalApplicationStatus.SUCCEEDED)
appStatus = clientHelper.toAppStatus(appReport).get
assertEquals(appStatus, ApplicationStatus.SuccessfulFinish)
val appMasterMetrics = new java.util.HashMap[String, Object]()
val metrics = new java.util.HashMap[String, java.util.Map[String, Object]]()
metrics.put(classOf[SamzaAppMasterMetrics].getCanonicalName(), appMasterMetrics)
appMasterMetrics.put("needed-containers", "1")
when(mockAmClient.getMetrics).thenReturn(metrics)
when(appReport.getYarnApplicationState).thenReturn(YarnApplicationState.RUNNING)
appStatus = clientHelper.toAppStatus(appReport).get
assertEquals(appStatus, ApplicationStatus.New) // Should not be RUNNING if there are still needed containers
appMasterMetrics.put("needed-containers", "0")
when(mockAmClient.getMetrics).thenReturn(metrics)
when(appReport.getYarnApplicationState).thenReturn(YarnApplicationState.RUNNING)
appStatus = clientHelper.toAppStatus(appReport).get
assertEquals(appStatus, ApplicationStatus.Running) // Should not be RUNNING if there are still needed containers
}
}