| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.spark.deploy.yarn |
| |
| import java.io.IOException |
| import java.lang.reflect.{InvocationTargetException, Modifier} |
| import java.net.{URI, URLEncoder} |
| import java.security.PrivilegedExceptionAction |
| import java.util.concurrent.{TimeoutException, TimeUnit} |
| |
| import scala.collection.mutable.HashMap |
| import scala.concurrent.Promise |
| import scala.concurrent.duration.Duration |
| import scala.util.control.NonFatal |
| |
| import org.apache.commons.lang3.{StringUtils => ComStrUtils} |
| import org.apache.hadoop.fs.{FileSystem, Path} |
| import org.apache.hadoop.security.UserGroupInformation |
| import org.apache.hadoop.util.StringUtils |
| import org.apache.hadoop.yarn.api._ |
| import org.apache.hadoop.yarn.api.records._ |
| import org.apache.hadoop.yarn.conf.YarnConfiguration |
| import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException |
| import org.apache.hadoop.yarn.util.Records |
| |
| import org.apache.spark._ |
| import org.apache.spark.deploy.{ExecutorFailureTracker, SparkHadoopUtil} |
| import org.apache.spark.deploy.history.HistoryServer |
| import org.apache.spark.deploy.security.HadoopDelegationTokenManager |
| import org.apache.spark.deploy.yarn.config._ |
| import org.apache.spark.internal.{Logging, LogKeys, MDC} |
| import org.apache.spark.internal.config._ |
| import org.apache.spark.internal.config.UI._ |
| import org.apache.spark.metrics.{MetricsSystem, MetricsSystemInstances} |
| import org.apache.spark.resource.ResourceProfile |
| import org.apache.spark.rpc._ |
| import org.apache.spark.scheduler.MiscellaneousProcessDetails |
| import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, YarnSchedulerBackend} |
| import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ |
| import org.apache.spark.util._ |
| |
| /** |
| * Common application master functionality for Spark on Yarn. |
| */ |
| private[spark] class ApplicationMaster( |
| args: ApplicationMasterArguments, |
| sparkConf: SparkConf, |
| yarnConf: YarnConfiguration) extends Logging { |
| |
| // TODO: Currently, task to container is computed once (TaskSetManager) - which need not be |
| // optimal as more containers are available. Might need to handle this better. |
| |
| private def extractLogUrls: Map[String, String] = { |
| YarnContainerInfoHelper.getLogUrls(SparkHadoopUtil. |
| newConfiguration(sparkConf), None).getOrElse(Map()) |
| } |
| |
| private val appAttemptId = |
| if (System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name()) != null) { |
| YarnSparkHadoopUtil.getContainerId.getApplicationAttemptId() |
| } else { |
| null |
| } |
| |
| private val isClusterMode = args.userClass != null |
| |
| private lazy val securityMgr = new SecurityManager(sparkConf) |
| |
| private var metricsSystem: Option[MetricsSystem] = None |
| |
| private val userClassLoader = { |
| val urls = Client.getUserClasspathUrls(sparkConf, isClusterMode) |
| |
| if (isClusterMode) { |
| if (Client.isUserClassPathFirst(sparkConf, isDriver = true)) { |
| new ChildFirstURLClassLoader(urls, Utils.getContextOrSparkClassLoader) |
| } else { |
| new MutableURLClassLoader(urls, Utils.getContextOrSparkClassLoader) |
| } |
| } else { |
| new MutableURLClassLoader(urls, Utils.getContextOrSparkClassLoader) |
| } |
| } |
| |
| private val client = new YarnRMClient() |
| |
| private val maxNumExecutorFailures = ExecutorFailureTracker.maxNumExecutorFailures(sparkConf) |
| |
| @volatile private var exitCode = 0 |
| @volatile private var unregistered = false |
| @volatile private var finished = false |
| @volatile private var finalStatus = getDefaultFinalStatus() |
| @volatile private var finalMsg: String = "" |
| @volatile private var userClassThread: Thread = _ |
| |
| @volatile private var reporterThread: Thread = _ |
| @volatile private var allocator: YarnAllocator = _ |
| |
| // A flag to check whether user has initialized spark context |
| @volatile private var registered = false |
| |
| // Lock for controlling the allocator (heartbeat) thread. |
| private val allocatorLock = new Object() |
| |
| // Steady state heartbeat interval. We want to be reasonably responsive without causing too many |
| // requests to RM. |
| private val heartbeatInterval = { |
| // Ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapses. |
| val expiryInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000) |
| math.max(0, math.min(expiryInterval / 2, sparkConf.get(RM_HEARTBEAT_INTERVAL))) |
| } |
| |
| // Initial wait interval before allocator poll, to allow for quicker ramp up when executors are |
| // being requested. |
| private val initialAllocationInterval = math.min(heartbeatInterval, |
| sparkConf.get(INITIAL_HEARTBEAT_INTERVAL)) |
| |
| // Next wait interval before allocator poll. |
| private var nextAllocationInterval = initialAllocationInterval |
| |
| // In cluster mode, used to tell the AM when the user's SparkContext has been initialized. |
| private val sparkContextPromise = Promise[SparkContext]() |
| |
| /** |
| * Load the list of localized files set by the client, used when launching executors. This should |
| * be called in a context where the needed credentials to access HDFS are available. |
| */ |
| private def prepareLocalResources(distCacheConf: SparkConf): Map[String, LocalResource] = { |
| logInfo("Preparing Local resources") |
| val resources = HashMap[String, LocalResource]() |
| |
| def setupDistributedCache( |
| file: String, |
| rtype: LocalResourceType, |
| timestamp: String, |
| size: String, |
| vis: String): Unit = { |
| val uri = new URI(file) |
| val amJarRsrc = Records.newRecord(classOf[LocalResource]) |
| amJarRsrc.setType(rtype) |
| amJarRsrc.setVisibility(LocalResourceVisibility.valueOf(vis)) |
| amJarRsrc.setResource(URL.fromURI(uri)) |
| amJarRsrc.setTimestamp(timestamp.toLong) |
| amJarRsrc.setSize(size.toLong) |
| |
| val fileName = Option(uri.getFragment()).getOrElse(new Path(uri).getName()) |
| resources(fileName) = amJarRsrc |
| } |
| |
| val distFiles = distCacheConf.get(CACHED_FILES) |
| val fileSizes = distCacheConf.get(CACHED_FILES_SIZES) |
| val timeStamps = distCacheConf.get(CACHED_FILES_TIMESTAMPS) |
| val visibilities = distCacheConf.get(CACHED_FILES_VISIBILITIES) |
| val resTypes = distCacheConf.get(CACHED_FILES_TYPES) |
| |
| for (i <- distFiles.indices) { |
| val resType = LocalResourceType.valueOf(resTypes(i)) |
| setupDistributedCache(distFiles(i), resType, timeStamps(i).toString, fileSizes(i).toString, |
| visibilities(i)) |
| } |
| |
| // Distribute the conf archive to executors. |
| distCacheConf.get(CACHED_CONF_ARCHIVE).foreach { path => |
| val uri = new URI(path) |
| val fs = FileSystem.get(uri, yarnConf) |
| val status = fs.getFileStatus(new Path(uri)) |
| // SPARK-16080: Make sure to use the correct name for the destination when distributing the |
| // conf archive to executors. |
| val destUri = new URI(uri.getScheme(), uri.getRawSchemeSpecificPart(), |
| Client.LOCALIZED_CONF_DIR) |
| setupDistributedCache(destUri.toString(), LocalResourceType.ARCHIVE, |
| status.getModificationTime().toString, status.getLen.toString, |
| LocalResourceVisibility.PRIVATE.name()) |
| } |
| |
| resources.toMap |
| } |
| |
| final def run(): Int = { |
| try { |
| val attemptID = if (isClusterMode) { |
| // Set the web ui port to be ephemeral for yarn if not set explicitly |
| // so we don't conflict with other spark processes running on the same box |
| // If set explicitly, Web UI will attempt to run on UI_PORT and try |
| // incrementally until UI_PORT + `spark.port.maxRetries` |
| if (System.getProperty(UI_PORT.key) == null) { |
| System.setProperty(UI_PORT.key, "0") |
| } |
| |
| // Set the master and deploy mode property to match the requested mode. |
| System.setProperty("spark.master", "yarn") |
| System.setProperty(SUBMIT_DEPLOY_MODE.key, "cluster") |
| |
| // Set this internal configuration if it is running on cluster mode, this |
| // configuration will be checked in SparkContext to avoid misuse of yarn cluster mode. |
| System.setProperty("spark.yarn.app.id", appAttemptId.getApplicationId().toString()) |
| |
| Option(appAttemptId.getAttemptId.toString) |
| } else { |
| None |
| } |
| |
| new CallerContext( |
| "APPMASTER", sparkConf.get(APP_CALLER_CONTEXT), |
| Option(appAttemptId.getApplicationId.toString), attemptID).setCurrentContext() |
| |
| logInfo(log"ApplicationAttemptId: ${MDC(LogKeys.APP_ATTEMPT_ID, appAttemptId)}") |
| |
| // During shutdown, we may not be able to create an FileSystem object. So, pre-create here. |
| val stagingDirPath = new Path(System.getenv("SPARK_YARN_STAGING_DIR")) |
| val stagingDirFs = stagingDirPath.getFileSystem(yarnConf) |
| // This shutdown hook should run *after* the SparkContext is shut down. |
| val priority = ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY - 1 |
| ShutdownHookManager.addShutdownHook(priority) { () => |
| try { |
| val maxAppAttempts = client.getMaxRegAttempts(sparkConf, yarnConf) |
| val isLastAttempt = appAttemptId.getAttemptId() >= maxAppAttempts |
| |
| if (!finished) { |
| // The default state of ApplicationMaster is failed if it is invoked by shut down hook. |
| // This behavior is different compared to 1.x version. |
| // If user application is exited ahead of time by calling System.exit(N), here mark |
| // this application as failed with EXIT_EARLY. For a good shutdown, user shouldn't call |
| // System.exit(0) to terminate the application. |
| finish(finalStatus, |
| ApplicationMaster.EXIT_EARLY, |
| "Shutdown hook called before final status was reported.") |
| } |
| |
| if (!unregistered) { |
| // we only want to unregister if we don't want the RM to retry |
| if (isLastAttempt) { |
| cleanupStagingDir(stagingDirFs, stagingDirPath) |
| unregister(finalStatus, finalMsg) |
| } else if (finalStatus == FinalApplicationStatus.SUCCEEDED) { |
| // When it's not the last attempt, if unregister failed caused by timeout exception, |
| // YARN will rerun the application, AM should not clean staging dir before unregister |
| // success. |
| unregister(finalStatus, finalMsg) |
| cleanupStagingDir(stagingDirFs, stagingDirPath) |
| } |
| } |
| } catch { |
| case e: Throwable => |
| logWarning("Ignoring Exception while stopping ApplicationMaster from shutdown hook", e) |
| } |
| } |
| |
| if (isClusterMode) { |
| runDriver() |
| } else { |
| runExecutorLauncher() |
| } |
| } catch { |
| case e: Exception => |
| // catch everything else if not specifically handled |
| logError("Uncaught exception: ", e) |
| finish(FinalApplicationStatus.FAILED, |
| ApplicationMaster.EXIT_UNCAUGHT_EXCEPTION, |
| "Uncaught exception: " + StringUtils.stringifyException(e)) |
| } finally { |
| try { |
| metricsSystem.foreach { ms => |
| ms.report() |
| ms.stop() |
| } |
| } catch { |
| case e: Exception => |
| logWarning("Exception during stopping of the metric system: ", e) |
| } |
| } |
| |
| exitCode |
| } |
| |
| def runUnmanaged( |
| clientRpcEnv: RpcEnv, |
| appAttemptId: ApplicationAttemptId, |
| stagingDir: Path, |
| cachedResourcesConf: SparkConf): Unit = { |
| try { |
| new CallerContext( |
| "APPMASTER", sparkConf.get(APP_CALLER_CONTEXT), |
| Option(appAttemptId.getApplicationId.toString), None).setCurrentContext() |
| |
| val driverRef = clientRpcEnv.setupEndpointRef( |
| RpcAddress(sparkConf.get(DRIVER_HOST_ADDRESS), |
| sparkConf.get(DRIVER_PORT)), |
| YarnSchedulerBackend.ENDPOINT_NAME) |
| // The client-mode AM doesn't listen for incoming connections, so report an invalid port. |
| registerAM(Utils.localHostNameForURI(), -1, sparkConf, |
| sparkConf.getOption("spark.driver.appUIAddress"), appAttemptId) |
| val encodedAppId = URLEncoder.encode(appAttemptId.getApplicationId.toString, "UTF-8") |
| addAmIpFilter(Some(driverRef), s"/proxy/$encodedAppId") |
| createAllocator(driverRef, sparkConf, clientRpcEnv, appAttemptId, cachedResourcesConf) |
| reporterThread.join() |
| } catch { |
| case e: Exception => |
| // catch everything else if not specifically handled |
| logError("Uncaught exception: ", e) |
| finish(FinalApplicationStatus.FAILED, |
| ApplicationMaster.EXIT_UNCAUGHT_EXCEPTION, |
| "Uncaught exception: " + StringUtils.stringifyException(e)) |
| if (!unregistered) { |
| // It's ok to clean staging dir first because unmanaged AM can't be retried. |
| cleanupStagingDir(stagingDir) |
| unregister(finalStatus, finalMsg) |
| } |
| } finally { |
| try { |
| metricsSystem.foreach { ms => |
| ms.report() |
| ms.stop() |
| } |
| } catch { |
| case e: Exception => |
| logWarning("Exception during stopping of the metric system: ", e) |
| } |
| } |
| } |
| |
| def stopUnmanaged(stagingDir: Path): Unit = { |
| if (!finished) { |
| finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS) |
| } |
| if (!unregistered) { |
| // It's ok to clean staging dir first because unmanaged AM can't be retried. |
| cleanupStagingDir(stagingDir) |
| unregister(finalStatus, finalMsg) |
| } |
| } |
| |
| /** |
| * Set the default final application status for client mode to UNDEFINED to handle |
| * if YARN HA restarts the application so that it properly retries. Set the final |
| * status to SUCCEEDED in cluster mode to handle if the user calls System.exit |
| * from the application code. |
| */ |
| final def getDefaultFinalStatus(): FinalApplicationStatus = { |
| if (isClusterMode) { |
| FinalApplicationStatus.FAILED |
| } else { |
| FinalApplicationStatus.UNDEFINED |
| } |
| } |
| |
| /** |
| * unregister is used to completely unregister the application from the ResourceManager. |
| * This means the ResourceManager will not retry the application attempt on your behalf if |
| * a failure occurred. |
| */ |
| final def unregister(status: FinalApplicationStatus, diagnostics: String = null): Unit = { |
| synchronized { |
| if (registered && !unregistered) { |
| logInfo(log"Unregistering ApplicationMaster with ${MDC(LogKeys.APP_STATE, status)}" + |
| Option(diagnostics).map( |
| msg => log" (diag message: ${MDC(LogKeys.MESSAGE, msg)})").getOrElse(log"")) |
| unregistered = true |
| client.unregister(status, Option(diagnostics).getOrElse("")) |
| } |
| } |
| } |
| |
| final def finish(status: FinalApplicationStatus, code: Int, msg: String = null): Unit = { |
| synchronized { |
| if (!finished) { |
| val inShutdown = ShutdownHookManager.inShutdown() |
| if (registered || !isClusterMode) { |
| exitCode = code |
| finalStatus = status |
| } else { |
| finalStatus = FinalApplicationStatus.FAILED |
| exitCode = ApplicationMaster.EXIT_SC_NOT_INITED |
| } |
| logInfo(log"Final app status: ${MDC(LogKeys.APP_STATE, finalStatus)}, " + |
| log"exitCode: ${MDC(LogKeys.EXIT_CODE, exitCode)}" + |
| Option(msg).map(msg => log", (reason: ${MDC(LogKeys.REASON, msg)})").getOrElse(log"")) |
| finalMsg = ComStrUtils.abbreviate(msg, sparkConf.get(AM_FINAL_MSG_LIMIT).toInt) |
| finished = true |
| if (!inShutdown && Thread.currentThread() != reporterThread && reporterThread != null) { |
| logDebug("shutting down reporter thread") |
| reporterThread.interrupt() |
| } |
| if (!inShutdown && Thread.currentThread() != userClassThread && userClassThread != null) { |
| logDebug("shutting down user thread") |
| userClassThread.interrupt() |
| } |
| } |
| } |
| } |
| |
| private def sparkContextInitialized(sc: SparkContext) = { |
| sparkContextPromise.synchronized { |
| // Notify runDriver function that SparkContext is available |
| sparkContextPromise.success(sc) |
| // Pause the user class thread in order to make proper initialization in runDriver function. |
| sparkContextPromise.wait() |
| } |
| } |
| |
| private def resumeDriver(): Unit = { |
| // When initialization in runDriver happened the user class thread has to be resumed. |
| sparkContextPromise.synchronized { |
| sparkContextPromise.notify() |
| } |
| } |
| |
| private def registerAM( |
| host: String, |
| port: Int, |
| _sparkConf: SparkConf, |
| uiAddress: Option[String], |
| appAttempt: ApplicationAttemptId): Unit = { |
| val appId = appAttempt.getApplicationId().toString() |
| val attemptId = appAttempt.getAttemptId().toString() |
| val historyAddress = ApplicationMaster |
| .getHistoryServerAddress(_sparkConf, yarnConf, appId, attemptId) |
| |
| client.register(host, port, yarnConf, _sparkConf, uiAddress, historyAddress) |
| registered = true |
| } |
| |
| private def createAllocator( |
| driverRef: RpcEndpointRef, |
| _sparkConf: SparkConf, |
| rpcEnv: RpcEnv, |
| appAttemptId: ApplicationAttemptId, |
| distCacheConf: SparkConf): Unit = { |
| // In client mode, the AM may be restarting after delegation tokens have reached their TTL. So |
| // always contact the driver to get the current set of valid tokens, so that local resources can |
| // be initialized below. |
| if (!isClusterMode) { |
| val tokens = driverRef.askSync[Array[Byte]](RetrieveDelegationTokens) |
| if (tokens != null) { |
| SparkHadoopUtil.get.addDelegationTokens(tokens, _sparkConf) |
| } |
| } |
| |
| val appId = appAttemptId.getApplicationId().toString() |
| val driverUrl = RpcEndpointAddress(driverRef.address.host, driverRef.address.port, |
| CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString |
| val localResources = prepareLocalResources(distCacheConf) |
| |
| // Before we initialize the allocator, let's log the information about how executors will |
| // be run up front, to avoid printing this out for every single executor being launched. |
| // Use placeholders for information that changes such as executor IDs. |
| logInfo { |
| val executorMemory = _sparkConf.get(EXECUTOR_MEMORY).toInt |
| val executorCores = _sparkConf.get(EXECUTOR_CORES) |
| val dummyRunner = new ExecutorRunnable(None, yarnConf, _sparkConf, driverUrl, "<executorId>", |
| "<hostname>", executorMemory, executorCores, appId, securityMgr, localResources, |
| ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID) |
| dummyRunner.launchContextDebugInfo() |
| } |
| |
| allocator = client.createAllocator( |
| yarnConf, |
| _sparkConf, |
| appAttemptId, |
| driverUrl, |
| driverRef, |
| securityMgr, |
| localResources) |
| |
| // Initialize the AM endpoint *after* the allocator has been initialized. This ensures |
| // that when the driver sends an initial executor request (e.g. after an AM restart), |
| // the allocator is ready to service requests. |
| rpcEnv.setupEndpoint("YarnAM", new AMEndpoint(rpcEnv, driverRef)) |
| if (_sparkConf.get(SHUFFLE_SERVICE_ENABLED)) { |
| logInfo(log"Initializing service data for shuffle service using name '" + |
| log"${MDC(LogKeys.SHUFFLE_SERVICE_NAME, _sparkConf.get(SHUFFLE_SERVICE_NAME))}'") |
| } |
| allocator.allocateResources() |
| val ms = MetricsSystem.createMetricsSystem(MetricsSystemInstances.APPLICATION_MASTER, sparkConf) |
| val prefix = _sparkConf.get(YARN_METRICS_NAMESPACE).getOrElse(appId) |
| ms.registerSource(new ApplicationMasterSource(prefix, allocator)) |
| // do not register static sources in this case as per SPARK-25277 |
| ms.start(false) |
| metricsSystem = Some(ms) |
| reporterThread = launchReporterThread() |
| } |
| |
| private def runDriver(): Unit = { |
| addAmIpFilter(None, System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV)) |
| userClassThread = startUserApplication() |
| |
| // This a bit hacky, but we need to wait until the spark.driver.port property has |
| // been set by the Thread executing the user class. |
| logInfo("Waiting for spark context initialization...") |
| val totalWaitTime = sparkConf.get(AM_MAX_WAIT_TIME) |
| try { |
| val sc = ThreadUtils.awaitResult(sparkContextPromise.future, |
| Duration(totalWaitTime, TimeUnit.MILLISECONDS)) |
| if (sc != null) { |
| val rpcEnv = sc.env.rpcEnv |
| |
| val userConf = sc.getConf |
| val host = userConf.get(DRIVER_HOST_ADDRESS) |
| val port = userConf.get(DRIVER_PORT) |
| registerAM(host, port, userConf, sc.ui.map(_.webUrl), appAttemptId) |
| |
| val driverRef = rpcEnv.setupEndpointRef( |
| RpcAddress(host, port), |
| YarnSchedulerBackend.ENDPOINT_NAME) |
| createAllocator(driverRef, userConf, rpcEnv, appAttemptId, distCacheConf()) |
| } else { |
| // Sanity check; should never happen in normal operation, since sc should only be null |
| // if the user app did not create a SparkContext. |
| throw new IllegalStateException("User did not initialize spark context!") |
| } |
| resumeDriver() |
| userClassThread.join() |
| } catch { |
| case e: SparkException if e.getCause().isInstanceOf[TimeoutException] => |
| logError( |
| s"SparkContext did not initialize after waiting for $totalWaitTime ms. " + |
| "Please check earlier log output for errors. Failing the application.") |
| finish(FinalApplicationStatus.FAILED, |
| ApplicationMaster.EXIT_SC_NOT_INITED, |
| "Timed out waiting for SparkContext.") |
| } finally { |
| resumeDriver() |
| } |
| } |
| |
| private def runExecutorLauncher(): Unit = { |
| val hostname = Utils.localHostNameForURI() |
| val amCores = sparkConf.get(AM_CORES) |
| val rpcEnv = RpcEnv.create("sparkYarnAM", hostname, hostname, -1, sparkConf, securityMgr, |
| amCores, true) |
| |
| // The client-mode AM doesn't listen for incoming connections, so report an invalid port. |
| registerAM(hostname, -1, sparkConf, sparkConf.get(DRIVER_APP_UI_ADDRESS), appAttemptId) |
| |
| // The driver should be up and listening, so unlike cluster mode, just try to connect to it |
| // with no waiting or retrying. |
| val (driverHost, driverPort) = Utils.parseHostPort(args.userArgs(0)) |
| val driverRef = rpcEnv.setupEndpointRef( |
| RpcAddress(driverHost, driverPort), |
| YarnSchedulerBackend.ENDPOINT_NAME) |
| addAmIpFilter(Some(driverRef), |
| System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV)) |
| createAllocator(driverRef, sparkConf, rpcEnv, appAttemptId, distCacheConf()) |
| |
| // In client mode the actor will stop the reporter thread. |
| reporterThread.join() |
| } |
| |
| private def allocationThreadImpl(): Unit = { |
| // The number of failures in a row until the allocation thread gives up. |
| val reporterMaxFailures = sparkConf.get(MAX_REPORTER_THREAD_FAILURES) |
| var failureCount = 0 |
| while (!finished) { |
| try { |
| if (allocator.getNumExecutorsFailed >= maxNumExecutorFailures) { |
| finish(FinalApplicationStatus.FAILED, |
| ApplicationMaster.EXIT_MAX_EXECUTOR_FAILURES, |
| s"Max number of executor failures ($maxNumExecutorFailures) reached") |
| } else if (allocator.isAllNodeExcluded) { |
| finish(FinalApplicationStatus.FAILED, |
| ApplicationMaster.EXIT_MAX_EXECUTOR_FAILURES, |
| "Due to executor failures all available nodes are excluded") |
| } else { |
| logDebug("Sending progress") |
| allocator.allocateResources() |
| } |
| failureCount = 0 |
| } catch { |
| case i: InterruptedException => // do nothing |
| case e: ApplicationAttemptNotFoundException => |
| failureCount += 1 |
| logError("Exception from Reporter thread.", e) |
| finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_REPORTER_FAILURE, |
| e.getMessage) |
| case e: Throwable => |
| failureCount += 1 |
| if (!NonFatal(e)) { |
| finish(FinalApplicationStatus.FAILED, |
| ApplicationMaster.EXIT_REPORTER_FAILURE, |
| "Fatal exception: " + StringUtils.stringifyException(e)) |
| } else if (failureCount >= reporterMaxFailures) { |
| finish(FinalApplicationStatus.FAILED, |
| ApplicationMaster.EXIT_REPORTER_FAILURE, "Exception was thrown " + |
| s"$failureCount time(s) from Reporter thread.") |
| } else { |
| logWarning(log"Reporter thread fails ${MDC(LogKeys.FAILURES, failureCount)} " + |
| log"time(s) in a row.", e) |
| } |
| } |
| try { |
| val numPendingAllocate = allocator.getNumContainersPendingAllocate |
| var sleepStartNs = 0L |
| var sleepInterval = 200L // ms |
| allocatorLock.synchronized { |
| sleepInterval = |
| if (numPendingAllocate > 0 || allocator.getNumPendingLossReasonRequests > 0) { |
| val currentAllocationInterval = |
| math.min(heartbeatInterval, nextAllocationInterval) |
| nextAllocationInterval = currentAllocationInterval * 2 // avoid overflow |
| currentAllocationInterval |
| } else { |
| nextAllocationInterval = initialAllocationInterval |
| heartbeatInterval |
| } |
| sleepStartNs = System.nanoTime() |
| allocatorLock.wait(sleepInterval) |
| } |
| val sleepDuration = System.nanoTime() - sleepStartNs |
| if (sleepDuration < TimeUnit.MILLISECONDS.toNanos(sleepInterval)) { |
| // log when sleep is interrupted |
| logDebug(s"Number of pending allocations is $numPendingAllocate. " + |
| s"Slept for $sleepDuration/$sleepInterval ms.") |
| // if sleep was less than the minimum interval, sleep for the rest of it |
| val toSleep = math.max(0, initialAllocationInterval - sleepDuration) |
| if (toSleep > 0) { |
| logDebug(s"Going back to sleep for $toSleep ms") |
| // use Thread.sleep instead of allocatorLock.wait. there is no need to be woken up |
| // by the methods that signal allocatorLock because this is just finishing the min |
| // sleep interval, which should happen even if this is signalled again. |
| Thread.sleep(toSleep) |
| } |
| } else { |
| logDebug(s"Number of pending allocations is $numPendingAllocate. " + |
| s"Slept for $sleepDuration/$sleepInterval.") |
| } |
| } catch { |
| case e: InterruptedException => |
| } |
| } |
| } |
| |
| private def launchReporterThread(): Thread = { |
| val t = new Thread { |
| override def run(): Unit = { |
| try { |
| allocationThreadImpl() |
| } finally { |
| allocator.stop() |
| } |
| } |
| } |
| t.setDaemon(true) |
| t.setName("Reporter") |
| t.start() |
| logInfo(log"Started progress reporter thread with " + |
| log"(heartbeat: ${MDC(LogKeys.HEARTBEAT_INTERVAL, heartbeatInterval)}, initial allocation: " + |
| log"${MDC(LogKeys.INITIAL_HEARTBEAT_INTERVAL, initialAllocationInterval)}) intervals") |
| t |
| } |
| |
| private def distCacheConf(): SparkConf = { |
| val distCacheConf = new SparkConf(false) |
| if (args.distCacheConf != null) { |
| Utils.getPropertiesFromFile(args.distCacheConf).foreach { case (k, v) => |
| distCacheConf.set(k, v) |
| } |
| } |
| distCacheConf |
| } |
| |
| /** |
| * Clean up the staging directory. |
| */ |
| private def cleanupStagingDir(stagingDirPath: Path): Unit = { |
| val stagingDirFs = stagingDirPath.getFileSystem(yarnConf) |
| cleanupStagingDir(stagingDirFs, stagingDirPath) |
| } |
| |
| private def cleanupStagingDir(fs: FileSystem, stagingDirPath: Path): Unit = { |
| try { |
| val preserveFiles = sparkConf.get(PRESERVE_STAGING_FILES) |
| if (!preserveFiles) { |
| logInfo(log"Deleting staging directory ${MDC(LogKeys.PATH, stagingDirPath)}") |
| fs.delete(stagingDirPath, true) |
| } |
| } catch { |
| case ioe: IOException => |
| logError("Failed to cleanup staging dir " + stagingDirPath, ioe) |
| } |
| } |
| |
| /** Add the Yarn IP filter that is required for properly securing the UI. */ |
| private def addAmIpFilter(driver: Option[RpcEndpointRef], proxyBase: String) = { |
| val amFilter = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter" |
| val params = client.getAmIpFilterParams(yarnConf, proxyBase) |
| driver match { |
| case Some(d) => |
| d.send(AddWebUIFilter(amFilter, params, proxyBase)) |
| |
| case None => |
| System.setProperty(UI_FILTERS.key, amFilter) |
| params.foreach { case (k, v) => System.setProperty(s"spark.$amFilter.param.$k", v) } |
| } |
| } |
| |
| /** |
| * Start the user class, which contains the spark driver, in a separate Thread. |
| * If the main routine exits cleanly or exits with System.exit(N) for any N |
| * we assume it was successful, for all other cases we assume failure. |
| * |
| * Returns the user thread that was started. |
| */ |
| private def startUserApplication(): Thread = { |
| logInfo("Starting the user application in a separate Thread") |
| |
| var userArgs = args.userArgs |
| if (args.primaryPyFile != null && args.primaryPyFile.endsWith(".py")) { |
| // When running pyspark, the app is run using PythonRunner. The second argument is the list |
| // of files to add to PYTHONPATH, which Client.scala already handles, so it's empty. |
| userArgs = Seq(args.primaryPyFile, "") ++ userArgs |
| } |
| if (args.primaryRFile != null && |
| (args.primaryRFile.endsWith(".R") || args.primaryRFile.endsWith(".r"))) { |
| // TODO(davies): add R dependencies here |
| } |
| |
| val mainMethod = userClassLoader.loadClass(args.userClass) |
| .getMethod("main", classOf[Array[String]]) |
| |
| val userThread = new Thread { |
| override def run(): Unit = { |
| try { |
| if (!Modifier.isStatic(mainMethod.getModifiers)) { |
| logError(s"Could not find static main method in object ${args.userClass}") |
| finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_EXCEPTION_USER_CLASS) |
| } else { |
| mainMethod.invoke(null, userArgs.toArray) |
| finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS) |
| logDebug("Done running user class") |
| } |
| } catch { |
| case e: InvocationTargetException => |
| e.getCause match { |
| case _: InterruptedException => |
| // Reporter thread can interrupt to stop user class |
| case SparkUserAppException(exitCode) => |
| val msg = log"User application exited with status " + |
| log"${MDC(LogKeys.EXIT_CODE, exitCode)}" |
| logError(msg) |
| finish(FinalApplicationStatus.FAILED, exitCode, msg.message) |
| case cause: Throwable => |
| logError("User class threw exception: ", cause) |
| finish(FinalApplicationStatus.FAILED, |
| ApplicationMaster.EXIT_EXCEPTION_USER_CLASS, |
| "User class threw exception: " + StringUtils.stringifyException(cause)) |
| } |
| sparkContextPromise.tryFailure(e.getCause()) |
| } finally { |
| // Notify the thread waiting for the SparkContext, in case the application did not |
| // instantiate one. This will do nothing when the user code instantiates a SparkContext |
| // (with the correct master), or when the user code throws an exception (due to the |
| // tryFailure above). |
| sparkContextPromise.trySuccess(null) |
| } |
| } |
| } |
| userThread.setContextClassLoader(userClassLoader) |
| userThread.setName("Driver") |
| userThread.start() |
| userThread |
| } |
| |
| private def resetAllocatorInterval(): Unit = allocatorLock.synchronized { |
| nextAllocationInterval = initialAllocationInterval |
| allocatorLock.notifyAll() |
| } |
| |
| /** |
| * An [[RpcEndpoint]] that communicates with the driver's scheduler backend. |
| */ |
| private class AMEndpoint(override val rpcEnv: RpcEnv, driver: RpcEndpointRef) |
| extends RpcEndpoint with Logging { |
| @volatile private var shutdown = false |
| @volatile private var exitCode = 0 |
| |
| private val clientModeTreatDisconnectAsFailed = |
| sparkConf.get(AM_CLIENT_MODE_TREAT_DISCONNECT_AS_FAILED) |
| |
| override def onStart(): Unit = { |
| driver.send(RegisterClusterManager(self)) |
| // if deployment mode for yarn Application is managed client |
| // then send the AM Log Info to spark driver |
| if (!isClusterMode && !sparkConf.get(YARN_UNMANAGED_AM)) { |
| val hostPort = YarnContainerInfoHelper.getNodeManagerHttpAddress(None) |
| val yarnAMID = "yarn-am" |
| val info = new MiscellaneousProcessDetails(hostPort, |
| sparkConf.get(AM_CORES), extractLogUrls) |
| driver.send(MiscellaneousProcessAdded(System.currentTimeMillis(), yarnAMID, info)) |
| } |
| } |
| |
| override def receive: PartialFunction[Any, Unit] = { |
| case UpdateDelegationTokens(tokens) => |
| SparkHadoopUtil.get.addDelegationTokens(tokens, sparkConf) |
| |
| case Shutdown(code) => |
| exitCode = code |
| shutdown = true |
| allocator.setShutdown(true) |
| } |
| |
| override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { |
| case r: RequestExecutors => |
| Option(allocator) match { |
| case Some(a) => |
| if (a.requestTotalExecutorsWithPreferredLocalities( |
| r.resourceProfileToTotalExecs, |
| r.numLocalityAwareTasksPerResourceProfileId, |
| r.hostToLocalTaskCount, |
| r.excludedNodes)) { |
| resetAllocatorInterval() |
| } |
| context.reply(true) |
| |
| case None => |
| logWarning("Container allocator is not ready to request executors yet.") |
| context.reply(false) |
| } |
| |
| case KillExecutors(executorIds) => |
| logInfo(log"Driver requested to kill executor(s) " + |
| log"${MDC(LogKeys.EXECUTOR_IDS, executorIds.mkString(", "))}.") |
| Option(allocator) match { |
| case Some(a) => executorIds.foreach(a.killExecutor) |
| case None => logWarning("Container allocator is not ready to kill executors yet.") |
| } |
| context.reply(true) |
| |
| case GetExecutorLossReason(eid) => |
| Option(allocator) match { |
| case Some(a) => |
| a.enqueueGetLossReasonRequest(eid, context) |
| resetAllocatorInterval() |
| case None => |
| logWarning("Container allocator is not ready to find executor loss reasons yet.") |
| } |
| } |
| |
| override def onDisconnected(remoteAddress: RpcAddress): Unit = { |
| // In cluster mode or unmanaged am case, do not rely on the disassociated event to exit |
| // This avoids potentially reporting incorrect exit codes if the driver fails |
| if (!(isClusterMode || sparkConf.get(YARN_UNMANAGED_AM))) { |
| if (shutdown || !clientModeTreatDisconnectAsFailed) { |
| if (exitCode == 0) { |
| logInfo(log"Driver terminated or disconnected! Shutting down. " + |
| log"${MDC(LogKeys.HOST_PORT, remoteAddress)}") |
| finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS) |
| } else { |
| logError(log"Driver terminated with exit code ${MDC(LogKeys.EXIT_CODE, exitCode)}! " + |
| log"Shutting down. ${MDC(LogKeys.HOST_PORT, remoteAddress)}") |
| finish(FinalApplicationStatus.FAILED, exitCode) |
| } |
| } else { |
| logError(s"Application Master lost connection with driver! Shutting down. $remoteAddress") |
| finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_DISCONNECTED) |
| } |
| } |
| } |
| } |
| |
| } |
| |
| object ApplicationMaster extends Logging { |
| |
| // exit codes for different causes, no reason behind the values |
| private val EXIT_SUCCESS = SparkExitCode.EXIT_SUCCESS |
| private val EXIT_UNCAUGHT_EXCEPTION = 10 |
| private val EXIT_MAX_EXECUTOR_FAILURES = SparkExitCode.EXCEED_MAX_EXECUTOR_FAILURES |
| private val EXIT_REPORTER_FAILURE = 12 |
| private val EXIT_SC_NOT_INITED = 13 |
| private val EXIT_EXCEPTION_USER_CLASS = 15 |
| private val EXIT_EARLY = 16 |
| private val EXIT_DISCONNECTED = 17 |
| |
| private var master: ApplicationMaster = _ |
| |
| def main(args: Array[String]): Unit = { |
| SignalUtils.registerLogger(log) |
| val amArgs = new ApplicationMasterArguments(args) |
| val sparkConf = new SparkConf() |
| if (amArgs.propertiesFile != null) { |
| Utils.getPropertiesFromFile(amArgs.propertiesFile).foreach { case (k, v) => |
| sparkConf.set(k, v) |
| } |
| } |
| // Set system properties for each config entry. This covers two use cases: |
| // - The default configuration stored by the SparkHadoopUtil class |
| // - The user application creating a new SparkConf in cluster mode |
| // |
| // Both cases create a new SparkConf object which reads these configs from system properties. |
| sparkConf.getAll.foreach { case (k, v) => |
| sys.props(k) = v |
| } |
| |
| val yarnConf = new YarnConfiguration(SparkHadoopUtil.newConfiguration(sparkConf)) |
| master = new ApplicationMaster(amArgs, sparkConf, yarnConf) |
| |
| val ugi = sparkConf.get(PRINCIPAL) match { |
| // We only need to log in with the keytab in cluster mode. In client mode, the driver |
| // handles the user keytab. |
| case Some(principal) if master.isClusterMode => |
| val originalCreds = UserGroupInformation.getCurrentUser().getCredentials() |
| SparkHadoopUtil.get.loginUserFromKeytab(principal, sparkConf.get(KEYTAB).orNull) |
| val newUGI = UserGroupInformation.getCurrentUser() |
| |
| if (master.appAttemptId == null || master.appAttemptId.getAttemptId > 1) { |
| // Re-obtain delegation tokens if this is not a first attempt, as they might be outdated |
| // as of now. Add the fresh tokens on top of the original user's credentials (overwrite). |
| // Set the context class loader so that the token manager has access to jars |
| // distributed by the user. |
| Utils.withContextClassLoader(master.userClassLoader) { |
| val credentialManager = new HadoopDelegationTokenManager(sparkConf, yarnConf, null) |
| credentialManager.obtainDelegationTokens(originalCreds) |
| } |
| } |
| |
| // Transfer the original user's tokens to the new user, since it may contain needed tokens |
| // (such as those user to connect to YARN). |
| newUGI.addCredentials(originalCreds) |
| newUGI |
| |
| case _ => |
| SparkHadoopUtil.get.createSparkUser() |
| } |
| |
| ugi.doAs(new PrivilegedExceptionAction[Unit]() { |
| override def run(): Unit = System.exit(master.run()) |
| }) |
| } |
| |
| private[spark] def sparkContextInitialized(sc: SparkContext): Unit = { |
| master.sparkContextInitialized(sc) |
| } |
| |
| private[spark] def getAttemptId(): ApplicationAttemptId = { |
| master.appAttemptId |
| } |
| |
| private[spark] def getHistoryServerAddress( |
| sparkConf: SparkConf, |
| yarnConf: YarnConfiguration, |
| appId: String, |
| attemptId: String): String = { |
| sparkConf.get(HISTORY_SERVER_ADDRESS) |
| .map { text => SparkHadoopUtil.get.substituteHadoopVariables(text, yarnConf) } |
| .map { address => s"${address}${HistoryServer.UI_PATH_PREFIX}/${appId}/${attemptId}" } |
| .getOrElse("") |
| } |
| } |
| |
| /** |
| * This object does not provide any special functionality. It exists so that it's easy to tell |
| * apart the client-mode AM from the cluster-mode AM when using tools such as ps or jps. |
| */ |
| object ExecutorLauncher { |
| |
| def main(args: Array[String]): Unit = { |
| ApplicationMaster.main(args) |
| } |
| |
| } |