blob: f952806f7d72192f67de194269aba8a7d99929dd [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.client.api.async.impl;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.client.api.NMClient;
import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
import org.apache.hadoop.yarn.client.api.impl.NMClientImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AbstractEvent;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.state.InvalidStateTransitionException;
import org.apache.hadoop.yarn.state.MultipleArcTransition;
import org.apache.hadoop.yarn.state.SingleArcTransition;
import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
@Private
@Unstable
public class NMClientAsyncImpl extends NMClientAsync {
private static final Log LOG = LogFactory.getLog(NMClientAsyncImpl.class);
protected static final int INITIAL_THREAD_POOL_SIZE = 10;
protected ThreadPoolExecutor threadPool;
protected int maxThreadPoolSize;
protected Thread eventDispatcherThread;
protected AtomicBoolean stopped = new AtomicBoolean(false);
protected BlockingQueue<ContainerEvent> events =
new LinkedBlockingQueue<ContainerEvent>();
protected ConcurrentMap<ContainerId, StatefulContainer> containers =
new ConcurrentHashMap<ContainerId, StatefulContainer>();
public NMClientAsyncImpl(AbstractCallbackHandler callbackHandler) {
this(NMClientAsync.class.getName(), callbackHandler);
}
public NMClientAsyncImpl(
String name, AbstractCallbackHandler callbackHandler) {
this(name, new NMClientImpl(), callbackHandler);
}
@Private
@VisibleForTesting
protected NMClientAsyncImpl(String name, NMClient client,
AbstractCallbackHandler callbackHandler) {
super(name, client, callbackHandler);
this.client = client;
this.callbackHandler = callbackHandler;
}
/**
* @deprecated Use {@link
* #NMClientAsyncImpl(NMClientAsync.AbstractCallbackHandler)}
* instead.
*/
@Deprecated
public NMClientAsyncImpl(CallbackHandler callbackHandler) {
this(NMClientAsync.class.getName(), callbackHandler);
}
/**
* @deprecated Use {@link #NMClientAsyncImpl(String,
* NMClientAsync.AbstractCallbackHandler)} instead.
*/
@Deprecated
public NMClientAsyncImpl(String name, CallbackHandler callbackHandler) {
this(name, new NMClientImpl(), callbackHandler);
}
@Private
@VisibleForTesting
@Deprecated
protected NMClientAsyncImpl(String name, NMClient client,
CallbackHandler callbackHandler) {
super(name, client, callbackHandler);
this.client = client;
this.callbackHandler = callbackHandler;
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
this.maxThreadPoolSize = conf.getInt(
YarnConfiguration.NM_CLIENT_ASYNC_THREAD_POOL_MAX_SIZE,
YarnConfiguration.DEFAULT_NM_CLIENT_ASYNC_THREAD_POOL_MAX_SIZE);
LOG.info("Upper bound of the thread pool size is " + maxThreadPoolSize);
client.init(conf);
super.serviceInit(conf);
}
@Override
protected void serviceStart() throws Exception {
client.start();
ThreadFactory tf = new ThreadFactoryBuilder().setNameFormat(
this.getClass().getName() + " #%d").setDaemon(true).build();
// Start with a default core-pool size and change it dynamically.
int initSize = Math.min(INITIAL_THREAD_POOL_SIZE, maxThreadPoolSize);
threadPool = new ThreadPoolExecutor(initSize, Integer.MAX_VALUE, 1,
TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>(), tf);
eventDispatcherThread = new Thread() {
@Override
public void run() {
ContainerEvent event = null;
Set<String> allNodes = new HashSet<String>();
while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
try {
event = events.take();
} catch (InterruptedException e) {
if (!stopped.get()) {
LOG.error("Returning, thread interrupted", e);
}
return;
}
allNodes.add(event.getNodeId().toString());
int threadPoolSize = threadPool.getCorePoolSize();
// We can increase the pool size only if haven't reached the maximum
// limit yet.
if (threadPoolSize != maxThreadPoolSize) {
// nodes where containers will run at *this* point of time. This is
// *not* the cluster size and doesn't need to be.
int nodeNum = allNodes.size();
int idealThreadPoolSize = Math.min(maxThreadPoolSize, nodeNum);
if (threadPoolSize < idealThreadPoolSize) {
// Bump up the pool size to idealThreadPoolSize +
// INITIAL_POOL_SIZE, the later is just a buffer so we are not
// always increasing the pool-size
int newThreadPoolSize = Math.min(maxThreadPoolSize,
idealThreadPoolSize + INITIAL_THREAD_POOL_SIZE);
LOG.info("Set NMClientAsync thread pool size to " +
newThreadPoolSize + " as the number of nodes to talk to is "
+ nodeNum);
threadPool.setCorePoolSize(newThreadPoolSize);
}
}
// the events from the queue are handled in parallel with a thread
// pool
threadPool.execute(getContainerEventProcessor(event));
// TODO: Group launching of multiple containers to a single
// NodeManager into a single connection
}
}
};
eventDispatcherThread.setName("Container Event Dispatcher");
eventDispatcherThread.setDaemon(false);
eventDispatcherThread.start();
super.serviceStart();
}
@Override
protected void serviceStop() throws Exception {
if (stopped.getAndSet(true)) {
// return if already stopped
return;
}
if (eventDispatcherThread != null) {
eventDispatcherThread.interrupt();
try {
eventDispatcherThread.join();
} catch (InterruptedException e) {
LOG.error("The thread of " + eventDispatcherThread.getName() +
" didn't finish normally.", e);
}
}
if (threadPool != null) {
threadPool.shutdownNow();
}
if (client != null) {
// If NMClientImpl doesn't stop running containers, the states doesn't
// need to be cleared.
if (!(client instanceof NMClientImpl) ||
((NMClientImpl) client).getCleanupRunningContainers().get()) {
if (containers != null) {
containers.clear();
}
}
client.stop();
}
super.serviceStop();
}
public void startContainerAsync(
Container container, ContainerLaunchContext containerLaunchContext) {
if (containers.putIfAbsent(container.getId(),
new StatefulContainer(this, container.getId())) != null) {
callbackHandler.onStartContainerError(container.getId(),
RPCUtil.getRemoteException("Container " + container.getId() +
" is already started or scheduled to start"));
}
try {
events.put(new StartContainerEvent(container, containerLaunchContext));
} catch (InterruptedException e) {
LOG.warn("Exception when scheduling the event of starting Container " +
container.getId());
callbackHandler.onStartContainerError(container.getId(), e);
}
}
@Deprecated
public void increaseContainerResourceAsync(Container container) {
if (!(callbackHandler instanceof AbstractCallbackHandler)) {
LOG.error("Callback handler does not implement container resource "
+ "increase callback methods");
return;
}
AbstractCallbackHandler handler = (AbstractCallbackHandler) callbackHandler;
if (containers.get(container.getId()) == null) {
handler.onIncreaseContainerResourceError(
container.getId(),
RPCUtil.getRemoteException(
"Container " + container.getId() +
" is neither started nor scheduled to start"));
}
try {
events.put(new UpdateContainerResourceEvent(container, true));
} catch (InterruptedException e) {
LOG.warn("Exception when scheduling the event of increasing resource of "
+ "Container " + container.getId());
handler.onIncreaseContainerResourceError(container.getId(), e);
}
}
@Override
public void updateContainerResourceAsync(Container container) {
if (!(callbackHandler instanceof AbstractCallbackHandler)) {
LOG.error("Callback handler does not implement container resource "
+ "increase callback methods");
return;
}
AbstractCallbackHandler handler = (AbstractCallbackHandler) callbackHandler;
if (containers.get(container.getId()) == null) {
handler.onUpdateContainerResourceError(
container.getId(),
RPCUtil.getRemoteException(
"Container " + container.getId() +
" is neither started nor scheduled to start"));
}
try {
events.put(new UpdateContainerResourceEvent(container, false));
} catch (InterruptedException e) {
LOG.warn("Exception when scheduling the event of increasing resource of "
+ "Container " + container.getId());
handler.onUpdateContainerResourceError(container.getId(), e);
}
}
@Override
public void reInitializeContainerAsync(ContainerId containerId,
ContainerLaunchContext containerLaunchContex, boolean autoCommit){
if (!(callbackHandler instanceof AbstractCallbackHandler)) {
LOG.error("Callback handler does not implement container re-initialize "
+ "callback methods");
return;
}
AbstractCallbackHandler handler = (AbstractCallbackHandler) callbackHandler;
if (containers.get(containerId) == null) {
handler.onContainerReInitializeError(
containerId, RPCUtil.getRemoteException(
"Container " + containerId + " is not started"));
}
try {
events.put(new ReInitializeContainerEvevnt(containerId,
client.getNodeIdOfStartedContainer(containerId),
containerLaunchContex, autoCommit));
} catch (InterruptedException e) {
LOG.warn("Exception when scheduling the event of re-initializing of "
+ "Container " + containerId);
handler.onContainerReInitializeError(containerId, e);
}
}
@Override
public void restartContainerAsync(ContainerId containerId){
if (!(callbackHandler instanceof AbstractCallbackHandler)) {
LOG.error("Callback handler does not implement container restart "
+ "callback methods");
return;
}
AbstractCallbackHandler handler = (AbstractCallbackHandler) callbackHandler;
if (containers.get(containerId) == null) {
handler.onContainerRestartError(
containerId, RPCUtil.getRemoteException(
"Container " + containerId + " is not started"));
}
try {
events.put(new ContainerEvent(containerId,
client.getNodeIdOfStartedContainer(containerId),
null, ContainerEventType.RESTART_CONTAINER));
} catch (InterruptedException e) {
LOG.warn("Exception when scheduling the event of restart of "
+ "Container " + containerId);
handler.onContainerRestartError(containerId, e);
}
}
@Override
public void rollbackLastReInitializationAsync(ContainerId containerId){
if (!(callbackHandler instanceof AbstractCallbackHandler)) {
LOG.error("Callback handler does not implement container rollback "
+ "callback methods");
return;
}
AbstractCallbackHandler handler = (AbstractCallbackHandler) callbackHandler;
if (containers.get(containerId) == null) {
handler.onRollbackLastReInitializationError(
containerId, RPCUtil.getRemoteException(
"Container " + containerId + " is not started"));
}
try {
events.put(new ContainerEvent(containerId,
client.getNodeIdOfStartedContainer(containerId),
null, ContainerEventType.ROLLBACK_LAST_REINIT));
} catch (InterruptedException e) {
LOG.warn("Exception when scheduling the event Rollback re-initialization"
+ " of Container " + containerId);
handler.onRollbackLastReInitializationError(containerId, e);
}
}
@Override
public void commitLastReInitializationAsync(ContainerId containerId){
if (!(callbackHandler instanceof AbstractCallbackHandler)) {
LOG.error("Callback handler does not implement container commit last " +
"re-initialization callback methods");
return;
}
AbstractCallbackHandler handler = (AbstractCallbackHandler) callbackHandler;
if (containers.get(containerId) == null) {
handler.onCommitLastReInitializationError(
containerId, RPCUtil.getRemoteException(
"Container " + containerId + " is not started"));
}
try {
events.put(new ContainerEvent(containerId,
client.getNodeIdOfStartedContainer(containerId),
null, ContainerEventType.COMMIT_LAST_REINT));
} catch (InterruptedException e) {
LOG.warn("Exception when scheduling the event Commit re-initialization"
+ " of Container " + containerId);
handler.onCommitLastReInitializationError(containerId, e);
}
}
public void stopContainerAsync(ContainerId containerId, NodeId nodeId) {
if (containers.get(containerId) == null) {
callbackHandler.onStopContainerError(containerId,
RPCUtil.getRemoteException("Container " + containerId +
" is neither started nor scheduled to start"));
}
try {
events.put(new ContainerEvent(containerId, nodeId, null,
ContainerEventType.STOP_CONTAINER));
} catch (InterruptedException e) {
LOG.warn("Exception when scheduling the event of stopping Container " +
containerId);
callbackHandler.onStopContainerError(containerId, e);
}
}
public void getContainerStatusAsync(ContainerId containerId, NodeId nodeId) {
try {
events.put(new ContainerEvent(containerId, nodeId, null,
ContainerEventType.QUERY_CONTAINER));
} catch (InterruptedException e) {
LOG.warn("Exception when scheduling the event of querying the status" +
" of Container " + containerId);
callbackHandler.onGetContainerStatusError(containerId, e);
}
}
protected static enum ContainerState {
PREP, FAILED, RUNNING, DONE,
}
protected boolean isCompletelyDone(StatefulContainer container) {
return container.getState() == ContainerState.DONE ||
container.getState() == ContainerState.FAILED;
}
protected ContainerEventProcessor getContainerEventProcessor(
ContainerEvent event) {
return new ContainerEventProcessor(event);
}
/**
* The type of the event of interacting with a container
*/
protected static enum ContainerEventType {
START_CONTAINER,
STOP_CONTAINER,
QUERY_CONTAINER,
UPDATE_CONTAINER_RESOURCE,
REINITIALIZE_CONTAINER,
RESTART_CONTAINER,
ROLLBACK_LAST_REINIT,
COMMIT_LAST_REINT
}
protected static class ContainerEvent
extends AbstractEvent<ContainerEventType>{
private ContainerId containerId;
private NodeId nodeId;
private Token containerToken;
public ContainerEvent(ContainerId containerId, NodeId nodeId,
Token containerToken, ContainerEventType type) {
super(type);
this.containerId = containerId;
this.nodeId = nodeId;
this.containerToken = containerToken;
}
public ContainerId getContainerId() {
return containerId;
}
public NodeId getNodeId() {
return nodeId;
}
public Token getContainerToken() {
return containerToken;
}
}
protected static class StartContainerEvent extends ContainerEvent {
private Container container;
private ContainerLaunchContext containerLaunchContext;
public StartContainerEvent(Container container,
ContainerLaunchContext containerLaunchContext) {
super(container.getId(), container.getNodeId(),
container.getContainerToken(), ContainerEventType.START_CONTAINER);
this.container = container;
this.containerLaunchContext = containerLaunchContext;
}
public Container getContainer() {
return container;
}
public ContainerLaunchContext getContainerLaunchContext() {
return containerLaunchContext;
}
}
protected static class ReInitializeContainerEvevnt extends ContainerEvent {
private ContainerLaunchContext containerLaunchContext;
private boolean autoCommit;
public ReInitializeContainerEvevnt(ContainerId containerId, NodeId nodeId,
ContainerLaunchContext containerLaunchContext, boolean autoCommit) {
super(containerId, nodeId, null,
ContainerEventType.REINITIALIZE_CONTAINER);
this.containerLaunchContext = containerLaunchContext;
this.autoCommit = autoCommit;
}
public ContainerLaunchContext getContainerLaunchContext() {
return containerLaunchContext;
}
public boolean isAutoCommit() {
return autoCommit;
}
}
protected static class UpdateContainerResourceEvent extends ContainerEvent {
private Container container;
private boolean isIncreaseEvent;
// UpdateContainerResourceEvent constructor takes in a
// flag to support callback API's calling through the deprecated
// increaseContainerResource
public UpdateContainerResourceEvent(Container container,
boolean isIncreaseEvent) {
super(container.getId(), container.getNodeId(),
container.getContainerToken(),
ContainerEventType.UPDATE_CONTAINER_RESOURCE);
this.container = container;
this.isIncreaseEvent = isIncreaseEvent;
}
public Container getContainer() {
return container;
}
}
protected static class StatefulContainer implements
EventHandler<ContainerEvent> {
protected final static StateMachineFactory<StatefulContainer,
ContainerState, ContainerEventType, ContainerEvent> stateMachineFactory
= new StateMachineFactory<StatefulContainer, ContainerState,
ContainerEventType, ContainerEvent>(ContainerState.PREP)
// Transitions from PREP state
.addTransition(ContainerState.PREP,
EnumSet.of(ContainerState.RUNNING, ContainerState.FAILED),
ContainerEventType.START_CONTAINER,
new StartContainerTransition())
.addTransition(ContainerState.PREP, ContainerState.DONE,
ContainerEventType.STOP_CONTAINER, new OutOfOrderTransition())
// Transitions from RUNNING state
.addTransition(ContainerState.RUNNING, ContainerState.RUNNING,
ContainerEventType.UPDATE_CONTAINER_RESOURCE,
new UpdateContainerResourceTransition())
// Transitions for Container Upgrade
.addTransition(ContainerState.RUNNING,
EnumSet.of(ContainerState.RUNNING, ContainerState.FAILED),
ContainerEventType.REINITIALIZE_CONTAINER,
new ReInitializeContainerTransition())
.addTransition(ContainerState.RUNNING,
EnumSet.of(ContainerState.RUNNING, ContainerState.FAILED),
ContainerEventType.RESTART_CONTAINER,
new ReInitializeContainerTransition())
.addTransition(ContainerState.RUNNING,
EnumSet.of(ContainerState.RUNNING, ContainerState.FAILED),
ContainerEventType.ROLLBACK_LAST_REINIT,
new ReInitializeContainerTransition())
.addTransition(ContainerState.RUNNING,
EnumSet.of(ContainerState.RUNNING, ContainerState.FAILED),
ContainerEventType.COMMIT_LAST_REINT,
new ReInitializeContainerTransition())
.addTransition(ContainerState.RUNNING,
EnumSet.of(ContainerState.DONE, ContainerState.FAILED),
ContainerEventType.STOP_CONTAINER,
new StopContainerTransition())
// Transition from DONE state
.addTransition(ContainerState.DONE, ContainerState.DONE,
EnumSet.of(ContainerEventType.START_CONTAINER,
ContainerEventType.STOP_CONTAINER,
ContainerEventType.UPDATE_CONTAINER_RESOURCE))
// Transition from FAILED state
.addTransition(ContainerState.FAILED, ContainerState.FAILED,
EnumSet.of(ContainerEventType.START_CONTAINER,
ContainerEventType.STOP_CONTAINER,
ContainerEventType.REINITIALIZE_CONTAINER,
ContainerEventType.RESTART_CONTAINER,
ContainerEventType.COMMIT_LAST_REINT,
ContainerEventType.ROLLBACK_LAST_REINIT,
ContainerEventType.UPDATE_CONTAINER_RESOURCE));
protected static class StartContainerTransition implements
MultipleArcTransition<StatefulContainer, ContainerEvent,
ContainerState> {
@Override
public ContainerState transition(
StatefulContainer container, ContainerEvent event) {
ContainerId containerId = event.getContainerId();
try {
StartContainerEvent scEvent = null;
if (event instanceof StartContainerEvent) {
scEvent = (StartContainerEvent) event;
}
assert scEvent != null;
Map<String, ByteBuffer> allServiceResponse =
container.nmClientAsync.getClient().startContainer(
scEvent.getContainer(), scEvent.getContainerLaunchContext());
try {
container.nmClientAsync.getCallbackHandler().onContainerStarted(
containerId, allServiceResponse);
} catch (Throwable thr) {
// Don't process user created unchecked exception
LOG.info("Unchecked exception is thrown from onContainerStarted for "
+ "Container " + containerId, thr);
}
return ContainerState.RUNNING;
} catch (YarnException e) {
return onExceptionRaised(container, event, e);
} catch (IOException e) {
return onExceptionRaised(container, event, e);
} catch (Throwable t) {
return onExceptionRaised(container, event, t);
}
}
private ContainerState onExceptionRaised(StatefulContainer container,
ContainerEvent event, Throwable t) {
try {
container.nmClientAsync.getCallbackHandler().onStartContainerError(
event.getContainerId(), t);
} catch (Throwable thr) {
// Don't process user created unchecked exception
LOG.info(
"Unchecked exception is thrown from onStartContainerError for " +
"Container " + event.getContainerId(), thr);
}
return ContainerState.FAILED;
}
}
protected static class UpdateContainerResourceTransition implements
SingleArcTransition<StatefulContainer, ContainerEvent> {
@SuppressWarnings("deprecation")
@Override
public void transition(
StatefulContainer container, ContainerEvent event) {
boolean isIncreaseEvent = false;
if (!(container.nmClientAsync.getCallbackHandler()
instanceof AbstractCallbackHandler)) {
LOG.error("Callback handler does not implement container resource "
+ "update callback methods");
return;
}
AbstractCallbackHandler handler =
(AbstractCallbackHandler) container.nmClientAsync
.getCallbackHandler();
try {
if (!(event instanceof UpdateContainerResourceEvent)) {
throw new AssertionError("Unexpected event type. Expecting:"
+ "UpdateContainerResourceEvent. Got:" + event);
}
UpdateContainerResourceEvent updateEvent =
(UpdateContainerResourceEvent) event;
container.nmClientAsync.getClient().updateContainerResource(
updateEvent.getContainer());
isIncreaseEvent = updateEvent.isIncreaseEvent;
try {
//If isIncreaseEvent is set, set the appropriate callbacks
//for backward compatibility
if (isIncreaseEvent) {
handler.onContainerResourceIncreased(updateEvent.getContainerId(),
updateEvent.getContainer().getResource());
} else {
handler.onContainerResourceUpdated(updateEvent.getContainerId(),
updateEvent.getContainer().getResource());
}
} catch (Throwable thr) {
// Don't process user created unchecked exception
LOG.info("Unchecked exception is thrown from "
+ "onContainerResourceUpdated for Container "
+ event.getContainerId(), thr);
}
} catch (Exception e) {
try {
if (isIncreaseEvent) {
handler
.onIncreaseContainerResourceError(event.getContainerId(), e);
} else {
handler.onUpdateContainerResourceError(event.getContainerId(), e);
}
} catch (Throwable thr) {
// Don't process user created unchecked exception
LOG.info("Unchecked exception is thrown from "
+ "onUpdateContainerResourceError for Container "
+ event.getContainerId(), thr);
}
}
}
}
protected static class ReInitializeContainerTransition implements
MultipleArcTransition<StatefulContainer, ContainerEvent,
ContainerState> {
@Override
public ContainerState transition(StatefulContainer container,
ContainerEvent containerEvent) {
ContainerId containerId = containerEvent.getContainerId();
AbstractCallbackHandler handler = (AbstractCallbackHandler) container
.nmClientAsync.getCallbackHandler();
Throwable handlerError = null;
try {
switch(containerEvent.getType()) {
case REINITIALIZE_CONTAINER:
if (!(containerEvent instanceof ReInitializeContainerEvevnt)) {
LOG.error("Unexpected Event.. [" +containerEvent.getType() + "]");
return ContainerState.FAILED;
}
ReInitializeContainerEvevnt rEvent =
(ReInitializeContainerEvevnt)containerEvent;
container.nmClientAsync.getClient().reInitializeContainer(
containerId, rEvent.getContainerLaunchContext(),
rEvent.isAutoCommit());
try {
handler.onContainerReInitialize(containerId);
} catch (Throwable tr) {
handlerError = tr;
}
break;
case RESTART_CONTAINER:
container.nmClientAsync.getClient().restartContainer(containerId);
try {
handler.onContainerRestart(containerId);
} catch (Throwable tr) {
handlerError = tr;
}
break;
case ROLLBACK_LAST_REINIT:
container.nmClientAsync.getClient()
.rollbackLastReInitialization(containerId);
try {
handler.onRollbackLastReInitialization(containerId);
} catch (Throwable tr) {
handlerError = tr;
}
break;
case COMMIT_LAST_REINT:
container.nmClientAsync.getClient()
.commitLastReInitialization(containerId);
try {
handler.onCommitLastReInitialization(containerId);
} catch (Throwable tr) {
handlerError = tr;
}
break;
default:
LOG.warn("Event of type [" + containerEvent.getType() + "] not" +
" expected here..");
break;
}
if (handlerError != null) {
LOG.info("Unchecked exception is thrown in handler for event ["
+ containerEvent.getType() + "] for Container "
+ containerId, handlerError);
}
return ContainerState.RUNNING;
} catch (Throwable t) {
switch(containerEvent.getType()) {
case REINITIALIZE_CONTAINER:
try {
handler.onContainerReInitializeError(containerId, t);
} catch (Throwable tr) {
handlerError = tr;
}
break;
case RESTART_CONTAINER:
try {
handler.onContainerRestartError(containerId, t);
} catch (Throwable tr) {
handlerError = tr;
}
break;
case ROLLBACK_LAST_REINIT:
try {
handler.onRollbackLastReInitializationError(containerId, t);
} catch (Throwable tr) {
handlerError = tr;
}
break;
case COMMIT_LAST_REINT:
try {
handler.onCommitLastReInitializationError(containerId, t);
} catch (Throwable tr) {
handlerError = tr;
}
break;
default:
LOG.warn("Event of type [" + containerEvent.getType() + "] not" +
" expected here..");
break;
}
if (handlerError != null) {
LOG.info("Unchecked exception is thrown in handler for event ["
+ containerEvent.getType() + "] for Container "
+ containerId, handlerError);
}
}
return ContainerState.FAILED;
}
}
protected static class StopContainerTransition implements
MultipleArcTransition<StatefulContainer, ContainerEvent,
ContainerState> {
@Override
public ContainerState transition(
StatefulContainer container, ContainerEvent event) {
ContainerId containerId = event.getContainerId();
try {
container.nmClientAsync.getClient().stopContainer(
containerId, event.getNodeId());
try {
container.nmClientAsync.getCallbackHandler().onContainerStopped(
event.getContainerId());
} catch (Throwable thr) {
// Don't process user created unchecked exception
LOG.info("Unchecked exception is thrown from onContainerStopped for "
+ "Container " + event.getContainerId(), thr);
}
return ContainerState.DONE;
} catch (YarnException e) {
return onExceptionRaised(container, event, e);
} catch (IOException e) {
return onExceptionRaised(container, event, e);
} catch (Throwable t) {
return onExceptionRaised(container, event, t);
}
}
private ContainerState onExceptionRaised(StatefulContainer container,
ContainerEvent event, Throwable t) {
try {
container.nmClientAsync.getCallbackHandler().onStopContainerError(
event.getContainerId(), t);
} catch (Throwable thr) {
// Don't process user created unchecked exception
LOG.info("Unchecked exception is thrown from onStopContainerError for "
+ "Container " + event.getContainerId(), thr);
}
return ContainerState.FAILED;
}
}
protected static class OutOfOrderTransition implements
SingleArcTransition<StatefulContainer, ContainerEvent> {
protected static final String STOP_BEFORE_START_ERROR_MSG =
"Container was killed before it was launched";
@Override
public void transition(StatefulContainer container, ContainerEvent event) {
try {
container.nmClientAsync.getCallbackHandler().onStartContainerError(
event.getContainerId(),
RPCUtil.getRemoteException(STOP_BEFORE_START_ERROR_MSG));
} catch (Throwable thr) {
// Don't process user created unchecked exception
LOG.info(
"Unchecked exception is thrown from onStartContainerError for " +
"Container " + event.getContainerId(), thr);
}
}
}
private final NMClientAsync nmClientAsync;
private final ContainerId containerId;
private final StateMachine<ContainerState, ContainerEventType,
ContainerEvent> stateMachine;
private final ReadLock readLock;
private final WriteLock writeLock;
public StatefulContainer(NMClientAsync client, ContainerId containerId) {
this.nmClientAsync = client;
this.containerId = containerId;
stateMachine = stateMachineFactory.make(this);
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
readLock = lock.readLock();
writeLock = lock.writeLock();
}
@Override
public void handle(ContainerEvent event) {
writeLock.lock();
try {
try {
this.stateMachine.doTransition(event.getType(), event);
} catch (InvalidStateTransitionException e) {
LOG.error("Can't handle this event at current state", e);
}
} finally {
writeLock.unlock();
}
}
public ContainerId getContainerId() {
return containerId;
}
public ContainerState getState() {
readLock.lock();
try {
return stateMachine.getCurrentState();
} finally {
readLock.unlock();
}
}
}
protected class ContainerEventProcessor implements Runnable {
protected ContainerEvent event;
public ContainerEventProcessor(ContainerEvent event) {
this.event = event;
}
@Override
public void run() {
ContainerId containerId = event.getContainerId();
LOG.info("Processing Event " + event + " for Container " + containerId);
if (event.getType() == ContainerEventType.QUERY_CONTAINER) {
try {
ContainerStatus containerStatus = client.getContainerStatus(
containerId, event.getNodeId());
try {
callbackHandler.onContainerStatusReceived(
containerId, containerStatus);
} catch (Throwable thr) {
// Don't process user created unchecked exception
LOG.info(
"Unchecked exception is thrown from onContainerStatusReceived" +
" for Container " + event.getContainerId(), thr);
}
} catch (YarnException e) {
onExceptionRaised(containerId, e);
} catch (IOException e) {
onExceptionRaised(containerId, e);
} catch (Throwable t) {
onExceptionRaised(containerId, t);
}
} else {
StatefulContainer container = containers.get(containerId);
if (container == null) {
LOG.info("Container " + containerId + " is already stopped or failed");
} else {
container.handle(event);
if (isCompletelyDone(container)) {
containers.remove(containerId);
}
}
}
}
private void onExceptionRaised(ContainerId containerId, Throwable t) {
try {
callbackHandler.onGetContainerStatusError(containerId, t);
} catch (Throwable thr) {
// Don't process user created unchecked exception
LOG.info("Unchecked exception is thrown from onGetContainerStatusError" +
" for Container " + containerId, thr);
}
}
}
}