blob: 28d20c8d433ac4e543ca78f9418fe31d34e83dde [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.client.api.async;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl;
import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.exceptions.YarnException;
import com.google.common.annotations.VisibleForTesting;
/**
* <code>AMRMClientAsync</code> handles communication with the ResourceManager
* and provides asynchronous updates on events such as container allocations and
* completions. It contains a thread that sends periodic heartbeats to the
* ResourceManager.
*
* It should be used by implementing a CallbackHandler:
* <pre>
* {@code
* class MyCallbackHandler extends AMRMClientAsync.AbstractCallbackHandler {
* public void onContainersAllocated(List<Container> containers) {
* [run tasks on the containers]
* }
*
* public void onContainersResourceChanged(List<Container> containers) {
* [determine if resource allocation of containers have been increased in
* the ResourceManager, and if so, inform the NodeManagers to increase the
* resource monitor/enforcement on the containers]
* }
*
* public void onContainersCompleted(List<ContainerStatus> statuses) {
* [update progress, check whether app is done]
* }
*
* public void onNodesUpdated(List<NodeReport> updated) {}
*
* public void onReboot() {}
* }
* }
* </pre>
*
* The client's lifecycle should be managed similarly to the following:
*
* <pre>
* {@code
* AMRMClientAsync asyncClient =
* createAMRMClientAsync(appAttId, 1000, new MyCallbackhandler());
* asyncClient.init(conf);
* asyncClient.start();
* RegisterApplicationMasterResponse response = asyncClient
* .registerApplicationMaster(appMasterHostname, appMasterRpcPort,
* appMasterTrackingUrl);
* asyncClient.addContainerRequest(containerRequest);
* [... wait for application to complete]
* asyncClient.unregisterApplicationMaster(status, appMsg, trackingUrl);
* asyncClient.stop();
* }
* </pre>
*/
@Public
@Stable
public abstract class AMRMClientAsync<T extends ContainerRequest>
extends AbstractService {
private static final Log LOG = LogFactory.getLog(AMRMClientAsync.class);
protected final AMRMClient<T> client;
protected final CallbackHandler handler;
protected final AtomicInteger heartbeatIntervalMs = new AtomicInteger();
/**
* <p>Create a new instance of AMRMClientAsync.</p>
*
* @param intervalMs heartbeat interval in milliseconds between AM and RM
* @param callbackHandler callback handler that processes responses from
* the <code>ResourceManager</code>
*/
public static <T extends ContainerRequest> AMRMClientAsync<T>
createAMRMClientAsync(
int intervalMs, AbstractCallbackHandler callbackHandler) {
return new AMRMClientAsyncImpl<T>(intervalMs, callbackHandler);
}
/**
* <p>Create a new instance of AMRMClientAsync.</p>
*
* @param client the AMRMClient instance
* @param intervalMs heartbeat interval in milliseconds between AM and RM
* @param callbackHandler callback handler that processes responses from
* the <code>ResourceManager</code>
*/
public static <T extends ContainerRequest> AMRMClientAsync<T>
createAMRMClientAsync(
AMRMClient<T> client, int intervalMs,
AbstractCallbackHandler callbackHandler) {
return new AMRMClientAsyncImpl<T>(client, intervalMs, callbackHandler);
}
protected AMRMClientAsync(
int intervalMs, AbstractCallbackHandler callbackHandler) {
this(new AMRMClientImpl<T>(), intervalMs, callbackHandler);
}
@Private
@VisibleForTesting
protected AMRMClientAsync(AMRMClient<T> client, int intervalMs,
AbstractCallbackHandler callbackHandler) {
super(AMRMClientAsync.class.getName());
this.client = client;
this.heartbeatIntervalMs.set(intervalMs);
this.handler = callbackHandler;
}
/**
*
* @deprecated Use {@link #createAMRMClientAsync(int,
* AMRMClientAsync.AbstractCallbackHandler)} instead.
*/
@Deprecated
public static <T extends ContainerRequest> AMRMClientAsync<T>
createAMRMClientAsync(int intervalMs, CallbackHandler callbackHandler) {
return new AMRMClientAsyncImpl<T>(intervalMs, callbackHandler);
}
/**
*
* @deprecated Use {@link #createAMRMClientAsync(AMRMClient,
* int, AMRMClientAsync.AbstractCallbackHandler)} instead.
*/
@Deprecated
public static <T extends ContainerRequest> AMRMClientAsync<T>
createAMRMClientAsync(AMRMClient<T> client, int intervalMs,
CallbackHandler callbackHandler) {
return new AMRMClientAsyncImpl<T>(client, intervalMs, callbackHandler);
}
@Deprecated
protected AMRMClientAsync(int intervalMs, CallbackHandler callbackHandler) {
this(new AMRMClientImpl<T>(), intervalMs, callbackHandler);
}
@Private
@VisibleForTesting
@Deprecated
protected AMRMClientAsync(AMRMClient<T> client, int intervalMs,
CallbackHandler callbackHandler) {
super(AMRMClientAsync.class.getName());
this.client = client;
this.heartbeatIntervalMs.set(intervalMs);
this.handler = callbackHandler;
}
public void setHeartbeatInterval(int interval) {
heartbeatIntervalMs.set(interval);
}
public abstract List<? extends Collection<T>> getMatchingRequests(
Priority priority,
String resourceName,
Resource capability);
/**
* Returns all matching ContainerRequests that match the given Priority,
* ResourceName, ExecutionType and Capability.
* @param priority Priority.
* @param resourceName Location.
* @param executionType ExecutionType.
* @param capability Capability.
* @return All matching ContainerRequests
*/
public List<? extends Collection<T>> getMatchingRequests(
Priority priority, String resourceName, ExecutionType executionType,
Resource capability) {
return client.getMatchingRequests(priority, resourceName,
executionType, capability);
}
/**
* Registers this application master with the resource manager. On successful
* registration, starts the heartbeating thread.
* @throws YarnException
* @throws IOException
*/
public abstract RegisterApplicationMasterResponse registerApplicationMaster(
String appHostName, int appHostPort, String appTrackingUrl)
throws YarnException, IOException;
/**
* Unregister the application master. This must be called in the end.
* @param appStatus Success/Failure status of the master
* @param appMessage Diagnostics message on failure
* @param appTrackingUrl New URL to get master info
* @throws YarnException
* @throws IOException
*/
public abstract void unregisterApplicationMaster(
FinalApplicationStatus appStatus, String appMessage, String appTrackingUrl)
throws YarnException, IOException;
/**
* Request containers for resources before calling <code>allocate</code>
* @param req Resource request
*/
public abstract void addContainerRequest(T req);
/**
* Remove previous container request. The previous container request may have
* already been sent to the ResourceManager. So even after the remove request
* the app must be prepared to receive an allocation for the previous request
* even after the remove request
* @param req Resource request
*/
public abstract void removeContainerRequest(T req);
/**
* Request container resource change before calling <code>allocate</code>.
* Any previous pending resource change request of the same container will be
* removed.
*
* Application that calls this method is expected to maintain the
* <code>Container</code>s that are returned from previous successful
* allocations or resource changes. By passing in the existing container and a
* target resource capability to this method, the application requests the
* ResourceManager to change the existing resource allocation to the target
* resource allocation.
*
* @param container The container returned from the last successful resource
* allocation or resource change
* @param capability The target resource capability of the container
*/
public abstract void requestContainerResourceChange(
Container container, Resource capability);
/**
* Release containers assigned by the Resource Manager. If the app cannot use
* the container or wants to give up the container then it can release them.
* The app needs to make new requests for the released resource capability if
* it still needs it. eg. it released non-local resources
* @param containerId
*/
public abstract void releaseAssignedContainer(ContainerId containerId);
/**
* Get the currently available resources in the cluster.
* A valid value is available after a call to allocate has been made
* @return Currently available resources
*/
public abstract Resource getAvailableResources();
/**
* Get the current number of nodes in the cluster.
* A valid values is available after a call to allocate has been made
* @return Current number of nodes in the cluster
*/
public abstract int getClusterNodeCount();
/**
* Register TimelineClient to AMRMClient.
* @param timelineClient
*/
public void registerTimelineClient(TimelineClient timelineClient) {
client.registerTimelineClient(timelineClient);
}
/**
* Get registered timeline client.
* @return the registered timeline client
*/
public TimelineClient getRegisteredTimeineClient() {
return client.getRegisteredTimeineClient();
}
/**
* Update application's blacklist with addition or removal resources.
*
* @param blacklistAdditions list of resources which should be added to the
* application blacklist
* @param blacklistRemovals list of resources which should be removed from the
* application blacklist
*/
public abstract void updateBlacklist(List<String> blacklistAdditions,
List<String> blacklistRemovals);
/**
* Wait for <code>check</code> to return true for each 1000 ms.
* See also {@link #waitFor(com.google.common.base.Supplier, int)}
* and {@link #waitFor(com.google.common.base.Supplier, int, int)}
* @param check the condition for which it should wait
*/
public void waitFor(Supplier<Boolean> check) throws InterruptedException {
waitFor(check, 1000);
}
/**
* Wait for <code>check</code> to return true for each
* <code>checkEveryMillis</code> ms.
* See also {@link #waitFor(com.google.common.base.Supplier, int, int)}
* @param check user defined checker
* @param checkEveryMillis interval to call <code>check</code>
*/
public void waitFor(Supplier<Boolean> check, int checkEveryMillis)
throws InterruptedException {
waitFor(check, checkEveryMillis, 1);
};
/**
* Wait for <code>check</code> to return true for each
* <code>checkEveryMillis</code> ms. In the main loop, this method will log
* the message "waiting in main loop" for each <code>logInterval</code> times
* iteration to confirm the thread is alive.
* @param check user defined checker
* @param checkEveryMillis interval to call <code>check</code>
* @param logInterval interval to log for each
*/
public void waitFor(Supplier<Boolean> check, int checkEveryMillis,
int logInterval) throws InterruptedException {
Preconditions.checkNotNull(check, "check should not be null");
Preconditions.checkArgument(checkEveryMillis >= 0,
"checkEveryMillis should be positive value");
Preconditions.checkArgument(logInterval >= 0,
"logInterval should be positive value");
int loggingCounter = logInterval;
do {
if (LOG.isDebugEnabled()) {
LOG.debug("Check the condition for main loop.");
}
boolean result = check.get();
if (result) {
LOG.info("Exits the main loop.");
return;
}
if (--loggingCounter <= 0) {
LOG.info("Waiting in main loop.");
loggingCounter = logInterval;
}
Thread.sleep(checkEveryMillis);
} while (true);
}
/**
* <p>
* The callback abstract class. The callback functions need to be implemented
* by {@link AMRMClientAsync} users. The APIs are called when responses from
* the <code>ResourceManager</code> are available.
* </p>
*/
public abstract static class AbstractCallbackHandler
implements CallbackHandler {
/**
* Called when the ResourceManager responds to a heartbeat with completed
* containers. If the response contains both completed containers and
* allocated containers, this will be called before containersAllocated.
*/
public abstract void onContainersCompleted(List<ContainerStatus> statuses);
/**
* Called when the ResourceManager responds to a heartbeat with allocated
* containers. If the response containers both completed containers and
* allocated containers, this will be called after containersCompleted.
*/
public abstract void onContainersAllocated(List<Container> containers);
/**
* Called when the ResourceManager responds to a heartbeat with containers
* whose resource allocation has been changed.
*/
public abstract void onContainersResourceChanged(
List<Container> containers);
/**
* Called when the ResourceManager wants the ApplicationMaster to shutdown
* for being out of sync etc. The ApplicationMaster should not unregister
* with the RM unless the ApplicationMaster wants to be the last attempt.
*/
public abstract void onShutdownRequest();
/**
* Called when nodes tracked by the ResourceManager have changed in health,
* availability etc.
*/
public abstract void onNodesUpdated(List<NodeReport> updatedNodes);
public abstract float getProgress();
/**
* Called when error comes from RM communications as well as from errors in
* the callback itself from the app. Calling
* stop() is the recommended action.
*/
public abstract void onError(Throwable e);
}
/**
* @deprecated Use {@link AMRMClientAsync.AbstractCallbackHandler} instead.
*/
@Deprecated
public interface CallbackHandler {
/**
* Called when the ResourceManager responds to a heartbeat with completed
* containers. If the response contains both completed containers and
* allocated containers, this will be called before containersAllocated.
*/
void onContainersCompleted(List<ContainerStatus> statuses);
/**
* Called when the ResourceManager responds to a heartbeat with allocated
* containers. If the response containers both completed containers and
* allocated containers, this will be called after containersCompleted.
*/
void onContainersAllocated(List<Container> containers);
/**
* Called when the ResourceManager wants the ApplicationMaster to shutdown
* for being out of sync etc. The ApplicationMaster should not unregister
* with the RM unless the ApplicationMaster wants to be the last attempt.
*/
void onShutdownRequest();
/**
* Called when nodes tracked by the ResourceManager have changed in health,
* availability etc.
*/
void onNodesUpdated(List<NodeReport> updatedNodes);
float getProgress();
/**
* Called when error comes from RM communications as well as from errors in
* the callback itself from the app. Calling
* stop() is the recommended action.
*
* @param e
*/
void onError(Throwable e);
}
}