blob: 674a779cc2bf2bb37f4b910e1d3b2750a62632ee [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptStateDataPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationStateDataPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStoredEvent;
@Private
@Unstable
/**
* Base class to implement storage of ResourceManager state.
* Takes care of asynchronous notifications and interfacing with YARN objects.
* Real store implementations need to derive from it and implement blocking
* store and load methods to actually store and load the state.
*/
public abstract class RMStateStore {
public static final Log LOG = LogFactory.getLog(RMStateStore.class);
/**
* State of an application attempt
*/
public static class ApplicationAttemptState {
final ApplicationAttemptId attemptId;
final Container masterContainer;
public ApplicationAttemptState(ApplicationAttemptId attemptId,
Container masterContainer) {
this.attemptId = attemptId;
this.masterContainer = masterContainer;
}
public Container getMasterContainer() {
return masterContainer;
}
public ApplicationAttemptId getAttemptId() {
return attemptId;
}
}
/**
* State of an application application
*/
public static class ApplicationState {
final ApplicationSubmissionContext context;
final long submitTime;
Map<ApplicationAttemptId, ApplicationAttemptState> attempts =
new HashMap<ApplicationAttemptId, ApplicationAttemptState>();
ApplicationState(long submitTime, ApplicationSubmissionContext context) {
this.submitTime = submitTime;
this.context = context;
}
public ApplicationId getAppId() {
return context.getApplicationId();
}
public long getSubmitTime() {
return submitTime;
}
public int getAttemptCount() {
return attempts.size();
}
public ApplicationSubmissionContext getApplicationSubmissionContext() {
return context;
}
public ApplicationAttemptState getAttempt(ApplicationAttemptId attemptId) {
return attempts.get(attemptId);
}
}
/**
* State of the ResourceManager
*/
public static class RMState {
Map<ApplicationId, ApplicationState> appState =
new HashMap<ApplicationId, ApplicationState>();
public Map<ApplicationId, ApplicationState> getApplicationState() {
return appState;
}
}
private Dispatcher rmDispatcher;
/**
* Dispatcher used to send state operation completion events to
* ResourceManager services
*/
public void setDispatcher(Dispatcher dispatcher) {
this.rmDispatcher = dispatcher;
}
AsyncDispatcher dispatcher;
public synchronized void init(Configuration conf) throws Exception{
// create async handler
dispatcher = new AsyncDispatcher();
dispatcher.init(conf);
dispatcher.register(RMStateStoreEventType.class,
new ForwardingEventHandler());
dispatcher.start();
initInternal(conf);
}
/**
* Derived classes initialize themselves using this method.
* The base class is initialized and the event dispatcher is ready to use at
* this point
*/
protected abstract void initInternal(Configuration conf) throws Exception;
public synchronized void close() throws Exception {
closeInternal();
dispatcher.stop();
}
/**
* Derived classes close themselves using this method.
* The base class will be closed and the event dispatcher will be shutdown
* after this
*/
protected abstract void closeInternal() throws Exception;
/**
* Blocking API
* The derived class must recover state from the store and return a new
* RMState object populated with that state
* This must not be called on the dispatcher thread
*/
public abstract RMState loadState() throws Exception;
/**
* Blocking API
* ResourceManager services use this to store the application's state
* This must not be called on the dispatcher thread
*/
public synchronized void storeApplication(RMApp app) throws Exception {
ApplicationSubmissionContext context = app
.getApplicationSubmissionContext();
assert context instanceof ApplicationSubmissionContextPBImpl;
ApplicationStateDataPBImpl appStateData = new ApplicationStateDataPBImpl();
appStateData.setSubmitTime(app.getSubmitTime());
appStateData.setApplicationSubmissionContext(context);
LOG.info("Storing info for app: " + context.getApplicationId());
storeApplicationState(app.getApplicationId().toString(), appStateData);
}
/**
* Blocking API
* Derived classes must implement this method to store the state of an
* application.
*/
protected abstract void storeApplicationState(String appId,
ApplicationStateDataPBImpl appStateData)
throws Exception;
@SuppressWarnings("unchecked")
/**
* Non-blocking API
* ResourceManager services call this to store state on an application attempt
* This does not block the dispatcher threads
* RMAppAttemptStoredEvent will be sent on completion to notify the RMAppAttempt
*/
public synchronized void storeApplicationAttempt(RMAppAttempt appAttempt) {
ApplicationAttemptState attemptState = new ApplicationAttemptState(
appAttempt.getAppAttemptId(), appAttempt.getMasterContainer());
dispatcher.getEventHandler().handle(
new RMStateStoreAppAttemptEvent(attemptState));
}
/**
* Blocking API
* Derived classes must implement this method to store the state of an
* application attempt
*/
protected abstract void storeApplicationAttemptState(String attemptId,
ApplicationAttemptStateDataPBImpl attemptStateData)
throws Exception;
/**
* Non-blocking API
* ResourceManager services call this to remove an application from the state
* store
* This does not block the dispatcher threads
* There is no notification of completion for this operation.
*/
public synchronized void removeApplication(RMApp app) {
ApplicationState appState = new ApplicationState(
app.getSubmitTime(), app.getApplicationSubmissionContext());
for(RMAppAttempt appAttempt : app.getAppAttempts().values()) {
ApplicationAttemptState attemptState = new ApplicationAttemptState(
appAttempt.getAppAttemptId(), appAttempt.getMasterContainer());
appState.attempts.put(attemptState.getAttemptId(), attemptState);
}
removeApplication(appState);
}
@SuppressWarnings("unchecked")
/**
* Non-Blocking API
*/
public synchronized void removeApplication(ApplicationState appState) {
dispatcher.getEventHandler().handle(new RMStateStoreRemoveAppEvent(appState));
}
/**
* Blocking API
* Derived classes must implement this method to remove the state of an
* application and its attempts
*/
protected abstract void removeApplicationState(ApplicationState appState)
throws Exception;
// Dispatcher related code
private synchronized void handleStoreEvent(RMStateStoreEvent event) {
switch(event.getType()) {
case STORE_APP_ATTEMPT:
{
ApplicationAttemptState attemptState =
((RMStateStoreAppAttemptEvent) event).getAppAttemptState();
Exception storedException = null;
ApplicationAttemptStateDataPBImpl attemptStateData =
new ApplicationAttemptStateDataPBImpl();
attemptStateData.setAttemptId(attemptState.getAttemptId());
attemptStateData.setMasterContainer(attemptState.getMasterContainer());
LOG.info("Storing info for attempt: " + attemptState.getAttemptId());
try {
storeApplicationAttemptState(attemptState.getAttemptId().toString(),
attemptStateData);
} catch (Exception e) {
LOG.error("Error storing appAttempt: "
+ attemptState.getAttemptId(), e);
storedException = e;
} finally {
notifyDoneStoringApplicationAttempt(attemptState.getAttemptId(),
storedException);
}
}
break;
case REMOVE_APP:
{
ApplicationState appState =
((RMStateStoreRemoveAppEvent) event).getAppState();
ApplicationId appId = appState.getAppId();
LOG.info("Removing info for app: " + appId);
try {
removeApplicationState(appState);
} catch (Exception e) {
LOG.error("Error removing app: " + appId, e);
}
}
break;
default:
LOG.error("Unknown RMStateStoreEvent type: " + event.getType());
}
}
@SuppressWarnings("unchecked")
/**
* In (@link storeApplicationAttempt}, derived class can call this method to
* notify the application attempt about operation completion
* @param appAttempt attempt that has been saved
*/
private void notifyDoneStoringApplicationAttempt(ApplicationAttemptId attemptId,
Exception storedException) {
rmDispatcher.getEventHandler().handle(
new RMAppAttemptStoredEvent(attemptId, storedException));
}
/**
* EventHandler implementation which forward events to the FSRMStateStore
* This hides the EventHandle methods of the store from its public interface
*/
private final class ForwardingEventHandler
implements EventHandler<RMStateStoreEvent> {
@Override
public void handle(RMStateStoreEvent event) {
handleStoreEvent(event);
}
}
}