blob: 8d0141014918ce307bf8002190fed859070b140d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.service;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.registry.client.api.RegistryOperations;
import org.apache.hadoop.registry.client.api.RegistryOperationsFactory;
import org.apache.hadoop.registry.client.binding.RegistryPathUtils;
import org.apache.hadoop.registry.client.binding.RegistryUtils;
import org.apache.hadoop.registry.client.types.ServiceRecord;
import org.apache.hadoop.registry.client.types.yarn.PersistencePolicies;
import org.apache.hadoop.registry.client.types.yarn.YarnRegistryAttributes;
import org.apache.hadoop.security.HadoopKerberosName;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.RejectedSchedulingRequest;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.UpdatedContainer;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.TimelineV2Client;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.service.api.ServiceApiConstants;
import org.apache.hadoop.yarn.service.api.records.Service;
import org.apache.hadoop.yarn.service.api.records.ServiceState;
import org.apache.hadoop.yarn.service.api.records.ConfigFile;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEvent;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType;
import org.apache.hadoop.yarn.service.component.Component;
import org.apache.hadoop.yarn.service.component.ComponentEvent;
import org.apache.hadoop.yarn.service.component.ComponentEventType;
import org.apache.hadoop.yarn.service.conf.YarnServiceConf;
import org.apache.hadoop.yarn.service.conf.YarnServiceConstants;
import org.apache.hadoop.yarn.service.containerlaunch.ContainerLaunchService;
import org.apache.hadoop.yarn.service.provider.ProviderUtils;
import org.apache.hadoop.yarn.service.registry.YarnRegistryViewForProviders;
import org.apache.hadoop.yarn.service.timelineservice.ServiceMetricsSink;
import org.apache.hadoop.yarn.service.timelineservice.ServiceTimelinePublisher;
import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
import org.apache.hadoop.yarn.service.utils.ServiceRegistryUtils;
import org.apache.hadoop.yarn.util.BoundedAppender;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.nio.ByteBuffer;
import java.text.MessageFormat;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import static org.apache.hadoop.fs.FileSystem.FS_DEFAULT_NAME_KEY;
import static org.apache.hadoop.registry.client.api.RegistryConstants.*;
import static org.apache.hadoop.yarn.api.records.ContainerExitStatus.KILLED_AFTER_APP_COMPLETION;
import static org.apache.hadoop.yarn.service.api.ServiceApiConstants.*;
import static org.apache.hadoop.yarn.service.component.ComponentEventType.*;
/**
*
*/
public class ServiceScheduler extends CompositeService {
private static final Logger LOG =
LoggerFactory.getLogger(ServiceScheduler.class);
private Service app;
// This encapsulates the <code>app</code> with methods to upgrade the app.
private ServiceManager serviceManager;
// component_name -> component
private final Map<String, Component> componentsByName =
new ConcurrentHashMap<>();
// id - > component
protected final Map<Long, Component> componentsById =
new ConcurrentHashMap<>();
private final Map<ContainerId, ComponentInstance> liveInstances =
new ConcurrentHashMap<>();
private ServiceMetrics serviceMetrics;
private ServiceTimelinePublisher serviceTimelinePublisher;
// Global diagnostics that will be reported to RM on eRxit.
// The unit the number of characters. This will be limited to 64 * 1024
// characters.
private BoundedAppender diagnostics = new BoundedAppender(64 * 1024);
// A cache for loading config files from remote such as hdfs
public LoadingCache<ConfigFile, Object> configFileCache = null;
public ScheduledExecutorService executorService;
public Map<String, String> globalTokens = new HashMap<>();
private AMRMClientAsync<AMRMClient.ContainerRequest> amRMClient;
private NMClientAsync nmClient;
private AsyncDispatcher dispatcher;
private YarnRegistryViewForProviders yarnRegistryOperations;
private ServiceContext context;
private ContainerLaunchService containerLaunchService;
private final Map<ContainerId, ComponentInstance> unRecoveredInstances =
new ConcurrentHashMap<>();
private long containerRecoveryTimeout;
// If even one component of a service uses placement constraints, then use
// placement scheduler to schedule containers for all components (including
// the ones with no constraints). Mixing of container requests and scheduling
// requests for a single service is not recommended.
private boolean hasAtLeastOnePlacementConstraint;
private boolean gracefulStop = false;
public ServiceScheduler(ServiceContext context) {
super(context.service.getName());
this.context = context;
}
public void buildInstance(ServiceContext context, Configuration configuration)
throws YarnException, IOException {
app = context.service;
executorService = Executors.newScheduledThreadPool(10);
RegistryOperations registryClient = null;
if (UserGroupInformation.isSecurityEnabled() &&
!StringUtils.isEmpty(context.principal)
&& !StringUtils.isEmpty(context.keytab)) {
Configuration conf = getConfig();
// Only take the first section of the principal
// e.g. hdfs-demo@EXAMPLE.COM will take hdfs-demo
// This is because somehow zookeeper client only uses the first section
// for acl validations.
String username = new HadoopKerberosName(context.principal.trim())
.getServiceName();
LOG.info("Set registry user accounts: sasl:" + username);
conf.set(KEY_REGISTRY_USER_ACCOUNTS, "sasl:" + username);
registryClient = RegistryOperationsFactory
.createKerberosInstance(conf,
"Client", context.principal, context.keytab);
} else {
registryClient = RegistryOperationsFactory
.createInstance("ServiceScheduler", configuration);
}
addIfService(registryClient);
yarnRegistryOperations =
createYarnRegistryOperations(context, registryClient);
// register metrics,
serviceMetrics = ServiceMetrics
.register(app.getName(), "Metrics for service");
serviceMetrics.tag("type", "Metrics type [component or service]", "service");
serviceMetrics.tag("appId", "Service id for service", app.getId());
amRMClient = createAMRMClient();
addIfService(amRMClient);
nmClient = createNMClient();
nmClient.getClient().cleanupRunningContainersOnStop(false);
addIfService(nmClient);
dispatcher = new AsyncDispatcher("Component dispatcher");
dispatcher.register(ServiceEventType.class, new ServiceEventHandler());
dispatcher.register(ComponentEventType.class,
new ComponentEventHandler());
dispatcher.register(ComponentInstanceEventType.class,
new ComponentInstanceEventHandler());
dispatcher.setDrainEventsOnStop();
addIfService(dispatcher);
containerLaunchService = new ContainerLaunchService(context);
addService(containerLaunchService);
if (YarnConfiguration.timelineServiceV2Enabled(configuration)) {
TimelineV2Client timelineClient = TimelineV2Client
.createTimelineClient(context.attemptId.getApplicationId());
amRMClient.registerTimelineV2Client(timelineClient);
serviceTimelinePublisher = new ServiceTimelinePublisher(timelineClient);
addService(serviceTimelinePublisher);
DefaultMetricsSystem.instance().register("ServiceMetricsSink",
"For processing metrics to ATS",
new ServiceMetricsSink(serviceTimelinePublisher));
LOG.info("Timeline v2 is enabled.");
}
initGlobalTokensForSubstitute(context);
//substitute quicklinks
ProviderUtils.substituteMapWithTokens(app.getQuicklinks(), globalTokens);
createConfigFileCache(context.fs.getFileSystem());
createAllComponents();
containerRecoveryTimeout = YarnServiceConf.getInt(
YarnServiceConf.CONTAINER_RECOVERY_TIMEOUT_MS,
YarnServiceConf.DEFAULT_CONTAINER_RECOVERY_TIMEOUT_MS,
app.getConfiguration(), getConfig());
}
protected YarnRegistryViewForProviders createYarnRegistryOperations(
ServiceContext context, RegistryOperations registryClient) {
return new YarnRegistryViewForProviders(registryClient,
RegistryUtils.currentUser(), YarnServiceConstants.APP_TYPE, app.getName(),
context.attemptId);
}
protected NMClientAsync createNMClient() {
return NMClientAsync.createNMClientAsync(new NMClientCallback());
}
protected AMRMClientAsync<AMRMClient.ContainerRequest> createAMRMClient() {
return AMRMClientAsync
.createAMRMClientAsync(1000, new AMRMClientCallback());
}
protected void setGracefulStop() {
this.gracefulStop = true;
nmClient.getClient().cleanupRunningContainersOnStop(true);
}
@Override
public void serviceInit(Configuration conf) throws Exception {
try {
buildInstance(context, conf);
} catch (YarnException e) {
throw new YarnRuntimeException(e);
}
super.serviceInit(conf);
}
@Override
public void serviceStop() throws Exception {
LOG.info("Stopping service scheduler");
if (executorService != null) {
executorService.shutdownNow();
}
DefaultMetricsSystem.shutdown();
// only stop the entire service when a graceful stop has been initiated
// (e.g. via client RPC, not through the AM receiving a SIGTERM)
if (gracefulStop) {
if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) {
// mark component-instances/containers as STOPPED
for (ContainerId containerId : getLiveInstances().keySet()) {
serviceTimelinePublisher.componentInstanceFinished(containerId,
KILLED_AFTER_APP_COMPLETION, diagnostics.toString());
}
// mark attempt as unregistered
serviceTimelinePublisher
.serviceAttemptUnregistered(context, diagnostics.toString());
}
// unregister AM
amRMClient.unregisterApplicationMaster(FinalApplicationStatus.ENDED,
diagnostics.toString(), "");
LOG.info("Service {} unregistered with RM, with attemptId = {} " +
", diagnostics = {} ", app.getName(), context.attemptId, diagnostics);
}
super.serviceStop();
}
@Override
public void serviceStart() throws Exception {
super.serviceStart();
InetSocketAddress bindAddress = context.clientAMService.getBindAddress();
// When yarn.resourcemanager.placement-constraints.handler is set to
// placement-processor then constraints need to be added during
// registerApplicationMaster.
RegisterApplicationMasterResponse response = amRMClient
.registerApplicationMaster(bindAddress.getHostName(),
bindAddress.getPort(), "N/A");
// Update internal resource types according to response.
if (response.getResourceTypes() != null) {
ResourceUtils.reinitializeResources(response.getResourceTypes());
}
if (response.getClientToAMTokenMasterKey() != null
&& response.getClientToAMTokenMasterKey().remaining() != 0) {
context.secretManager
.setMasterKey(response.getClientToAMTokenMasterKey().array());
}
registerServiceInstance(context.attemptId, app);
// Since AM has been started and registered, the service is in STARTED state
app.setState(ServiceState.STARTED);
serviceManager = new ServiceManager(context);
// recover components based on containers sent from RM
recoverComponents(response);
for (Component component : componentsById.values()) {
// Trigger initial evaluation of components
if (component.areDependenciesReady()) {
LOG.info("Triggering initial evaluation of component {}",
component.getName());
ComponentEvent event = new ComponentEvent(component.getName(), FLEX)
.setDesired(component.getComponentSpec().getNumberOfContainers());
component.handle(event);
}
}
}
private void recoverComponents(RegisterApplicationMasterResponse response) {
List<Container> containersFromPrevAttempt = response
.getContainersFromPreviousAttempts();
LOG.info("Received {} containers from previous attempt.",
containersFromPrevAttempt.size());
Map<String, ServiceRecord> existingRecords = new HashMap<>();
List<String> existingComps = null;
try {
existingComps = yarnRegistryOperations.listComponents();
LOG.info("Found {} containers from ZK registry: {}", existingComps.size(),
existingComps);
} catch (Exception e) {
LOG.info("Could not read component paths: {}", e.getMessage());
}
if (existingComps != null) {
for (String existingComp : existingComps) {
try {
ServiceRecord record =
yarnRegistryOperations.getComponent(existingComp);
existingRecords.put(existingComp, record);
} catch (Exception e) {
LOG.warn("Could not resolve record for component {}: {}",
existingComp, e);
}
}
}
for (Container container : containersFromPrevAttempt) {
LOG.info("Handling {} from previous attempt", container.getId());
ServiceRecord record = existingRecords.remove(RegistryPathUtils
.encodeYarnID(container.getId().toString()));
if (record != null) {
Component comp = componentsById.get(container.getAllocationRequestId());
ComponentEvent event =
new ComponentEvent(comp.getName(), CONTAINER_RECOVERED)
.setContainer(container)
.setInstance(comp.getComponentInstance(record.description));
comp.handle(event);
// do not remove requests in this case because we do not know if they
// have already been removed
} else {
LOG.info("Record not found in registry for container {} from previous" +
" attempt, releasing", container.getId());
amRMClient.releaseAssignedContainer(container.getId());
}
}
ApplicationId appId = ApplicationId.fromString(app.getId());
existingRecords.forEach((encodedContainerId, record) -> {
String componentName = record.get(YarnRegistryAttributes.YARN_COMPONENT);
if (componentName != null) {
Component component = componentsByName.get(componentName);
if (component != null) {
ComponentInstance compInstance = component.getComponentInstance(
record.description);
ContainerId containerId = ContainerId.fromString(record.get(
YarnRegistryAttributes.YARN_ID));
if (containerId.getApplicationAttemptId().getApplicationId()
.equals(appId)) {
unRecoveredInstances.put(containerId, compInstance);
component.removePendingInstance(compInstance);
}
}
}
});
if (unRecoveredInstances.size() > 0) {
executorService.schedule(() -> {
synchronized (unRecoveredInstances) {
// after containerRecoveryTimeout, all the containers that haven't be
// recovered by the RM will released. The corresponding Component
// Instances are added to the pending queues of their respective
// component.
unRecoveredInstances.forEach((containerId, instance) -> {
LOG.info("{}, wait on container {} expired",
instance.getCompInstanceId(), containerId);
instance.cleanupRegistryAndCompHdfsDir(containerId);
Component component = componentsByName.get(instance.getCompName());
component.requestContainers(1);
component.reInsertPendingInstance(instance);
amRMClient.releaseAssignedContainer(containerId);
});
unRecoveredInstances.clear();
}
}, containerRecoveryTimeout, TimeUnit.MILLISECONDS);
}
}
private void initGlobalTokensForSubstitute(ServiceContext context) {
// ZK
globalTokens.put(ServiceApiConstants.CLUSTER_ZK_QUORUM, getConfig()
.getTrimmed(KEY_REGISTRY_ZK_QUORUM, DEFAULT_REGISTRY_ZK_QUORUM));
String user = RegistryUtils.currentUser();
globalTokens.put(SERVICE_ZK_PATH,
ServiceRegistryUtils.mkServiceHomePath(user, app.getName()));
globalTokens.put(ServiceApiConstants.USER, user);
String dnsDomain = getConfig().getTrimmed(KEY_DNS_DOMAIN);
if (dnsDomain != null && !dnsDomain.isEmpty()) {
globalTokens.put(ServiceApiConstants.DOMAIN, dnsDomain);
}
// HDFS
String clusterFs = getConfig().getTrimmed(FS_DEFAULT_NAME_KEY);
if (clusterFs != null && !clusterFs.isEmpty()) {
globalTokens.put(ServiceApiConstants.CLUSTER_FS_URI, clusterFs);
globalTokens.put(ServiceApiConstants.CLUSTER_FS_HOST,
URI.create(clusterFs).getHost());
}
globalTokens.put(SERVICE_HDFS_DIR, context.serviceHdfsDir);
// service name
globalTokens.put(SERVICE_NAME_LC, app.getName().toLowerCase());
globalTokens.put(SERVICE_NAME, app.getName());
}
private void createConfigFileCache(final FileSystem fileSystem) {
this.configFileCache =
CacheBuilder.newBuilder().expireAfterAccess(10, TimeUnit.MINUTES)
.build(new CacheLoader<ConfigFile, Object>() {
@Override public Object load(ConfigFile key) throws Exception {
switch (key.getType()) {
case HADOOP_XML:
try (FSDataInputStream input = fileSystem
.open(new Path(key.getSrcFile()))) {
org.apache.hadoop.conf.Configuration confRead =
new org.apache.hadoop.conf.Configuration(false);
confRead.addResource(input);
Map<String, String> map = new HashMap<>(confRead.size());
for (Map.Entry<String, String> entry : confRead) {
map.put(entry.getKey(), entry.getValue());
}
return map;
}
case TEMPLATE:
try (FSDataInputStream fileInput = fileSystem
.open(new Path(key.getSrcFile()))) {
return IOUtils.toString(fileInput);
}
default:
return null;
}
}
});
context.configCache = configFileCache;
}
private void registerServiceInstance(ApplicationAttemptId attemptId,
Service service) throws IOException {
LOG.info("Registering " + attemptId + ", " + service.getName()
+ " into registry");
ServiceRecord serviceRecord = new ServiceRecord();
serviceRecord.set(YarnRegistryAttributes.YARN_ID,
attemptId.getApplicationId().toString());
serviceRecord.set(YarnRegistryAttributes.YARN_PERSISTENCE,
PersistencePolicies.APPLICATION);
serviceRecord.description = "YarnServiceMaster";
executorService.submit(new Runnable() {
@Override public void run() {
try {
yarnRegistryOperations.registerSelf(serviceRecord, false);
LOG.info("Registered service under {}; absolute path {}",
yarnRegistryOperations.getSelfRegistrationPath(),
yarnRegistryOperations.getAbsoluteSelfRegistrationPath());
boolean isFirstAttempt = 1 == attemptId.getAttemptId();
// delete the children in case there are any and this is an AM startup.
// just to make sure everything underneath is purged
if (isFirstAttempt) {
yarnRegistryOperations.deleteChildren(
yarnRegistryOperations.getSelfRegistrationPath(), true);
}
} catch (IOException e) {
LOG.error(
"Failed to register app " + app.getName() + " in registry", e);
}
}
});
if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) {
serviceTimelinePublisher.serviceAttemptRegistered(app, getConfig());
}
}
private void createAllComponents() {
long allocateId = 0;
// sort components by dependencies
Collection<org.apache.hadoop.yarn.service.api.records.Component> sortedComponents =
ServiceApiUtil.sortByDependencies(app.getComponents());
for (org.apache.hadoop.yarn.service.api.records.Component compSpec : sortedComponents) {
Component component = new Component(compSpec, allocateId, context);
componentsById.put(allocateId, component);
componentsByName.put(component.getName(), component);
allocateId++;
if (!hasAtLeastOnePlacementConstraint
&& compSpec.getPlacementPolicy() != null
&& compSpec.getPlacementPolicy().getConstraints() != null
&& !compSpec.getPlacementPolicy().getConstraints().isEmpty()) {
hasAtLeastOnePlacementConstraint = true;
}
}
}
private final class ServiceEventHandler
implements EventHandler<ServiceEvent> {
@Override
public void handle(ServiceEvent event) {
try {
serviceManager.handle(event);
} catch (Throwable t) {
LOG.error(MessageFormat
.format("[SERVICE]: Error in handling event type {0}",
event.getType()), t);
}
}
}
private final class ComponentEventHandler
implements EventHandler<ComponentEvent> {
@Override
public void handle(ComponentEvent event) {
Component component = componentsByName.get(event.getName());
if (component == null) {
LOG.error("No component exists for " + event.getName());
return;
}
try {
component.handle(event);
} catch (Throwable t) {
LOG.error(MessageFormat
.format("[COMPONENT {0}]: Error in handling event type {1}",
component.getName(), event.getType()), t);
}
}
}
private final class ComponentInstanceEventHandler
implements EventHandler<ComponentInstanceEvent> {
@Override
public void handle(ComponentInstanceEvent event) {
ComponentInstance instance =
liveInstances.get(event.getContainerId());
if (instance == null) {
LOG.error("No component instance exists for " + event.getContainerId());
return;
}
try {
instance.handle(event);
} catch (Throwable t) {
LOG.error(instance.getCompInstanceId() +
": Error in handling event type " + event.getType(), t);
}
}
}
class AMRMClientCallback extends AMRMClientAsync.AbstractCallbackHandler {
@Override
public void onContainersAllocated(List<Container> containers) {
LOG.info(containers.size() + " containers allocated. ");
for (Container container : containers) {
Component comp = componentsById.get(container.getAllocationRequestId());
ComponentEvent event =
new ComponentEvent(comp.getName(), CONTAINER_ALLOCATED)
.setContainer(container);
dispatcher.getEventHandler().handle(event);
try {
Collection<AMRMClient.ContainerRequest> requests = amRMClient
.getMatchingRequests(container.getAllocationRequestId());
LOG.info("[COMPONENT {}]: remove {} outstanding container requests " +
"for allocateId " + container.getAllocationRequestId(),
comp.getName(), requests.size());
// remove the corresponding request
if (requests.iterator().hasNext()) {
AMRMClient.ContainerRequest request = requests.iterator().next();
amRMClient.removeContainerRequest(request);
}
} catch(Exception e) {
//TODO Due to YARN-7490, exception may be thrown, catch and ignore for
//now.
LOG.error("Exception when removing the matching requests. ", e);
}
}
}
@Override
public void onContainersReceivedFromPreviousAttempts(
List<Container> containers) {
if (containers == null || containers.isEmpty()) {
return;
}
for (Container container : containers) {
ComponentInstance compInstance;
synchronized (unRecoveredInstances) {
compInstance = unRecoveredInstances.remove(container.getId());
}
if (compInstance != null) {
Component component = componentsById.get(
container.getAllocationRequestId());
ComponentEvent event = new ComponentEvent(component.getName(),
CONTAINER_RECOVERED)
.setInstance(compInstance)
.setContainerId(container.getId())
.setContainer(container);
component.handle(event);
} else {
LOG.info("Not waiting to recover container {}, releasing",
container.getId());
amRMClient.releaseAssignedContainer(container.getId());
}
}
}
@Override
public void onContainersCompleted(List<ContainerStatus> statuses) {
for (ContainerStatus status : statuses) {
ContainerId containerId = status.getContainerId();
ComponentInstance instance = liveInstances.get(status.getContainerId());
if (instance == null) {
LOG.warn(
"Container {} Completed. No component instance exists. exitStatus={}. diagnostics={} ",
containerId, status.getExitStatus(), status.getDiagnostics());
return;
}
ComponentEvent event =
new ComponentEvent(instance.getCompName(), CONTAINER_COMPLETED)
.setStatus(status).setInstance(instance);
dispatcher.getEventHandler().handle(event);
}
}
@Override
public void onContainersUpdated(List<UpdatedContainer> containers) {
}
@Override public void onShutdownRequest() {
//Was used for non-work-preserving restart in YARN, should be deprecated.
}
@Override public void onNodesUpdated(List<NodeReport> updatedNodes) {
StringBuilder str = new StringBuilder();
str.append("Nodes updated info: ").append(System.lineSeparator());
for (NodeReport report : updatedNodes) {
str.append(report.getNodeId()).append(", state = ")
.append(report.getNodeState()).append(", healthDiagnostics = ")
.append(report.getHealthReport()).append(System.lineSeparator());
}
LOG.warn(str.toString());
}
@Override public float getProgress() {
// get running containers over desired containers
long total = 0;
for (org.apache.hadoop.yarn.service.api.records.Component component : app
.getComponents()) {
total += component.getNumberOfContainers();
}
// Probably due to user flexed down to 0
if (total == 0) {
return 100;
}
return Math.max((float) liveInstances.size() / total * 100, 100);
}
@Override public void onError(Throwable e) {
LOG.error("Error in AMRMClient callback handler ", e);
}
@Override
public void onRequestsRejected(
List<RejectedSchedulingRequest> rejectedSchedulingRequests) {
LOG.error("Error in AMRMClient callback handler. Following scheduling "
+ "requests were rejected: {}", rejectedSchedulingRequests);
}
}
private class NMClientCallback extends NMClientAsync.AbstractCallbackHandler {
@Override public void onContainerStarted(ContainerId containerId,
Map<String, ByteBuffer> allServiceResponse) {
ComponentInstance instance = liveInstances.get(containerId);
if (instance == null) {
LOG.error("No component instance exists for " + containerId);
return;
}
ComponentEvent event =
new ComponentEvent(instance.getCompName(), CONTAINER_STARTED)
.setInstance(instance).setContainerId(containerId);
dispatcher.getEventHandler().handle(event);
}
@Override public void onContainerStatusReceived(ContainerId containerId,
ContainerStatus containerStatus) {
}
@Override public void onContainerStopped(ContainerId containerId) {
}
@Override
public void onStartContainerError(ContainerId containerId, Throwable t) {
ComponentInstance instance = liveInstances.get(containerId);
if (instance == null) {
LOG.error("No component instance exists for " + containerId);
return;
}
LOG.error("Failed to start " + containerId, t);
amRMClient.releaseAssignedContainer(containerId);
// After container released, it'll get CONTAINER_COMPLETED event from RM
// automatically which will trigger stopping COMPONENT INSTANCE
}
@Override public void onContainerResourceIncreased(ContainerId containerId,
Resource resource) {
}
@Override public void onContainerResourceUpdated(ContainerId containerId,
Resource resource) {
}
@Override public void onGetContainerStatusError(ContainerId containerId,
Throwable t) {
}
@Override
public void onIncreaseContainerResourceError(ContainerId containerId,
Throwable t) {
}
@Override
public void onUpdateContainerResourceError(ContainerId containerId,
Throwable t) {
}
@Override
public void onStopContainerError(ContainerId containerId, Throwable t) {
}
}
public ServiceMetrics getServiceMetrics() {
return serviceMetrics;
}
public AMRMClientAsync<AMRMClient.ContainerRequest> getAmRMClient() {
return amRMClient;
}
public NMClientAsync getNmClient() {
return nmClient;
}
public void addLiveCompInstance(ContainerId containerId,
ComponentInstance instance) {
liveInstances.put(containerId, instance);
}
public void removeLiveCompInstance(ContainerId containerId) {
liveInstances.remove(containerId);
}
public YarnRegistryViewForProviders getYarnRegistryOperations() {
return yarnRegistryOperations;
}
public ServiceTimelinePublisher getServiceTimelinePublisher() {
return serviceTimelinePublisher;
}
public Map<ContainerId, ComponentInstance> getLiveInstances() {
return liveInstances;
}
public ContainerLaunchService getContainerLaunchService() {
return containerLaunchService;
}
public ServiceContext getContext() {
return context;
}
public Map<String, Component> getAllComponents() {
return componentsByName;
}
public Service getApp() {
return app;
}
public AsyncDispatcher getDispatcher() {
return dispatcher;
}
public BoundedAppender getDiagnostics() {
return diagnostics;
}
public boolean hasAtLeastOnePlacementConstraint() {
return hasAtLeastOnePlacementConstraint;
}
}