blob: ef5290b40c4126b54e0f97745d711e322e409fa9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.slider.server.appmaster;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.health.HealthCheckRegistry;
import com.codahale.metrics.jvm.GarbageCollectorMetricSet;
import com.codahale.metrics.jvm.MemoryUsageGaugeSet;
import com.codahale.metrics.jvm.ThreadStatesGaugeSet;
import com.google.common.base.Preconditions;
import com.google.protobuf.BlockingService;
import org.apache.commons.collections.CollectionUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.registry.client.binding.RegistryTypeUtils;
import org.apache.hadoop.registry.client.binding.RegistryUtils;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.service.ServiceStateChangeListener;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.*;
import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.registry.client.api.RegistryOperations;
import org.apache.hadoop.registry.client.binding.RegistryPathUtils;
import org.apache.hadoop.registry.client.types.yarn.PersistencePolicies;
import org.apache.hadoop.registry.client.types.ServiceRecord;
import org.apache.hadoop.registry.client.types.yarn.YarnRegistryAttributes;
import org.apache.hadoop.registry.server.integration.RMRegistryOperationsService;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.security.client.ClientToAMTokenSecretManager;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.webapp.WebApps;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
import org.apache.slider.api.ClusterDescription;
import org.apache.slider.api.InternalKeys;
import org.apache.slider.api.ResourceKeys;
import org.apache.slider.api.RoleKeys;
import org.apache.slider.api.StatusKeys;
import org.apache.slider.api.proto.SliderClusterAPI;
import org.apache.slider.client.SliderYarnClientImpl;
import org.apache.slider.common.SliderExitCodes;
import org.apache.slider.common.SliderKeys;
import org.apache.slider.common.params.AbstractActionArgs;
import org.apache.slider.common.params.SliderAMArgs;
import org.apache.slider.common.params.SliderAMCreateAction;
import org.apache.slider.common.params.SliderActions;
import org.apache.slider.common.tools.ConfigHelper;
import org.apache.slider.common.tools.PortScanner;
import org.apache.slider.common.tools.SliderFileSystem;
import org.apache.slider.common.tools.SliderUtils;
import org.apache.slider.common.tools.SliderVersionInfo;
import org.apache.slider.core.build.InstanceIO;
import org.apache.slider.core.conf.AggregateConf;
import org.apache.slider.core.conf.ConfTree;
import org.apache.slider.core.conf.ConfTreeOperations;
import org.apache.slider.core.conf.MapOperations;
import org.apache.slider.core.exceptions.BadConfigException;
import org.apache.slider.core.exceptions.SliderException;
import org.apache.slider.core.exceptions.SliderInternalStateException;
import org.apache.slider.core.exceptions.TriggerClusterTeardownException;
import org.apache.slider.core.main.ExitCodeProvider;
import org.apache.slider.core.main.LauncherExitCodes;
import org.apache.slider.core.main.RunService;
import org.apache.slider.core.main.ServiceLauncher;
import org.apache.slider.core.registry.info.CustomRegistryConstants;
import org.apache.slider.providers.ProviderCompleted;
import org.apache.slider.providers.ProviderRole;
import org.apache.slider.providers.ProviderService;
import org.apache.slider.providers.SliderProviderFactory;
import org.apache.slider.providers.agent.AgentKeys;
import org.apache.slider.providers.agent.AgentProviderService;
import org.apache.slider.providers.slideram.SliderAMClientProvider;
import org.apache.slider.providers.slideram.SliderAMProviderService;
import org.apache.slider.server.appmaster.actions.ActionRegisterServiceInstance;
import org.apache.slider.server.appmaster.actions.EscalateOutstandingRequests;
import org.apache.slider.server.appmaster.actions.RegisterComponentInstance;
import org.apache.slider.server.appmaster.actions.QueueExecutor;
import org.apache.slider.server.appmaster.actions.QueueService;
import org.apache.slider.server.appmaster.actions.ActionStopSlider;
import org.apache.slider.server.appmaster.actions.ActionUpgradeContainers;
import org.apache.slider.server.appmaster.actions.AsyncAction;
import org.apache.slider.server.appmaster.actions.RenewingAction;
import org.apache.slider.server.appmaster.actions.ResetFailureWindow;
import org.apache.slider.server.appmaster.actions.ReviewAndFlexApplicationSize;
import org.apache.slider.server.appmaster.actions.UnregisterComponentInstance;
import org.apache.slider.server.appmaster.management.MetricsAndMonitoring;
import org.apache.slider.server.appmaster.management.YarnServiceHealthCheck;
import org.apache.slider.server.appmaster.monkey.ChaosKillAM;
import org.apache.slider.server.appmaster.monkey.ChaosKillContainer;
import org.apache.slider.server.appmaster.monkey.ChaosMonkeyService;
import org.apache.slider.server.appmaster.operations.AsyncRMOperationHandler;
import org.apache.slider.server.appmaster.operations.ProviderNotifyingOperationHandler;
import org.apache.slider.server.appmaster.rpc.RpcBinder;
import org.apache.slider.server.appmaster.rpc.SliderAMPolicyProvider;
import org.apache.slider.server.appmaster.rpc.SliderClusterProtocolPBImpl;
import org.apache.slider.server.appmaster.operations.AbstractRMOperation;
import org.apache.slider.server.appmaster.rpc.SliderIPCService;
import org.apache.slider.server.appmaster.security.SecurityConfiguration;
import org.apache.slider.server.appmaster.state.AppState;
import org.apache.slider.server.appmaster.state.AppStateBindingInfo;
import org.apache.slider.server.appmaster.state.ContainerAssignment;
import org.apache.slider.server.appmaster.state.ProviderAppState;
import org.apache.slider.server.appmaster.operations.RMOperationHandler;
import org.apache.slider.server.appmaster.state.RoleInstance;
import org.apache.slider.server.appmaster.web.AgentService;
import org.apache.slider.server.appmaster.web.rest.InsecureAmFilterInitializer;
import org.apache.slider.server.appmaster.web.rest.agent.AgentWebApp;
import org.apache.slider.server.appmaster.web.SliderAMWebApp;
import org.apache.slider.server.appmaster.web.WebAppApi;
import org.apache.slider.server.appmaster.web.WebAppApiImpl;
import org.apache.slider.server.appmaster.web.rest.RestPaths;
import org.apache.slider.server.appmaster.web.rest.application.ApplicationResouceContentCacheFactory;
import org.apache.slider.server.appmaster.web.rest.application.resources.ContentCache;
import org.apache.slider.server.services.security.CertificateManager;
import org.apache.slider.server.services.utility.AbstractSliderLaunchedService;
import org.apache.slider.server.services.utility.WebAppService;
import org.apache.slider.server.services.workflow.ServiceThreadFactory;
import org.apache.slider.server.services.workflow.WorkflowExecutorService;
import org.apache.slider.server.services.workflow.WorkflowRpcService;
import org.apache.slider.server.services.yarnregistry.YarnRegistryViewForProviders;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.net.BindException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URL;
import java.net.URLClassLoader;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
* This is the AM, which directly implements the callbacks from the AM and NM
*/
public class SliderAppMaster extends AbstractSliderLaunchedService
implements AMRMClientAsync.CallbackHandler,
NMClientAsync.CallbackHandler,
RunService,
SliderExitCodes,
SliderKeys,
ServiceStateChangeListener,
RoleKeys,
ProviderCompleted,
AppMasterActionOperations {
protected static final Logger log =
LoggerFactory.getLogger(SliderAppMaster.class);
/**
* log for YARN events
*/
protected static final Logger LOG_YARN = log;
public static final String SERVICE_CLASSNAME_SHORT = "SliderAppMaster";
public static final String SERVICE_CLASSNAME =
"org.apache.slider.server.appmaster." + SERVICE_CLASSNAME_SHORT;
public static final int HEARTBEAT_INTERVAL = 1000;
public static final int NUM_RPC_HANDLERS = 5;
/**
* Metrics and monitoring services.
* Deployed in {@link #serviceInit(Configuration)}
*/
private final MetricsAndMonitoring metricsAndMonitoring = new MetricsAndMonitoring();
/**
* metrics registry
*/
public MetricRegistry metrics;
/** Error string on chaos monkey launch failure action: {@value} */
public static final String E_TRIGGERED_LAUNCH_FAILURE =
"Chaos monkey triggered launch failure";
/** YARN RPC to communicate with the Resource Manager or Node Manager */
private YarnRPC yarnRPC;
/** Handle to communicate with the Resource Manager*/
@SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
private AMRMClientAsync asyncRMClient;
@SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
private RMOperationHandler rmOperationHandler;
private RMOperationHandler providerRMOperationHandler;
/** Handle to communicate with the Node Manager*/
@SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
public NMClientAsync nmClientAsync;
/**
* token blob
*/
private Credentials containerCredentials;
/**
* Slider IPC: Real service handler
*/
private SliderIPCService sliderIPCService;
/**
* Slider IPC: binding
*/
private WorkflowRpcService rpcService;
/**
* Secret manager
*/
@SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
private ClientToAMTokenSecretManager secretManager;
/** Hostname of the container*/
private String appMasterHostname = "";
/* Port on which the app master listens for status updates from clients*/
private int appMasterRpcPort = 0;
/** Tracking url to which app master publishes info for clients to monitor*/
@SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
private String appMasterTrackingUrl = "";
/** Application Attempt Id ( combination of attemptId and fail count )*/
private ApplicationAttemptId appAttemptID;
/**
* Security info client to AM key returned after registration
*/
private ByteBuffer clientToAMKey;
/**
* App ACLs
*/
protected Map<ApplicationAccessType, String> applicationACLs;
/**
* Ongoing state of the cluster: containers, nodes they
* live on, etc.
*/
private final AppState appState =
new AppState(new ProtobufClusterServices(), metricsAndMonitoring);
/**
* App state for external objects. This is almost entirely
* a read-only view of the application state. To change the state,
* Providers (or anything else) are expected to queue async changes.
*/
private final ProviderAppState stateForProviders =
new ProviderAppState("undefined", appState);
/**
* model the state using locks and conditions
*/
private final ReentrantLock AMExecutionStateLock = new ReentrantLock();
private final Condition isAMCompleted = AMExecutionStateLock.newCondition();
/**
* Flag set if the AM is to be shutdown
*/
private final AtomicBoolean amCompletionFlag = new AtomicBoolean(false);
/**
* Flag set during the init process
*/
private final AtomicBoolean initCompleted = new AtomicBoolean(false);
/**
* Flag to set if the process exit code was set before shutdown started
*/
@SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
private boolean spawnedProcessExitedBeforeShutdownTriggered;
/** Arguments passed in : raw*/
@SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
private SliderAMArgs serviceArgs;
/**
* ID of the AM container
*/
@SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
private ContainerId appMasterContainerID;
/**
* Monkey Service -may be null
*/
private ChaosMonkeyService monkey;
/**
* ProviderService of this cluster
*/
@SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
private ProviderService providerService;
/**
* The YARN registry service
*/
@SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
private RegistryOperations registryOperations;
/**
* The stop request received...the exit details are extracted
* from this
*/
private volatile ActionStopSlider stopAction;
@SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
private RoleLaunchService launchService;
//username -null if it is not known/not to be set
@SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
private String hadoop_user_name;
private String service_user_name;
private SliderAMWebApp webApp;
@SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
private InetSocketAddress rpcServiceAddress;
private SliderAMProviderService sliderAMProvider;
private CertificateManager certificateManager;
/**
* Executor.
* Assigned in {@link #serviceInit(Configuration)}
*/
private WorkflowExecutorService<ExecutorService> executorService;
/**
* Action queues. Created at instance creation, but
* added as a child and inited in {@link #serviceInit(Configuration)}
*/
private final QueueService actionQueues = new QueueService();
private String agentOpsUrl;
private String agentStatusUrl;
private YarnRegistryViewForProviders yarnRegistryOperations;
//private FsDelegationTokenManager fsDelegationTokenManager;
private RegisterApplicationMasterResponse amRegistrationData;
private PortScanner portScanner;
private SecurityConfiguration securityConfiguration;
/**
* The port for the web application
*/
private int webAppPort;
/**
* Is security enabled?
* Set early on in the {@link #createAndRunCluster(String)} operation.
*/
private boolean securityEnabled;
private ContentCache contentCache;
private SliderYarnClientImpl yarnClient;
/**
* resource limits
*/
private Resource maximumResourceCapability;
/**
* Service Constructor
*/
public SliderAppMaster() {
super(SERVICE_CLASSNAME_SHORT);
new HdfsConfiguration();
new YarnConfiguration();
}
/* =================================================================== */
/* service lifecycle methods */
/* =================================================================== */
@Override //AbstractService
public synchronized void serviceInit(Configuration conf) throws Exception {
// slider client if found
Configuration customConf = SliderUtils.loadSliderClientXML();
// Load in the server configuration - if it is actually on the Classpath
URL serverXmlUrl = ConfigHelper.getResourceUrl(SLIDER_SERVER_XML);
if (serverXmlUrl != null) {
log.info("Loading {} at {}", SLIDER_SERVER_XML, serverXmlUrl);
Configuration serverConf = ConfigHelper.loadFromResource(SLIDER_SERVER_XML);
ConfigHelper.mergeConfigurations(customConf, serverConf,
SLIDER_SERVER_XML, true);
}
serviceArgs.applyDefinitions(customConf);
serviceArgs.applyFileSystemBinding(customConf);
// conf now contains all customizations
AbstractActionArgs action = serviceArgs.getCoreAction();
SliderAMCreateAction createAction = (SliderAMCreateAction) action;
// sort out the location of the AM
String rmAddress = createAction.getRmAddress();
if (rmAddress != null) {
log.debug("Setting RM address from the command line: {}", rmAddress);
SliderUtils.setRmSchedulerAddress(customConf, rmAddress);
}
log.info("AM configuration:\n{}",
ConfigHelper.dumpConfigToString(customConf));
for (Map.Entry<String, String> envs : System.getenv().entrySet()) {
log.info("System env {}={}", envs.getKey(), envs.getValue());
}
ConfigHelper.mergeConfigurations(conf, customConf, SLIDER_CLIENT_XML, true);
//init security with our conf
if (SliderUtils.isHadoopClusterSecure(conf)) {
log.info("Secure mode with kerberos realm {}",
SliderUtils.getKerberosRealm());
UserGroupInformation.setConfiguration(conf);
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
log.debug("Authenticating as {}", ugi);
SliderUtils.verifyPrincipalSet(conf, DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY);
} else {
log.info("Cluster is insecure");
}
log.info("Login user is {}", UserGroupInformation.getLoginUser());
//look at settings of Hadoop Auth, to pick up a problem seen once
checkAndWarnForAuthTokenProblems();
// validate server env
boolean dependencyChecks =
!conf.getBoolean(KEY_SLIDER_AM_DEPENDENCY_CHECKS_DISABLED,
false);
SliderUtils.validateSliderServerEnvironment(log, dependencyChecks);
// create and register monitoring services
addService(metricsAndMonitoring);
metrics = metricsAndMonitoring.getMetrics();
/* TODO: turn these one once the metrics testing is more under control
metrics.registerAll(new ThreadStatesGaugeSet());
metrics.registerAll(new MemoryUsageGaugeSet());
metrics.registerAll(new GarbageCollectorMetricSet());
*/
contentCache = ApplicationResouceContentCacheFactory.createContentCache(stateForProviders);
executorService = new WorkflowExecutorService<>("AmExecutor",
Executors.newFixedThreadPool(2,
new ServiceThreadFactory("AmExecutor", true)));
addService(executorService);
addService(actionQueues);
//init all child services
super.serviceInit(conf);
}
@Override
protected void serviceStart() throws Exception {
super.serviceStart();
HealthCheckRegistry health = metricsAndMonitoring.getHealth();
health.register("AM Health", new YarnServiceHealthCheck(this));
}
/**
* Start the queue processing
*/
private void startQueueProcessing() {
log.info("Queue Processing started");
executorService.execute(actionQueues);
executorService.execute(new QueueExecutor(this, actionQueues));
}
/* =================================================================== */
/* RunService methods called from ServiceLauncher */
/* =================================================================== */
/**
* pick up the args from the service launcher
* @param config configuration
* @param args argument list
*/
@Override // RunService
public Configuration bindArgs(Configuration config, String... args) throws Exception {
// let the superclass process it
Configuration superConf = super.bindArgs(config, args);
// add the slider XML config
ConfigHelper.injectSliderXMLResource();
//yarn-ify
YarnConfiguration yarnConfiguration = new YarnConfiguration(
superConf);
serviceArgs = new SliderAMArgs(args);
serviceArgs.parse();
return SliderUtils.patchConfiguration(yarnConfiguration);
}
/**
* this is called by service launcher; when it returns the application finishes
* @return the exit code to return by the app
* @throws Throwable
*/
@Override
public int runService() throws Throwable {
SliderVersionInfo.loadAndPrintVersionInfo(log);
//dump the system properties if in debug mode
if (log.isDebugEnabled()) {
log.debug("System properties:\n" + SliderUtils.propertiesToString(System.getProperties()));
}
//choose the action
String action = serviceArgs.getAction();
List<String> actionArgs = serviceArgs.getActionArgs();
int exitCode;
switch (action) {
case SliderActions.ACTION_HELP:
log.info("{}: {}", getName(), serviceArgs.usage());
exitCode = SliderExitCodes.EXIT_USAGE;
break;
case SliderActions.ACTION_CREATE:
exitCode = createAndRunCluster(actionArgs.get(0));
break;
default:
throw new SliderException("Unimplemented: " + action);
}
log.info("Exiting AM; final exit code = {}", exitCode);
return exitCode;
}
/**
* Initialize a newly created service then add it.
* Because the service is not started, this MUST be done before
* the AM itself starts, or it is explicitly added after
* @param service the service to init
*/
public Service initAndAddService(Service service) {
service.init(getConfig());
addService(service);
return service;
}
/* =================================================================== */
/**
* Create and run the cluster.
* @param clustername cluster name
* @return exit code
* @throws Throwable on a failure
*/
private int createAndRunCluster(String clustername) throws Throwable {
//load the cluster description from the cd argument
String sliderClusterDir = serviceArgs.getSliderClusterURI();
URI sliderClusterURI = new URI(sliderClusterDir);
Path clusterDirPath = new Path(sliderClusterURI);
log.info("Application defined at {}", sliderClusterURI);
SliderFileSystem fs = getClusterFS();
// build up information about the running application -this
// will be passed down to the cluster status
MapOperations appInformation = new MapOperations();
AggregateConf instanceDefinition =
InstanceIO.loadInstanceDefinitionUnresolved(fs, clusterDirPath);
instanceDefinition.setName(clustername);
log.info("Deploying cluster {}:", instanceDefinition);
stateForProviders.setApplicationName(clustername);
Configuration serviceConf = getConfig();
securityConfiguration = new SecurityConfiguration(serviceConf, instanceDefinition, clustername);
// obtain security state
securityEnabled = securityConfiguration.isSecurityEnabled();
// set the global security flag for the instance definition
instanceDefinition.getAppConfOperations().set(KEY_SECURITY_ENABLED, securityEnabled);
// triggers resolution and snapshotting for agent
appState.setInitialInstanceDefinition(instanceDefinition);
File confDir = getLocalConfDir();
if (!confDir.exists() || !confDir.isDirectory()) {
log.info("Conf dir {} does not exist.", confDir);
File parentFile = confDir.getParentFile();
log.info("Parent dir {}:\n{}", parentFile, SliderUtils.listDir(parentFile));
}
//get our provider
MapOperations globalInternalOptions = getGlobalInternalOptions();
String providerType = globalInternalOptions.getMandatoryOption(
InternalKeys.INTERNAL_PROVIDER_NAME);
log.info("Cluster provider type is {}", providerType);
SliderProviderFactory factory =
SliderProviderFactory.createSliderProviderFactory(providerType);
providerService = factory.createServerProvider();
// init the provider BUT DO NOT START IT YET
initAndAddService(providerService);
providerRMOperationHandler = new ProviderNotifyingOperationHandler(providerService);
// create a slider AM provider
sliderAMProvider = new SliderAMProviderService();
initAndAddService(sliderAMProvider);
InetSocketAddress rmSchedulerAddress = SliderUtils.getRmSchedulerAddress(serviceConf);
log.info("RM is at {}", rmSchedulerAddress);
yarnRPC = YarnRPC.create(serviceConf);
// set up the YARN client. This may require patching in the RM client-API address if it
// is (somehow) unset server-side. String clientRMaddr = serviceConf.get(YarnConfiguration.RM_ADDRESS);
InetSocketAddress clientRpcAddress = SliderUtils.getRmAddress(serviceConf);
if (!SliderUtils.isAddressDefined(clientRpcAddress)) {
// client addr is being unset. We can lift it from the other RM APIs
log.warn("Yarn RM address was unbound; attempting to fix up");
serviceConf.set(YarnConfiguration.RM_ADDRESS,
String.format("%s:%d", rmSchedulerAddress.getHostString(), clientRpcAddress.getPort() ));
}
initAndAddService(yarnClient = new SliderYarnClientImpl());
yarnClient.start();
/*
* Extract the container ID. This is then
* turned into an (incompete) container
*/
appMasterContainerID = ConverterUtils.toContainerId(
SliderUtils.mandatoryEnvVariable(ApplicationConstants.Environment.CONTAINER_ID.name()));
appAttemptID = appMasterContainerID.getApplicationAttemptId();
ApplicationId appid = appAttemptID.getApplicationId();
log.info("AM for ID {}", appid.getId());
appInformation.put(StatusKeys.INFO_AM_CONTAINER_ID, appMasterContainerID.toString());
appInformation.put(StatusKeys.INFO_AM_APP_ID, appid.toString());
appInformation.put(StatusKeys.INFO_AM_ATTEMPT_ID, appAttemptID.toString());
Map<String, String> envVars;
List<Container> liveContainers;
List<NodeReport> nodeReports = yarnClient.getNodeReports(NodeState.RUNNING);
log.info("Yarn node report count: {}", nodeReports.size());
/*
* It is critical this section is synchronized, to stop async AM events
* arriving while registering a restarting AM.
*/
synchronized (appState) {
int heartbeatInterval = HEARTBEAT_INTERVAL;
// add the RM client -this brings the callbacks in
asyncRMClient = AMRMClientAsync.createAMRMClientAsync(heartbeatInterval, this);
addService(asyncRMClient);
//now bring it up
deployChildService(asyncRMClient);
// nmclient relays callbacks back to this class
nmClientAsync = new NMClientAsyncImpl("nmclient", this);
deployChildService(nmClientAsync);
// set up secret manager
secretManager = new ClientToAMTokenSecretManager(appAttemptID, null);
if (securityEnabled) {
// fix up the ACLs if they are not set
String acls = serviceConf.get(KEY_PROTOCOL_ACL);
if (acls == null) {
getConfig().set(KEY_PROTOCOL_ACL, "*");
}
}
certificateManager = new CertificateManager();
//bring up the Slider RPC service
buildPortScanner(instanceDefinition);
startSliderRPCServer(instanceDefinition);
rpcServiceAddress = rpcService.getConnectAddress();
appMasterHostname = rpcServiceAddress.getHostName();
appMasterRpcPort = rpcServiceAddress.getPort();
appMasterTrackingUrl = null;
log.info("AM Server is listening at {}:{}", appMasterHostname, appMasterRpcPort);
appInformation.put(StatusKeys.INFO_AM_HOSTNAME, appMasterHostname);
appInformation.set(StatusKeys.INFO_AM_RPC_PORT, appMasterRpcPort);
log.info("Starting Yarn registry");
registryOperations = startRegistryOperationsService();
log.info(registryOperations.toString());
//build the role map
List<ProviderRole> providerRoles = new ArrayList<>(providerService.getRoles());
providerRoles.addAll(SliderAMClientProvider.ROLES);
// Start up the WebApp and track the URL for it
MapOperations component = instanceDefinition.getAppConfOperations()
.getComponent(SliderKeys.COMPONENT_AM);
certificateManager.initialize(component, appMasterHostname,
appMasterContainerID.toString(),
clustername);
certificateManager.setPassphrase(instanceDefinition.getPassphrase());
if (component.getOptionBool(
AgentKeys.KEY_AGENT_TWO_WAY_SSL_ENABLED, false)) {
uploadServerCertForLocalization(clustername, fs);
}
webAppPort = getPortToRequest();
if (webAppPort == 0) {
// failure to find a port
throw new BadConfigException("Failed to fix a web application port");
}
String scheme = WebAppUtils.HTTP_PREFIX;
appMasterTrackingUrl = scheme + appMasterHostname + ":" + webAppPort;
appInformation.put(StatusKeys.INFO_AM_WEB_URL, appMasterTrackingUrl + "/");
appInformation.set(StatusKeys.INFO_AM_WEB_PORT, webAppPort);
// *****************************************************
// Register self with ResourceManager
// This will start heartbeating to the RM
// address = SliderUtils.getRmSchedulerAddress(asyncRMClient.getConfig());
// *****************************************************
log.info("Connecting to RM at {},address tracking URL={}",
appMasterRpcPort, appMasterTrackingUrl);
amRegistrationData = asyncRMClient.registerApplicationMaster(appMasterHostname,
appMasterRpcPort,
appMasterTrackingUrl);
maximumResourceCapability = amRegistrationData.getMaximumResourceCapability();
int minMemory = serviceConf.getInt(RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
// validate scheduler vcores allocation setting
int minCores = serviceConf.getInt(RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES);
int maxMemory = maximumResourceCapability.getMemory();
int maxCores = maximumResourceCapability.getVirtualCores();
appState.setContainerLimits(minMemory,maxMemory, minCores, maxCores );
// build the handler for RM request/release operations; this uses
// the max value as part of its lookup
rmOperationHandler = new AsyncRMOperationHandler(asyncRMClient, maximumResourceCapability);
// set the RM-defined maximum cluster values
appInformation.put(ResourceKeys.YARN_CORES, Integer.toString(maxCores));
appInformation.put(ResourceKeys.YARN_MEMORY, Integer.toString(maxMemory));
processAMCredentials(securityConfiguration);
if (securityEnabled) {
secretManager.setMasterKey(
amRegistrationData.getClientToAMTokenMasterKey().array());
applicationACLs = amRegistrationData.getApplicationACLs();
//tell the server what the ACLs are
rpcService.getServer().refreshServiceAcl(serviceConf,
new SliderAMPolicyProvider());
if (securityConfiguration.isKeytabProvided()) {
// perform keytab based login to establish kerberos authenticated
// principal. Can do so now since AM registration with RM above required
// tokens associated to principal
String principal = securityConfiguration.getPrincipal();
File localKeytabFile = securityConfiguration.getKeytabFile(instanceDefinition);
// Now log in...
login(principal, localKeytabFile);
// obtain new FS reference that should be kerberos based and different
// than the previously cached reference
fs = getClusterFS();
}
}
// extract container list
liveContainers = amRegistrationData.getContainersFromPreviousAttempts();
//now validate the installation
Configuration providerConf =
providerService.loadProviderConfigurationInformation(confDir);
providerService.initializeApplicationConfiguration(instanceDefinition, fs);
providerService.validateApplicationConfiguration(instanceDefinition,
confDir,
securityEnabled);
//determine the location for the role history data
Path historyDir = new Path(clusterDirPath, HISTORY_DIR_NAME);
//build the instance
AppStateBindingInfo binding = new AppStateBindingInfo();
binding.instanceDefinition = instanceDefinition;
binding.serviceConfig = serviceConf;
binding.publishedProviderConf = providerConf;
binding.roles = providerRoles;
binding.fs = fs.getFileSystem();
binding.historyPath = historyDir;
binding.liveContainers = liveContainers;
binding.applicationInfo = appInformation;
binding.releaseSelector = providerService.createContainerReleaseSelector();
binding.nodeReports = nodeReports;
appState.buildInstance(binding);
providerService.rebuildContainerDetails(liveContainers,
instanceDefinition.getName(), appState.getRolePriorityMap());
// add the AM to the list of nodes in the cluster
appState.buildAppMasterNode(appMasterContainerID,
appMasterHostname,
webAppPort,
appMasterHostname + ":" + webAppPort);
// build up environment variables that the AM wants set in every container
// irrespective of provider and role.
envVars = new HashMap<>();
if (hadoop_user_name != null) {
envVars.put(HADOOP_USER_NAME, hadoop_user_name);
}
}
String rolesTmpSubdir = appMasterContainerID.toString() + "/roles";
String amTmpDir = globalInternalOptions.getMandatoryOption(InternalKeys.INTERNAL_AM_TMP_DIR);
Path tmpDirPath = new Path(amTmpDir);
Path launcherTmpDirPath = new Path(tmpDirPath, rolesTmpSubdir);
fs.getFileSystem().mkdirs(launcherTmpDirPath);
//launcher service
launchService = new RoleLaunchService(actionQueues,
providerService,
fs,
new Path(getGeneratedConfDir()),
envVars,
launcherTmpDirPath);
deployChildService(launchService);
appState.noteAMLaunched();
//Give the provider access to the state, and AM
providerService.bind(stateForProviders, actionQueues, liveContainers);
sliderAMProvider.bind(stateForProviders, actionQueues, liveContainers);
// chaos monkey
maybeStartMonkey();
// setup token renewal and expiry handling for long lived apps
// if (!securityConfiguration.isKeytabProvided() &&
// SliderUtils.isHadoopClusterSecure(getConfig())) {
// fsDelegationTokenManager = new FsDelegationTokenManager(actionQueues);
// fsDelegationTokenManager.acquireDelegationToken(getConfig());
// }
// if not a secure cluster, extract the username -it will be
// propagated to workers
if (!UserGroupInformation.isSecurityEnabled()) {
hadoop_user_name = System.getenv(HADOOP_USER_NAME);
log.info(HADOOP_USER_NAME + "='{}'", hadoop_user_name);
}
service_user_name = RegistryUtils.currentUser();
log.info("Registry service username ={}", service_user_name);
// declare the cluster initialized
log.info("Application Master Initialization Completed");
initCompleted.set(true);
scheduleFailureWindowResets(instanceDefinition.getResources());
scheduleEscalation(instanceDefinition.getInternal());
try {
// Web service endpoints: initialize
WebAppApiImpl webAppApi =
new WebAppApiImpl(
stateForProviders,
providerService,
certificateManager,
registryOperations,
metricsAndMonitoring,
actionQueues,
this,
contentCache);
initAMFilterOptions(serviceConf);
// start the agent web app
startAgentWebApp(appInformation, serviceConf, webAppApi);
deployWebApplication(webAppPort, webAppApi);
// schedule YARN Registry registration
queue(new ActionRegisterServiceInstance(clustername, appid));
// log the YARN and web UIs
log.info("RM Webapp address {}",
serviceConf.get(YarnConfiguration.RM_WEBAPP_ADDRESS));
log.info("slider Webapp address {}", appMasterTrackingUrl);
// Start the Slider AM provider
sliderAMProvider.start();
// launch the real provider; this is expected to trigger a callback that
// starts the node review process
launchProviderService(instanceDefinition, confDir);
// start handling any scheduled events
startQueueProcessing();
//now block waiting to be told to exit the process
waitForAMCompletionSignal();
} catch(Exception e) {
log.error("Exception : {}", e, e);
onAMStop(new ActionStopSlider(e));
}
//shutdown time
return finish();
}
/**
* Deploy the web application.
* <p>
* Creates and starts the web application, and adds a
* <code>WebAppService</code> service under the AM, to ensure
* a managed web application shutdown.
* @param port port to deploy the web application on
* @param webAppApi web app API instance
*/
private void deployWebApplication(int port, WebAppApiImpl webAppApi) {
log.info("Creating and launching web application");
webApp = new SliderAMWebApp(webAppApi);
WebApps.$for(SliderAMWebApp.BASE_PATH,
WebAppApi.class,
webAppApi,
RestPaths.WS_CONTEXT)
.withHttpPolicy(getConfig(), HttpConfig.Policy.HTTP_ONLY)
.at(port)
.inDevMode()
.start(webApp);
WebAppService<SliderAMWebApp> webAppService =
new WebAppService<>("slider", webApp);
deployChildService(webAppService);
}
private void processAMCredentials(SecurityConfiguration securityConfiguration)
throws IOException {
// process the initial user to obtain the set of user
// supplied credentials (tokens were passed in by client). Remove AMRM
// token and HDFS delegation token, the latter because we will provide an
// up to date token for container launches (getContainerCredentials()).
UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
Credentials credentials = currentUser.getCredentials();
List<Text> filteredTokens = new ArrayList<>();
filteredTokens.add(AMRMTokenIdentifier.KIND_NAME);
boolean keytabProvided = securityConfiguration.isKeytabProvided();
log.info("Slider AM Security Mode: {}", keytabProvided ? "KEYTAB" : "TOKEN");
if (keytabProvided) {
filteredTokens.add(DelegationTokenIdentifier.HDFS_DELEGATION_KIND);
}
Iterator<Token<? extends TokenIdentifier>> iter =
credentials.getAllTokens().iterator();
while (iter.hasNext()) {
Token<? extends TokenIdentifier> token = iter.next();
log.info("Token {}", token.getKind());
if (filteredTokens.contains(token.getKind())) {
log.debug("Filtering token {} from AM tokens", token.getKind());
iter.remove();
}
}
// at this point this credentials map is probably clear, but leaving this
// code to allow for future tokens...
containerCredentials = credentials;
}
/**
* Build up the port scanner. This may include setting a port range.
*/
private void buildPortScanner(AggregateConf instanceDefinition) {
portScanner = new PortScanner();
String portRange = instanceDefinition.
getAppConfOperations().getGlobalOptions().
getOption(SliderKeys.KEY_ALLOWED_PORT_RANGE, "0");
if (!"0".equals(portRange)) {
portScanner.setPortRange(portRange);
}
}
/**
* Locate a port to request for a service such as RPC or web/REST.
* This uses port range definitions in the <code>instanceDefinition</code>
* to fix the port range —if one is set.
* <p>
* The port returned is available at the time of the request; there are
* no guarantees as to how long that situation will last.
* @return the port to request.
* @throws SliderException
*/
private int getPortToRequest()
throws SliderException {
return portScanner.getAvailablePort();
}
private void uploadServerCertForLocalization(String clustername,
SliderFileSystem fs)
throws IOException {
Path certsDir = fs.buildClusterSecurityDirPath(clustername);
if (!fs.getFileSystem().exists(certsDir)) {
fs.getFileSystem().mkdirs(certsDir,
new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE));
}
Path destPath = new Path(certsDir, SliderKeys.CRT_FILE_NAME);
if (!fs.getFileSystem().exists(destPath)) {
fs.getFileSystem().copyFromLocalFile(
new Path(CertificateManager.getServerCertficateFilePath().getAbsolutePath()),
destPath);
log.info("Uploaded server cert to localization path {}", destPath);
}
fs.getFileSystem().setPermission(destPath,
new FsPermission(FsAction.READ, FsAction.NONE, FsAction.NONE));
}
protected void login(String principal, File localKeytabFile)
throws IOException, SliderException {
log.info("Logging in as {} with keytab {}", principal, localKeytabFile);
UserGroupInformation.loginUserFromKeytab(principal,
localKeytabFile.getAbsolutePath());
validateLoginUser(UserGroupInformation.getLoginUser());
}
/**
* Ensure that the user is generated from a keytab and has no HDFS delegation
* tokens.
*
* @param user user to validate
* @throws SliderException
*/
protected void validateLoginUser(UserGroupInformation user)
throws SliderException {
if (!user.isFromKeytab()) {
log.error("User is not holding on a keytab in a secure deployment:" +
" slider will fail as tokens expire");
}
Credentials credentials =
user.getCredentials();
Iterator<Token<? extends TokenIdentifier>> iter =
credentials.getAllTokens().iterator();
while (iter.hasNext()) {
Token<? extends TokenIdentifier> token = iter.next();
log.info("Token {}", token.getKind());
if (token.getKind().equals(
DelegationTokenIdentifier.HDFS_DELEGATION_KIND)) {
log.info("HDFS delegation token {}. Removing...", token);
iter.remove();
}
}
}
/**
* Set up and start the agent web application
* @param appInformation application information
* @param serviceConf service configuration
* @param webAppApi web app API instance to bind to
* @throws IOException
*/
private void startAgentWebApp(MapOperations appInformation,
Configuration serviceConf, WebAppApiImpl webAppApi) throws IOException {
URL[] urls = ((URLClassLoader) AgentWebApp.class.getClassLoader() ).getURLs();
StringBuilder sb = new StringBuilder("AM classpath:");
for (URL url : urls) {
sb.append("\n").append(url.toString());
}
LOG_YARN.debug(sb.append("\n").toString());
initAMFilterOptions(serviceConf);
// Start up the agent web app and track the URL for it
MapOperations appMasterConfig = getInstanceDefinition()
.getAppConfOperations().getComponent(SliderKeys.COMPONENT_AM);
AgentWebApp agentWebApp = AgentWebApp.$for(AgentWebApp.BASE_PATH,
webAppApi,
RestPaths.AGENT_WS_CONTEXT)
.withComponentConfig(appMasterConfig)
.start();
agentOpsUrl =
"https://" + appMasterHostname + ":" + agentWebApp.getSecuredPort();
agentStatusUrl =
"https://" + appMasterHostname + ":" + agentWebApp.getPort();
AgentService agentService =
new AgentService("slider-agent", agentWebApp);
agentService.init(serviceConf);
agentService.start();
addService(agentService);
appInformation.put(StatusKeys.INFO_AM_AGENT_OPS_URL, agentOpsUrl + "/");
appInformation.put(StatusKeys.INFO_AM_AGENT_STATUS_URL, agentStatusUrl + "/");
appInformation.set(StatusKeys.INFO_AM_AGENT_STATUS_PORT,
agentWebApp.getPort());
appInformation.set(StatusKeys.INFO_AM_AGENT_OPS_PORT,
agentWebApp.getSecuredPort());
}
/**
* Set up the AM filter
* @param serviceConf configuration to patch
*/
private void initAMFilterOptions(Configuration serviceConf) {
// IP filtering
String amFilterName = AM_FILTER_NAME;
// This is here until YARN supports proxy & redirect operations
// on verbs other than GET, and is only supported for testing
if (X_DEV_INSECURE_REQUIRED && serviceConf.getBoolean(X_DEV_INSECURE_WS,
X_DEV_INSECURE_DEFAULT)) {
log.warn("Insecure filter enabled: REST operations are unauthenticated");
amFilterName = InsecureAmFilterInitializer.NAME;
}
serviceConf.set(HADOOP_HTTP_FILTER_INITIALIZERS, amFilterName);
}
/**
* This registers the service instance and its external values
* @param instanceName name of this instance
* @param appId application ID
* @throws IOException
*/
public void registerServiceInstance(String instanceName,
ApplicationId appId) throws IOException {
// the registry is running, so register services
URL amWebURI = new URL(appMasterTrackingUrl);
URL agentOpsURI = new URL(agentOpsUrl);
URL agentStatusURI = new URL(agentStatusUrl);
//Give the provider restricted access to the state, registry
setupInitialRegistryPaths();
yarnRegistryOperations = new YarnRegistryViewForProviders(
registryOperations,
service_user_name,
SliderKeys.APP_TYPE,
instanceName,
appAttemptID);
providerService.bindToYarnRegistry(yarnRegistryOperations);
sliderAMProvider.bindToYarnRegistry(yarnRegistryOperations);
// Yarn registry
ServiceRecord serviceRecord = new ServiceRecord();
serviceRecord.set(YarnRegistryAttributes.YARN_ID, appId.toString());
serviceRecord.set(YarnRegistryAttributes.YARN_PERSISTENCE,
PersistencePolicies.APPLICATION);
serviceRecord.description = "Slider Application Master";
serviceRecord.addExternalEndpoint(
RegistryTypeUtils.ipcEndpoint(
CustomRegistryConstants.AM_IPC_PROTOCOL,
rpcServiceAddress));
// internal services
sliderAMProvider.applyInitialRegistryDefinitions(amWebURI,
agentOpsURI,
agentStatusURI,
serviceRecord);
// provider service dynamic definitions.
providerService.applyInitialRegistryDefinitions(amWebURI,
agentOpsURI,
agentStatusURI,
serviceRecord);
// register the service's entry
log.info("Service Record \n{}", serviceRecord);
yarnRegistryOperations.registerSelf(serviceRecord, true);
log.info("Registered service under {}; absolute path {}",
yarnRegistryOperations.getSelfRegistrationPath(),
yarnRegistryOperations.getAbsoluteSelfRegistrationPath());
boolean isFirstAttempt = 1 == appAttemptID.getAttemptId();
// delete the children in case there are any and this is an AM startup.
// just to make sure everything underneath is purged
if (isFirstAttempt) {
yarnRegistryOperations.deleteChildren(
yarnRegistryOperations.getSelfRegistrationPath(),
true);
}
}
/**
* TODO: purge this once RM is doing the work
* @throws IOException
*/
protected void setupInitialRegistryPaths() throws IOException {
if (registryOperations instanceof RMRegistryOperationsService) {
RMRegistryOperationsService rmRegOperations =
(RMRegistryOperationsService) registryOperations;
rmRegOperations.initUserRegistryAsync(service_user_name);
}
}
/**
* Handler for {@link RegisterComponentInstance action}
* Register/re-register an ephemeral container that is already in the app state
* @param id the component
* @param description component description
* @return true if the component is registered
*/
public boolean registerComponent(ContainerId id, String description) throws
IOException {
RoleInstance instance = appState.getOwnedContainer(id);
if (instance == null) {
return false;
}
// this is where component registrations go
log.info("Registering component {}", id);
String cid = RegistryPathUtils.encodeYarnID(id.toString());
ServiceRecord container = new ServiceRecord();
container.set(YarnRegistryAttributes.YARN_ID, cid);
container.description = description;
container.set(YarnRegistryAttributes.YARN_PERSISTENCE,
PersistencePolicies.CONTAINER);
try {
yarnRegistryOperations.putComponent(cid, container);
} catch (IOException e) {
log.warn("Failed to register container {}/{}: {}",
id, description, e, e);
return false;
}
return true;
}
/**
* Handler for {@link UnregisterComponentInstance}
*
* unregister a component. At the time this message is received,
* the component may not have been registered
* @param id the component
*/
public void unregisterComponent(ContainerId id) {
log.info("Unregistering component {}", id);
if (yarnRegistryOperations == null) {
log.warn("Processing unregister component event before initialization " +
"completed; init flag ={}", initCompleted);
return;
}
String cid = RegistryPathUtils.encodeYarnID(id.toString());
try {
yarnRegistryOperations.deleteComponent(cid);
} catch (IOException e) {
log.warn("Failed to delete container {} : {}", id, e, e);
}
}
/**
* looks for a specific case where a token file is provided as an environment
* variable, yet the file is not there.
*
* This surfaced (once) in HBase, where its HDFS library was looking for this,
* and somehow the token was missing. This is a check in the AM so that
* if the problem re-occurs, the AM can fail with a more meaningful message.
*
*/
private void checkAndWarnForAuthTokenProblems() {
String fileLocation =
System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
if (fileLocation != null) {
File tokenFile = new File(fileLocation);
if (!tokenFile.exists()) {
log.warn("Token file {} specified in {} not found", tokenFile,
UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
}
}
}
/**
* Build the configuration directory passed in or of the target FS
* @return the file
*/
public File getLocalConfDir() {
File confdir =
new File(SliderKeys.PROPAGATED_CONF_DIR_NAME).getAbsoluteFile();
return confdir;
}
/**
* Get the path to the DFS configuration that is defined in the cluster specification
* @return the generated configuration dir
*/
public String getGeneratedConfDir() {
return getGlobalInternalOptions().get(
InternalKeys.INTERNAL_GENERATED_CONF_PATH);
}
/**
* Get the global internal options for the AM
* @return a map to access the internals
*/
public MapOperations getGlobalInternalOptions() {
return getInstanceDefinition()
.getInternalOperations().
getGlobalOptions();
}
/**
* Get the filesystem of this cluster
* @return the FS of the config
*/
public SliderFileSystem getClusterFS() throws IOException {
return new SliderFileSystem(getConfig());
}
/**
* Get the AM log
* @return the log of the AM
*/
public static Logger getLog() {
return log;
}
/**
* Get the application state
* @return the application state
*/
public AppState getAppState() {
return appState;
}
/**
* Block until it is signalled that the AM is done
*/
private void waitForAMCompletionSignal() {
AMExecutionStateLock.lock();
try {
if (!amCompletionFlag.get()) {
log.debug("blocking until signalled to terminate");
isAMCompleted.awaitUninterruptibly();
}
} finally {
AMExecutionStateLock.unlock();
}
}
/**
* Signal that the AM is complete .. queues it in a separate thread
*
* @param stopActionRequest request containing shutdown details
*/
public synchronized void signalAMComplete(ActionStopSlider stopActionRequest) {
// this is a queued action: schedule it through the queues
schedule(stopActionRequest);
}
/**
* Signal that the AM is complete
*
* @param stopActionRequest request containing shutdown details
*/
public synchronized void onAMStop(ActionStopSlider stopActionRequest) {
AMExecutionStateLock.lock();
try {
if (amCompletionFlag.compareAndSet(false, true)) {
// first stop request received
this.stopAction = stopActionRequest;
isAMCompleted.signal();
}
} finally {
AMExecutionStateLock.unlock();
}
}
/**
* trigger the YARN cluster termination process
* @return the exit code
*/
private synchronized int finish() {
Preconditions.checkNotNull(stopAction, "null stop action");
FinalApplicationStatus appStatus;
log.info("Triggering shutdown of the AM: {}", stopAction);
String appMessage = stopAction.getMessage();
//stop the daemon & grab its exit code
int exitCode = stopAction.getExitCode();
appStatus = stopAction.getFinalApplicationStatus();
if (!spawnedProcessExitedBeforeShutdownTriggered) {
//stopped the forked process but don't worry about its exit code
exitCode = stopForkedProcess();
log.debug("Stopped forked process: exit code={}", exitCode);
}
// make sure the AM is actually registered. If not, there's no point
// trying to unregister it
if (amRegistrationData == null) {
log.info("Application attempt not yet registered; skipping unregistration");
return exitCode;
}
//stop any launches in progress
launchService.stop();
//now release all containers
releaseAllContainers();
// When the application completes, it should send a finish application
// signal to the RM
log.info("Application completed. Signalling finish to RM");
try {
log.info("Unregistering AM status={} message={}", appStatus, appMessage);
asyncRMClient.unregisterApplicationMaster(appStatus, appMessage, null);
/* JDK7
} catch (YarnException | IOException e) {
log.info("Failed to unregister application: " + e, e);
}
*/
} catch (IOException e) {
log.info("Failed to unregister application: {}", e, e);
} catch (InvalidApplicationMasterRequestException e) {
log.info("Application not found in YARN application list;" +
" it may have been terminated/YARN shutdown in progress: {}", e, e);
} catch (YarnException e) {
log.info("Failed to unregister application: {}", e, e);
}
return exitCode;
}
/**
* Get diagnostics info about containers
*/
private String getContainerDiagnosticInfo() {
return appState.getContainerDiagnosticInfo();
}
public Object getProxy(Class protocol, InetSocketAddress addr) {
return yarnRPC.getProxy(protocol, addr, getConfig());
}
/**
* Start the slider RPC server
*/
private void startSliderRPCServer(AggregateConf instanceDefinition)
throws IOException, SliderException {
verifyIPCAccess();
sliderIPCService = new SliderIPCService(
this,
certificateManager,
stateForProviders,
actionQueues,
metricsAndMonitoring,
contentCache);
deployChildService(sliderIPCService);
SliderClusterProtocolPBImpl protobufRelay =
new SliderClusterProtocolPBImpl(sliderIPCService);
BlockingService blockingService = SliderClusterAPI.SliderClusterProtocolPB
.newReflectiveBlockingService(
protobufRelay);
int port = getPortToRequest();
InetSocketAddress rpcAddress = new InetSocketAddress("0.0.0.0", port);
rpcService =
new WorkflowRpcService("SliderRPC",
RpcBinder.createProtobufServer(rpcAddress, getConfig(),
secretManager,
NUM_RPC_HANDLERS,
blockingService,
null));
deployChildService(rpcService);
}
/**
* verify that if the cluster is authed, the ACLs are set.
* @throws BadConfigException if Authorization is set without any ACL
*/
private void verifyIPCAccess() throws BadConfigException {
boolean authorization = getConfig().getBoolean(
CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION,
false);
String acls = getConfig().get(KEY_PROTOCOL_ACL);
if (authorization && SliderUtils.isUnset(acls)) {
throw new BadConfigException("Application has IPC authorization enabled in " +
CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION +
" but no ACLs in " + KEY_PROTOCOL_ACL);
}
}
/* =================================================================== */
/* AMRMClientAsync callbacks */
/* =================================================================== */
/**
* Callback event when a container is allocated.
*
* The app state is updated with the allocation, and builds up a list
* of assignments and RM operations. The assignments are
* handed off into the pool of service launchers to asynchronously schedule
* container launch operations.
*
* The operations are run in sequence; they are expected to be 0 or more
* release operations (to handle over-allocations)
*
* @param allocatedContainers list of containers that are now ready to be
* given work.
*/
@SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
@Override //AMRMClientAsync
public void onContainersAllocated(List<Container> allocatedContainers) {
LOG_YARN.info("onContainersAllocated({})", allocatedContainers.size());
List<ContainerAssignment> assignments = new ArrayList<>();
List<AbstractRMOperation> operations = new ArrayList<>();
//app state makes all the decisions
appState.onContainersAllocated(allocatedContainers, assignments, operations);
//for each assignment: instantiate that role
for (ContainerAssignment assignment : assignments) {
launchService.launchRole(assignment, getInstanceDefinition());
}
//for all the operations, exec them
execute(operations);
log.info("Diagnostics: {}", getContainerDiagnosticInfo());
}
@Override //AMRMClientAsync
public synchronized void onContainersCompleted(List<ContainerStatus> completedContainers) {
LOG_YARN.info("onContainersCompleted([{}]", completedContainers.size());
for (ContainerStatus status : completedContainers) {
ContainerId containerId = status.getContainerId();
LOG_YARN.info("Container Completion for" +
" containerID={}," +
" state={}," +
" exitStatus={}," +
" diagnostics={}",
containerId, status.getState(),
status.getExitStatus(),
status.getDiagnostics());
// non complete containers should not be here
assert (status.getState() == ContainerState.COMPLETE);
AppState.NodeCompletionResult result = appState.onCompletedNode(status);
if (result.containerFailed) {
RoleInstance ri = result.roleInstance;
log.error("Role instance {} failed ", ri);
}
// known nodes trigger notifications
if(!result.unknownNode) {
getProviderService().notifyContainerCompleted(containerId);
queue(new UnregisterComponentInstance(containerId, 0,
TimeUnit.MILLISECONDS));
}
}
reviewRequestAndReleaseNodes("onContainersCompleted");
}
/**
* Signal that containers are being upgraded. Containers specified with
* --containers option and all containers of all roles specified with
* --components option are merged and upgraded.
*
* @param upgradeContainersRequest
* request containing upgrade details
*/
public synchronized void onUpgradeContainers(
ActionUpgradeContainers upgradeContainersRequest) throws IOException,
SliderException {
LOG_YARN.info("onUpgradeContainers({})",
upgradeContainersRequest.getMessage());
Set<String> containers = upgradeContainersRequest.getContainers() == null ? new HashSet<String>()
: upgradeContainersRequest.getContainers();
LOG_YARN.info(" Container list provided (total {}) : {}",
containers.size(), containers);
Set<String> components = upgradeContainersRequest.getComponents() == null ? new HashSet<String>()
: upgradeContainersRequest.getComponents();
LOG_YARN.info(" Component list provided (total {}) : {}",
components.size(), components);
// If components are specified as well, then grab all the containers of
// each of the components (roles)
if (CollectionUtils.isNotEmpty(components)) {
Map<ContainerId, RoleInstance> liveContainers = appState.getLiveContainers();
if (CollectionUtils.isNotEmpty(liveContainers.keySet())) {
Map<String, Set<String>> roleContainerMap = prepareRoleContainerMap(liveContainers);
for (String component : components) {
Set<String> roleContainers = roleContainerMap.get(component);
if (roleContainers != null) {
containers.addAll(roleContainers);
}
}
}
}
LOG_YARN.info("Final list of containers to be upgraded (total {}) : {}",
containers.size(), containers);
if (providerService instanceof AgentProviderService) {
AgentProviderService agentProviderService = (AgentProviderService) providerService;
agentProviderService.setInUpgradeMode(true);
agentProviderService.addUpgradeContainers(containers);
}
}
// create a reverse map of roles -> set of all live containers
private Map<String, Set<String>> prepareRoleContainerMap(
Map<ContainerId, RoleInstance> liveContainers) {
// liveContainers is ensured to be not empty
Map<String, Set<String>> roleContainerMap = new HashMap<>();
for (Map.Entry<ContainerId, RoleInstance> liveContainer : liveContainers
.entrySet()) {
RoleInstance role = liveContainer.getValue();
if (roleContainerMap.containsKey(role.role)) {
roleContainerMap.get(role.role).add(liveContainer.getKey().toString());
} else {
Set<String> containers = new HashSet<String>();
containers.add(liveContainer.getKey().toString());
roleContainerMap.put(role.role, containers);
}
}
return roleContainerMap;
}
/**
* Implementation of cluster flexing.
* It should be the only way that anything -even the AM itself on startup-
* asks for nodes.
* @param resources the resource tree
* @throws SliderException slider problems, including invalid configs
* @throws IOException IO problems
*/
public void flexCluster(ConfTree resources)
throws IOException, SliderException {
AggregateConf newConf =
new AggregateConf(appState.getInstanceDefinitionSnapshot());
newConf.setResources(resources);
// verify the new definition is valid
sliderAMProvider.validateInstanceDefinition(newConf);
providerService.validateInstanceDefinition(newConf);
appState.updateResourceDefinitions(resources);
// reset the scheduled windows...the values
// may have changed
appState.resetFailureCounts();
// ask for more containers if needed
reviewRequestAndReleaseNodes("flexCluster");
}
/**
* Schedule the failure window
* @param resources the resource tree
* @throws BadConfigException if the window is out of range
*/
private void scheduleFailureWindowResets(ConfTree resources) throws
BadConfigException {
ResetFailureWindow reset = new ResetFailureWindow();
ConfTreeOperations ops = new ConfTreeOperations(resources);
MapOperations globals = ops.getGlobalOptions();
long seconds = globals.getTimeRange(ResourceKeys.CONTAINER_FAILURE_WINDOW,
ResourceKeys.DEFAULT_CONTAINER_FAILURE_WINDOW_DAYS,
ResourceKeys.DEFAULT_CONTAINER_FAILURE_WINDOW_HOURS,
ResourceKeys.DEFAULT_CONTAINER_FAILURE_WINDOW_MINUTES, 0);
if (seconds > 0) {
log.info(
"Scheduling the failure window reset interval to every {} seconds",
seconds);
RenewingAction<ResetFailureWindow> renew = new RenewingAction<>(
reset, seconds, seconds, TimeUnit.SECONDS, 0);
actionQueues.renewing("failures", renew);
} else {
log.info("Failure window reset interval is not set");
}
}
/**
* Schedule the escalation action
* @param internal
* @throws BadConfigException
*/
private void scheduleEscalation(ConfTree internal) throws BadConfigException {
EscalateOutstandingRequests escalate = new EscalateOutstandingRequests();
ConfTreeOperations ops = new ConfTreeOperations(internal);
int seconds = ops.getGlobalOptions().getOptionInt(InternalKeys.ESCALATION_CHECK_INTERVAL,
InternalKeys.DEFAULT_ESCALATION_CHECK_INTERVAL);
RenewingAction<EscalateOutstandingRequests> renew = new RenewingAction<>(
escalate, seconds, seconds, TimeUnit.SECONDS, 0);
actionQueues.renewing("escalation", renew);
}
/**
* Look at where the current node state is -and whether it should be changed
* @param reason reason for operation
*/
private synchronized void reviewRequestAndReleaseNodes(String reason) {
log.debug("reviewRequestAndReleaseNodes({})", reason);
queue(new ReviewAndFlexApplicationSize(reason, 0, TimeUnit.SECONDS));
}
/**
* Handle the event requesting a review ... look at the queue and decide
* whether to act or not
* @param action action triggering the event. It may be put
* back into the queue
* @throws SliderInternalStateException
*/
public void handleReviewAndFlexApplicationSize(ReviewAndFlexApplicationSize action)
throws SliderInternalStateException {
if ( actionQueues.hasQueuedActionWithAttribute(
AsyncAction.ATTR_REVIEWS_APP_SIZE | AsyncAction.ATTR_HALTS_APP)) {
// this operation isn't needed at all -existing duplicate or shutdown due
return;
}
// if there is an action which changes cluster size, wait
if (actionQueues.hasQueuedActionWithAttribute(
AsyncAction.ATTR_CHANGES_APP_SIZE)) {
// place the action at the back of the queue
actionQueues.put(action);
}
executeNodeReview(action.name);
}
/**
* Look at where the current node state is -and whether it should be changed
*/
public synchronized void executeNodeReview(String reason)
throws SliderInternalStateException {
log.debug("in executeNodeReview({})", reason);
if (amCompletionFlag.get()) {
log.info("Ignoring node review operation: shutdown in progress");
}
try {
List<AbstractRMOperation> allOperations = appState.reviewRequestAndReleaseNodes();
// tell the provider
providerRMOperationHandler.execute(allOperations);
//now apply the operations
execute(allOperations);
} catch (TriggerClusterTeardownException e) {
//App state has decided that it is time to exit
log.error("Cluster teardown triggered {}", e, e);
queue(new ActionStopSlider(e));
}
}
/**
* Escalate operation as triggered by external timer.
* <p>
* Get the list of new operations off the AM, then executest them.
*/
public void escalateOutstandingRequests() {
List<AbstractRMOperation> operations = appState.escalateOutstandingRequests();
providerRMOperationHandler.execute(operations);
execute(operations);
}
/**
* Shutdown operation: release all containers
*/
private void releaseAllContainers() {
if (providerService instanceof AgentProviderService) {
log.info("Setting stopInitiated flag to true");
AgentProviderService agentProviderService = (AgentProviderService) providerService;
agentProviderService.setAppStopInitiated(true);
}
// Add the sleep here (before releasing containers) so that applications get
// time to perform graceful shutdown
try {
long timeout = getContainerReleaseTimeout();
if (timeout > 0) {
Thread.sleep(timeout);
}
} catch (InterruptedException e) {
log.info("Sleep for container release interrupted");
} finally {
List<AbstractRMOperation> operations = appState.releaseAllContainers();
providerRMOperationHandler.execute(operations);
// now apply the operations
execute(operations);
}
}
private long getContainerReleaseTimeout() {
// Get container release timeout in millis or 0 if the property is not set.
// If non-zero then add the agent heartbeat delay time, since it can take up
// to that much time for agents to receive the stop command.
int timeout = getInstanceDefinition().getAppConfOperations()
.getGlobalOptions()
.getOptionInt(SliderKeys.APP_CONTAINER_RELEASE_TIMEOUT, 0);
if (timeout > 0) {
timeout += SliderKeys.APP_CONTAINER_HEARTBEAT_INTERVAL_SEC;
}
// convert to millis
long timeoutInMillis = timeout * 1000l;
log.info("Container release timeout in millis = {}", timeoutInMillis);
return timeoutInMillis;
}
/**
* RM wants to shut down the AM
*/
@Override //AMRMClientAsync
public void onShutdownRequest() {
LOG_YARN.info("Shutdown Request received");
signalAMComplete(new ActionStopSlider("stop",
EXIT_SUCCESS,
FinalApplicationStatus.SUCCEEDED,
"Shutdown requested from RM"));
}
/**
* Monitored nodes have been changed
* @param updatedNodes list of updated nodes
*/
@Override //AMRMClientAsync
public void onNodesUpdated(List<NodeReport> updatedNodes) {
LOG_YARN.info("onNodesUpdated({})", updatedNodes.size());
log.info("Updated nodes {}", updatedNodes);
// Check if any nodes are lost or revived and update state accordingly
AppState.NodeUpdatedOutcome outcome = appState.onNodesUpdated(updatedNodes);
if (!outcome.operations.isEmpty()) {
execute(outcome.operations);
}
// trigger a review if the cluster changed
if (outcome.clusterChanged) {
reviewRequestAndReleaseNodes("nodes updated");
}
}
/**
* heartbeat operation; return the ratio of requested
* to actual
* @return progress
*/
@Override //AMRMClientAsync
public float getProgress() {
return appState.getApplicationProgressPercentage();
}
@Override //AMRMClientAsync
public void onError(Throwable e) {
//callback says it's time to finish
LOG_YARN.error("AMRMClientAsync.onError() received {}", e, e);
signalAMComplete(new ActionStopSlider("stop",
EXIT_EXCEPTION_THROWN,
FinalApplicationStatus.FAILED,
"AMRMClientAsync.onError() received " + e));
}
/* =================================================================== */
/* RMOperationHandlerActions */
/* =================================================================== */
@Override
public void execute(List<AbstractRMOperation> operations) {
rmOperationHandler.execute(operations);
}
@Override
public void releaseAssignedContainer(ContainerId containerId) {
rmOperationHandler.releaseAssignedContainer(containerId);
}
@Override
public void addContainerRequest(AMRMClient.ContainerRequest req) {
rmOperationHandler.addContainerRequest(req);
}
@Override
public int cancelContainerRequests(Priority priority1,
Priority priority2,
int count) {
return rmOperationHandler.cancelContainerRequests(priority1, priority2, count);
}
@Override
public void cancelSingleRequest(AMRMClient.ContainerRequest request) {
rmOperationHandler.cancelSingleRequest(request);
}
/* =================================================================== */
/* END */
/* =================================================================== */
/**
* Launch the provider service
*
* @param instanceDefinition definition of the service
* @param confDir directory of config data
* @throws IOException
* @throws SliderException
*/
protected synchronized void launchProviderService(AggregateConf instanceDefinition,
File confDir)
throws IOException, SliderException {
Map<String, String> env = new HashMap<>();
boolean execStarted = providerService.exec(instanceDefinition, confDir, env,
this);
if (execStarted) {
providerService.registerServiceListener(this);
providerService.start();
} else {
// didn't start, so don't register
providerService.start();
// and send the started event ourselves
eventCallbackEvent(null);
}
}
/* =================================================================== */
/* EventCallback from the child or ourselves directly */
/* =================================================================== */
@Override // ProviderCompleted
public void eventCallbackEvent(Object parameter) {
// signalled that the child process is up.
appState.noteAMLive();
// now ask for the cluster nodes
try {
flexCluster(getInstanceDefinition().getResources());
} catch (Exception e) {
// cluster flex failure: log
log.error("Failed to flex cluster nodes: {}", e, e);
// then what? exit
queue(new ActionStopSlider(e));
}
}
/**
* report container loss. If this isn't already known about, react
*
* @param containerId id of the container which has failed
* @throws SliderException
*/
public synchronized void providerLostContainer(
ContainerId containerId)
throws SliderException {
log.info("containerLostContactWithProvider: container {} lost",
containerId);
RoleInstance activeContainer = appState.getOwnedContainer(containerId);
if (activeContainer != null) {
execute(appState.releaseContainer(containerId));
// ask for more containers if needed
log.info("Container released; triggering review");
reviewRequestAndReleaseNodes("Loss of container");
} else {
log.info("Container not in active set - ignoring");
}
}
/* =================================================================== */
/* ServiceStateChangeListener */
/* =================================================================== */
/**
* Received on listening service termination.
* @param service the service that has changed.
*/
@Override //ServiceStateChangeListener
public void stateChanged(Service service) {
if (service == providerService && service.isInState(STATE.STOPPED)) {
//its the current master process in play
int exitCode = providerService.getExitCode();
int mappedProcessExitCode = exitCode;
boolean shouldTriggerFailure = !amCompletionFlag.get()
&& (mappedProcessExitCode != 0);
if (shouldTriggerFailure) {
String reason =
"Spawned process failed with raw " + exitCode + " mapped to " +
mappedProcessExitCode;
ActionStopSlider stop = new ActionStopSlider("stop",
mappedProcessExitCode,
FinalApplicationStatus.FAILED,
reason);
//this wasn't expected: the process finished early
spawnedProcessExitedBeforeShutdownTriggered = true;
log.info(
"Process has exited with exit code {} mapped to {} -triggering termination",
exitCode,
mappedProcessExitCode);
//tell the AM the cluster is complete
signalAMComplete(stop);
} else {
//we don't care
log.info(
"Process has exited with exit code {} mapped to {} -ignoring",
exitCode,
mappedProcessExitCode);
}
} else {
super.stateChanged(service);
}
}
/**
* stop forked process if it the running process var is not null
* @return the process exit code
*/
protected synchronized Integer stopForkedProcess() {
providerService.stop();
return providerService.getExitCode();
}
/**
* Async start container request
* @param container container
* @param ctx context
* @param instance node details
*/
public void startContainer(Container container,
ContainerLaunchContext ctx,
RoleInstance instance) throws IOException {
// Set up tokens for the container too. Today, for normal shell commands,
// the container in distribute-shell doesn't need any tokens. We are
// populating them mainly for NodeManagers to be able to download any
// files in the distributed file-system. The tokens are otherwise also
// useful in cases, for e.g., when one is running a "hadoop dfs" command
// inside the distributed shell.
// add current HDFS delegation token with an up to date token
ByteBuffer tokens = getContainerCredentials();
if (tokens != null) {
ctx.setTokens(tokens);
} else {
log.warn("No delegation tokens obtained and set for launch context");
}
appState.containerStartSubmitted(container, instance);
nmClientAsync.startContainerAsync(container, ctx);
}
private ByteBuffer getContainerCredentials() throws IOException {
// a delegation token can be retrieved from filesystem since
// the login is via a keytab (see above)
Credentials credentials = new Credentials(containerCredentials);
ByteBuffer tokens = null;
if (securityConfiguration.isKeytabProvided()) {
Token<? extends TokenIdentifier>[] hdfsTokens =
getClusterFS().getFileSystem().addDelegationTokens(
UserGroupInformation.getLoginUser().getShortUserName(),
credentials);
}
if (!credentials.getAllTokens().isEmpty()) {
DataOutputBuffer dob = new DataOutputBuffer();
credentials.writeTokenStorageToStream(dob);
dob.close();
tokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
}
return tokens;
}
@Override // NMClientAsync.CallbackHandler
public void onContainerStopped(ContainerId containerId) {
// do nothing but log: container events from the AM
// are the source of container halt details to react to
log.info("onContainerStopped {} ", containerId);
}
@Override // NMClientAsync.CallbackHandler
public void onContainerStarted(ContainerId containerId,
Map<String, ByteBuffer> allServiceResponse) {
LOG_YARN.info("Started Container {} ", containerId);
RoleInstance cinfo = appState.onNodeManagerContainerStarted(containerId);
if (cinfo != null) {
LOG_YARN.info("Deployed instance of role {} onto {}",
cinfo.role, containerId);
//trigger an async container status
nmClientAsync.getContainerStatusAsync(containerId,
cinfo.container.getNodeId());
// push out a registration
queue(new RegisterComponentInstance(containerId, cinfo.role,
0, TimeUnit.MILLISECONDS));
} else {
//this is a hypothetical path not seen. We react by warning
log.error("Notified of started container that isn't pending {} - releasing",
containerId);
//then release it
asyncRMClient.releaseAssignedContainer(containerId);
}
}
@Override // NMClientAsync.CallbackHandler
public void onStartContainerError(ContainerId containerId, Throwable t) {
LOG_YARN.error("Failed to start Container {}", containerId, t);
appState.onNodeManagerContainerStartFailed(containerId, t);
}
@Override // NMClientAsync.CallbackHandler
public void onContainerStatusReceived(ContainerId containerId,
ContainerStatus containerStatus) {
LOG_YARN.debug("Container Status: id={}, status={}", containerId,
containerStatus);
}
@Override // NMClientAsync.CallbackHandler
public void onGetContainerStatusError(
ContainerId containerId, Throwable t) {
LOG_YARN.error("Failed to query the status of Container {}", containerId);
}
@Override // NMClientAsync.CallbackHandler
public void onStopContainerError(ContainerId containerId, Throwable t) {
LOG_YARN.warn("Failed to stop Container {}", containerId);
}
public AggregateConf getInstanceDefinition() {
return appState.getInstanceDefinition();
}
/**
* This is the status, the live model
*/
public ClusterDescription getClusterDescription() {
return appState.getClusterStatus();
}
public ProviderService getProviderService() {
return providerService;
}
/**
* Queue an action for immediate execution in the executor thread
* @param action action to execute
*/
public void queue(AsyncAction action) {
actionQueues.put(action);
}
/**
* Schedule an action
* @param action for delayed execution
*/
public void schedule(AsyncAction action) {
actionQueues.schedule(action);
}
/**
* Handle any exception in a thread. If the exception provides an exit
* code, that is the one that will be used
* @param thread thread throwing the exception
* @param exception exception
*/
public void onExceptionInThread(Thread thread, Exception exception) {
log.error("Exception in {}: {}", thread.getName(), exception, exception);
// if there is a teardown in progress, ignore it
if (amCompletionFlag.get()) {
log.info("Ignoring exception: shutdown in progress");
} else {
int exitCode = EXIT_EXCEPTION_THROWN;
if (exception instanceof ExitCodeProvider) {
exitCode = ((ExitCodeProvider) exception).getExitCode();
}
signalAMComplete(new ActionStopSlider("stop",
exitCode,
FinalApplicationStatus.FAILED,
exception.toString()));
}
}
/**
* Start the chaos monkey
* @return true if it started
*/
private boolean maybeStartMonkey() {
MapOperations internals = getGlobalInternalOptions();
Boolean enabled =
internals.getOptionBool(InternalKeys.CHAOS_MONKEY_ENABLED,
InternalKeys.DEFAULT_CHAOS_MONKEY_ENABLED);
if (!enabled) {
log.debug("Chaos monkey disabled");
return false;
}
long monkeyInterval = internals.getTimeRange(
InternalKeys.CHAOS_MONKEY_INTERVAL,
InternalKeys.DEFAULT_CHAOS_MONKEY_INTERVAL_DAYS,
InternalKeys.DEFAULT_CHAOS_MONKEY_INTERVAL_HOURS,
InternalKeys.DEFAULT_CHAOS_MONKEY_INTERVAL_MINUTES,
0);
if (monkeyInterval == 0) {
log.debug(
"Chaos monkey not configured with a time interval...not enabling");
return false;
}
long monkeyDelay = internals.getTimeRange(
InternalKeys.CHAOS_MONKEY_DELAY,
0,
0,
0,
(int)monkeyInterval);
log.info("Adding Chaos Monkey scheduled every {} seconds ({} hours -delay {}",
monkeyInterval, monkeyInterval/(60*60), monkeyDelay);
monkey = new ChaosMonkeyService(metrics, actionQueues);
initAndAddService(monkey);
// configure the targets
// launch failure: special case with explicit failure triggered now
int amLaunchFailProbability = internals.getOptionInt(
InternalKeys.CHAOS_MONKEY_PROBABILITY_AM_LAUNCH_FAILURE,
0);
if (amLaunchFailProbability> 0 && monkey.chaosCheck(amLaunchFailProbability)) {
log.info("Chaos Monkey has triggered AM Launch failure");
// trigger a failure
ActionStopSlider stop = new ActionStopSlider("stop",
0, TimeUnit.SECONDS,
LauncherExitCodes.EXIT_FALSE,
FinalApplicationStatus.FAILED,
E_TRIGGERED_LAUNCH_FAILURE);
queue(stop);
}
int amKillProbability = internals.getOptionInt(
InternalKeys.CHAOS_MONKEY_PROBABILITY_AM_FAILURE,
InternalKeys.DEFAULT_CHAOS_MONKEY_PROBABILITY_AM_FAILURE);
monkey.addTarget("AM killer",
new ChaosKillAM(actionQueues, -1), amKillProbability);
int containerKillProbability = internals.getOptionInt(
InternalKeys.CHAOS_MONKEY_PROBABILITY_CONTAINER_FAILURE,
InternalKeys.DEFAULT_CHAOS_MONKEY_PROBABILITY_CONTAINER_FAILURE);
monkey.addTarget("Container killer",
new ChaosKillContainer(appState, actionQueues, rmOperationHandler),
containerKillProbability);
// and schedule it
if (monkey.schedule(monkeyDelay, monkeyInterval, TimeUnit.SECONDS)) {
log.info("Chaos Monkey is running");
return true;
} else {
log.info("Chaos monkey not started");
return false;
}
}
/**
* This is the main entry point for the service launcher.
* @param args command line arguments.
*/
public static void main(String[] args) {
//turn the args to a list
List<String> argsList = Arrays.asList(args);
//create a new list, as the ArrayList type doesn't push() on an insert
List<String> extendedArgs = new ArrayList<String>(argsList);
//insert the service name
extendedArgs.add(0, SERVICE_CLASSNAME);
//now have the service launcher do its work
ServiceLauncher.serviceMain(extendedArgs);
}
}