blob: 01606bc8c76c452afaf8331209786ccbcc4b88af [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import javax.crypto.SecretKey;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.SettableFuture;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerLaunchContextPBImpl;
import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.proto.YarnProtos.ReservationAllocationStateProto;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEvent;
import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEventType;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AggregateAppResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics;
import org.apache.hadoop.yarn.state.InvalidStateTransitionException;
import org.apache.hadoop.yarn.state.MultipleArcTransition;
import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory;
@Private
@Unstable
/**
* Base class to implement storage of ResourceManager state.
* Takes care of asynchronous notifications and interfacing with YARN objects.
* Real store implementations need to derive from it and implement blocking
* store and load methods to actually store and load the state.
*/
public abstract class RMStateStore extends AbstractService {
// constants for RM App state and RMDTSecretManagerState.
@VisibleForTesting
public static final String RM_APP_ROOT = "RMAppRoot";
protected static final String RM_DT_SECRET_MANAGER_ROOT = "RMDTSecretManagerRoot";
protected static final String RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME =
"RMDelegationTokensRoot";
protected static final String DELEGATION_KEY_PREFIX = "DelegationKey_";
protected static final String DELEGATION_TOKEN_PREFIX = "RMDelegationToken_";
protected static final String DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX =
"RMDTSequenceNumber_";
protected static final String AMRMTOKEN_SECRET_MANAGER_ROOT =
"AMRMTokenSecretManagerRoot";
protected static final String RESERVATION_SYSTEM_ROOT =
"ReservationSystemRoot";
protected static final String VERSION_NODE = "RMVersionNode";
protected static final String EPOCH_NODE = "EpochNode";
protected long baseEpoch;
private long epochRange;
protected ResourceManager resourceManager;
private final ReadLock readLock;
private final WriteLock writeLock;
public static final Log LOG = LogFactory.getLog(RMStateStore.class);
/**
* The enum defines state of RMStateStore.
*/
public enum RMStateStoreState {
ACTIVE,
FENCED
};
private static final StateMachineFactory<RMStateStore,
RMStateStoreState,
RMStateStoreEventType,
RMStateStoreEvent>
stateMachineFactory = new StateMachineFactory<RMStateStore,
RMStateStoreState,
RMStateStoreEventType,
RMStateStoreEvent>(
RMStateStoreState.ACTIVE)
.addTransition(RMStateStoreState.ACTIVE,
EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED),
RMStateStoreEventType.STORE_APP, new StoreAppTransition())
.addTransition(RMStateStoreState.ACTIVE,
EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED),
RMStateStoreEventType.UPDATE_APP, new UpdateAppTransition())
.addTransition(RMStateStoreState.ACTIVE,
EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED),
RMStateStoreEventType.REMOVE_APP, new RemoveAppTransition())
.addTransition(RMStateStoreState.ACTIVE,
EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED),
RMStateStoreEventType.STORE_APP_ATTEMPT,
new StoreAppAttemptTransition())
.addTransition(RMStateStoreState.ACTIVE,
EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED),
RMStateStoreEventType.UPDATE_APP_ATTEMPT,
new UpdateAppAttemptTransition())
.addTransition(RMStateStoreState.ACTIVE,
EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED),
RMStateStoreEventType.REMOVE_APP_ATTEMPT,
new RemoveAppAttemptTransition())
.addTransition(RMStateStoreState.ACTIVE,
EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED),
RMStateStoreEventType.STORE_MASTERKEY,
new StoreRMDTMasterKeyTransition())
.addTransition(RMStateStoreState.ACTIVE,
EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED),
RMStateStoreEventType.REMOVE_MASTERKEY,
new RemoveRMDTMasterKeyTransition())
.addTransition(RMStateStoreState.ACTIVE,
EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED),
RMStateStoreEventType.STORE_DELEGATION_TOKEN,
new StoreRMDTTransition())
.addTransition(RMStateStoreState.ACTIVE,
EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED),
RMStateStoreEventType.REMOVE_DELEGATION_TOKEN,
new RemoveRMDTTransition())
.addTransition(RMStateStoreState.ACTIVE,
EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED),
RMStateStoreEventType.UPDATE_DELEGATION_TOKEN,
new UpdateRMDTTransition())
.addTransition(RMStateStoreState.ACTIVE,
EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED),
RMStateStoreEventType.UPDATE_AMRM_TOKEN,
new StoreOrUpdateAMRMTokenTransition())
.addTransition(RMStateStoreState.ACTIVE,
EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED),
RMStateStoreEventType.STORE_RESERVATION,
new StoreReservationAllocationTransition())
.addTransition(RMStateStoreState.ACTIVE,
EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED),
RMStateStoreEventType.REMOVE_RESERVATION,
new RemoveReservationAllocationTransition())
.addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED,
RMStateStoreEventType.FENCED)
.addTransition(RMStateStoreState.FENCED, RMStateStoreState.FENCED,
EnumSet.of(
RMStateStoreEventType.STORE_APP,
RMStateStoreEventType.UPDATE_APP,
RMStateStoreEventType.REMOVE_APP,
RMStateStoreEventType.STORE_APP_ATTEMPT,
RMStateStoreEventType.UPDATE_APP_ATTEMPT,
RMStateStoreEventType.FENCED,
RMStateStoreEventType.STORE_MASTERKEY,
RMStateStoreEventType.REMOVE_MASTERKEY,
RMStateStoreEventType.STORE_DELEGATION_TOKEN,
RMStateStoreEventType.REMOVE_DELEGATION_TOKEN,
RMStateStoreEventType.UPDATE_DELEGATION_TOKEN,
RMStateStoreEventType.UPDATE_AMRM_TOKEN,
RMStateStoreEventType.STORE_RESERVATION,
RMStateStoreEventType.REMOVE_RESERVATION));
private final StateMachine<RMStateStoreState,
RMStateStoreEventType,
RMStateStoreEvent> stateMachine;
private static class StoreAppTransition
implements MultipleArcTransition<RMStateStore, RMStateStoreEvent,
RMStateStoreState> {
@Override
public RMStateStoreState transition(RMStateStore store,
RMStateStoreEvent event) {
if (!(event instanceof RMStateStoreAppEvent)) {
// should never happen
LOG.error("Illegal event type: " + event.getClass());
return RMStateStoreState.ACTIVE;
}
boolean isFenced = false;
ApplicationStateData appState =
((RMStateStoreAppEvent) event).getAppState();
ApplicationId appId =
appState.getApplicationSubmissionContext().getApplicationId();
LOG.info("Storing info for app: " + appId);
try {
store.storeApplicationStateInternal(appId, appState);
store.notifyApplication(
new RMAppEvent(appId, RMAppEventType.APP_NEW_SAVED));
} catch (Exception e) {
LOG.error("Error storing app: " + appId, e);
if (e instanceof StoreLimitException) {
store.notifyApplication(
new RMAppEvent(appId, RMAppEventType.APP_SAVE_FAILED,
e.getMessage()));
} else {
isFenced = store.notifyStoreOperationFailedInternal(e);
}
}
return finalState(isFenced);
};
}
private static class UpdateAppTransition implements
MultipleArcTransition<RMStateStore, RMStateStoreEvent,
RMStateStoreState> {
@Override
public RMStateStoreState transition(RMStateStore store,
RMStateStoreEvent event) {
if (!(event instanceof RMStateUpdateAppEvent)) {
// should never happen
LOG.error("Illegal event type: " + event.getClass());
return RMStateStoreState.ACTIVE;
}
boolean isFenced = false;
ApplicationStateData appState =
((RMStateUpdateAppEvent) event).getAppState();
SettableFuture<Object> result =
((RMStateUpdateAppEvent) event).getResult();
ApplicationId appId =
appState.getApplicationSubmissionContext().getApplicationId();
LOG.info("Updating info for app: " + appId);
try {
if (isAppStateFinal(appState)) {
pruneAppState(appState);
}
store.updateApplicationStateInternal(appId, appState);
if (((RMStateUpdateAppEvent) event).isNotifyApplication()) {
store.notifyApplication(new RMAppEvent(appId,
RMAppEventType.APP_UPDATE_SAVED));
}
if (result != null) {
result.set(null);
}
} catch (Exception e) {
String msg = "Error updating app: " + appId;
LOG.error(msg, e);
isFenced = store.notifyStoreOperationFailedInternal(e);
if (result != null) {
result.setException(new YarnException(msg, e));
}
}
return finalState(isFenced);
}
private boolean isAppStateFinal(ApplicationStateData appState) {
RMAppState state = appState.getState();
return state == RMAppState.FINISHED || state == RMAppState.FAILED ||
state == RMAppState.KILLED;
}
private void pruneAppState(ApplicationStateData appState) {
ApplicationSubmissionContext srcCtx =
appState.getApplicationSubmissionContext();
ApplicationSubmissionContextPBImpl context =
new ApplicationSubmissionContextPBImpl();
// most fields in the ApplicationSubmissionContext are not needed,
// but the following few need to be present for recovery to succeed
context.setApplicationId(srcCtx.getApplicationId());
context.setResource(srcCtx.getResource());
context.setQueue(srcCtx.getQueue());
context.setAMContainerResourceRequests(
srcCtx.getAMContainerResourceRequests());
context.setApplicationName(srcCtx.getApplicationName());
context.setPriority(srcCtx.getPriority());
context.setApplicationTags(srcCtx.getApplicationTags());
context.setApplicationType(srcCtx.getApplicationType());
context.setUnmanagedAM(srcCtx.getUnmanagedAM());
context.setNodeLabelExpression(srcCtx.getNodeLabelExpression());
ContainerLaunchContextPBImpl amContainerSpec =
new ContainerLaunchContextPBImpl();
amContainerSpec.setApplicationACLs(
srcCtx.getAMContainerSpec().getApplicationACLs());
context.setAMContainerSpec(amContainerSpec);
appState.setApplicationSubmissionContext(context);
}
}
private static class RemoveAppTransition implements
MultipleArcTransition<RMStateStore, RMStateStoreEvent,
RMStateStoreState> {
@Override
public RMStateStoreState transition(RMStateStore store,
RMStateStoreEvent event) {
if (!(event instanceof RMStateStoreRemoveAppEvent)) {
// should never happen
LOG.error("Illegal event type: " + event.getClass());
return RMStateStoreState.ACTIVE;
}
boolean isFenced = false;
ApplicationStateData appState =
((RMStateStoreRemoveAppEvent) event).getAppState();
ApplicationId appId =
appState.getApplicationSubmissionContext().getApplicationId();
LOG.info("Removing info for app: " + appId);
try {
store.removeApplicationStateInternal(appState);
} catch (Exception e) {
LOG.error("Error removing app: " + appId, e);
isFenced = store.notifyStoreOperationFailedInternal(e);
}
return finalState(isFenced);
};
}
private static class StoreAppAttemptTransition implements
MultipleArcTransition<RMStateStore, RMStateStoreEvent,
RMStateStoreState> {
@Override
public RMStateStoreState transition(RMStateStore store,
RMStateStoreEvent event) {
if (!(event instanceof RMStateStoreAppAttemptEvent)) {
// should never happen
LOG.error("Illegal event type: " + event.getClass());
return RMStateStoreState.ACTIVE;
}
boolean isFenced = false;
ApplicationAttemptStateData attemptState =
((RMStateStoreAppAttemptEvent) event).getAppAttemptState();
try {
if (LOG.isDebugEnabled()) {
LOG.debug("Storing info for attempt: " + attemptState.getAttemptId());
}
store.storeApplicationAttemptStateInternal(attemptState.getAttemptId(),
attemptState);
store.notifyApplicationAttempt(new RMAppAttemptEvent
(attemptState.getAttemptId(),
RMAppAttemptEventType.ATTEMPT_NEW_SAVED));
} catch (Exception e) {
LOG.error("Error storing appAttempt: " + attemptState.getAttemptId(), e);
isFenced = store.notifyStoreOperationFailedInternal(e);
}
return finalState(isFenced);
};
}
private static class UpdateAppAttemptTransition implements
MultipleArcTransition<RMStateStore, RMStateStoreEvent,
RMStateStoreState> {
@Override
public RMStateStoreState transition(RMStateStore store,
RMStateStoreEvent event) {
if (!(event instanceof RMStateUpdateAppAttemptEvent)) {
// should never happen
LOG.error("Illegal event type: " + event.getClass());
return RMStateStoreState.ACTIVE;
}
boolean isFenced = false;
ApplicationAttemptStateData attemptState =
((RMStateUpdateAppAttemptEvent) event).getAppAttemptState();
try {
if (LOG.isDebugEnabled()) {
LOG.debug("Updating info for attempt: " + attemptState.getAttemptId());
}
store.updateApplicationAttemptStateInternal(attemptState.getAttemptId(),
attemptState);
store.notifyApplicationAttempt(new RMAppAttemptEvent
(attemptState.getAttemptId(),
RMAppAttemptEventType.ATTEMPT_UPDATE_SAVED));
} catch (Exception e) {
LOG.error("Error updating appAttempt: " + attemptState.getAttemptId(), e);
isFenced = store.notifyStoreOperationFailedInternal(e);
}
return finalState(isFenced);
};
}
private static class StoreRMDTTransition implements
MultipleArcTransition<RMStateStore, RMStateStoreEvent,
RMStateStoreState> {
@Override
public RMStateStoreState transition(RMStateStore store,
RMStateStoreEvent event) {
if (!(event instanceof RMStateStoreRMDTEvent)) {
// should never happen
LOG.error("Illegal event type: " + event.getClass());
return RMStateStoreState.ACTIVE;
}
boolean isFenced = false;
RMStateStoreRMDTEvent dtEvent = (RMStateStoreRMDTEvent) event;
try {
LOG.info("Storing RMDelegationToken and SequenceNumber");
store.storeRMDelegationTokenState(
dtEvent.getRmDTIdentifier(), dtEvent.getRenewDate());
} catch (Exception e) {
LOG.error("Error While Storing RMDelegationToken and SequenceNumber ",
e);
isFenced = store.notifyStoreOperationFailedInternal(e);
}
return finalState(isFenced);
}
}
private static class RemoveRMDTTransition implements
MultipleArcTransition<RMStateStore, RMStateStoreEvent,
RMStateStoreState> {
@Override
public RMStateStoreState transition(RMStateStore store,
RMStateStoreEvent event) {
if (!(event instanceof RMStateStoreRMDTEvent)) {
// should never happen
LOG.error("Illegal event type: " + event.getClass());
return RMStateStoreState.ACTIVE;
}
boolean isFenced = false;
RMStateStoreRMDTEvent dtEvent = (RMStateStoreRMDTEvent) event;
try {
LOG.info("Removing RMDelegationToken and SequenceNumber");
store.removeRMDelegationTokenState(dtEvent.getRmDTIdentifier());
} catch (Exception e) {
LOG.error("Error While Removing RMDelegationToken and SequenceNumber ",
e);
isFenced = store.notifyStoreOperationFailedInternal(e);
}
return finalState(isFenced);
}
}
private static class UpdateRMDTTransition implements
MultipleArcTransition<RMStateStore, RMStateStoreEvent,
RMStateStoreState> {
@Override
public RMStateStoreState transition(RMStateStore store,
RMStateStoreEvent event) {
if (!(event instanceof RMStateStoreRMDTEvent)) {
// should never happen
LOG.error("Illegal event type: " + event.getClass());
return RMStateStoreState.ACTIVE;
}
boolean isFenced = false;
RMStateStoreRMDTEvent dtEvent = (RMStateStoreRMDTEvent) event;
try {
LOG.info("Updating RMDelegationToken and SequenceNumber");
store.updateRMDelegationTokenState(
dtEvent.getRmDTIdentifier(), dtEvent.getRenewDate());
} catch (Exception e) {
LOG.error("Error While Updating RMDelegationToken and SequenceNumber ",
e);
isFenced = store.notifyStoreOperationFailedInternal(e);
}
return finalState(isFenced);
}
}
private static class StoreRMDTMasterKeyTransition implements
MultipleArcTransition<RMStateStore, RMStateStoreEvent,
RMStateStoreState> {
@Override
public RMStateStoreState transition(RMStateStore store,
RMStateStoreEvent event) {
if (!(event instanceof RMStateStoreRMDTMasterKeyEvent)) {
// should never happen
LOG.error("Illegal event type: " + event.getClass());
return RMStateStoreState.ACTIVE;
}
boolean isFenced = false;
RMStateStoreRMDTMasterKeyEvent dtEvent =
(RMStateStoreRMDTMasterKeyEvent) event;
try {
LOG.info("Storing RMDTMasterKey.");
store.storeRMDTMasterKeyState(dtEvent.getDelegationKey());
} catch (Exception e) {
LOG.error("Error While Storing RMDTMasterKey.", e);
isFenced = store.notifyStoreOperationFailedInternal(e);
}
return finalState(isFenced);
}
}
private static class RemoveRMDTMasterKeyTransition implements
MultipleArcTransition<RMStateStore, RMStateStoreEvent,
RMStateStoreState> {
@Override
public RMStateStoreState transition(RMStateStore store,
RMStateStoreEvent event) {
if (!(event instanceof RMStateStoreRMDTMasterKeyEvent)) {
// should never happen
LOG.error("Illegal event type: " + event.getClass());
return RMStateStoreState.ACTIVE;
}
boolean isFenced = false;
RMStateStoreRMDTMasterKeyEvent dtEvent =
(RMStateStoreRMDTMasterKeyEvent) event;
try {
LOG.info("Removing RMDTMasterKey.");
store.removeRMDTMasterKeyState(dtEvent.getDelegationKey());
} catch (Exception e) {
LOG.error("Error While Removing RMDTMasterKey.", e);
isFenced = store.notifyStoreOperationFailedInternal(e);
}
return finalState(isFenced);
}
}
private static class StoreOrUpdateAMRMTokenTransition implements
MultipleArcTransition<RMStateStore, RMStateStoreEvent,
RMStateStoreState> {
@Override
public RMStateStoreState transition(RMStateStore store,
RMStateStoreEvent event) {
if (!(event instanceof RMStateStoreAMRMTokenEvent)) {
// should never happen
LOG.error("Illegal event type: " + event.getClass());
return RMStateStoreState.ACTIVE;
}
RMStateStoreAMRMTokenEvent amrmEvent = (RMStateStoreAMRMTokenEvent) event;
boolean isFenced = false;
try {
LOG.info("Updating AMRMToken");
store.storeOrUpdateAMRMTokenSecretManagerState(
amrmEvent.getAmrmTokenSecretManagerState(), amrmEvent.isUpdate());
} catch (Exception e) {
LOG.error("Error storing info for AMRMTokenSecretManager", e);
isFenced = store.notifyStoreOperationFailedInternal(e);
}
return finalState(isFenced);
}
}
private static class StoreReservationAllocationTransition implements
MultipleArcTransition<RMStateStore, RMStateStoreEvent,
RMStateStoreState> {
@Override
public RMStateStoreState transition(RMStateStore store,
RMStateStoreEvent event) {
if (!(event instanceof RMStateStoreStoreReservationEvent)) {
// should never happen
LOG.error("Illegal event type: " + event.getClass());
return RMStateStoreState.ACTIVE;
}
boolean isFenced = false;
RMStateStoreStoreReservationEvent reservationEvent =
(RMStateStoreStoreReservationEvent) event;
try {
LOG.info("Storing reservation allocation." + reservationEvent
.getReservationIdName());
store.storeReservationState(
reservationEvent.getReservationAllocation(),
reservationEvent.getPlanName(),
reservationEvent.getReservationIdName());
} catch (Exception e) {
LOG.error("Error while storing reservation allocation.", e);
isFenced = store.notifyStoreOperationFailedInternal(e);
}
return finalState(isFenced);
}
}
private static class RemoveReservationAllocationTransition implements
MultipleArcTransition<RMStateStore, RMStateStoreEvent,
RMStateStoreState> {
@Override
public RMStateStoreState transition(RMStateStore store,
RMStateStoreEvent event) {
if (!(event instanceof RMStateStoreStoreReservationEvent)) {
// should never happen
LOG.error("Illegal event type: " + event.getClass());
return RMStateStoreState.ACTIVE;
}
boolean isFenced = false;
RMStateStoreStoreReservationEvent reservationEvent =
(RMStateStoreStoreReservationEvent) event;
try {
LOG.info("Removing reservation allocation." + reservationEvent
.getReservationIdName());
store.removeReservationState(
reservationEvent.getPlanName(),
reservationEvent.getReservationIdName());
} catch (Exception e) {
LOG.error("Error while removing reservation allocation.", e);
isFenced = store.notifyStoreOperationFailedInternal(e);
}
return finalState(isFenced);
}
}
private static RMStateStoreState finalState(boolean isFenced) {
return isFenced ? RMStateStoreState.FENCED : RMStateStoreState.ACTIVE;
}
private static class RemoveAppAttemptTransition implements
MultipleArcTransition<RMStateStore, RMStateStoreEvent,
RMStateStoreState> {
@Override
public RMStateStoreState transition(RMStateStore store,
RMStateStoreEvent event) {
if (!(event instanceof RMStateStoreRemoveAppAttemptEvent)) {
// should never happen
LOG.error("Illegal event type: " + event.getClass());
return RMStateStoreState.ACTIVE;
}
boolean isFenced = false;
ApplicationAttemptId attemptId =
((RMStateStoreRemoveAppAttemptEvent) event).getApplicationAttemptId();
ApplicationId appId = attemptId.getApplicationId();
LOG.info("Removing attempt " + attemptId + " from app: " + appId);
try {
store.removeApplicationAttemptInternal(attemptId);
} catch (Exception e) {
LOG.error("Error removing attempt: " + attemptId, e);
isFenced = store.notifyStoreOperationFailedInternal(e);
}
return finalState(isFenced);
}
}
public RMStateStore() {
super(RMStateStore.class.getName());
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
this.readLock = lock.readLock();
this.writeLock = lock.writeLock();
stateMachine = stateMachineFactory.make(this);
}
public static class RMDTSecretManagerState {
// DTIdentifier -> renewDate
Map<RMDelegationTokenIdentifier, Long> delegationTokenState =
new HashMap<RMDelegationTokenIdentifier, Long>();
Set<DelegationKey> masterKeyState =
new HashSet<DelegationKey>();
int dtSequenceNumber = 0;
public Map<RMDelegationTokenIdentifier, Long> getTokenState() {
return delegationTokenState;
}
public Set<DelegationKey> getMasterKeyState() {
return masterKeyState;
}
public int getDTSequenceNumber() {
return dtSequenceNumber;
}
}
/**
* State of the ResourceManager
*/
public static class RMState {
Map<ApplicationId, ApplicationStateData> appState =
new TreeMap<ApplicationId, ApplicationStateData>();
RMDTSecretManagerState rmSecretManagerState = new RMDTSecretManagerState();
AMRMTokenSecretManagerState amrmTokenSecretManagerState = null;
private Map<String, Map<ReservationId, ReservationAllocationStateProto>>
reservationState = new TreeMap<>();
public Map<ApplicationId, ApplicationStateData> getApplicationState() {
return appState;
}
public RMDTSecretManagerState getRMDTSecretManagerState() {
return rmSecretManagerState;
}
public AMRMTokenSecretManagerState getAMRMTokenSecretManagerState() {
return amrmTokenSecretManagerState;
}
public Map<String, Map<ReservationId, ReservationAllocationStateProto>>
getReservationState() {
return reservationState;
}
}
private Dispatcher rmDispatcher;
/**
* Dispatcher used to send state operation completion events to
* ResourceManager services
*/
public void setRMDispatcher(Dispatcher dispatcher) {
this.rmDispatcher = dispatcher;
}
AsyncDispatcher dispatcher;
@SuppressWarnings("rawtypes")
@VisibleForTesting
protected EventHandler rmStateStoreEventHandler;
@Override
protected void serviceInit(Configuration conf) throws Exception{
// create async handler
dispatcher = new AsyncDispatcher("RM StateStore dispatcher");
dispatcher.init(conf);
rmStateStoreEventHandler = new ForwardingEventHandler();
dispatcher.register(RMStateStoreEventType.class,
rmStateStoreEventHandler);
dispatcher.setDrainEventsOnStop();
// read the base epoch value from conf
baseEpoch = conf.getLong(YarnConfiguration.RM_EPOCH,
YarnConfiguration.DEFAULT_RM_EPOCH);
epochRange = conf.getLong(YarnConfiguration.RM_EPOCH_RANGE,
YarnConfiguration.DEFAULT_RM_EPOCH_RANGE);
initInternal(conf);
}
@Override
protected void serviceStart() throws Exception {
dispatcher.start();
startInternal();
}
/**
* Derived classes initialize themselves using this method.
*/
protected abstract void initInternal(Configuration conf) throws Exception;
/**
* Derived classes start themselves using this method.
* The base class is started and the event dispatcher is ready to use at
* this point
*/
protected abstract void startInternal() throws Exception;
@Override
protected void serviceStop() throws Exception {
dispatcher.stop();
closeInternal();
}
/**
* Derived classes close themselves using this method.
* The base class will be closed and the event dispatcher will be shutdown
* after this
*/
protected abstract void closeInternal() throws Exception;
/**
* 1) Versioning scheme: major.minor. For e.g. 1.0, 1.1, 1.2...1.25, 2.0 etc.
* 2) Any incompatible change of state-store is a major upgrade, and any
* compatible change of state-store is a minor upgrade.
* 3) If theres's no version, treat it as CURRENT_VERSION_INFO.
* 4) Within a minor upgrade, say 1.1 to 1.2:
* overwrite the version info and proceed as normal.
* 5) Within a major upgrade, say 1.2 to 2.0:
* throw exception and indicate user to use a separate upgrade tool to
* upgrade RM state.
*/
public void checkVersion() throws Exception {
Version loadedVersion = loadVersion();
LOG.info("Loaded RM state version info " + loadedVersion);
if (loadedVersion != null && loadedVersion.equals(getCurrentVersion())) {
return;
}
// if there is no version info, treat it as CURRENT_VERSION_INFO;
if (loadedVersion == null) {
loadedVersion = getCurrentVersion();
}
if (loadedVersion.isCompatibleTo(getCurrentVersion())) {
LOG.info("Storing RM state version info " + getCurrentVersion());
storeVersion();
} else {
throw new RMStateVersionIncompatibleException(
"Expecting RM state version " + getCurrentVersion()
+ ", but loading version " + loadedVersion);
}
}
/**
* Derived class use this method to load the version information from state
* store.
*/
protected abstract Version loadVersion() throws Exception;
/**
* Derived class use this method to store the version information.
*/
protected abstract void storeVersion() throws Exception;
/**
* Get the current version of the underlying state store.
*/
protected abstract Version getCurrentVersion();
/**
* Get the current epoch of RM and increment the value.
*/
public abstract long getAndIncrementEpoch() throws Exception;
/**
* Compute the next epoch value by incrementing by one.
* Wraps around if the epoch range is exceeded so that
* when federation is enabled epoch collisions can be avoided.
*/
protected long nextEpoch(long epoch){
long epochVal = epoch - baseEpoch + 1;
if (epochRange > 0) {
epochVal %= epochRange;
}
return epochVal + baseEpoch;
}
/**
* Blocking API
* The derived class must recover state from the store and return a new
* RMState object populated with that state
* This must not be called on the dispatcher thread
*/
public abstract RMState loadState() throws Exception;
/**
* Non-Blocking API
* ResourceManager services use this to store the application's state
* This does not block the dispatcher threads
* RMAppStoredEvent will be sent on completion to notify the RMApp
*/
@SuppressWarnings("unchecked")
public void storeNewApplication(RMApp app) {
ApplicationSubmissionContext context = app
.getApplicationSubmissionContext();
assert context instanceof ApplicationSubmissionContextPBImpl;
ApplicationStateData appState =
ApplicationStateData.newInstance(app.getSubmitTime(),
app.getStartTime(), context, app.getUser(), app.getCallerContext());
appState.setApplicationTimeouts(app.getApplicationTimeouts());
getRMStateStoreEventHandler().handle(new RMStateStoreAppEvent(appState));
}
@SuppressWarnings("unchecked")
public void updateApplicationState(ApplicationStateData appState) {
getRMStateStoreEventHandler().handle(new RMStateUpdateAppEvent(appState));
}
@SuppressWarnings("unchecked")
public void updateApplicationState(ApplicationStateData appState,
boolean notifyApp) {
getRMStateStoreEventHandler().handle(new RMStateUpdateAppEvent(appState,
notifyApp));
}
public void updateApplicationStateSynchronously(ApplicationStateData appState,
boolean notifyApp, SettableFuture<Object> resultFuture) {
handleStoreEvent(
new RMStateUpdateAppEvent(appState, notifyApp, resultFuture));
}
public void updateFencedState() {
handleStoreEvent(new RMStateStoreEvent(RMStateStoreEventType.FENCED));
}
/**
* Blocking API
* Derived classes must implement this method to store the state of an
* application.
*/
protected abstract void storeApplicationStateInternal(ApplicationId appId,
ApplicationStateData appStateData) throws Exception;
protected abstract void updateApplicationStateInternal(ApplicationId appId,
ApplicationStateData appStateData) throws Exception;
@SuppressWarnings("unchecked")
/**
* Non-blocking API
* ResourceManager services call this to store state on an application attempt
* This does not block the dispatcher threads
* RMAppAttemptStoredEvent will be sent on completion to notify the RMAppAttempt
*/
public void storeNewApplicationAttempt(RMAppAttempt appAttempt) {
Credentials credentials = getCredentialsFromAppAttempt(appAttempt);
RMAppAttemptMetrics attempMetrics = appAttempt.getRMAppAttemptMetrics();
AggregateAppResourceUsage resUsage =
attempMetrics.getAggregateAppResourceUsage();
ApplicationAttemptStateData attemptState =
ApplicationAttemptStateData.newInstance(
appAttempt.getAppAttemptId(),
appAttempt.getMasterContainer(),
credentials, appAttempt.getStartTime(),
resUsage.getResourceUsageSecondsMap(),
attempMetrics.getPreemptedResourceSecondsMap());
getRMStateStoreEventHandler().handle(
new RMStateStoreAppAttemptEvent(attemptState));
}
@SuppressWarnings("unchecked")
public void updateApplicationAttemptState(
ApplicationAttemptStateData attemptState) {
getRMStateStoreEventHandler().handle(
new RMStateUpdateAppAttemptEvent(attemptState));
}
/**
* Blocking API
* Derived classes must implement this method to store the state of an
* application attempt
*/
protected abstract void storeApplicationAttemptStateInternal(
ApplicationAttemptId attemptId,
ApplicationAttemptStateData attemptStateData) throws Exception;
protected abstract void updateApplicationAttemptStateInternal(
ApplicationAttemptId attemptId,
ApplicationAttemptStateData attemptStateData) throws Exception;
/**
* RMDTSecretManager call this to store the state of a delegation token
* and sequence number
*/
public void storeRMDelegationToken(
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate) {
handleStoreEvent(new RMStateStoreRMDTEvent(rmDTIdentifier, renewDate,
RMStateStoreEventType.STORE_DELEGATION_TOKEN));
}
/**
* Blocking API
* Derived classes must implement this method to store the state of
* RMDelegationToken and sequence number
*/
protected abstract void storeRMDelegationTokenState(
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate)
throws Exception;
/**
* RMDTSecretManager call this to remove the state of a delegation token
*/
public void removeRMDelegationToken(
RMDelegationTokenIdentifier rmDTIdentifier) {
handleStoreEvent(new RMStateStoreRMDTEvent(rmDTIdentifier, null,
RMStateStoreEventType.REMOVE_DELEGATION_TOKEN));
}
/**
* Blocking API
* Derived classes must implement this method to remove the state of RMDelegationToken
*/
protected abstract void removeRMDelegationTokenState(
RMDelegationTokenIdentifier rmDTIdentifier) throws Exception;
/**
* RMDTSecretManager call this to update the state of a delegation token
* and sequence number
*/
public void updateRMDelegationToken(
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate) {
handleStoreEvent(new RMStateStoreRMDTEvent(rmDTIdentifier, renewDate,
RMStateStoreEventType.UPDATE_DELEGATION_TOKEN));
}
/**
* Blocking API
* Derived classes must implement this method to update the state of
* RMDelegationToken and sequence number
*/
protected abstract void updateRMDelegationTokenState(
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate)
throws Exception;
/**
* RMDTSecretManager call this to store the state of a master key
*/
public void storeRMDTMasterKey(DelegationKey delegationKey) {
handleStoreEvent(new RMStateStoreRMDTMasterKeyEvent(delegationKey,
RMStateStoreEventType.STORE_MASTERKEY));
}
/**
* Blocking API
* Derived classes must implement this method to store the state of
* DelegationToken Master Key
*/
protected abstract void storeRMDTMasterKeyState(DelegationKey delegationKey)
throws Exception;
/**
* RMDTSecretManager call this to remove the state of a master key
*/
public void removeRMDTMasterKey(DelegationKey delegationKey) {
handleStoreEvent(new RMStateStoreRMDTMasterKeyEvent(delegationKey,
RMStateStoreEventType.REMOVE_MASTERKEY));
}
/**
* Blocking Apis to maintain reservation state.
*/
public void storeNewReservation(
ReservationAllocationStateProto reservationAllocation, String planName,
String reservationIdName) {
handleStoreEvent(new RMStateStoreStoreReservationEvent(
reservationAllocation, RMStateStoreEventType.STORE_RESERVATION,
planName, reservationIdName));
}
public void removeReservation(String planName, String reservationIdName) {
handleStoreEvent(new RMStateStoreStoreReservationEvent(
null, RMStateStoreEventType.REMOVE_RESERVATION,
planName, reservationIdName));
}
/**
* Blocking API
* Derived classes must implement this method to store the state of
* a reservation allocation.
*/
protected abstract void storeReservationState(
ReservationAllocationStateProto reservationAllocation, String planName,
String reservationIdName) throws Exception;
/**
* Blocking API
* Derived classes must implement this method to remove the state of
* a reservation allocation.
*/
protected abstract void removeReservationState(String planName,
String reservationIdName) throws Exception;
/**
* Blocking API
* Derived classes must implement this method to remove the state of
* DelegationToken Master Key
*/
protected abstract void removeRMDTMasterKeyState(DelegationKey delegationKey)
throws Exception;
/**
* Blocking API Derived classes must implement this method to store or update
* the state of AMRMToken Master Key
*/
protected abstract void storeOrUpdateAMRMTokenSecretManagerState(
AMRMTokenSecretManagerState amrmTokenSecretManagerState, boolean isUpdate)
throws Exception;
/**
* Store or Update state of AMRMToken Master Key
*/
public void storeOrUpdateAMRMTokenSecretManager(
AMRMTokenSecretManagerState amrmTokenSecretManagerState, boolean isUpdate) {
handleStoreEvent(new RMStateStoreAMRMTokenEvent(
amrmTokenSecretManagerState, isUpdate,
RMStateStoreEventType.UPDATE_AMRM_TOKEN));
}
/**
* Non-blocking API
* ResourceManager services call this to remove an application from the state
* store
* This does not block the dispatcher threads
* There is no notification of completion for this operation.
*/
@SuppressWarnings("unchecked")
public void removeApplication(RMApp app) {
ApplicationStateData appState =
ApplicationStateData.newInstance(app.getSubmitTime(),
app.getStartTime(), app.getApplicationSubmissionContext(),
app.getUser(), app.getCallerContext());
for(RMAppAttempt appAttempt : app.getAppAttempts().values()) {
appState.attempts.put(appAttempt.getAppAttemptId(), null);
}
getRMStateStoreEventHandler().handle(
new RMStateStoreRemoveAppEvent(appState));
}
/**
* Blocking API
* Derived classes must implement this method to remove the state of an
* application and its attempts
*/
protected abstract void removeApplicationStateInternal(
ApplicationStateData appState) throws Exception;
/**
* Non-blocking API
* ResourceManager services call this to remove an attempt from the state
* store
* This does not block the dispatcher threads
* There is no notification of completion for this operation.
*/
@SuppressWarnings("unchecked")
public synchronized void removeApplicationAttempt(
ApplicationAttemptId applicationAttemptId) {
getRMStateStoreEventHandler().handle(
new RMStateStoreRemoveAppAttemptEvent(applicationAttemptId));
}
/**
* Blocking API
* Derived classes must implement this method to remove the state of specified
* attempt.
*/
protected abstract void removeApplicationAttemptInternal(
ApplicationAttemptId attemptId) throws Exception;
// TODO: This should eventually become cluster-Id + "AM_RM_TOKEN_SERVICE". See
// YARN-1779
public static final Text AM_RM_TOKEN_SERVICE = new Text(
"AM_RM_TOKEN_SERVICE");
public static final Text AM_CLIENT_TOKEN_MASTER_KEY_NAME =
new Text("YARN_CLIENT_TOKEN_MASTER_KEY");
public Credentials getCredentialsFromAppAttempt(RMAppAttempt appAttempt) {
Credentials credentials = new Credentials();
SecretKey clientTokenMasterKey =
appAttempt.getClientTokenMasterKey();
if(clientTokenMasterKey != null){
credentials.addSecretKey(AM_CLIENT_TOKEN_MASTER_KEY_NAME,
clientTokenMasterKey.getEncoded());
}
return credentials;
}
@VisibleForTesting
protected boolean isFencedState() {
return (RMStateStoreState.FENCED == getRMStateStoreState());
}
// Dispatcher related code
protected void handleStoreEvent(RMStateStoreEvent event) {
this.writeLock.lock();
try {
if (LOG.isDebugEnabled()) {
LOG.debug("Processing event of type " + event.getType());
}
final RMStateStoreState oldState = getRMStateStoreState();
this.stateMachine.doTransition(event.getType(), event);
if (oldState != getRMStateStoreState()) {
LOG.info("RMStateStore state change from " + oldState + " to "
+ getRMStateStoreState());
}
} catch (InvalidStateTransitionException e) {
LOG.error("Can't handle this event at current state", e);
} finally {
this.writeLock.unlock();
}
}
/**
* This method is called to notify the ResourceManager that the store
* operation has failed.
* @param failureCause the exception due to which the operation failed
*/
protected void notifyStoreOperationFailed(Exception failureCause) {
if (isFencedState()) {
return;
}
if (notifyStoreOperationFailedInternal(failureCause)) {
updateFencedState();
}
}
@SuppressWarnings("unchecked")
private boolean notifyStoreOperationFailedInternal(
Exception failureCause) {
boolean isFenced = false;
LOG.error("State store operation failed ", failureCause);
if (HAUtil.isHAEnabled(getConfig())) {
rmDispatcher.getEventHandler().handle(
new RMFatalEvent(RMFatalEventType.STATE_STORE_FENCED,
failureCause));
isFenced = true;
} else {
rmDispatcher.getEventHandler().handle(
new RMFatalEvent(RMFatalEventType.STATE_STORE_OP_FAILED,
failureCause));
}
return isFenced;
}
@SuppressWarnings("unchecked")
/**
* This method is called to notify the application that
* new application is stored or updated in state store
* @param event App event containing the app id and event type
*/
private void notifyApplication(RMAppEvent event) {
rmDispatcher.getEventHandler().handle(event);
}
@SuppressWarnings("unchecked")
/**
* This method is called to notify the application attempt
* that new attempt is stored or updated in state store
* @param event App attempt event containing the app attempt
* id and event type
*/
private void notifyApplicationAttempt(RMAppAttemptEvent event) {
rmDispatcher.getEventHandler().handle(event);
}
/**
* EventHandler implementation which forward events to the FSRMStateStore
* This hides the EventHandle methods of the store from its public interface
*/
private final class ForwardingEventHandler
implements EventHandler<RMStateStoreEvent> {
@Override
public void handle(RMStateStoreEvent event) {
handleStoreEvent(event);
}
}
/**
* Derived classes must implement this method to delete the state store
* @throws Exception
*/
public abstract void deleteStore() throws Exception;
/**
* Derived classes must implement this method to remove application from the
* state store
*
* @throws Exception
*/
public abstract void removeApplication(ApplicationId removeAppId)
throws Exception;
public void setResourceManager(ResourceManager rm) {
this.resourceManager = rm;
}
public RMStateStoreState getRMStateStoreState() {
this.readLock.lock();
try {
return this.stateMachine.getCurrentState();
} finally {
this.readLock.unlock();
}
}
@SuppressWarnings("rawtypes")
protected EventHandler getRMStateStoreEventHandler() {
return dispatcher.getEventHandler();
}
}