blob: 7778a380dcd99139012a71c80fb86ede1fef0f71 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.job.yarn;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.records.*;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.samza.SamzaException;
import org.apache.samza.clustermanager.*;
import org.apache.samza.clustermanager.SamzaApplicationState;
import org.apache.samza.clustermanager.SamzaContainerLaunchException;
import org.apache.samza.config.Config;
import org.apache.samza.config.ShellCommandConfig;
import org.apache.samza.config.YarnConfig;
import org.apache.samza.coordinator.JobModelManager;
import org.apache.samza.job.CommandBuilder;
import org.apache.samza.job.yarn.YarnContainer;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.util.hadoop.HttpFileSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
/**
*
* An {@link YarnClusterResourceManager} implements a ClusterResourceManager using Yarn as the underlying
* resource manager. This class is as an adaptor between Yarn and translates Yarn callbacks into
* Samza specific callback methods as specified in Callback.
*
* Thread-safety:
* 1.Start and stop methods should NOT be called from multiple threads.
* 2.ALL callbacks from the YarnContainerManager are invoked from a single Callback thread of the AMRMClient.
* 3.Stop should not be called more than once.
*
*/
public class YarnClusterResourceManager extends ClusterResourceManager implements AMRMClientAsync.CallbackHandler {
private final int INVALID_YARN_CONTAINER_ID = -1;
/**
* The containerProcessManager instance to request resources from yarn.
*/
private final AMRMClientAsync<AMRMClient.ContainerRequest> amClient;
/**
* A helper class to launch Yarn containers.
*/
private final YarnContainerRunner yarnContainerRunner;
/**
* Configuration and state specific to Yarn.
*/
private final YarnConfiguration hConfig;
private final YarnAppState state;
/**
* SamzaYarnAppMasterLifecycle is responsible for registering, unregistering the AM client.
*/
private final SamzaYarnAppMasterLifecycle lifecycle;
/**
* SamzaAppMasterService is responsible for hosting an AM web UI. This picks up data from both
* SamzaAppState and YarnAppState.
*/
private final SamzaYarnAppMasterService service;
/**
* State variables to map Yarn specific callbacks into Samza specific callbacks.
*/
private final ConcurrentHashMap<SamzaResource, Container> allocatedResources = new ConcurrentHashMap<>();
private final ConcurrentHashMap<SamzaResourceRequest, AMRMClient.ContainerRequest> requestsMap = new ConcurrentHashMap<>();
final AtomicBoolean started = new AtomicBoolean(false);
private final Object lock = new Object();
private static final Logger log = LoggerFactory.getLogger(YarnClusterResourceManager.class);
/**
* Creates an YarnClusterResourceManager from config, a jobModelReader and a callback.
* @param config to instantiate the container manager with
* @param jobModelManager the jobModel manager to get the job model (mostly for the UI)
* @param callback the callback to receive events from Yarn.
* @param samzaAppState samza app state for display in the UI
*/
public YarnClusterResourceManager(Config config, JobModelManager jobModelManager, ClusterResourceManager.Callback callback, SamzaApplicationState samzaAppState ) {
super(callback);
hConfig = new YarnConfiguration();
hConfig.set("fs.http.impl", HttpFileSystem.class.getName());
MetricsRegistryMap registry = new MetricsRegistryMap();
// parse configs from the Yarn environment
String containerIdStr = System.getenv(ApplicationConstants.Environment.CONTAINER_ID.toString());
ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
String nodeHostString = System.getenv(ApplicationConstants.Environment.NM_HOST.toString());
String nodePortString = System.getenv(ApplicationConstants.Environment.NM_PORT.toString());
String nodeHttpPortString = System.getenv(ApplicationConstants.Environment.NM_HTTP_PORT.toString());
int nodePort = Integer.parseInt(nodePortString);
int nodeHttpPort = Integer.parseInt(nodeHttpPortString);
YarnConfig yarnConfig = new YarnConfig(config);
int interval = yarnConfig.getAMPollIntervalMs();
//Instantiate the AM Client.
this.amClient = AMRMClientAsync.createAMRMClientAsync(interval, this);
this.state = new YarnAppState(jobModelManager, -1, containerId, nodeHostString, nodePort, nodeHttpPort, samzaAppState);
log.info("Initialized YarnAppState: {}", state.toString());
this.service = new SamzaYarnAppMasterService(config, this.state, registry);
log.info("ContainerID str {}, Nodehost {} , Nodeport {} , NodeHttpport {}", new Object [] {containerIdStr, nodeHostString, nodePort, nodeHttpPort});
this.lifecycle = new SamzaYarnAppMasterLifecycle(yarnConfig.getContainerMaxMemoryMb(), yarnConfig.getContainerMaxCpuCores(), state, amClient );
yarnContainerRunner = new YarnContainerRunner(config, hConfig);
}
/**
* Starts the YarnContainerManager and initialize all its sub-systems.
* Attempting to start an already started container manager will return immediately.
*/
@Override
public void start() {
if(!started.compareAndSet(false, true)) {
log.info("Attempting to start an already started ContainerManager");
return;
}
service.onInit();
log.info("Starting YarnContainerManager.");
amClient.init(hConfig);
amClient.start();
lifecycle.onInit();
if(lifecycle.shouldShutdown()) {
clusterManagerCallback.onError(new SamzaException("Invalid resource request."));
}
log.info("Finished starting YarnContainerManager");
}
/**
* Request resources for running container processes.
*/
@Override
public void requestResources(SamzaResourceRequest resourceRequest) {
final int DEFAULT_PRIORITY = 0;
log.info("Requesting resources on " + resourceRequest.getPreferredHost() + " for container " + resourceRequest.getContainerID());
int memoryMb = resourceRequest.getMemoryMB();
int cpuCores = resourceRequest.getNumCores();
String preferredHost = resourceRequest.getPreferredHost();
Resource capability = Resource.newInstance(memoryMb, cpuCores);
Priority priority = Priority.newInstance(DEFAULT_PRIORITY);
AMRMClient.ContainerRequest issuedRequest;
if (preferredHost.equals("ANY_HOST"))
{
log.info("Making a request for ANY_HOST " + preferredHost );
issuedRequest = new AMRMClient.ContainerRequest(capability, null, null, priority);
}
else
{
log.info("Making a preferred host request on " + preferredHost);
issuedRequest = new AMRMClient.ContainerRequest(
capability,
new String[]{preferredHost},
null,
priority);
}
//ensure that updating the state and making the request are done atomically.
synchronized (lock) {
requestsMap.put(resourceRequest, issuedRequest);
amClient.addContainerRequest(issuedRequest);
}
}
/**
* Requests the YarnContainerManager to release a resource. If the app cannot use the resource or wants to give up
* the resource, it can release them.
*
* @param resource to be released
*/
@Override
public void releaseResources(SamzaResource resource) {
log.info("Release resource invoked {} ", resource);
//ensure that updating state and removing the request are done atomically
synchronized (lock) {
Container container = allocatedResources.get(resource);
if (container == null) {
log.info("Resource {} already released. ", resource);
return;
}
amClient.releaseAssignedContainer(container.getId());
allocatedResources.remove(resource);
}
}
/**
*
* Requests the launch of a StreamProcessor with the specified ID on the resource
* @param resource , the SamzaResource on which to launch the StreamProcessor
* @param builder, the builder to build the resource launch command from
*
* TODO: Support non-builder methods to launch resources. Maybe, refactor into a ContainerLaunchStrategy interface
*/
@Override
public void launchStreamProcessor(SamzaResource resource, CommandBuilder builder) throws SamzaContainerLaunchException {
String containerIDStr = builder.buildEnvironment().get(ShellCommandConfig.ENV_CONTAINER_ID());
int containerID = Integer.parseInt(containerIDStr);
log.info("Received launch request for {} on hostname {}", containerID , resource.getHost());
synchronized (lock) {
Container container = allocatedResources.get(resource);
if (container == null) {
log.info("Resource {} already released. ", resource);
return;
}
state.runningYarnContainers.put(containerID, new YarnContainer(container));
yarnContainerRunner.runContainer(containerID, container, builder);
}
}
/**
* Given a lookupContainerId from Yarn (for example: containerId_app_12345, this method returns the SamzaContainer ID
* in the range [0,N-1] that maps to it.
* @param lookupContainerId the Yarn container ID.
* @return the samza container ID.
*/
//TODO: Get rid of the YarnContainer object and just use Container in state.runningYarnContainers hashmap.
//In that case, this scan will turn into a lookup. This change will require changes/testing in the UI files because
//those UI stub templates operate on the YarnContainer object.
private int getIDForContainer(String lookupContainerId) {
int samzaContainerID = INVALID_YARN_CONTAINER_ID;
for(Map.Entry<Integer, YarnContainer> entry : state.runningYarnContainers.entrySet()) {
Integer key = entry.getKey();
YarnContainer yarnContainer = entry.getValue();
String yarnContainerId = yarnContainer.id().toString();
if(yarnContainerId.equals(lookupContainerId)) {
return key;
}
}
return samzaContainerID;
}
/**
*
* Remove a previously submitted resource request. The previous container request may have
* been submitted. Even after the remove request, a Callback implementation must
* be prepared to receive an allocation for the previous request. This is merely a best effort cancellation.
*
* @param request the request to be cancelled
*/
@Override
public void cancelResourceRequest(SamzaResourceRequest request) {
log.info("Cancelling request {} ", request);
//ensure that removal and cancellation are done atomically.
synchronized (lock) {
AMRMClient.ContainerRequest containerRequest = requestsMap.get(request);
if (containerRequest == null) {
log.info("Cancellation of {} already done. ", containerRequest);
return;
}
requestsMap.remove(request);
amClient.removeContainerRequest(containerRequest);
}
}
/**
* Stops the YarnContainerManager and all its sub-components.
* Stop should NOT be called from multiple threads.
* TODO: fix this to make stop idempotent?.
*/
@Override
public void stop(SamzaApplicationState.SamzaAppStatus status) {
log.info("Stopping AM client " );
lifecycle.onShutdown(status);
amClient.stop();
log.info("Stopping the AM service " );
service.onShutdown();
}
/**
* Callback invoked from Yarn when containers complete. This translates the yarn callbacks into Samza specific
* ones.
*
* @param statuses the YarnContainerStatus callbacks from Yarn.
*/
@Override
public void onContainersCompleted(List<ContainerStatus> statuses) {
List<SamzaResourceStatus> samzaResrcStatuses = new ArrayList<>();
for(ContainerStatus status: statuses) {
log.info("Container completed from RM " + status);
SamzaResourceStatus samzaResrcStatus = new SamzaResourceStatus(status.getContainerId().toString(), status.getDiagnostics(), status.getExitStatus());
samzaResrcStatuses.add(samzaResrcStatus);
int completedContainerID = getIDForContainer(status.getContainerId().toString());
log.info("Completed container had ID: {}", completedContainerID);
//remove the container from the list of running containers, if failed with a non-zero exit code, add it to the list of
//failed containers.
if(completedContainerID != INVALID_YARN_CONTAINER_ID){
if(state.runningYarnContainers.containsKey(completedContainerID)) {
log.info("Removing container ID {} from completed containers", completedContainerID);
state.runningYarnContainers.remove(completedContainerID);
if(status.getExitStatus() != ContainerExitStatus.SUCCESS)
state.failedContainersStatus.put(status.getContainerId().toString(), status);
}
}
}
clusterManagerCallback.onResourcesCompleted(samzaResrcStatuses);
}
/**
* Callback invoked from Yarn when containers are allocated. This translates the yarn callbacks into Samza
* specific ones.
* @param containers the list of {@link Container} returned by Yarn.
*/
@Override
public void onContainersAllocated(List<Container> containers) {
List<SamzaResource> resources = new ArrayList<SamzaResource>();
for(Container container : containers) {
log.info("Container allocated from RM on " + container.getNodeId().getHost());
final String id = container.getId().toString();
String host = container.getNodeId().getHost();
int memory = container.getResource().getMemory();
int numCores = container.getResource().getVirtualCores();
SamzaResource resource = new SamzaResource(numCores, memory, host, id);
allocatedResources.put(resource, container);
resources.add(resource);
}
clusterManagerCallback.onResourcesAvailable(resources);
}
//The below methods are specific to the Yarn AMRM Client. We currently don't handle scenarios where there are
//nodes being updated. We always return 0 when asked for progress by Yarn.
@Override
public void onShutdownRequest() {
//not implemented currently.
}
@Override
public void onNodesUpdated(List<NodeReport> updatedNodes) {
//not implemented currently.
}
@Override
public float getProgress() {
//not implemented currently.
return 0;
}
/**
* Callback invoked when there is an error in the Yarn client. This delegates the
* callback handling to the {@link ClusterResourceManager.Callback} instance.
*
*/
@Override
public void onError(Throwable e) {
log.error("Exception in the Yarn callback {}", e);
clusterManagerCallback.onError(e);
}
}