blob: 5c0dfac4b70a617353e1e5e3efe2b53341e28a80 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.job.yarn
import java.io.IOException
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync
import org.apache.hadoop.yarn.exceptions.{InvalidApplicationMasterRequestException, YarnException}
import org.apache.samza.SamzaException
import org.apache.samza.clustermanager.SamzaApplicationState
import SamzaApplicationState.SamzaAppStatus
import org.apache.samza.util.Logging
/**
* Responsible for managing the lifecycle of the Yarn application master. Mostly,
* this means registering and unregistering with the RM, and shutting down
* when the RM tells us to Reboot.
*/
//This class is used in the refactored code path as called by run-jc.sh
class SamzaYarnAppMasterLifecycle(containerMem: Int, containerCpu: Int, samzaAppState: SamzaApplicationState, state: YarnAppState, amClient: AMRMClientAsync[ContainerRequest]) extends Logging {
var validResourceRequest = true
var shutdownMessage: String = null
var webApp: SamzaYarnAppMasterService = null
def onInit() {
val host = state.nodeHost
val response = amClient.registerApplicationMaster(host, state.rpcUrl.getPort, "%s:%d" format (host, state.trackingUrl.getPort))
// validate that the YARN cluster can handle our container resource requirements
val maxCapability = response.getMaximumResourceCapability
val maxMem = maxCapability.getMemory
val maxCpu = maxCapability.getVirtualCores
info("Got AM register response. The YARN RM supports container requests with max-mem: %s, max-cpu: %s" format (maxMem, maxCpu))
if (containerMem > maxMem || containerCpu > maxCpu) {
shutdownMessage = "The YARN cluster is unable to run your job due to unsatisfiable resource requirements. You asked for mem: %s, and cpu: %s." format (containerMem, containerCpu)
error(shutdownMessage)
validResourceRequest = false
samzaAppState.status = SamzaAppStatus.FAILED;
samzaAppState.jobHealthy.set(false)
}
}
def onReboot() {
throw new SamzaException("Received a reboot signal from the RM, so throwing an exception to reboot the AM.")
}
def onShutdown(samzaAppStatus: SamzaAppStatus) {
val yarnStatus: FinalApplicationStatus = getStatus(samzaAppStatus)
info("Shutting down SamzaAppStatus: " + samzaAppStatus + " yarn status: " + yarnStatus)
//The value of state.status is set to either SUCCEEDED or FAILED for errors we catch and handle - like container failures
//All other AM failures (errors in callbacks/connection failures after retries/token expirations) should not unregister the AM,
//allowing the RM to restart it (potentially on a different host)
if(samzaAppStatus != SamzaAppStatus.UNDEFINED) {
info("Unregistering AM from the RM.")
try {
amClient.unregisterApplicationMaster(yarnStatus, shutdownMessage, null)
info("Unregister complete.")
} catch {
case ex: InvalidApplicationMasterRequestException =>
// Once the NM dies, the corresponding app attempt ID is removed from the RM cache so that the RM can spin up a new AM and its containers.
// Hence, this throws InvalidApplicationMasterRequestException since that AM is unregistered with the RM already.
info("Removed application attempt from RM cache because the AM died. Unregister complete.")
case ex @ (_ : YarnException | _ : IOException) =>
error("Caught an exception while trying to unregister AM. Trying to stop other components.", ex)
}
}
else {
info("Not unregistering AM from the RM. This will enable RM retries")
}
}
def getStatus(samzaAppStatus: SamzaAppStatus): FinalApplicationStatus = {
if (samzaAppStatus == SamzaAppStatus.FAILED)
return FinalApplicationStatus.FAILED
if(samzaAppStatus == SamzaAppStatus.SUCCEEDED)
return FinalApplicationStatus.SUCCEEDED
return FinalApplicationStatus.UNDEFINED
}
def shouldShutdown = !validResourceRequest
}