blob: bdf17797f02a7127b730ce309d7ed9da5370bfc6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.reef.runtime.yarn.driver;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.reef.annotations.Unstable;
import org.apache.reef.annotations.audience.DriverSide;
import org.apache.reef.annotations.audience.Private;
import org.apache.reef.annotations.audience.RuntimeAuthor;
import org.apache.reef.driver.restart.DriverRuntimeRestartManager;
import org.apache.reef.driver.restart.EvaluatorRestartInfo;
import org.apache.reef.driver.restart.RestartEvaluators;
import org.apache.reef.runtime.common.driver.EvaluatorPreserver;
import org.apache.reef.runtime.common.driver.evaluator.pojos.State;
import org.apache.reef.runtime.common.driver.resourcemanager.ResourceEventImpl;
import org.apache.reef.runtime.common.driver.resourcemanager.ResourceStatusEventImpl;
import org.apache.reef.runtime.yarn.driver.parameters.YarnEvaluatorPreserver;
import org.apache.reef.runtime.yarn.util.YarnUtilities;
import org.apache.reef.tang.annotations.Parameter;
import javax.inject.Inject;
import java.util.*;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* The implementation of restart manager for YARN. Handles evaluator preservation as well
* as evaluator recovery on YARN.
*/
@DriverSide
@RuntimeAuthor
@Private
@Unstable
public final class YarnDriverRuntimeRestartManager implements DriverRuntimeRestartManager {
private static final Logger LOG = Logger.getLogger(YarnDriverRuntimeRestartManager.class.getName());
/**
* The default resubmission attempts number returned if:
* 1) we are not able to determine the number of application attempts based on the environment provided by YARN.
* 2) we are able to receive a list of previous containers from the Resource Manager.
*/
private static final int DEFAULT_RESTART_RESUBMISSION_ATTEMPTS = 1;
private final EvaluatorPreserver evaluatorPreserver;
private final ApplicationMasterRegistration registration;
private final REEFEventHandlers reefEventHandlers;
private final YarnContainerManager yarnContainerManager;
private final RackNameFormatter rackNameFormatter;
private Set<Container> previousContainers = null;
@Inject
private YarnDriverRuntimeRestartManager(@Parameter(YarnEvaluatorPreserver.class)
final EvaluatorPreserver evaluatorPreserver,
final REEFEventHandlers reefEventHandlers,
final ApplicationMasterRegistration registration,
final YarnContainerManager yarnContainerManager,
final RackNameFormatter rackNameFormatter) {
this.registration = registration;
this.evaluatorPreserver = evaluatorPreserver;
this.reefEventHandlers = reefEventHandlers;
this.yarnContainerManager = yarnContainerManager;
this.rackNameFormatter = rackNameFormatter;
}
/**
* Determines the number of times the Driver has been submitted based on the container ID environment
* variable provided by YARN. If that fails, determine whether the application master is a restart
* based on the number of previous containers reported by YARN. In the failure scenario, returns 1 if restart, 0
* otherwise.
* @return positive value if the application master is a restarted instance, 0 otherwise.
*/
@Override
public int getResubmissionAttempts() {
final String containerIdString = YarnUtilities.getContainerIdString();
final ApplicationAttemptId appAttemptID = YarnUtilities.getAppAttemptId(containerIdString);
if (containerIdString == null || appAttemptID == null) {
LOG.log(Level.WARNING, "Was not able to fetch application attempt, container ID is [" + containerIdString +
"] and application attempt is [" + appAttemptID + "]. Determining restart based on previous containers.");
if (this.isRestartByPreviousContainers()) {
LOG.log(Level.WARNING, "Driver is a restarted instance based on the number of previous containers. " +
"As returned by the Resource Manager. Returning default resubmission attempts " +
DEFAULT_RESTART_RESUBMISSION_ATTEMPTS + ".");
return DEFAULT_RESTART_RESUBMISSION_ATTEMPTS;
}
return 0;
}
int appAttempt = appAttemptID.getAttemptId();
LOG.log(Level.FINE, "Application attempt: " + appAttempt);
assert appAttempt > 0;
return appAttempt - 1;
}
/**
* Initializes the list of previous containers and determine whether or not this is an instance of restart
* based on information reported by the RM.
* @return true if previous containers is not empty.
*/
private boolean isRestartByPreviousContainers() {
this.initializeListOfPreviousContainers();
return !this.previousContainers.isEmpty();
}
/**
* Initializes the list of previous containers as reported by YARN.
*/
private synchronized void initializeListOfPreviousContainers() {
if (this.previousContainers == null) {
final List<Container> yarnPrevContainers =
this.registration.getRegistration().getContainersFromPreviousAttempts();
// If it's still null, create an empty list to indicate that it's not a restart.
if (yarnPrevContainers == null) {
this.previousContainers = Collections.unmodifiableSet(new HashSet<Container>());
} else {
this.previousContainers = Collections.unmodifiableSet(new HashSet<>(yarnPrevContainers));
}
yarnContainerManager.onContainersRecovered(this.previousContainers);
}
}
@Override
public void recordAllocatedEvaluator(final String id) {
this.evaluatorPreserver.recordAllocatedEvaluator(id);
}
@Override
public void recordRemovedEvaluator(final String id) {
this.evaluatorPreserver.recordRemovedEvaluator(id);
}
/**
* Used by {@link org.apache.reef.driver.restart.DriverRestartManager}.
* Gets the list of previous containers from the resource manager,
* compares that list to the YarnDriverRuntimeRestartManager's own list based on the evalutor preserver,
* and determine which evaluators are alive and which have failed during restart.
* @return a map of Evaluator ID to {@link EvaluatorRestartInfo} for evaluators that have either failed or survived
* driver restart.
*/
@Override
public RestartEvaluators getPreviousEvaluators() {
final RestartEvaluators.Builder restartEvaluatorsBuilder = RestartEvaluators.newBuilder();
this.initializeListOfPreviousContainers();
if (this.previousContainers != null && !this.previousContainers.isEmpty()) {
LOG.log(Level.INFO, "Driver restarted, with {0} previous containers", this.previousContainers.size());
final Set<String> expectedContainers = this.evaluatorPreserver.recoverEvaluators();
final int numExpectedContainers = expectedContainers.size();
final int numPreviousContainers = this.previousContainers.size();
if (numExpectedContainers > numPreviousContainers) {
// we expected more containers to be alive, some containers must have died during driver restart
LOG.log(Level.WARNING, "Expected {0} containers while only {1} are still alive",
new Object[]{numExpectedContainers, numPreviousContainers});
final Set<String> previousContainersIds = new HashSet<>();
for (final Container container : this.previousContainers) {
previousContainersIds.add(container.getId().toString());
}
for (final String expectedContainerId : expectedContainers) {
if (!previousContainersIds.contains(expectedContainerId)) {
LOG.log(Level.WARNING, "Expected container [{0}] not alive, must have failed during driver restart.",
expectedContainerId);
restartEvaluatorsBuilder.addRestartEvaluator(
EvaluatorRestartInfo.createFailedEvaluatorInfo(expectedContainerId));
}
}
}
if (numExpectedContainers < numPreviousContainers) {
// somehow we have more alive evaluators, this should not happen
throw new RuntimeException("Expected only [" + numExpectedContainers + "] containers " +
"but resource manager believe that [" + numPreviousContainers + "] are outstanding for driver.");
}
// numExpectedContainers == numPreviousContainers
for (final Container container : this.previousContainers) {
LOG.log(Level.FINE, "Previous container: [{0}]", container.toString());
if (!expectedContainers.contains(container.getId().toString())) {
throw new RuntimeException("Not expecting container " + container.getId().toString());
}
restartEvaluatorsBuilder.addRestartEvaluator(EvaluatorRestartInfo.createExpectedEvaluatorInfo(
ResourceEventImpl.newRecoveryBuilder().setIdentifier(container.getId().toString())
.setNodeId(container.getNodeId().toString()).setRackName(rackNameFormatter.getRackName(container))
.setResourceMemory(container.getResource().getMemory())
.setVirtualCores(container.getResource().getVirtualCores())
.setRuntimeName(RuntimeIdentifier.RUNTIME_NAME).build()));
}
}
return restartEvaluatorsBuilder.build();
}
/**
* Calls the appropriate handler via REEFEventHandlers, which is a runtime specific implementation
* of the YARN runtime.
* @param evaluatorIds the set of evaluator IDs of failed evaluators during restart.
*/
@Override
public void informAboutEvaluatorFailures(final Set<String> evaluatorIds) {
for (String evaluatorId : evaluatorIds) {
LOG.log(Level.WARNING, "Container [" + evaluatorId +
"] has failed during driver restart process, FailedEvaluatorHandler will be triggered, but " +
"no additional evaluator can be requested due to YARN-2433.");
// trigger a failed evaluator event
this.reefEventHandlers.onResourceStatus(ResourceStatusEventImpl.newBuilder()
.setIdentifier(evaluatorId)
.setState(State.FAILED)
.setExitCode(1)
.setDiagnostics("Container [" + evaluatorId + "] failed during driver restart process.")
.build());
}
}
}