blob: ee947aed464e8789706ec3c46596e56b48d00f75 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.job.yarn
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.permission.FsPermission
import org.apache.hadoop.fs.{FileStatus, Path, FileSystem}
import org.apache.hadoop.yarn.api.records.ApplicationId
import org.apache.hadoop.yarn.api.records.ApplicationReport
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus
import org.apache.hadoop.yarn.api.records.YarnApplicationState
import org.apache.hadoop.yarn.client.api.YarnClient
import org.apache.samza.SamzaException
import org.apache.samza.config.{MapConfig, JobConfig, YarnConfig}
import org.apache.samza.job.ApplicationStatus
import org.junit.Assert.assertEquals
import org.junit.Assert.assertNotNull
import org.mockito.Mockito._
import org.mockito.Matchers.any
import org.scalatest.FunSuite
import org.scalatest.mockito.MockitoSugar
class TestClientHelper extends FunSuite {
import MockitoSugar._
val hadoopConfig = mock[Configuration]
val clientHelper = new ClientHelper(hadoopConfig) {
override def createYarnClient() = {
mock[YarnClient]
}
}
test("test validateJobConfig") {
import collection.JavaConverters._
var config = new MapConfig()
intercept[SamzaException] {
clientHelper.validateJobConfig(config)
}
config = new MapConfig(Map(JobConfig.JOB_SECURITY_MANAGER_FACTORY -> "some value").asJava)
clientHelper.validateJobConfig(config)
}
test("test prepareJobConfig") {
val jobContext = new JobContext
jobContext.setAppStagingDir(new Path("/user/temp/.samzaStaging/app_123"))
clientHelper.jobContext = jobContext
val ret = clientHelper.getSecurityYarnConfig
assert(ret.size == 2)
assert(ret.get(YarnConfig.YARN_JOB_STAGING_DIRECTORY) == Some("/user/temp/.samzaStaging/app_123"))
assert(ret.get(YarnConfig.YARN_CREDENTIALS_FILE) == Some("/user/temp/.samzaStaging/app_123/credentials"))
}
test("test setupAMLocalResources") {
val applicationId = mock[ApplicationId]
when(applicationId.toString).thenReturn("application_123")
val jobContext = new JobContext
jobContext.setAppId(applicationId)
clientHelper.jobContext = jobContext
val mockFs = mock[FileSystem]
val fileStatus = new FileStatus(0, false, 0, 0, System.currentTimeMillis(), null)
when(mockFs.getHomeDirectory).thenReturn(new Path("/user/test"))
when(mockFs.getFileStatus(any[Path])).thenReturn(fileStatus)
when(mockFs.mkdirs(any[Path])).thenReturn(true)
doNothing().when(mockFs).copyFromLocalFile(any[Path], any[Path])
doNothing().when(mockFs).setPermission(any[Path], any[FsPermission])
val ret = clientHelper.setupAMLocalResources(mockFs, Some("some.principal"), Some("some.keytab"))
assert(ret.size == 1)
assert(ret.contains("some.keytab"))
}
test("test toAppStatus") {
val appReport = mock[ApplicationReport]
when(appReport.getYarnApplicationState).thenReturn(YarnApplicationState.FAILED)
when(appReport.getDiagnostics).thenReturn("some yarn diagnostics")
var appStatus = clientHelper.toAppStatus(appReport).get
assertEquals(appStatus, ApplicationStatus.UnsuccessfulFinish)
assertNotNull(appStatus.getThrowable)
when(appReport.getYarnApplicationState).thenReturn(YarnApplicationState.NEW)
appStatus = clientHelper.toAppStatus(appReport).get
assertEquals(appStatus, ApplicationStatus.New)
when(appReport.getYarnApplicationState).thenReturn(YarnApplicationState.FINISHED)
when(appReport.getFinalApplicationStatus).thenReturn(FinalApplicationStatus.FAILED)
appStatus = clientHelper.toAppStatus(appReport).get
assertEquals(appStatus, ApplicationStatus.UnsuccessfulFinish)
when(appReport.getFinalApplicationStatus).thenReturn(FinalApplicationStatus.KILLED)
appStatus = clientHelper.toAppStatus(appReport).get
assertEquals(appStatus, ApplicationStatus.UnsuccessfulFinish)
when(appReport.getFinalApplicationStatus).thenReturn(FinalApplicationStatus.SUCCEEDED)
appStatus = clientHelper.toAppStatus(appReport).get
assertEquals(appStatus, ApplicationStatus.SuccessfulFinish)
}
}