blob: 2f8a06c88a3bb7bc48795a4b9a40988dc84e4bb9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gobblin.yarn;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.AllArgsConstructor;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.util.Records;
import org.apache.helix.HelixManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.base.Throwables;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;
import com.google.common.io.Closer;
import com.google.common.util.concurrent.AbstractIdleService;
import com.typesafe.config.Config;
import lombok.AccessLevel;
import lombok.Getter;
import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
import org.apache.gobblin.cluster.GobblinClusterMetricTagNames;
import org.apache.gobblin.cluster.GobblinClusterUtils;
import org.apache.gobblin.cluster.HelixUtils;
import org.apache.gobblin.cluster.event.ClusterManagerShutdownRequest;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metrics.GobblinMetrics;
import org.apache.gobblin.metrics.MetricReporterException;
import org.apache.gobblin.metrics.MultiReporterException;
import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.ExecutorsUtils;
import org.apache.gobblin.util.JvmUtils;
import org.apache.gobblin.util.executors.ScalingThreadPoolExecutor;
import org.apache.gobblin.yarn.event.ContainerReleaseRequest;
import org.apache.gobblin.yarn.event.ContainerShutdownRequest;
import org.apache.gobblin.yarn.event.NewContainerRequest;
import static org.apache.gobblin.yarn.GobblinYarnTaskRunner.HELIX_YARN_INSTANCE_NAME_PREFIX;
/**
* This class is responsible for all Yarn-related stuffs including ApplicationMaster registration,
* ApplicationMaster un-registration, Yarn container management, etc.
*
* @author Yinan Li
*/
public class YarnService extends AbstractIdleService {
private static final Logger LOGGER = LoggerFactory.getLogger(YarnService.class);
private static final String UNKNOWN_HELIX_INSTANCE = "UNKNOWN";
private final String applicationName;
private final String applicationId;
private final String appViewAcl;
//Default helix instance tag derived from cluster level config
private final String helixInstanceTags;
private final Config config;
private final EventBus eventBus;
private final Configuration yarnConfiguration;
private final FileSystem fs;
private final Optional<GobblinMetrics> gobblinMetrics;
private final Optional<EventSubmitter> eventSubmitter;
@VisibleForTesting
@Getter(AccessLevel.PROTECTED)
private final AMRMClientAsync<AMRMClient.ContainerRequest> amrmClientAsync;
private final NMClientAsync nmClientAsync;
private final ExecutorService containerLaunchExecutor;
private final int initialContainers;
private final int requestedContainerMemoryMbs;
private final int requestedContainerCores;
private final int jvmMemoryOverheadMbs;
private final double jvmMemoryXmxRatio;
private final boolean containerHostAffinityEnabled;
private final int helixInstanceMaxRetries;
private final Optional<String> containerJvmArgs;
private final String containerTimezone;
private final HelixManager helixManager;
@Getter(AccessLevel.PROTECTED)
private volatile Optional<Resource> maxResourceCapacity = Optional.absent();
// Security tokens for accessing HDFS
private ByteBuffer tokens;
private final Closer closer = Closer.create();
private final Object allContainersStopped = new Object();
// A map from container IDs to Container instances, Helix participant IDs of the containers and Helix Tag
@VisibleForTesting
@Getter(AccessLevel.PROTECTED)
private final ConcurrentMap<ContainerId, ContainerInfo> containerMap = Maps.newConcurrentMap();
// A cache of the containers with an outstanding container release request.
// This is a cache instead of a set to get the automatic cleanup in case a container completes before the requested
// release.
@VisibleForTesting
@Getter(AccessLevel.PROTECTED)
private final Cache<ContainerId, String> releasedContainerCache;
// A generator for an integer ID of a Helix instance (participant)
private final AtomicInteger helixInstanceIdGenerator = new AtomicInteger(0);
// A map from Helix instance names to the number times the instances are retried to be started
private final ConcurrentMap<String, AtomicInteger> helixInstanceRetryCount = Maps.newConcurrentMap();
// A concurrent HashSet of unused Helix instance names. An unused Helix instance name gets put
// into the set if the container running the instance completes. Unused Helix
// instance names get picked up when replacement containers get allocated.
private final Set<String> unusedHelixInstanceNames = ConcurrentHashMap.newKeySet();
// The map from helix tag to allocated container count
private final Map<String, Integer> allocatedContainerCountMap = Maps.newConcurrentMap();
private volatile YarnContainerRequestBundle yarnContainerRequest;
private final AtomicInteger priorityNumGenerator = new AtomicInteger(0);
private final Map<String, Integer> resourcePriorityMap = new HashMap<>();
private volatile boolean shutdownInProgress = false;
public YarnService(Config config, String applicationName, String applicationId, YarnConfiguration yarnConfiguration,
FileSystem fs, EventBus eventBus, HelixManager helixManager) throws Exception {
this.applicationName = applicationName;
this.applicationId = applicationId;
this.config = config;
this.eventBus = eventBus;
this.helixManager = helixManager;
this.gobblinMetrics = config.getBoolean(ConfigurationKeys.METRICS_ENABLED_KEY) ?
Optional.of(buildGobblinMetrics()) : Optional.<GobblinMetrics>absent();
this.eventSubmitter = config.getBoolean(ConfigurationKeys.METRICS_ENABLED_KEY) ?
Optional.of(buildEventSubmitter()) : Optional.<EventSubmitter>absent();
this.yarnConfiguration = yarnConfiguration;
this.fs = fs;
int amRmHeartbeatIntervalMillis = Long.valueOf(TimeUnit.SECONDS.toMillis(
ConfigUtils.getInt(config, GobblinYarnConfigurationKeys.AMRM_HEARTBEAT_INTERVAL_SECS,
GobblinYarnConfigurationKeys.DEFAULT_AMRM_HEARTBEAT_INTERVAL_SECS))).intValue();
this.amrmClientAsync = closer.register(
AMRMClientAsync.createAMRMClientAsync(amRmHeartbeatIntervalMillis, new AMRMClientCallbackHandler()));
this.amrmClientAsync.init(this.yarnConfiguration);
this.nmClientAsync = closer.register(NMClientAsync.createNMClientAsync(getNMClientCallbackHandler()));
this.nmClientAsync.init(this.yarnConfiguration);
this.initialContainers = config.getInt(GobblinYarnConfigurationKeys.INITIAL_CONTAINERS_KEY);
this.requestedContainerMemoryMbs = config.getInt(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY);
this.requestedContainerCores = config.getInt(GobblinYarnConfigurationKeys.CONTAINER_CORES_KEY);
this.containerHostAffinityEnabled = config.getBoolean(GobblinYarnConfigurationKeys.CONTAINER_HOST_AFFINITY_ENABLED);
this.helixInstanceMaxRetries = config.getInt(GobblinYarnConfigurationKeys.HELIX_INSTANCE_MAX_RETRIES);
this.helixInstanceTags = ConfigUtils.getString(config,
GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_KEY, GobblinClusterConfigurationKeys.HELIX_DEFAULT_TAG);
this.containerJvmArgs = config.hasPath(GobblinYarnConfigurationKeys.CONTAINER_JVM_ARGS_KEY) ?
Optional.of(config.getString(GobblinYarnConfigurationKeys.CONTAINER_JVM_ARGS_KEY)) :
Optional.<String>absent();
int numContainerLaunchThreads =
ConfigUtils.getInt(config, GobblinYarnConfigurationKeys.MAX_CONTAINER_LAUNCH_THREADS_KEY,
GobblinYarnConfigurationKeys.DEFAULT_MAX_CONTAINER_LAUNCH_THREADS);
this.containerLaunchExecutor = ScalingThreadPoolExecutor.newScalingThreadPool(5, numContainerLaunchThreads, 0L,
ExecutorsUtils.newThreadFactory(Optional.of(LOGGER), Optional.of("ContainerLaunchExecutor")));
this.tokens = getSecurityTokens();
this.releasedContainerCache = CacheBuilder.newBuilder().expireAfterAccess(ConfigUtils.getInt(config,
GobblinYarnConfigurationKeys.RELEASED_CONTAINERS_CACHE_EXPIRY_SECS,
GobblinYarnConfigurationKeys.DEFAULT_RELEASED_CONTAINERS_CACHE_EXPIRY_SECS), TimeUnit.SECONDS).build();
this.jvmMemoryXmxRatio = ConfigUtils.getDouble(this.config,
GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_XMX_RATIO_KEY,
GobblinYarnConfigurationKeys.DEFAULT_CONTAINER_JVM_MEMORY_XMX_RATIO);
Preconditions.checkArgument(this.jvmMemoryXmxRatio >= 0 && this.jvmMemoryXmxRatio <= 1,
GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_XMX_RATIO_KEY + " must be between 0 and 1 inclusive");
this.jvmMemoryOverheadMbs = ConfigUtils.getInt(this.config,
GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_OVERHEAD_MBS_KEY,
GobblinYarnConfigurationKeys.DEFAULT_CONTAINER_JVM_MEMORY_OVERHEAD_MBS);
Preconditions.checkArgument(this.jvmMemoryOverheadMbs < this.requestedContainerMemoryMbs * this.jvmMemoryXmxRatio,
GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_OVERHEAD_MBS_KEY + " cannot be more than "
+ GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY + " * "
+ GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_XMX_RATIO_KEY);
this.appViewAcl = ConfigUtils.getString(this.config, GobblinYarnConfigurationKeys.APP_VIEW_ACL,
GobblinYarnConfigurationKeys.DEFAULT_APP_VIEW_ACL);
this.containerTimezone = ConfigUtils.getString(this.config, GobblinYarnConfigurationKeys.GOBBLIN_YARN_CONTAINER_TIMEZONE,
GobblinYarnConfigurationKeys.DEFAULT_GOBBLIN_YARN_CONTAINER_TIMEZONE);
}
@SuppressWarnings("unused")
@Subscribe
public void handleNewContainerRequest(NewContainerRequest newContainerRequest) {
if (!this.maxResourceCapacity.isPresent()) {
LOGGER.error(String.format(
"Unable to handle new container request as maximum resource capacity is not available: "
+ "[memory (MBs) requested = %d, vcores requested = %d]", this.requestedContainerMemoryMbs,
this.requestedContainerCores));
return;
}
requestContainer(newContainerRequest.getReplacedContainer().transform(container -> container.getNodeId().getHost()),
newContainerRequest.getResource());
}
protected NMClientCallbackHandler getNMClientCallbackHandler() {
return new NMClientCallbackHandler();
}
@SuppressWarnings("unused")
@Subscribe
public void handleContainerShutdownRequest(ContainerShutdownRequest containerShutdownRequest) {
for (Container container : containerShutdownRequest.getContainers()) {
LOGGER.info(String.format("Stopping container %s running on %s", container.getId(), container.getNodeId()));
this.nmClientAsync.stopContainerAsync(container.getId(), container.getNodeId());
}
}
/**
* Request the Resource Manager to release the container
* @param containerReleaseRequest containers to release
*/
@Subscribe
public void handleContainerReleaseRequest(ContainerReleaseRequest containerReleaseRequest) {
for (Container container : containerReleaseRequest.getContainers()) {
LOGGER.info(String.format("Releasing container %s running on %s", container.getId(), container.getNodeId()));
// Record that this container was explicitly released so that a new one is not spawned to replace it
// Put the container id in the releasedContainerCache before releasing it so that handleContainerCompletion()
// can check for the container id and skip spawning a replacement container.
// Note that this is best effort since these are asynchronous operations and a container may abort concurrently
// with the release call. So in some cases a replacement container may have already been spawned before
// the container is put into the black list.
this.releasedContainerCache.put(container.getId(), "");
this.amrmClientAsync.releaseAssignedContainer(container.getId());
}
}
@Override
protected void startUp() throws Exception {
LOGGER.info("Starting the YarnService");
// Register itself with the EventBus for container-related requests
this.eventBus.register(this);
this.amrmClientAsync.start();
this.nmClientAsync.start();
// The ApplicationMaster registration response is used to determine the maximum resource capacity of the cluster
RegisterApplicationMasterResponse response = this.amrmClientAsync.registerApplicationMaster(
GobblinClusterUtils.getHostname(), -1, "");
LOGGER.info("ApplicationMaster registration response: " + response);
this.maxResourceCapacity = Optional.of(response.getMaximumResourceCapability());
LOGGER.info("Requesting initial containers");
requestInitialContainers(this.initialContainers);
}
@Override
protected void shutDown() throws IOException {
LOGGER.info("Stopping the YarnService");
this.shutdownInProgress = true;
try {
ExecutorsUtils.shutdownExecutorService(this.containerLaunchExecutor, Optional.of(LOGGER));
// Stop the running containers
for (ContainerInfo containerInfo : this.containerMap.values()) {
LOGGER.info("Stopping container {} running participant {}", containerInfo.getContainer().getId(),
containerInfo.getHelixParticipantId());
this.nmClientAsync.stopContainerAsync(containerInfo.getContainer().getId(), containerInfo.getContainer().getNodeId());
}
if (!this.containerMap.isEmpty()) {
synchronized (this.allContainersStopped) {
try {
// Wait 5 minutes for the containers to stop
this.allContainersStopped.wait(5 * 60 * 1000);
LOGGER.info("All of the containers have been stopped");
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
}
}
this.amrmClientAsync.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, null, null);
} catch (IOException | YarnException e) {
LOGGER.error("Failed to unregister the ApplicationMaster", e);
} finally {
try {
this.closer.close();
} finally {
if (this.gobblinMetrics.isPresent()) {
this.gobblinMetrics.get().stopMetricsReporting();
}
}
}
}
public void updateToken() throws IOException{
this.tokens = getSecurityTokens();
}
private GobblinMetrics buildGobblinMetrics() {
// Create tags list
ImmutableList.Builder<Tag<?>> tags = new ImmutableList.Builder<>();
tags.add(new Tag<>(GobblinClusterMetricTagNames.APPLICATION_ID, this.applicationId));
tags.add(new Tag<>(GobblinClusterMetricTagNames.APPLICATION_NAME, this.applicationName));
// Intialize Gobblin metrics and start reporters
GobblinMetrics gobblinMetrics = GobblinMetrics.get(this.applicationId, null, tags.build());
try {
gobblinMetrics.startMetricReporting(ConfigUtils.configToProperties(config));
} catch (MultiReporterException ex) {
for (MetricReporterException e: ex.getExceptions()) {
LOGGER.error("Failed to start {} {} reporter.", e.getSinkType().name(), e.getReporterType().name(), e);
}
}
return gobblinMetrics;
}
private EventSubmitter buildEventSubmitter() {
return new EventSubmitter.Builder(this.gobblinMetrics.get().getMetricContext(),
GobblinYarnEventConstants.EVENT_NAMESPACE)
.build();
}
/**
* Request an allocation of containers. If numTargetContainers is larger than the max of current and expected number
* of containers then additional containers are requested.
*
* If numTargetContainers is less than the current number of allocated containers then release free containers.
* Shrinking is relative to the number of currently allocated containers since it takes time for containers
* to be allocated and assigned work and we want to avoid releasing a container prematurely before it is assigned
* work. This means that a container may not be released even though numTargetContainers is less than the requested
* number of containers. The intended usage is for the caller of this method to make periodic calls to attempt to
* adjust the cluster towards the desired number of containers.
*
* @param yarnContainerRequestBundle the desired containers information, including numbers, resource and helix tag
* @param inUseInstances a set of in use instances
*/
public synchronized void requestTargetNumberOfContainers(YarnContainerRequestBundle yarnContainerRequestBundle, Set<String> inUseInstances) {
LOGGER.debug("Requesting numTargetContainers {}, in use instances count is {}, container map size is {}",
yarnContainerRequestBundle.getTotalContainers(), inUseInstances, this.containerMap.size());
int numTargetContainers = yarnContainerRequestBundle.getTotalContainers();
// YARN can allocate more than the requested number of containers, compute additional allocations and deallocations
// based on the max of the requested and actual allocated counts
int numAllocatedContainers = this.containerMap.size();
// Request additional containers if the desired count is higher than the max of the current allocation or previously
// requested amount. Note that there may be in-flight or additional allocations after numContainers has been computed
// so overshooting can occur, but periodic calls to this method will make adjustments towards the target.
for (Map.Entry<String, Integer> entry : yarnContainerRequestBundle.getHelixTagContainerCountMap().entrySet()) {
String currentHelixTag = entry.getKey();
int desiredContainerCount = entry.getValue();
// Calculate requested container count based on adding allocated count and outstanding ContainerRequests in Yarn
int requestedContainerCount = allocatedContainerCountMap.getOrDefault(currentHelixTag, 0)
+ getMatchingRequestsCount(yarnContainerRequestBundle.getHelixTagResourceMap().get(currentHelixTag));
for(; requestedContainerCount < desiredContainerCount; requestedContainerCount++) {
requestContainer(Optional.absent(), yarnContainerRequestBundle.getHelixTagResourceMap().get(currentHelixTag));
}
}
// If the total desired is lower than the currently allocated amount then release free containers.
// This is based on the currently allocated amount since containers may still be in the process of being allocated
// and assigned work. Resizing based on numRequestedContainers at this point may release a container right before
// or soon after it is assigned work.
if (numTargetContainers < numAllocatedContainers) {
LOGGER.debug("Shrinking number of containers by {}", (numAllocatedContainers - numTargetContainers));
List<Container> containersToRelease = new ArrayList<>();
int numToShutdown = numAllocatedContainers - numTargetContainers;
// Look for eligible containers to release. If a container is in use then it is not released.
for (Map.Entry<ContainerId, ContainerInfo> entry : this.containerMap.entrySet()) {
ContainerInfo containerInfo = entry.getValue();
if (!inUseInstances.contains(containerInfo.getHelixParticipantId())) {
containersToRelease.add(containerInfo.getContainer());
}
if (containersToRelease.size() == numToShutdown) {
break;
}
}
LOGGER.debug("Shutting down containers {}", containersToRelease);
this.eventBus.post(new ContainerReleaseRequest(containersToRelease));
}
this.yarnContainerRequest = yarnContainerRequestBundle;
LOGGER.info("Current tag-container desired count:{}, tag-container allocated: {}",
yarnContainerRequestBundle.getHelixTagContainerCountMap(), this.allocatedContainerCountMap);
}
// Request initial containers with default resource and helix tag
private void requestInitialContainers(int containersRequested) {
YarnContainerRequestBundle initialYarnContainerRequest = new YarnContainerRequestBundle();
Resource capability = Resource.newInstance(this.requestedContainerMemoryMbs, this.requestedContainerCores);
initialYarnContainerRequest.add(this.helixInstanceTags, containersRequested, capability);
requestTargetNumberOfContainers(initialYarnContainerRequest, Collections.EMPTY_SET);
}
private void requestContainer(Optional<String> preferredNode, Optional<Resource> resourceOptional) {
Resource desiredResource = resourceOptional.or(Resource.newInstance(
this.requestedContainerMemoryMbs, this.requestedContainerCores));
requestContainer(preferredNode, desiredResource);
}
// Request containers with specific resource requirement
private void requestContainer(Optional<String> preferredNode, Resource resource) {
// Fail if Yarn cannot meet container resource requirements
Preconditions.checkArgument(resource.getMemory() <= this.maxResourceCapacity.get().getMemory() &&
resource.getVirtualCores() <= this.maxResourceCapacity.get().getVirtualCores(),
"Resource requirement must less than the max resource capacity. Requested resource" + resource.toString()
+ " exceed the max resource limit " + this.maxResourceCapacity.get().toString());
// Due to YARN-314, different resource capacity needs different priority, otherwise Yarn will not allocate container
Priority priority = Records.newRecord(Priority.class);
if(!resourcePriorityMap.containsKey(resource.toString())) {
resourcePriorityMap.put(resource.toString(), priorityNumGenerator.getAndIncrement());
}
int priorityNum = resourcePriorityMap.get(resource.toString());
priority.setPriority(priorityNum);
String[] preferredNodes = preferredNode.isPresent() ? new String[] {preferredNode.get()} : null;
this.amrmClientAsync.addContainerRequest(
new AMRMClient.ContainerRequest(resource, preferredNodes, null, priority));
}
protected ContainerLaunchContext newContainerLaunchContext(ContainerInfo containerInfo)
throws IOException {
Path appWorkDir = GobblinClusterUtils.getAppWorkDirPathFromConfig(this.config, this.fs, this.applicationName, this.applicationId);
Path containerWorkDir = new Path(appWorkDir, GobblinYarnConfigurationKeys.CONTAINER_WORK_DIR_NAME);
Map<String, LocalResource> resourceMap = Maps.newHashMap();
addContainerLocalResources(new Path(appWorkDir, GobblinYarnConfigurationKeys.LIB_JARS_DIR_NAME), resourceMap);
addContainerLocalResources(new Path(containerWorkDir, GobblinYarnConfigurationKeys.APP_JARS_DIR_NAME), resourceMap);
addContainerLocalResources(
new Path(containerWorkDir, GobblinYarnConfigurationKeys.APP_FILES_DIR_NAME), resourceMap);
if (this.config.hasPath(GobblinYarnConfigurationKeys.CONTAINER_FILES_REMOTE_KEY)) {
YarnHelixUtils.addRemoteFilesToLocalResources(this.config.getString(GobblinYarnConfigurationKeys.CONTAINER_FILES_REMOTE_KEY),
resourceMap, yarnConfiguration);
}
if (this.config.hasPath(GobblinYarnConfigurationKeys.CONTAINER_ZIPS_REMOTE_KEY)) {
YarnHelixUtils.addRemoteZipsToLocalResources(this.config.getString(GobblinYarnConfigurationKeys.CONTAINER_ZIPS_REMOTE_KEY),
resourceMap, yarnConfiguration);
}
ContainerLaunchContext containerLaunchContext = Records.newRecord(ContainerLaunchContext.class);
containerLaunchContext.setLocalResources(resourceMap);
containerLaunchContext.setEnvironment(YarnHelixUtils.getEnvironmentVariables(this.yarnConfiguration));
containerLaunchContext.setCommands(Lists.newArrayList(buildContainerCommand(containerInfo)));
Map<ApplicationAccessType, String> acls = new HashMap<>(1);
acls.put(ApplicationAccessType.VIEW_APP, this.appViewAcl);
containerLaunchContext.setApplicationACLs(acls);
if (UserGroupInformation.isSecurityEnabled()) {
containerLaunchContext.setTokens(this.tokens.duplicate());
}
return containerLaunchContext;
}
private void addContainerLocalResources(Path destDir, Map<String, LocalResource> resourceMap) throws IOException {
if (!this.fs.exists(destDir)) {
LOGGER.warn(String.format("Path %s does not exist so no container LocalResource to add", destDir));
return;
}
FileStatus[] statuses = this.fs.listStatus(destDir);
if (statuses != null) {
for (FileStatus status : statuses) {
YarnHelixUtils.addFileAsLocalResource(this.fs, status.getPath(), LocalResourceType.FILE, resourceMap);
}
}
}
private ByteBuffer getSecurityTokens() throws IOException {
Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials();
Closer closer = Closer.create();
try {
DataOutputBuffer dataOutputBuffer = closer.register(new DataOutputBuffer());
credentials.writeTokenStorageToStream(dataOutputBuffer);
// Remove the AM->RM token so that containers cannot access it
Iterator<Token<?>> tokenIterator = credentials.getAllTokens().iterator();
while (tokenIterator.hasNext()) {
Token<?> token = tokenIterator.next();
if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
tokenIterator.remove();
}
}
return ByteBuffer.wrap(dataOutputBuffer.getData(), 0, dataOutputBuffer.getLength());
} catch (Throwable t) {
throw closer.rethrow(t);
} finally {
closer.close();
}
}
@VisibleForTesting
protected String buildContainerCommand(ContainerInfo containerInfo) {
String containerProcessName = GobblinYarnTaskRunner.class.getSimpleName();
StringBuilder containerCommand = new StringBuilder()
.append(ApplicationConstants.Environment.JAVA_HOME.$()).append("/bin/java")
.append(" -Xmx").append((int) (containerInfo.getContainer().getResource().getMemory() * this.jvmMemoryXmxRatio) -
this.jvmMemoryOverheadMbs).append("M")
.append(" -D").append(GobblinYarnConfigurationKeys.JVM_USER_TIMEZONE_CONFIG).append("=").append(this.containerTimezone)
.append(" -D").append(GobblinYarnConfigurationKeys.GOBBLIN_YARN_CONTAINER_LOG_DIR_NAME).append("=").append(ApplicationConstants.LOG_DIR_EXPANSION_VAR)
.append(" -D").append(GobblinYarnConfigurationKeys.GOBBLIN_YARN_CONTAINER_LOG_FILE_NAME).append("=").append(containerProcessName).append(".").append(ApplicationConstants.STDOUT)
.append(" ").append(JvmUtils.formatJvmArguments(this.containerJvmArgs))
.append(" ").append(GobblinYarnTaskRunner.class.getName())
.append(" --").append(GobblinClusterConfigurationKeys.APPLICATION_NAME_OPTION_NAME)
.append(" ").append(this.applicationName)
.append(" --").append(GobblinClusterConfigurationKeys.APPLICATION_ID_OPTION_NAME)
.append(" ").append(this.applicationId)
.append(" --").append(GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_OPTION_NAME)
.append(" ").append(containerInfo.getHelixParticipantId());
if (!Strings.isNullOrEmpty(containerInfo.getHelixTag())) {
containerCommand.append(" --").append(GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_OPTION_NAME)
.append(" ").append(containerInfo.getHelixTag());
}
return containerCommand.append(" 1>").append(ApplicationConstants.LOG_DIR_EXPANSION_VAR).append(File.separator).append(
containerProcessName).append(".").append(ApplicationConstants.STDOUT)
.append(" 2>").append(ApplicationConstants.LOG_DIR_EXPANSION_VAR).append(File.separator).append(
containerProcessName).append(".").append(ApplicationConstants.STDERR).toString();
}
/**
* Check the exit status of a completed container and see if the replacement container
* should try to be started on the same node. Some exit status indicates a disk or
* node failure and in such cases the replacement container should try to be started on
* a different node.
*/
private boolean shouldStickToTheSameNode(int containerExitStatus) {
switch (containerExitStatus) {
case ContainerExitStatus.DISKS_FAILED:
return false;
case ContainerExitStatus.ABORTED:
// Mostly likely this exit status is due to node failures because the
// application itself will not release containers.
return false;
default:
// Stick to the same node for other cases if host affinity is enabled.
return this.containerHostAffinityEnabled;
}
}
/**
* Handle the completion of a container. A new container will be requested to replace the one
* that just exited. Depending on the exit status and if container host affinity is enabled,
* the new container may or may not try to be started on the same node.
*
* A container completes in either of the following conditions: 1) some error happens in the
* container and caused the container to exit, 2) the container gets killed due to some reason,
* for example, if it runs over the allowed amount of virtual or physical memory, 3) the gets
* preempted by the ResourceManager, or 4) the container gets stopped by the ApplicationMaster.
* A replacement container is needed in all but the last case.
*/
protected void handleContainerCompletion(ContainerStatus containerStatus) {
ContainerInfo completedContainerInfo = this.containerMap.remove(containerStatus.getContainerId());
//Get the Helix instance name for the completed container. Because callbacks are processed asynchronously, we might
//encounter situations where handleContainerCompletion() is called before onContainersAllocated(), resulting in the
//containerId missing from the containersMap.
String completedInstanceName = completedContainerInfo == null? UNKNOWN_HELIX_INSTANCE : completedContainerInfo.getHelixParticipantId();
String helixTag = completedContainerInfo == null ? helixInstanceTags : completedContainerInfo.getHelixTag();
allocatedContainerCountMap.put(helixTag, allocatedContainerCountMap.get(helixTag) - 1);
LOGGER.info(String.format("Container %s running Helix instance %s with tag %s has completed with exit status %d",
containerStatus.getContainerId(), completedInstanceName, helixTag, containerStatus.getExitStatus()));
if (!Strings.isNullOrEmpty(containerStatus.getDiagnostics())) {
LOGGER.info(String.format("Received the following diagnostics information for container %s: %s",
containerStatus.getContainerId(), containerStatus.getDiagnostics()));
}
if (containerStatus.getExitStatus() == ContainerExitStatus.ABORTED) {
if (this.releasedContainerCache.getIfPresent(containerStatus.getContainerId()) != null) {
LOGGER.info("Container release requested, so not spawning a replacement for containerId {}", containerStatus.getContainerId());
if (completedContainerInfo != null) {
LOGGER.info("Adding instance {} to the pool of unused instances", completedInstanceName);
this.unusedHelixInstanceNames.add(completedInstanceName);
}
return;
} else {
LOGGER.info("Container {} aborted due to lost NM", containerStatus.getContainerId());
// Container release was not requested. Likely, the container was running on a node on which the NM died.
// In this case, RM assumes that the containers are "lost", even though the container process may still be
// running on the node. We need to ensure that the Helix instances running on the orphaned containers
// are fenced off from the Helix cluster to avoid double publishing and state being committed by the
// instances.
if (!UNKNOWN_HELIX_INSTANCE.equals(completedInstanceName)) {
String clusterName = this.helixManager.getClusterName();
//Disable the orphaned instance.
if (HelixUtils.isInstanceLive(helixManager, completedInstanceName)) {
LOGGER.info("Disabling the Helix instance {}", completedInstanceName);
this.helixManager.getClusterManagmentTool().enableInstance(clusterName, completedInstanceName, false);
}
}
}
}
if (this.shutdownInProgress) {
return;
}
if(completedContainerInfo != null) {
this.helixInstanceRetryCount.putIfAbsent(completedInstanceName, new AtomicInteger(0));
int retryCount = this.helixInstanceRetryCount.get(completedInstanceName).incrementAndGet();
// Populate event metadata
Optional<ImmutableMap.Builder<String, String>> eventMetadataBuilder = Optional.absent();
if (this.eventSubmitter.isPresent()) {
eventMetadataBuilder = Optional.of(buildContainerStatusEventMetadata(containerStatus));
eventMetadataBuilder.get().put(GobblinYarnEventConstants.EventMetadata.HELIX_INSTANCE_ID, completedInstanceName);
eventMetadataBuilder.get().put(GobblinYarnEventConstants.EventMetadata.CONTAINER_STATUS_RETRY_ATTEMPT, retryCount + "");
}
if (this.helixInstanceMaxRetries > 0 && retryCount > this.helixInstanceMaxRetries) {
if (this.eventSubmitter.isPresent()) {
this.eventSubmitter.get()
.submit(GobblinYarnEventConstants.EventNames.HELIX_INSTANCE_COMPLETION, eventMetadataBuilder.get().build());
}
LOGGER.warn("Maximum number of retries has been achieved for Helix instance " + completedInstanceName);
return;
}
// Add the Helix instance name of the completed container to the set of unused
// instance names so they can be reused by a replacement container.
LOGGER.info("Adding instance {} to the pool of unused instances", completedInstanceName);
this.unusedHelixInstanceNames.add(completedInstanceName);
if (this.eventSubmitter.isPresent()) {
this.eventSubmitter.get()
.submit(GobblinYarnEventConstants.EventNames.HELIX_INSTANCE_COMPLETION, eventMetadataBuilder.get().build());
}
}
Optional<Resource> newContainerResource = completedContainerInfo != null ?
Optional.of(completedContainerInfo.getContainer().getResource()) : Optional.absent();
LOGGER.info("Requesting a new container to replace {} to run Helix instance {} with helix tag {} and resource {}",
containerStatus.getContainerId(), completedInstanceName, helixTag, newContainerResource.orNull());
this.eventBus.post(new NewContainerRequest(
shouldStickToTheSameNode(containerStatus.getExitStatus()) && completedContainerInfo != null ?
Optional.of(completedContainerInfo.getContainer()) : Optional.absent(), newContainerResource));
}
private ImmutableMap.Builder<String, String> buildContainerStatusEventMetadata(ContainerStatus containerStatus) {
ImmutableMap.Builder<String, String> eventMetadataBuilder = new ImmutableMap.Builder<>();
eventMetadataBuilder.put(GobblinYarnMetricTagNames.CONTAINER_ID, containerStatus.getContainerId().toString());
eventMetadataBuilder.put(GobblinYarnEventConstants.EventMetadata.CONTAINER_STATUS_CONTAINER_STATE,
containerStatus.getState().toString());
if (ContainerExitStatus.INVALID != containerStatus.getExitStatus()) {
eventMetadataBuilder.put(GobblinYarnEventConstants.EventMetadata.CONTAINER_STATUS_EXIT_STATUS,
containerStatus.getExitStatus() + "");
}
if (!Strings.isNullOrEmpty(containerStatus.getDiagnostics())) {
eventMetadataBuilder.put(GobblinYarnEventConstants.EventMetadata.CONTAINER_STATUS_EXIT_DIAGNOSTICS,
containerStatus.getDiagnostics());
}
return eventMetadataBuilder;
}
/**
* Get the number of matching container requests for the specified resource memory and cores.
*/
private int getMatchingRequestsCount(Resource resource) {
int priorityNum = resourcePriorityMap.getOrDefault(resource.toString(), 0);
Priority priority = Priority.newInstance(priorityNum);
return getAmrmClientAsync().getMatchingRequests(priority, ResourceRequest.ANY, resource).size();
}
/**
* A custom implementation of {@link AMRMClientAsync.CallbackHandler}.
*/
private class AMRMClientCallbackHandler implements AMRMClientAsync.CallbackHandler {
private volatile boolean done = false;
@Override
public void onContainersCompleted(List<ContainerStatus> statuses) {
for (ContainerStatus containerStatus : statuses) {
handleContainerCompletion(containerStatus);
}
}
@Override
public void onContainersAllocated(List<Container> containers) {
for (final Container container : containers) {
String containerId = container.getId().toString();
String containerHelixTag = YarnHelixUtils.findHelixTagForContainer(container, allocatedContainerCountMap, yarnContainerRequest);
if (Strings.isNullOrEmpty(containerHelixTag)) {
containerHelixTag = helixInstanceTags;
}
if (eventSubmitter.isPresent()) {
eventSubmitter.get().submit(GobblinYarnEventConstants.EventNames.CONTAINER_ALLOCATION,
GobblinYarnMetricTagNames.CONTAINER_ID, containerId);
}
LOGGER.info("Container {} has been allocated with resource {} for helix tag {}",
container.getId(), container.getResource(), containerHelixTag);
//Iterate over the (thread-safe) set of unused instances to find the first instance that is not currently live.
//Once we find a candidate instance, it is removed from the set.
String instanceName = null;
//Ensure that updates to unusedHelixInstanceNames are visible to other threads that might concurrently
//invoke the callback on container allocation.
synchronized (this) {
Iterator<String> iterator = unusedHelixInstanceNames.iterator();
while (iterator.hasNext()) {
instanceName = iterator.next();
if (!HelixUtils.isInstanceLive(helixManager, instanceName)) {
iterator.remove();
LOGGER.info("Found an unused instance {}", instanceName);
break;
} else {
//Reset the instance name to null, since this instance name is live.
instanceName = null;
}
}
allocatedContainerCountMap.put(containerHelixTag,
allocatedContainerCountMap.getOrDefault(containerHelixTag, 0) + 1);
}
if (Strings.isNullOrEmpty(instanceName)) {
// No unused instance name, so generating a new one.
instanceName = HelixUtils
.getHelixInstanceName(HELIX_YARN_INSTANCE_NAME_PREFIX, helixInstanceIdGenerator.incrementAndGet());
}
ContainerInfo containerInfo = new ContainerInfo(container, instanceName, containerHelixTag);
containerMap.put(container.getId(), containerInfo);
// Find matching requests and remove the request to reduce the chance that a subsequent request
// will request extra containers. YARN does not have a delta request API and the requests are not
// cleaned up automatically.
// Try finding a match first with the host as the resource name then fall back to any resource match.
// See YARN-1902.
List<? extends Collection<AMRMClient.ContainerRequest>> matchingRequests = amrmClientAsync
.getMatchingRequests(container.getPriority(), container.getNodeHttpAddress(), container.getResource());
if (matchingRequests.isEmpty()) {
LOGGER.debug("Matching request by host {} not found", container.getNodeHttpAddress());
matchingRequests = amrmClientAsync
.getMatchingRequests(container.getPriority(), ResourceRequest.ANY, container.getResource());
}
if (!matchingRequests.isEmpty()) {
AMRMClient.ContainerRequest firstMatchingContainerRequest = matchingRequests.get(0).iterator().next();
LOGGER.debug("Found matching requests {}, removing first matching request {}",
matchingRequests, firstMatchingContainerRequest);
amrmClientAsync.removeContainerRequest(firstMatchingContainerRequest);
}
containerLaunchExecutor.submit(new Runnable() {
@Override
public void run() {
try {
LOGGER.info("Starting container " + containerId);
nmClientAsync.startContainerAsync(container, newContainerLaunchContext(containerInfo));
} catch (IOException ioe) {
LOGGER.error("Failed to start container " + containerId, ioe);
}
}
});
}
}
@Override
public void onShutdownRequest() {
if (eventSubmitter.isPresent()) {
eventSubmitter.get().submit(GobblinYarnEventConstants.EventNames.SHUTDOWN_REQUEST);
}
LOGGER.info("Received shutdown request from the ResourceManager");
this.done = true;
eventBus.post(new ClusterManagerShutdownRequest());
}
@Override
public void onNodesUpdated(List<NodeReport> updatedNodes) {
for (NodeReport nodeReport : updatedNodes) {
LOGGER.info("Received node update report: " + nodeReport);
}
}
@Override
public float getProgress() {
return this.done ? 1.0f : 0.0f;
}
@Override
public void onError(Throwable t) {
if (eventSubmitter.isPresent()) {
eventSubmitter.get().submit(GobblinYarnEventConstants.EventNames.ERROR,
GobblinYarnEventConstants.EventMetadata.ERROR_EXCEPTION, Throwables.getStackTraceAsString(t));
}
LOGGER.error("Received error: " + t, t);
this.done = true;
eventBus.post(new ClusterManagerShutdownRequest());
}
}
/**
* A custom implementation of {@link NMClientAsync.CallbackHandler}.
*/
class NMClientCallbackHandler implements NMClientAsync.CallbackHandler {
@Override
public void onContainerStarted(ContainerId containerId, Map<String, ByteBuffer> allServiceResponse) {
if (eventSubmitter.isPresent()) {
eventSubmitter.get().submit(GobblinYarnEventConstants.EventNames.CONTAINER_STARTED,
GobblinYarnMetricTagNames.CONTAINER_ID, containerId.toString());
}
LOGGER.info(String.format("Container %s has been started", containerId));
}
@Override
public void onContainerStatusReceived(ContainerId containerId, ContainerStatus containerStatus) {
if (eventSubmitter.isPresent()) {
eventSubmitter.get().submit(GobblinYarnEventConstants.EventNames.CONTAINER_STATUS_RECEIVED,
buildContainerStatusEventMetadata(containerStatus).build());
}
LOGGER.info(String.format("Received container status for container %s: %s", containerId, containerStatus));
}
@Override
public void onContainerStopped(ContainerId containerId) {
if (eventSubmitter.isPresent()) {
eventSubmitter.get().submit(GobblinYarnEventConstants.EventNames.CONTAINER_STOPPED,
GobblinYarnMetricTagNames.CONTAINER_ID, containerId.toString());
}
LOGGER.info(String.format("Container %s has been stopped", containerId));
if (containerMap.isEmpty()) {
synchronized (allContainersStopped) {
allContainersStopped.notify();
}
}
}
@Override
public void onStartContainerError(ContainerId containerId, Throwable t) {
if (eventSubmitter.isPresent()) {
eventSubmitter.get().submit(GobblinYarnEventConstants.EventNames.CONTAINER_START_ERROR,
GobblinYarnMetricTagNames.CONTAINER_ID, containerId.toString(),
GobblinYarnEventConstants.EventMetadata.ERROR_EXCEPTION, Throwables.getStackTraceAsString(t));
}
LOGGER.error(String.format("Failed to start container %s due to error %s", containerId, t));
}
@Override
public void onGetContainerStatusError(ContainerId containerId, Throwable t) {
if (eventSubmitter.isPresent()) {
eventSubmitter.get().submit(GobblinYarnEventConstants.EventNames.CONTAINER_GET_STATUS_ERROR,
GobblinYarnMetricTagNames.CONTAINER_ID, containerId.toString(),
GobblinYarnEventConstants.EventMetadata.ERROR_EXCEPTION, Throwables.getStackTraceAsString(t));
}
LOGGER.error(String.format("Failed to get status for container %s due to error %s", containerId, t));
}
@Override
public void onStopContainerError(ContainerId containerId, Throwable t) {
if (eventSubmitter.isPresent()) {
eventSubmitter.get().submit(GobblinYarnEventConstants.EventNames.CONTAINER_STOP_ERROR,
GobblinYarnMetricTagNames.CONTAINER_ID, containerId.toString(),
GobblinYarnEventConstants.EventMetadata.ERROR_EXCEPTION, Throwables.getStackTraceAsString(t));
}
LOGGER.error(String.format("Failed to stop container %s due to error %s", containerId, t));
}
}
//A class encapsulates Container instances, Helix participant IDs of the containers and Helix Tag
@AllArgsConstructor
@Getter
static class ContainerInfo {
private final Container container;
private final String helixParticipantId;
private final String helixTag;
}
}