blob: c8369c1acf3d50a759f14d217be8dd546f5202a5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.slider.server.appmaster.state;
import com.codahale.metrics.Metric;
import com.codahale.metrics.MetricRegistry;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.slider.api.ClusterDescription;
import org.apache.slider.api.ClusterDescriptionKeys;
import org.apache.slider.api.ClusterDescriptionOperations;
import org.apache.slider.api.ClusterNode;
import org.apache.slider.api.InternalKeys;
import org.apache.slider.api.ResourceKeys;
import org.apache.slider.api.RoleKeys;
import org.apache.slider.api.StatusKeys;
import org.apache.slider.api.types.ApplicationLivenessInformation;
import org.apache.slider.api.types.ComponentInformation;
import org.apache.slider.api.types.RoleStatistics;
import org.apache.slider.common.SliderExitCodes;
import org.apache.slider.common.SliderKeys;
import org.apache.slider.common.tools.ConfigHelper;
import org.apache.slider.common.tools.SliderUtils;
import org.apache.slider.core.conf.AggregateConf;
import org.apache.slider.core.conf.ConfTree;
import org.apache.slider.core.conf.ConfTreeOperations;
import org.apache.slider.core.conf.MapOperations;
import org.apache.slider.core.exceptions.BadClusterStateException;
import org.apache.slider.core.exceptions.BadConfigException;
import org.apache.slider.core.exceptions.ErrorStrings;
import org.apache.slider.core.exceptions.NoSuchNodeException;
import org.apache.slider.core.exceptions.SliderInternalStateException;
import org.apache.slider.core.exceptions.TriggerClusterTeardownException;
import org.apache.slider.core.persist.AggregateConfSerDeser;
import org.apache.slider.core.persist.ConfTreeSerDeser;
import org.apache.slider.providers.PlacementPolicy;
import org.apache.slider.providers.ProviderRole;
import org.apache.slider.server.appmaster.management.LongGauge;
import org.apache.slider.server.appmaster.management.MetricsAndMonitoring;
import org.apache.slider.server.appmaster.management.MetricsConstants;
import org.apache.slider.server.appmaster.operations.AbstractRMOperation;
import org.apache.slider.server.appmaster.operations.ContainerReleaseOperation;
import org.apache.slider.server.appmaster.operations.ContainerRequestOperation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.slider.api.ResourceKeys.*;
import static org.apache.slider.api.RoleKeys.*;
import static org.apache.slider.api.StateValues.*;
/**
* The model of all the ongoing state of a Slider AM.
*
* concurrency rules: any method which begins with <i>build</i>
* is not synchronized and intended to be used during
* initialization.
*/
public class AppState {
protected static final Logger log =
LoggerFactory.getLogger(AppState.class);
private final AbstractClusterServices recordFactory;
private final MetricsAndMonitoring metricsAndMonitoring;
/**
* Flag set to indicate the application is live -this only happens
* after the buildInstance operation
*/
private boolean applicationLive = false;
/**
* The definition of the instance. Flexing updates the resources section
* This is used as a synchronization point on activities that update
* the CD, and also to update some of the structures that
* feed in to the CD
*/
private AggregateConf instanceDefinition;
/**
* Time the instance definition snapshots were created
*/
private long snapshotTime;
/**
* Snapshot of the instance definition. This is fully
* resolved.
*/
private AggregateConf instanceDefinitionSnapshot;
/**
* Snapshot of the raw instance definition; unresolved and
* without any patch of an AM into it.
*/
private AggregateConf unresolvedInstanceDefinition;
/**
* snapshot of resources as of last update time
*/
private ConfTreeOperations resourcesSnapshot;
private ConfTreeOperations appConfSnapshot;
private ConfTreeOperations internalsSnapshot;
/**
* This is the status, the live model
*/
private ClusterDescription clusterStatus = new ClusterDescription();
/**
* Metadata provided by the AM for use in filling in status requests
*/
private Map<String, String> applicationInfo;
/**
* Client properties created via the provider -static for the life
* of the application
*/
private Map<String, String> clientProperties = new HashMap<>();
/**
* This is a template of the cluster status
*/
private ClusterDescription clusterStatusTemplate = new ClusterDescription();
private final Map<Integer, RoleStatus> roleStatusMap =
new ConcurrentHashMap<>();
private final Map<String, ProviderRole> roles =
new ConcurrentHashMap<>();
private final Map<Integer, ProviderRole> rolePriorityMap =
new ConcurrentHashMap<>();
/**
* The master node.
*/
private RoleInstance appMasterNode;
/**
* Hash map of the containers we have. This includes things that have
* been allocated but are not live; it is a superset of the live list
*/
private final ConcurrentMap<ContainerId, RoleInstance> ownedContainers =
new ConcurrentHashMap<>();
/**
* Hash map of the containers we have released, but we
* are still awaiting acknowledgements on. Any failure of these
* containers is treated as a successful outcome
*/
private final ConcurrentMap<ContainerId, Container> containersBeingReleased =
new ConcurrentHashMap<>();
/**
* Counter for completed containers ( complete denotes successful or failed )
*/
private final LongGauge completedContainerCount = new LongGauge();
/**
* Count of failed containers
*/
private final LongGauge failedContainerCount = new LongGauge();
/**
* # of started containers
*/
private final LongGauge startedContainers = new LongGauge();
/**
* # of containers that failed to start
*/
private final LongGauge startFailedContainerCount = new LongGauge();
/**
* Track the number of surplus containers received and discarded
*/
private final LongGauge surplusContainers = new LongGauge();
/**
* Track the number of requested containers.
* Important: this does not include AA requests which are yet to be issued.
*/
private final LongGauge outstandingContainerRequests = new LongGauge();
/**
* Map of requested nodes. This records the command used to start it,
* resources, etc. When container started callback is received,
* the node is promoted from here to the containerMap
*/
private final Map<ContainerId, RoleInstance> startingContainers =
new ConcurrentHashMap<>();
/**
* List of completed nodes. This isn't kept in the CD as it gets too
* big for the RPC responses. Indeed, we should think about how deep to get this
*/
private final Map<ContainerId, RoleInstance> completedContainers
= new ConcurrentHashMap<>();
/**
* Nodes that failed to start.
* Again, kept out of the CD
*/
private final Map<ContainerId, RoleInstance> failedContainers =
new ConcurrentHashMap<>();
/**
* Nodes that came assigned to a role above that
* which were asked for -this appears to happen
*/
private final Set<ContainerId> surplusNodes = new HashSet<>();
/**
* Map of containerID to cluster nodes, for status reports.
* Access to this should be synchronized on the clusterDescription
*/
private final Map<ContainerId, RoleInstance> liveNodes =
new ConcurrentHashMap<>();
private final AtomicInteger completionOfNodeNotInLiveListEvent =
new AtomicInteger();
private final AtomicInteger completionOfUnknownContainerEvent =
new AtomicInteger();
/**
* limits of container core numbers in this queue
*/
private int containerMaxCores;
private int containerMinCores;
/**
* limits of container memory in this queue
*/
private int containerMaxMemory;
private int containerMinMemory;
private RoleHistory roleHistory;
private Configuration publishedProviderConf;
private long startTimeThreshold;
private int failureThreshold = 10;
private int nodeFailureThreshold = 3;
private String logServerURL = "";
/**
* Selector of containers to release; application wide.
*/
private ContainerReleaseSelector containerReleaseSelector;
private Resource minResource;
private Resource maxResource;
/**
* Create an instance
* @param recordFactory factory for YARN records
* @param metricsAndMonitoring metrics and monitoring services
*/
public AppState(AbstractClusterServices recordFactory,
MetricsAndMonitoring metricsAndMonitoring) {
Preconditions.checkArgument(recordFactory != null, "null recordFactory");
Preconditions.checkArgument(metricsAndMonitoring != null, "null metricsAndMonitoring");
this.recordFactory = recordFactory;
this.metricsAndMonitoring = metricsAndMonitoring;
// register any metrics
register(MetricsConstants.CONTAINERS_OUTSTANDING_REQUESTS, outstandingContainerRequests);
register(MetricsConstants.CONTAINERS_SURPLUS, surplusContainers);
register(MetricsConstants.CONTAINERS_STARTED, startedContainers);
register(MetricsConstants.CONTAINERS_COMPLETED, completedContainerCount);
register(MetricsConstants.CONTAINERS_FAILED, failedContainerCount);
register(MetricsConstants.CONTAINERS_START_FAILED, startFailedContainerCount);
}
private void register(String name, Metric counter) {
this.metricsAndMonitoring.getMetrics().register(
MetricRegistry.name(AppState.class, name), counter);
}
public long getFailedCountainerCount() {
return failedContainerCount.getCount();
}
/**
* Increment the count
*/
public void incFailedCountainerCount() {
failedContainerCount.inc();
}
public long getStartFailedCountainerCount() {
return startFailedContainerCount.getCount();
}
/**
* Increment the count and return the new value
*/
public void incStartedCountainerCount() {
startedContainers.inc();
}
public long getStartedCountainerCount() {
return startedContainers.getCount();
}
/**
* Increment the count and return the new value
*/
public void incStartFailedCountainerCount() {
startFailedContainerCount.inc();
}
public AtomicInteger getCompletionOfNodeNotInLiveListEvent() {
return completionOfNodeNotInLiveListEvent;
}
public AtomicInteger getCompletionOfUnknownContainerEvent() {
return completionOfUnknownContainerEvent;
}
public Map<Integer, RoleStatus> getRoleStatusMap() {
return roleStatusMap;
}
protected Map<String, ProviderRole> getRoleMap() {
return roles;
}
public Map<Integer, ProviderRole> getRolePriorityMap() {
return rolePriorityMap;
}
private Map<ContainerId, RoleInstance> getStartingContainers() {
return startingContainers;
}
private Map<ContainerId, RoleInstance> getCompletedContainers() {
return completedContainers;
}
public Map<ContainerId, RoleInstance> getFailedContainers() {
return failedContainers;
}
public Map<ContainerId, RoleInstance> getLiveContainers() {
return liveNodes;
}
/**
* Get the current view of the cluster status.
* <p>
* Calls to {@link #refreshClusterStatus()} trigger a
* refresh of this field.
* <p>
* This is read-only
* to the extent that changes here do not trigger updates in the
* application state.
* @return the cluster status
*/
public synchronized ClusterDescription getClusterStatus() {
return clusterStatus;
}
@VisibleForTesting
protected synchronized void setClusterStatus(ClusterDescription clusterDesc) {
this.clusterStatus = clusterDesc;
}
/**
* Set the instance definition -this also builds the (now obsolete)
* cluster specification from it.
*
* Important: this is for early binding and must not be used after the build
* operation is complete.
* @param definition initial definition
* @throws BadConfigException
*/
public synchronized void setInitialInstanceDefinition(AggregateConf definition)
throws BadConfigException, IOException {
log.debug("Setting initial instance definition");
// snapshot the definition
AggregateConfSerDeser serDeser = new AggregateConfSerDeser();
unresolvedInstanceDefinition = serDeser.fromInstance(definition);
this.instanceDefinition = serDeser.fromInstance(definition);
onInstanceDefinitionUpdated();
}
public synchronized AggregateConf getInstanceDefinition() {
return instanceDefinition;
}
/**
* Get the role history of the application
* @return the role history
*/
@VisibleForTesting
public RoleHistory getRoleHistory() {
return roleHistory;
}
/**
* Get the path used for history files
* @return the directory used for history files
*/
@VisibleForTesting
public Path getHistoryPath() {
return roleHistory.getHistoryPath();
}
/**
* Set the container limits -the min and max values for
* resource requests. All requests must be multiples of the min
* values.
* @param minMemory min memory MB
* @param maxMemory maximum memory
* @param minCores min v core count
* @param maxCores maximum cores
*/
public void setContainerLimits(int minMemory,int maxMemory, int minCores, int maxCores) {
containerMinCores = minCores;
containerMaxCores = maxCores;
containerMinMemory = minMemory;
containerMaxMemory = maxMemory;
minResource = recordFactory.newResource(containerMinMemory, containerMinCores);
maxResource = recordFactory.newResource(containerMaxMemory, containerMaxCores);
}
public ConfTreeOperations getResourcesSnapshot() {
return resourcesSnapshot;
}
public ConfTreeOperations getAppConfSnapshot() {
return appConfSnapshot;
}
public ConfTreeOperations getInternalsSnapshot() {
return internalsSnapshot;
}
public boolean isApplicationLive() {
return applicationLive;
}
public long getSnapshotTime() {
return snapshotTime;
}
public AggregateConf getInstanceDefinitionSnapshot() {
return instanceDefinitionSnapshot;
}
public AggregateConf getUnresolvedInstanceDefinition() {
return unresolvedInstanceDefinition;
}
public synchronized void buildInstance(AppStateBindingInfo binding)
throws BadClusterStateException, BadConfigException, IOException {
binding.validate();
log.debug("Building application state");
publishedProviderConf = binding.publishedProviderConf;
applicationInfo = binding.applicationInfo != null ? binding.applicationInfo
: new HashMap<String, String>();
clientProperties = new HashMap<>();
containerReleaseSelector = binding.releaseSelector;
Set<String> confKeys = ConfigHelper.sortedConfigKeys(publishedProviderConf);
// Add the -site configuration properties
for (String key : confKeys) {
String val = publishedProviderConf.get(key);
clientProperties.put(key, val);
}
// set the cluster specification (once its dependency the client properties
// is out the way
setInitialInstanceDefinition(binding.instanceDefinition);
//build the initial role list
List<ProviderRole> roleList = new ArrayList<>(binding.roles);
for (ProviderRole providerRole : roleList) {
buildRole(providerRole);
}
ConfTreeOperations resources = instanceDefinition.getResourceOperations();
Set<String> roleNames = resources.getComponentNames();
for (String name : roleNames) {
if (!roles.containsKey(name)) {
// this is a new value
log.info("Adding role {}", name);
MapOperations resComponent = resources.getComponent(name);
ProviderRole dynamicRole = createDynamicProviderRole(name, resComponent);
buildRole(dynamicRole);
roleList.add(dynamicRole);
}
}
//then pick up the requirements
buildRoleRequirementsFromResources();
//set the livespan
MapOperations globalResOpts = instanceDefinition.getResourceOperations().getGlobalOptions();
startTimeThreshold = globalResOpts.getOptionInt(
InternalKeys.INTERNAL_CONTAINER_FAILURE_SHORTLIFE,
InternalKeys.DEFAULT_INTERNAL_CONTAINER_FAILURE_SHORTLIFE);
failureThreshold = globalResOpts.getOptionInt(
CONTAINER_FAILURE_THRESHOLD,
DEFAULT_CONTAINER_FAILURE_THRESHOLD);
nodeFailureThreshold = globalResOpts.getOptionInt(
NODE_FAILURE_THRESHOLD,
DEFAULT_NODE_FAILURE_THRESHOLD);
initClusterStatus();
// set up the role history
roleHistory = new RoleHistory(roleStatusMap.values(), recordFactory);
roleHistory.register(metricsAndMonitoring);
roleHistory.onStart(binding.fs, binding.historyPath);
// trigger first node update
roleHistory.onNodesUpdated(binding.nodeReports);
//rebuild any live containers
rebuildModelFromRestart(binding.liveContainers);
// any am config options to pick up
logServerURL = binding.serviceConfig.get(YarnConfiguration.YARN_LOG_SERVER_URL, "");
//mark as live
applicationLive = true;
}
public void initClusterStatus() {
//copy into cluster status.
ClusterDescription status = ClusterDescription.copy(clusterStatusTemplate);
status.state = STATE_CREATED;
MapOperations infoOps = new MapOperations("info", status.info);
infoOps.mergeWithoutOverwrite(applicationInfo);
SliderUtils.addBuildInfo(infoOps, "status");
long now = now();
status.setInfoTime(StatusKeys.INFO_LIVE_TIME_HUMAN,
StatusKeys.INFO_LIVE_TIME_MILLIS,
now);
SliderUtils.setInfoTime(infoOps,
StatusKeys.INFO_LIVE_TIME_HUMAN,
StatusKeys.INFO_LIVE_TIME_MILLIS,
now);
if (0 == status.createTime) {
status.createTime = now;
SliderUtils.setInfoTime(infoOps,
StatusKeys.INFO_CREATE_TIME_HUMAN,
StatusKeys.INFO_CREATE_TIME_MILLIS,
now);
}
status.state = STATE_LIVE;
//set the app state to this status
setClusterStatus(status);
}
/**
* Build a dynamic provider role
* @param name name of role
* @return a new provider role
* @throws BadConfigException bad configuration
*/
public ProviderRole createDynamicProviderRole(String name, MapOperations component)
throws BadConfigException {
String priOpt = component.getMandatoryOption(COMPONENT_PRIORITY);
int priority = SliderUtils.parseAndValidate(
"value of " + name + " " + COMPONENT_PRIORITY, priOpt, 0, 1, -1);
String placementOpt = component.getOption(COMPONENT_PLACEMENT_POLICY,
Integer.toString(PlacementPolicy.DEFAULT));
int placement = SliderUtils.parseAndValidate(
"value of " + name + " " + COMPONENT_PLACEMENT_POLICY, placementOpt, 0, 0, -1);
int placementTimeout = component.getOptionInt(PLACEMENT_ESCALATE_DELAY,
DEFAULT_PLACEMENT_ESCALATE_DELAY_SECONDS);
ProviderRole newRole = new ProviderRole(name,
priority,
placement,
getNodeFailureThresholdForRole(name),
placementTimeout,
component.getOption(YARN_LABEL_EXPRESSION, DEF_YARN_LABEL_EXPRESSION));
log.info("New {} ", newRole);
return newRole;
}
/**
* Actions to perform when an instance definition is updated
* Currently:
* <ol>
* <li>
* resolve the configuration
* </li>
* <li>
* update the cluster spec derivative
* </li>
* </ol>
*
* @throws BadConfigException
*/
private synchronized void onInstanceDefinitionUpdated()
throws BadConfigException, IOException {
log.debug("Instance definition updated");
//note the time
snapshotTime = now();
// resolve references if not already done
instanceDefinition.resolve();
// force in the AM desired state values
ConfTreeOperations resources = instanceDefinition.getResourceOperations();
if (resources.getComponent(SliderKeys.COMPONENT_AM) != null) {
resources.setComponentOpt(
SliderKeys.COMPONENT_AM, COMPONENT_INSTANCES, "1");
}
//snapshot all three sectons
resourcesSnapshot = ConfTreeOperations.fromInstance(instanceDefinition.getResources());
appConfSnapshot = ConfTreeOperations.fromInstance(instanceDefinition.getAppConf());
internalsSnapshot = ConfTreeOperations.fromInstance(instanceDefinition.getInternal());
//build a new aggregate from the snapshots
instanceDefinitionSnapshot = new AggregateConf(resourcesSnapshot.confTree,
appConfSnapshot.confTree,
internalsSnapshot.confTree);
instanceDefinitionSnapshot.setName(instanceDefinition.getName());
clusterStatusTemplate = ClusterDescriptionOperations.buildFromInstanceDefinition(
instanceDefinition);
// Add the -site configuration properties
for (Map.Entry<String, String> prop : clientProperties.entrySet()) {
clusterStatusTemplate.clientProperties.put(prop.getKey(), prop.getValue());
}
}
/**
* The resource configuration is updated -review and update state.
* @param resources updated resources specification
* @return a list of any dynamically added provider roles
* (purely for testing purposes)
*/
@VisibleForTesting
public synchronized List<ProviderRole> updateResourceDefinitions(ConfTree resources)
throws BadConfigException, IOException {
log.debug("Updating resources to {}", resources);
// snapshot the (possibly unresolved) values
ConfTreeSerDeser serDeser = new ConfTreeSerDeser();
unresolvedInstanceDefinition.setResources(
serDeser.fromInstance(resources));
// assign another copy under the instance definition for resolving
// and then driving application size
instanceDefinition.setResources(serDeser.fromInstance(resources));
onInstanceDefinitionUpdated();
// propagate the role table
Map<String, Map<String, String>> updated = resources.components;
getClusterStatus().roles = SliderUtils.deepClone(updated);
getClusterStatus().updateTime = now();
return buildRoleRequirementsFromResources();
}
/**
* build the role requirements from the cluster specification
* @return a list of any dynamically added provider roles
*/
private List<ProviderRole> buildRoleRequirementsFromResources() throws BadConfigException {
List<ProviderRole> newRoles = new ArrayList<>(0);
// now update every role's desired count.
// if there are no instance values, that role count goes to zero
ConfTreeOperations resources =
instanceDefinition.getResourceOperations();
// Add all the existing roles
for (RoleStatus roleStatus : getRoleStatusMap().values()) {
if (roleStatus.isExcludeFromFlexing()) {
// skip inflexible roles, e.g AM itself
continue;
}
long currentDesired = roleStatus.getDesired();
String role = roleStatus.getName();
int desiredInstanceCount = getDesiredInstanceCount(resources, role);
if (desiredInstanceCount == 0) {
log.info("Role {} has 0 instances specified", role);
}
if (currentDesired != desiredInstanceCount) {
log.info("Role {} flexed from {} to {}", role, currentDesired,
desiredInstanceCount);
roleStatus.setDesired(desiredInstanceCount);
}
}
// now the dynamic ones. Iterate through the the cluster spec and
// add any role status entries not in the role status
Set<String> roleNames = resources.getComponentNames();
for (String name : roleNames) {
if (!roles.containsKey(name)) {
// this is a new value
log.info("Adding new role {}", name);
MapOperations component = resources.getComponent(name);
ProviderRole dynamicRole = createDynamicProviderRole(name, component);
RoleStatus roleStatus = buildRole(dynamicRole);
roleStatus.setDesired(getDesiredInstanceCount(resources, name));
log.info("New role {}", roleStatus);
roleHistory.addNewRole(roleStatus);
newRoles.add(dynamicRole);
}
}
// and fill in all those roles with their requirements
buildRoleResourceRequirements();
return newRoles;
}
/**
* Get the desired instance count of a role, rejecting negative values
* @param resources resource map
* @param role role name
* @return the instance count
* @throws BadConfigException if the count is negative
*/
private int getDesiredInstanceCount(ConfTreeOperations resources,
String role) throws BadConfigException {
int desiredInstanceCount =
resources.getComponentOptInt(role, COMPONENT_INSTANCES, 0);
if (desiredInstanceCount < 0) {
log.error("Role {} has negative desired instances : {}", role,
desiredInstanceCount);
throw new BadConfigException(
"Negative instance count (%) requested for component %s",
desiredInstanceCount, role);
}
return desiredInstanceCount;
}
/**
* Add knowledge of a role.
* This is a build-time operation that is not synchronized, and
* should be used while setting up the system state -before servicing
* requests.
* @param providerRole role to add
* @return the role status built up
* @throws BadConfigException if a role of that priority already exists
*/
public RoleStatus buildRole(ProviderRole providerRole) throws BadConfigException {
// build role status map
int priority = providerRole.id;
if (roleStatusMap.containsKey(priority)) {
throw new BadConfigException("Duplicate Provider Key: %s and %s",
providerRole,
roleStatusMap.get(priority));
}
RoleStatus roleStatus = new RoleStatus(providerRole);
roleStatusMap.put(priority, roleStatus);
String name = providerRole.name;
roles.put(name, providerRole);
rolePriorityMap.put(priority, providerRole);
// register its entries
metricsAndMonitoring.addMetricSet(MetricsConstants.PREFIX_SLIDER_ROLES + name, roleStatus);
return roleStatus;
}
/**
* Build up the requirements of every resource
*/
private void buildRoleResourceRequirements() {
roleStatusMap.values();
for (RoleStatus role : roleStatusMap.values()) {
role.setResourceRequirements(
buildResourceRequirements(role, recordFactory.newResource()));
}
}
/**
* build up the special master node, which lives
* in the live node set but has a lifecycle bonded to the AM
* @param containerId the AM master
* @param host hostname
* @param amPort port
* @param nodeHttpAddress http address: may be null
*/
public void buildAppMasterNode(ContainerId containerId,
String host,
int amPort,
String nodeHttpAddress) {
Container container = new ContainerPBImpl();
container.setId(containerId);
NodeId nodeId = NodeId.newInstance(host, amPort);
container.setNodeId(nodeId);
container.setNodeHttpAddress(nodeHttpAddress);
RoleInstance am = new RoleInstance(container);
am.role = SliderKeys.COMPONENT_AM;
am.roleId = SliderKeys.ROLE_AM_PRIORITY_INDEX;
am.createTime =now();
am.startTime = am.createTime;
appMasterNode = am;
//it is also added to the set of live nodes
getLiveContainers().put(containerId, am);
putOwnedContainer(containerId, am);
// patch up the role status
RoleStatus roleStatus = roleStatusMap.get(SliderKeys.ROLE_AM_PRIORITY_INDEX);
roleStatus.setDesired(1);
roleStatus.incActual();
roleStatus.incStarted();
}
/**
* Note that the master node has been launched,
* though it isn't considered live until any forked
* processes are running. It is NOT registered with
* the role history -the container is incomplete
* and it will just cause confusion
*/
public void noteAMLaunched() {
getLiveContainers().put(appMasterNode.getContainerId(), appMasterNode);
}
/**
* AM declares ourselves live in the cluster description.
* This is meant to be triggered from the callback
* indicating the spawned process is up and running.
*/
public void noteAMLive() {
appMasterNode.state = STATE_LIVE;
}
/**
* Look up the status entry of a role or raise an exception
* @param key role ID
* @return the status entry
* @throws RuntimeException if the role cannot be found
*/
public RoleStatus lookupRoleStatus(int key) {
RoleStatus rs = getRoleStatusMap().get(key);
if (rs == null) {
throw new RuntimeException("Cannot find role for role ID " + key);
}
return rs;
}
/**
* Look up the status entry of a container or raise an exception
*
* @param c container
* @return the status entry
* @throws RuntimeException if the role cannot be found
*/
public RoleStatus lookupRoleStatus(Container c) {
return lookupRoleStatus(ContainerPriority.extractRole(c));
}
/**
* Get a deep clone of the role status list. Concurrent events may mean this
* list (or indeed, some of the role status entries) may be inconsistent
* @return a snapshot of the role status entries
*/
public List<RoleStatus> cloneRoleStatusList() {
Collection<RoleStatus> statuses = roleStatusMap.values();
List<RoleStatus> statusList = new ArrayList<>(statuses.size());
try {
for (RoleStatus status : statuses) {
statusList.add((RoleStatus)(status.clone()));
}
} catch (CloneNotSupportedException e) {
log.warn("Unexpected cloning failure: {}", e, e);
}
return statusList;
}
/**
* Look up a role in the map
* @param name role name
* @return the instance
* @throws YarnRuntimeException if not found
*/
public RoleStatus lookupRoleStatus(String name) throws YarnRuntimeException {
ProviderRole providerRole = roles.get(name);
if (providerRole == null) {
throw new YarnRuntimeException("Unknown role " + name);
}
return lookupRoleStatus(providerRole.id);
}
/**
* Clone the list of active (==owned) containers
* @return the list of role instances representing all owned containers
*/
public synchronized List<RoleInstance> cloneOwnedContainerList() {
Collection<RoleInstance> values = ownedContainers.values();
return new ArrayList<>(values);
}
/**
* Get the number of active (==owned) containers
* @return
*/
public int getNumOwnedContainers() {
return ownedContainers.size();
}
/**
* Look up an active container: any container that the AM has, even
* if it is not currently running/live
*/
public RoleInstance getOwnedContainer(ContainerId id) {
return ownedContainers.get(id);
}
/**
* Remove an owned container
* @param id container ID
* @return the instance removed
*/
private RoleInstance removeOwnedContainer(ContainerId id) {
return ownedContainers.remove(id);
}
/**
* set/update an owned container
* @param id container ID
* @param instance
* @return
*/
private RoleInstance putOwnedContainer(ContainerId id,
RoleInstance instance) {
return ownedContainers.put(id, instance);
}
/**
* Clone the live container list. This is synchronized.
* @return a snapshot of the live node list
*/
public synchronized List<RoleInstance> cloneLiveContainerInfoList() {
List<RoleInstance> allRoleInstances;
Collection<RoleInstance> values = getLiveContainers().values();
allRoleInstances = new ArrayList<>(values);
return allRoleInstances;
}
/**
* Lookup live instance by string value of container ID
* @param containerId container ID as a string
* @return the role instance for that container
* @throws NoSuchNodeException if it does not exist
*/
public synchronized RoleInstance getLiveInstanceByContainerID(String containerId)
throws NoSuchNodeException {
Collection<RoleInstance> nodes = getLiveContainers().values();
return findNodeInCollection(containerId, nodes);
}
/**
* Lookup owned instance by string value of container ID
* @param containerId container ID as a string
* @return the role instance for that container
* @throws NoSuchNodeException if it does not exist
*/
public synchronized RoleInstance getOwnedInstanceByContainerID(String containerId)
throws NoSuchNodeException {
Collection<RoleInstance> nodes = ownedContainers.values();
return findNodeInCollection(containerId, nodes);
}
/**
* Iterate through a collection of role instances to find one with a
* specific (string) container ID
* @param containerId container ID as a string
* @param nodes collection
* @return the found node
* @throws NoSuchNodeException if there was no match
*/
private RoleInstance findNodeInCollection(String containerId,
Collection<RoleInstance> nodes) throws NoSuchNodeException {
RoleInstance found = null;
for (RoleInstance node : nodes) {
if (containerId.equals(node.id)) {
found = node;
break;
}
}
if (found != null) {
return found;
} else {
//at this point: no node
throw new NoSuchNodeException("Unknown node: " + containerId);
}
}
public synchronized List<RoleInstance> getLiveInstancesByContainerIDs(
Collection<String> containerIDs) {
//first, a hashmap of those containerIDs is built up
Set<String> uuidSet = new HashSet<String>(containerIDs);
List<RoleInstance> nodes = new ArrayList<RoleInstance>(uuidSet.size());
Collection<RoleInstance> clusterNodes = getLiveContainers().values();
for (RoleInstance node : clusterNodes) {
if (uuidSet.contains(node.id)) {
nodes.add(node);
}
}
//at this point: a possibly empty list of nodes
return nodes;
}
/**
* Enum all nodes by role.
* @param role role, or "" for all roles
* @return a list of nodes, may be empty
*/
public synchronized List<RoleInstance> enumLiveNodesInRole(String role) {
List<RoleInstance> nodes = new ArrayList<RoleInstance>();
Collection<RoleInstance> allRoleInstances = getLiveContainers().values();
for (RoleInstance node : allRoleInstances) {
if (role.isEmpty() || role.equals(node.role)) {
nodes.add(node);
}
}
return nodes;
}
/**
* enum nodes by role ID, from either the owned or live node list
* @param roleId role the container must be in
* @param owned flag to indicate "use owned list" rather than the smaller
* "live" list
* @return a list of nodes, may be empty
*/
public synchronized List<RoleInstance> enumNodesWithRoleId(int roleId,
boolean owned) {
List<RoleInstance> nodes = new ArrayList<RoleInstance>();
Collection<RoleInstance> allRoleInstances;
allRoleInstances = owned ? ownedContainers.values() : liveNodes.values();
for (RoleInstance node : allRoleInstances) {
if (node.roleId == roleId) {
nodes.add(node);
}
}
return nodes;
}
/**
* Build an instance map.
* @return the map of Role name to list of role instances
*/
private synchronized Map<String, List<String>> createRoleToInstanceMap() {
Map<String, List<String>> map = new HashMap<String, List<String>>();
for (RoleInstance node : getLiveContainers().values()) {
List<String> containers = map.get(node.role);
if (containers == null) {
containers = new ArrayList<String>();
map.put(node.role, containers);
}
containers.add(node.id);
}
return map;
}
/**
* Build a map of role->nodename->node-info
*
* @return the map of Role name to list of Cluster Nodes
*/
public synchronized Map<String, Map<String, ClusterNode>> createRoleToClusterNodeMap() {
Map<String, Map<String, ClusterNode>> map = new HashMap<>();
for (RoleInstance node : getLiveContainers().values()) {
Map<String, ClusterNode> containers = map.get(node.role);
if (containers == null) {
containers = new HashMap<String, ClusterNode>();
map.put(node.role, containers);
}
ClusterNode clusterNode = node.toClusterNode();
containers.put(clusterNode.name, clusterNode);
}
return map;
}
/**
* Notification called just before the NM is asked to
* start a container
* @param container container to start
* @param instance clusterNode structure
*/
public void containerStartSubmitted(Container container,
RoleInstance instance) {
instance.state = STATE_SUBMITTED;
instance.container = container;
instance.createTime = now();
getStartingContainers().put(container.getId(), instance);
putOwnedContainer(container.getId(), instance);
roleHistory.onContainerStartSubmitted(container, instance);
}
/**
* Note that a container has been submitted for release; update internal state
* and mark the associated ContainerInfo released field to indicate that
* while it is still in the active list, it has been queued for release.
*
* @param container container
* @throws SliderInternalStateException if there is no container of that ID
* on the active list
*/
public synchronized void containerReleaseSubmitted(Container container)
throws SliderInternalStateException {
ContainerId id = container.getId();
//look up the container
RoleInstance instance = getOwnedContainer(id);
if (instance == null) {
throw new SliderInternalStateException(
"No active container with ID " + id);
}
//verify that it isn't already released
if (containersBeingReleased.containsKey(id)) {
throw new SliderInternalStateException(
"Container %s already queued for release", id);
}
instance.released = true;
containersBeingReleased.put(id, instance.container);
RoleStatus role = lookupRoleStatus(instance.roleId);
role.incReleasing();
roleHistory.onContainerReleaseSubmitted(container);
}
/**
* Create a container request.
* Update internal state, such as the role request count.
* Anti-Affine: the {@link RoleStatus#outstandingAArequest} is set here.
* This is where role history information will be used for placement decisions.
* @param role role
* @return the container request to submit or null if there is none
*/
private AMRMClient.ContainerRequest createContainerRequest(RoleStatus role) {
if (role.isAntiAffinePlacement()) {
return createAAContainerRequest(role);
} else {
incrementRequestCount(role);
return roleHistory.requestContainerForRole(role).getIssuedRequest();
}
}
/**
* Create a container request.
* Update internal state, such as the role request count.
* Anti-Affine: the {@link RoleStatus#outstandingAArequest} is set here.
* This is where role history information will be used for placement decisions.
* @param role role
* @return the container request to submit or null if there is none
*/
private AMRMClient.ContainerRequest createAAContainerRequest(RoleStatus role) {
OutstandingRequest request = roleHistory.requestContainerForAARole(role);
if (request == null) {
return null;
}
incrementRequestCount(role);
role.setOutstandingAArequest(request);
return request.getIssuedRequest();
}
/**
* Increment the request count of a role.
* <p>
* Also updates application state counters
* @param role role being requested.
*/
protected void incrementRequestCount(RoleStatus role) {
role.incRequested();
incOutstandingContainerRequests();
}
/**
* Inc #of outstanding requests.
*/
private void incOutstandingContainerRequests() {
outstandingContainerRequests.inc();
}
/**
* Decrement the number of outstanding requests. This never goes below zero.
*/
private void decOutstandingContainerRequests() {
synchronized (outstandingContainerRequests) {
if (outstandingContainerRequests.getCount() > 0) {
// decrement but never go below zero
outstandingContainerRequests.dec();
}
}
}
/**
* Get the value of a YARN requirement (cores, RAM, etc).
* These are returned as integers, but there is special handling of the
* string {@link ResourceKeys#YARN_RESOURCE_MAX}, which triggers
* the return of the maximum value.
* @param name component to get from
* @param option option name
* @param defVal default value
* @param maxVal value to return if the max val is requested
* @return parsed value
* @throws NumberFormatException if the role could not be parsed.
*/
private int getResourceRequirement(ConfTreeOperations resources,
String name,
String option,
int defVal,
int maxVal) {
String val = resources.getComponentOpt(name, option,
Integer.toString(defVal));
Integer intVal;
if (YARN_RESOURCE_MAX.equals(val)) {
intVal = maxVal;
} else {
intVal = Integer.decode(val);
}
return intVal;
}
/**
* Build up the resource requirements for this role from the
* cluster specification, including substituing max allowed values
* if the specification asked for it.
* @param role role
* @param capability capability to set up. A new one may be created
* during normalization
*/
public Resource buildResourceRequirements(RoleStatus role, Resource capability) {
// Set up resource requirements from role values
String name = role.getName();
ConfTreeOperations resources = getResourcesSnapshot();
int cores = getResourceRequirement(resources,
name,
YARN_CORES,
DEF_YARN_CORES,
containerMaxCores);
capability.setVirtualCores(cores);
int ram = getResourceRequirement(resources, name,
YARN_MEMORY,
DEF_YARN_MEMORY,
containerMaxMemory);
capability.setMemory(ram);
log.debug("Component {} has RAM={}, vCores ={}", name, ram, cores);
Resource normalized = recordFactory.normalize(capability, minResource,
maxResource);
if (!Resources.equals(normalized, capability)) {
// resource requirements normalized to something other than asked for.
// LOG @ WARN so users can see why this is happening.
log.warn("Resource requirements of {} normalized" +
" from {} to {}", name, capability, normalized);
}
return normalized;
}
/**
* add a launched container to the node map for status responses
* @param container id
* @param node node details
*/
private void addLaunchedContainer(Container container, RoleInstance node) {
node.container = container;
if (node.role == null) {
throw new RuntimeException(
"Unknown role for node " + node);
}
getLiveContainers().put(node.getContainerId(), node);
//tell role history
roleHistory.onContainerStarted(container);
}
/**
* container start event
* @param containerId container that is to be started
* @return the role instance, or null if there was a problem
*/
public synchronized RoleInstance onNodeManagerContainerStarted(ContainerId containerId) {
try {
return innerOnNodeManagerContainerStarted(containerId);
} catch (YarnRuntimeException e) {
log.error("NodeManager callback on started container {} failed",
containerId,
e);
return null;
}
}
/**
* container start event handler -throwing an exception on problems
* @param containerId container that is to be started
* @return the role instance
* @throws RuntimeException on problems
*/
@VisibleForTesting
public RoleInstance innerOnNodeManagerContainerStarted(ContainerId containerId) {
incStartedCountainerCount();
RoleInstance instance = getOwnedContainer(containerId);
if (instance == null) {
//serious problem
throw new YarnRuntimeException("Container not in active containers start "+
containerId);
}
if (instance.role == null) {
throw new YarnRuntimeException("Component instance has no instance name " +
instance);
}
instance.startTime = now();
RoleInstance starting = getStartingContainers().remove(containerId);
if (null == starting) {
throw new YarnRuntimeException(
"Container "+ containerId +" is already started");
}
instance.state = STATE_LIVE;
RoleStatus roleStatus = lookupRoleStatus(instance.roleId);
roleStatus.incStarted();
Container container = instance.container;
addLaunchedContainer(container, instance);
return instance;
}
/**
* update the application state after a failure to start a container.
* This is perhaps where blacklisting could be most useful: failure
* to start a container is a sign of a more serious problem
* than a later exit.
*
* -relayed from NMClientAsync.CallbackHandler
* @param containerId failing container
* @param thrown what was thrown
*/
public synchronized void onNodeManagerContainerStartFailed(ContainerId containerId,
Throwable thrown) {
removeOwnedContainer(containerId);
incFailedCountainerCount();
incStartFailedCountainerCount();
RoleInstance instance = getStartingContainers().remove(containerId);
if (null != instance) {
RoleStatus roleStatus = lookupRoleStatus(instance.roleId);
String text;
if (null != thrown) {
text = SliderUtils.stringify(thrown);
} else {
text = "container start failure";
}
instance.diagnostics = text;
roleStatus.noteFailed(true, text, ContainerOutcome.Failed);
getFailedContainers().put(containerId, instance);
roleHistory.onNodeManagerContainerStartFailed(instance.container);
}
}
/**
* Handle node update from the RM. This syncs up the node map with the RM's view
* @param updatedNodes updated nodes
*/
public synchronized NodeUpdatedOutcome onNodesUpdated(List<NodeReport> updatedNodes) {
boolean changed = roleHistory.onNodesUpdated(updatedNodes);
if (changed) {
log.info("YARN cluster changed —cancelling current AA requests");
List<AbstractRMOperation> operations = cancelOutstandingAARequests();
log.debug("Created {} cancel requests", operations.size());
return new NodeUpdatedOutcome(true, operations);
}
return new NodeUpdatedOutcome(false, new ArrayList<AbstractRMOperation>(0));
}
/**
* Return value of the {@link #onNodesUpdated(List)} call.
*/
public static class NodeUpdatedOutcome {
public final boolean clusterChanged;
public final List<AbstractRMOperation> operations;
public NodeUpdatedOutcome(boolean clusterChanged,
List<AbstractRMOperation> operations) {
this.clusterChanged = clusterChanged;
this.operations = operations;
}
}
/**
* Is a role short lived by the threshold set for this application
* @param instance instance
* @return true if the instance is considered short lived
*/
@VisibleForTesting
public boolean isShortLived(RoleInstance instance) {
long time = now();
long started = instance.startTime;
boolean shortlived;
if (started > 0) {
long duration = time - started;
shortlived = duration < (startTimeThreshold * 1000);
log.info("Duration {} and startTimeThreshold {}", duration, startTimeThreshold);
} else {
// never even saw a start event
shortlived = true;
}
return shortlived;
}
/**
* Current time in milliseconds. Made protected for
* the option to override it in tests.
* @return the current time.
*/
protected long now() {
return System.currentTimeMillis();
}
/**
* This is a very small class to send a multiple result back from
* the completion operation
*/
public static class NodeCompletionResult {
public boolean surplusNode = false;
public RoleInstance roleInstance;
// did the container fail for *any* reason?
public boolean containerFailed = false;
// detailed outcome on the container failure
public ContainerOutcome outcome = ContainerOutcome.Completed;
public int exitStatus = 0;
public boolean unknownNode = false;
public String toString() {
final StringBuilder sb =
new StringBuilder("NodeCompletionResult{");
sb.append("surplusNode=").append(surplusNode);
sb.append(", roleInstance=").append(roleInstance);
sb.append(", exitStatus=").append(exitStatus);
sb.append(", containerFailed=").append(containerFailed);
sb.append(", outcome=").append(outcome);
sb.append(", unknownNode=").append(unknownNode);
sb.append('}');
return sb.toString();
}
}
/**
* handle completed node in the CD -move something from the live
* server list to the completed server list.
* @param status the node that has just completed
* @return NodeCompletionResult
*/
public synchronized NodeCompletionResult onCompletedNode(ContainerStatus status) {
ContainerId containerId = status.getContainerId();
NodeCompletionResult result = new NodeCompletionResult();
RoleInstance roleInstance;
int exitStatus = status.getExitStatus();
result.exitStatus = exitStatus;
if (containersBeingReleased.containsKey(containerId)) {
log.info("Container was queued for release : {}", containerId);
Container container = containersBeingReleased.remove(containerId);
RoleStatus roleStatus = lookupRoleStatus(container);
long releasing = roleStatus.decReleasing();
long actual = roleStatus.decActual();
long completedCount = roleStatus.incCompleted();
log.info("decrementing role count for role {} to {}; releasing={}, completed={}",
roleStatus.getName(),
actual,
releasing,
completedCount);
result.outcome = ContainerOutcome.Completed;
roleHistory.onReleaseCompleted(container);
} else if (surplusNodes.remove(containerId)) {
//its a surplus one being purged
result.surplusNode = true;
} else {
// a container has failed or been killed
// use the exit code to determine the outcome
result.containerFailed = true;
result.outcome = ContainerOutcome.fromExitStatus(exitStatus);
roleInstance = removeOwnedContainer(containerId);
if (roleInstance != null) {
//it was active, move it to failed
incFailedCountainerCount();
failedContainers.put(containerId, roleInstance);
} else {
// the container may have been noted as failed already, so look
// it up
roleInstance = failedContainers.get(containerId);
}
if (roleInstance != null) {
int roleId = roleInstance.roleId;
String rolename = roleInstance.role;
log.info("Failed container in role[{}] : {}", roleId, rolename);
try {
RoleStatus roleStatus = lookupRoleStatus(roleId);
roleStatus.decActual();
boolean shortLived = isShortLived(roleInstance);
String message;
Container failedContainer = roleInstance.container;
//build the failure message
if (failedContainer != null) {
String completedLogsUrl = getLogsURLForContainer(failedContainer);
message = String.format("Failure %s on host %s (%d): %s",
roleInstance.getContainerId(),
failedContainer.getNodeId().getHost(),
exitStatus,
completedLogsUrl);
} else {
message = String.format("Failure %s (%d)", containerId, exitStatus);
}
roleStatus.noteFailed(shortLived, message, result.outcome);
long failed = roleStatus.getFailed();
log.info("Current count of failed role[{}] {} = {}",
roleId, rolename, failed);
if (failedContainer != null) {
roleHistory.onFailedContainer(failedContainer, shortLived, result.outcome);
}
} catch (YarnRuntimeException e1) {
log.error("Failed container of unknown role {}", roleId);
}
} else {
//this isn't a known container.
log.error("Notified of completed container {} that is not in the list" +
" of active or failed containers", containerId);
completionOfUnknownContainerEvent.incrementAndGet();
result.unknownNode = true;
}
}
if (result.surplusNode) {
//a surplus node
return result;
}
//record the complete node's details; this pulls it from the livenode set
//remove the node
ContainerId id = status.getContainerId();
log.info("Removing node ID {}", id);
RoleInstance node = getLiveContainers().remove(id);
if (node != null) {
node.state = STATE_DESTROYED;
node.exitCode = exitStatus;
node.diagnostics = status.getDiagnostics();
getCompletedContainers().put(id, node);
result.roleInstance = node;
} else {
// not in the list
log.warn("Received notification of completion of unknown node {}", id);
completionOfNodeNotInLiveListEvent.incrementAndGet();
}
// and the active node list if present
removeOwnedContainer(containerId);
// finally, verify the node doesn't exist any more
assert !containersBeingReleased.containsKey(
containerId) : "container still in release queue";
assert !getLiveContainers().containsKey(
containerId) : " container still in live nodes";
assert getOwnedContainer(containerId) ==
null : "Container still in active container list";
return result;
}
/**
* Get the URL log for a container
* @param c container
* @return the URL or "" if it cannot be determined
*/
protected String getLogsURLForContainer(Container c) {
if (c==null) {
return null;
}
String user = null;
try {
user = SliderUtils.getCurrentUser().getShortUserName();
} catch (IOException ignored) {
}
String completedLogsUrl = "";
String url = logServerURL;
if (user != null && SliderUtils.isSet(url)) {
completedLogsUrl = url
+ "/" + c.getNodeId() + "/" + c.getId() + "/ctx/" + user;
}
return completedLogsUrl;
}
/**
* Return the percentage done that Slider is to have YARN display in its
* Web UI
* @return an number from 0 to 100
*/
public synchronized float getApplicationProgressPercentage() {
float percentage;
long desired = 0;
float actual = 0;
for (RoleStatus role : getRoleStatusMap().values()) {
desired += role.getDesired();
actual += role.getActual();
}
if (desired == 0) {
percentage = 100;
} else {
percentage = actual / desired;
}
return percentage;
}
/**
* Update the cluster description with the current application state
*/
public ClusterDescription refreshClusterStatus() {
return refreshClusterStatus(null);
}
/**
* Update the cluster description with the current application state
* @param providerStatus status from the provider for the cluster info section
*/
public synchronized ClusterDescription refreshClusterStatus(Map<String, String> providerStatus) {
ClusterDescription cd = getClusterStatus();
long now = now();
cd.setInfoTime(StatusKeys.INFO_STATUS_TIME_HUMAN,
StatusKeys.INFO_STATUS_TIME_MILLIS,
now);
if (providerStatus != null) {
for (Map.Entry<String, String> entry : providerStatus.entrySet()) {
cd.setInfo(entry.getKey(), entry.getValue());
}
}
MapOperations infoOps = new MapOperations("info", cd.info);
infoOps.mergeWithoutOverwrite(applicationInfo);
SliderUtils.addBuildInfo(infoOps, "status");
cd.statistics = new HashMap<>();
// build the map of node -> container IDs
Map<String, List<String>> instanceMap = createRoleToInstanceMap();
cd.instances = instanceMap;
//build the map of node -> containers
Map<String, Map<String, ClusterNode>> clusterNodes =
createRoleToClusterNodeMap();
cd.status = new HashMap<>();
cd.status.put(ClusterDescriptionKeys.KEY_CLUSTER_LIVE, clusterNodes);
for (RoleStatus role : getRoleStatusMap().values()) {
String rolename = role.getName();
List<String> instances = instanceMap.get(rolename);
int nodeCount = instances != null ? instances.size(): 0;
cd.setRoleOpt(rolename, COMPONENT_INSTANCES,
role.getDesired());
cd.setRoleOpt(rolename, RoleKeys.ROLE_ACTUAL_INSTANCES, nodeCount);
cd.setRoleOpt(rolename, ROLE_REQUESTED_INSTANCES, role.getRequested());
cd.setRoleOpt(rolename, ROLE_RELEASING_INSTANCES, role.getReleasing());
cd.setRoleOpt(rolename, ROLE_FAILED_INSTANCES, role.getFailed());
cd.setRoleOpt(rolename, ROLE_FAILED_STARTING_INSTANCES, role.getStartFailed());
cd.setRoleOpt(rolename, ROLE_FAILED_RECENTLY_INSTANCES, role.getFailedRecently());
cd.setRoleOpt(rolename, ROLE_NODE_FAILED_INSTANCES, role.getNodeFailed());
cd.setRoleOpt(rolename, ROLE_PREEMPTED_INSTANCES, role.getPreempted());
if (role.isAntiAffinePlacement()) {
cd.setRoleOpt(rolename, ROLE_PENDING_AA_INSTANCES, role.getPendingAntiAffineRequests());
}
Map<String, Integer> stats = role.buildStatistics();
cd.statistics.put(rolename, stats);
}
Map<String, Integer> sliderstats = getLiveStatistics();
cd.statistics.put(SliderKeys.COMPONENT_AM, sliderstats);
// liveness
cd.liveness = getApplicationLivenessInformation();
return cd;
}
/**
* get application liveness information
* @return a snapshot of the current liveness information
*/
public ApplicationLivenessInformation getApplicationLivenessInformation() {
ApplicationLivenessInformation li = new ApplicationLivenessInformation();
RoleStatistics stats = getRoleStatistics();
int outstanding = (int)(stats.desired - stats.actual);
li.requestsOutstanding = outstanding;
li.allRequestsSatisfied = outstanding <= 0;
li.activeRequests = (int)stats.requested;
return li;
}
/**
* Get the live statistics map
* @return a map of statistics values, defined in the {@link StatusKeys}
* keylist.
*/
protected Map<String, Integer> getLiveStatistics() {
Map<String, Integer> sliderstats = new HashMap<>();
sliderstats.put(StatusKeys.STATISTICS_CONTAINERS_LIVE,
liveNodes.size());
sliderstats.put(StatusKeys.STATISTICS_CONTAINERS_COMPLETED,
completedContainerCount.intValue());
sliderstats.put(StatusKeys.STATISTICS_CONTAINERS_FAILED,
failedContainerCount.intValue());
sliderstats.put(StatusKeys.STATISTICS_CONTAINERS_STARTED,
startedContainers.intValue());
sliderstats.put(StatusKeys.STATISTICS_CONTAINERS_START_FAILED,
startFailedContainerCount.intValue());
sliderstats.put(StatusKeys.STATISTICS_CONTAINERS_SURPLUS,
surplusContainers.intValue());
sliderstats.put(StatusKeys.STATISTICS_CONTAINERS_UNKNOWN_COMPLETED,
completionOfUnknownContainerEvent.get());
return sliderstats;
}
/**
* Get the aggregate statistics across all roles
* @return role statistics
*/
public RoleStatistics getRoleStatistics() {
RoleStatistics stats = new RoleStatistics();
for (RoleStatus role : getRoleStatusMap().values()) {
stats.add(role.getStatistics());
}
return stats;
}
/**
* Get a snapshot of component information.
* <p>
* This does <i>not</i> include any container list, which
* is more expensive to create.
* @return a map of current role status values.
*/
public Map<String, ComponentInformation> getComponentInfoSnapshot() {
Map<Integer, RoleStatus> statusMap = getRoleStatusMap();
Map<String, ComponentInformation> results = new HashMap<>(
statusMap.size());
for (RoleStatus status : statusMap.values()) {
String name = status.getName();
ComponentInformation info = status.serialize();
results.put(name, info);
}
return results;
}
/**
* Look at where the current node state is -and whether it should be changed
*/
public synchronized List<AbstractRMOperation> reviewRequestAndReleaseNodes()
throws SliderInternalStateException, TriggerClusterTeardownException {
log.debug("in reviewRequestAndReleaseNodes()");
List<AbstractRMOperation> allOperations = new ArrayList<>();
for (RoleStatus roleStatus : getRoleStatusMap().values()) {
if (!roleStatus.isExcludeFromFlexing()) {
List<AbstractRMOperation> operations = reviewOneRole(roleStatus);
allOperations.addAll(operations);
}
}
return allOperations;
}
/**
* Check the "recent" failure threshold for a role
* @param role role to examine
* @throws TriggerClusterTeardownException if the role
* has failed too many times
*/
private void checkFailureThreshold(RoleStatus role)
throws TriggerClusterTeardownException {
long failures = role.getFailedRecently();
int threshold = getFailureThresholdForRole(role);
if (log.isDebugEnabled() && failures > 0) {
log.debug("Failure count of component: {}: {}, threshold={}",
role.getName(), failures, threshold);
}
if (failures > threshold) {
throw new TriggerClusterTeardownException(
SliderExitCodes.EXIT_DEPLOYMENT_FAILED,
FinalApplicationStatus.FAILED, ErrorStrings.E_UNSTABLE_CLUSTER +
" - failed with component %s failed 'recently' %d times (%d in startup);" +
" threshold is %d - last failure: %s",
role.getName(),
role.getFailed(),
role.getStartFailed(),
threshold,
role.getFailureMessage());
}
}
/**
* Get the failure threshold for a specific role, falling back to
* the global one if not
* @param roleStatus role
* @return the threshold for failures
*/
private int getFailureThresholdForRole(RoleStatus roleStatus) {
ConfTreeOperations resources =
instanceDefinition.getResourceOperations();
return resources.getComponentOptInt(roleStatus.getName(),
CONTAINER_FAILURE_THRESHOLD,
failureThreshold);
}
/**
* Get the node failure threshold for a specific role, falling back to
* the global one if not
* @param roleName role name
* @return the threshold for failures
*/
private int getNodeFailureThresholdForRole(String roleName) {
ConfTreeOperations resources =
instanceDefinition.getResourceOperations();
return resources.getComponentOptInt(roleName,
NODE_FAILURE_THRESHOLD,
nodeFailureThreshold);
}
/**
* Reset the "recent" failure counts of all roles
*/
public void resetFailureCounts() {
for (RoleStatus roleStatus : getRoleStatusMap().values()) {
long failed = roleStatus.resetFailedRecently();
log.info("Resetting failure count of {}; was {}",
roleStatus.getName(),
failed);
}
roleHistory.resetFailedRecently();
}
/**
* Escalate operation as triggered by external timer.
* @return a (usually empty) list of cancel/request operations.
*/
public List<AbstractRMOperation> escalateOutstandingRequests() {
return roleHistory.escalateOutstandingRequests();
}
/**
* Cancel any outstanding AA Requests, building up the list of ops to
* cancel, removing them from RoleHistory structures and the RoleStatus
* entries.
* @return a (usually empty) list of cancel/request operations.
*/
public synchronized List<AbstractRMOperation> cancelOutstandingAARequests() {
// get the list of cancel operations
List<AbstractRMOperation> operations = roleHistory.cancelOutstandingAARequests();
for (RoleStatus roleStatus : roleStatusMap.values()) {
if (roleStatus.isAARequestOutstanding()) {
log.info("Cancelling outstanding AA request for {}", roleStatus);
roleStatus.cancelOutstandingAARequest();
}
}
return operations;
}
/**
* Look at the allocation status of one role, and trigger add/release
* actions if the number of desired role instances doesn't equal
* (actual + pending).
* <p>
* MUST be executed from within a synchronized method
* <p>
* @param role role
* @return a list of operations
* @throws SliderInternalStateException if the operation reveals that
* the internal state of the application is inconsistent.
*/
@SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
private List<AbstractRMOperation> reviewOneRole(RoleStatus role)
throws SliderInternalStateException, TriggerClusterTeardownException {
List<AbstractRMOperation> operations = new ArrayList<>();
long delta;
long expected;
String name = role.getName();
synchronized (role) {
delta = role.getDelta();
expected = role.getDesired();
}
log.info("Reviewing {} : ", role);
log.debug("Expected {}, Delta: {}", expected, delta);
checkFailureThreshold(role);
if (expected < 0 ) {
// negative value: fail
throw new TriggerClusterTeardownException(
SliderExitCodes.EXIT_DEPLOYMENT_FAILED,
FinalApplicationStatus.FAILED,
"Negative component count of %d desired for component %s",
expected, role);
}
if (delta > 0) {
// more workers needed than we have -ask for more
log.info("{}: Asking for {} more nodes(s) for a total of {} ", name, delta, expected);
if (role.isAntiAffinePlacement()) {
long pending = delta;
if (roleHistory.canPlaceAANodes()) {
// build one only if there is none outstanding, the role history knows
// enough about the cluster to ask, and there is somewhere to place
// the node
if (!role.isAARequestOutstanding()) {
// no outstanding AA; try to place things
AMRMClient.ContainerRequest request = createAAContainerRequest(role);
if (request != null) {
pending--;
log.info("Starting an anti-affine request sequence for {} nodes; pending={}",
delta, pending);
addContainerRequest(operations, request);
} else {
log.info("No location for anti-affine request");
}
}
} else {
log.warn("Awaiting node map before generating anti-affinity requests");
}
log.info("Setting pending to {}", pending);
role.setPendingAntiAffineRequests(pending);
} else {
for (int i = 0; i < delta; i++) {
//get the role history to select a suitable node, if available
addContainerRequest(operations, createContainerRequest(role));
}
}
} else if (delta < 0) {
log.info("{}: Asking for {} fewer node(s) for a total of {}", name,
-delta,
expected);
// reduce the number expected (i.e. subtract the delta)
long excess = -delta;
// how many requests are outstanding? for AA roles, this includes pending
long outstandingRequests = role.getRequested() + role.getPendingAntiAffineRequests();
if (outstandingRequests > 0) {
// outstanding requests.
int toCancel = (int)Math.min(outstandingRequests, excess);
// Delegate to Role History
List<AbstractRMOperation> cancellations = roleHistory.cancelRequestsForRole(role, toCancel);
log.info("Found {} outstanding requests to cancel", cancellations.size());
operations.addAll(cancellations);
if (toCancel != cancellations.size()) {
log.error("Tracking of outstanding requests is not in sync with the summary statistics:" +
" expected to be able to cancel {} requests, but got {}",
toCancel, cancellations.size());
}
role.cancel(toCancel);
excess -= toCancel;
assert excess >= 0 : "Attempted to cancel too many requests";
log.info("Submitted {} cancellations, leaving {} to release",
toCancel, excess);
if (excess == 0) {
log.info("After cancelling requests, application is now at desired size");
}
}
// after the cancellation there may be no excess
if (excess > 0) {
// there's an excess, so more to cancel
// get the nodes to release
int roleId = role.getKey();
// enum all active nodes that aren't being released
List<RoleInstance> containersToRelease = enumNodesWithRoleId(roleId, true);
if (containersToRelease.isEmpty()) {
log.info("No containers for component {}", roleId);
}
// filter out all release-in-progress nodes
ListIterator<RoleInstance> li = containersToRelease.listIterator();
while (li.hasNext()) {
RoleInstance next = li.next();
if (next.released) {
li.remove();
}
}
// warn if the desired state can't be reached
int numberAvailableForRelease = containersToRelease.size();
if (numberAvailableForRelease < excess) {
log.warn("Not enough containers to release, have {} and need {} more",
numberAvailableForRelease,
excess - numberAvailableForRelease);
}
// ask the release selector to sort the targets
containersToRelease = containerReleaseSelector.sortCandidates(
roleId,
containersToRelease);
// crop to the excess
List<RoleInstance> finalCandidates = (excess < numberAvailableForRelease)
? containersToRelease.subList(0, (int)excess)
: containersToRelease;
// then build up a release operation, logging each container as released
for (RoleInstance possible : finalCandidates) {
log.info("Targeting for release: {}", possible);
containerReleaseSubmitted(possible.container);
operations.add(new ContainerReleaseOperation(possible.getId()));
}
}
} else {
// actual + requested == desired
// there's a special case here: clear all pending AA requests
if (role.getPendingAntiAffineRequests() > 0) {
log.debug("Clearing outstanding pending AA requests");
role.setPendingAntiAffineRequests(0);
}
}
// there's now a list of operations to execute
log.debug("operations scheduled: {}; updated role: {}", operations.size(), role);
return operations;
}
/**
* Add a container request if the request is non-null
* @param operations operations to add the entry to
* @param containerAsk what to ask for
* @return true if a request was added
*/
private boolean addContainerRequest(List<AbstractRMOperation> operations,
AMRMClient.ContainerRequest containerAsk) {
if (containerAsk != null) {
log.info("Container ask is {} and label = {}", containerAsk,
containerAsk.getNodeLabelExpression());
int askMemory = containerAsk.getCapability().getMemory();
if (askMemory > this.containerMaxMemory) {
log.warn("Memory requested: {} > max of {}", askMemory, containerMaxMemory);
}
operations.add(new ContainerRequestOperation(containerAsk));
return true;
} else {
return false;
}
}
/**
* Releases a container based on container id
* @param containerId
* @return
* @throws SliderInternalStateException
*/
public List<AbstractRMOperation> releaseContainer(ContainerId containerId)
throws SliderInternalStateException {
List<AbstractRMOperation> operations = new ArrayList<AbstractRMOperation>();
List<RoleInstance> activeRoleInstances = cloneOwnedContainerList();
for (RoleInstance role : activeRoleInstances) {
if (role.container.getId().equals(containerId)) {
containerReleaseSubmitted(role.container);
operations.add(new ContainerReleaseOperation(role.getId()));
}
}
return operations;
}
/**
* Find a container running on a specific host -looking
* into the node ID to determine this.
*
* @param node node
* @param roleId role the container must be in
* @return a container or null if there are no containers on this host
* that can be released.
*/
private RoleInstance findRoleInstanceOnHost(NodeInstance node, int roleId) {
Collection<RoleInstance> targets = cloneOwnedContainerList();
String hostname = node.hostname;
for (RoleInstance ri : targets) {
if (hostname.equals(RoleHistoryUtils.hostnameOf(ri.container))
&& ri.roleId == roleId
&& containersBeingReleased.get(ri.getContainerId()) == null) {
return ri;
}
}
return null;
}
/**
* Release all containers.
* @return a list of operations to execute
*/
public synchronized List<AbstractRMOperation> releaseAllContainers() {
Collection<RoleInstance> targets = cloneOwnedContainerList();
log.info("Releasing {} containers", targets.size());
List<AbstractRMOperation> operations =
new ArrayList<>(targets.size());
for (RoleInstance instance : targets) {
if (instance.roleId == SliderKeys.ROLE_AM_PRIORITY_INDEX) {
// don't worry about the AM
continue;
}
Container possible = instance.container;
ContainerId id = possible.getId();
if (!instance.released) {
String url = getLogsURLForContainer(possible);
log.info("Releasing container. Log: " + url);
try {
containerReleaseSubmitted(possible);
} catch (SliderInternalStateException e) {
log.warn("when releasing container {} :", possible, e);
}
operations.add(new ContainerReleaseOperation(id));
}
}
return operations;
}
/**
* Event handler for allocated containers: builds up the lists
* of assignment actions (what to run where), and possibly
* a list of operations to perform
* @param allocatedContainers the containers allocated
* @param assignments the assignments of roles to containers
* @param operations any allocation or release operations
*/
public synchronized void onContainersAllocated(List<Container> allocatedContainers,
List<ContainerAssignment> assignments,
List<AbstractRMOperation> operations) {
assignments.clear();
operations.clear();
List<Container> ordered = roleHistory.prepareAllocationList(allocatedContainers);
log.debug("onContainersAllocated(): Total containers allocated = {}", ordered.size());
for (Container container : ordered) {
final NodeId nodeId = container.getNodeId();
String containerHostInfo = nodeId.getHost() + ":" + nodeId.getPort();
//get the role
final ContainerId cid = container.getId();
final RoleStatus role = lookupRoleStatus(container);
//dec requested count
role.decRequested();
//inc allocated count -this may need to be dropped in a moment,
// but us needed to update the logic below
final long allocated = role.incActual();
final long desired = role.getDesired();
final String roleName = role.getName();
final ContainerAllocationResults allocation =
roleHistory.onContainerAllocated(container, desired, allocated);
final ContainerAllocationOutcome outcome = allocation.outcome;
// add all requests to the operations list
operations.addAll(allocation.operations);
//look for condition where we get more back than we asked
if (allocated > desired) {
log.info("Discarding surplus {} container {} on {}", roleName, cid, containerHostInfo);
operations.add(new ContainerReleaseOperation(cid));
//register as a surplus node
surplusNodes.add(cid);
surplusContainers.inc();
//and, as we aren't binding it to role, dec that role's actual count
role.decActual();
} else {
// Allocation being accepted -so decrement the number of outstanding requests
decOutstandingContainerRequests();
log.info("Assigning role {} to container" +
" {}," +
" on {}:{},",
roleName,
cid,
nodeId.getHost(),
nodeId.getPort());
assignments.add(new ContainerAssignment(container, role, outcome));
//add to the history
roleHistory.onContainerAssigned(container);
// now for AA requests, add some more
if (role.isAntiAffinePlacement()) {
role.completeOutstandingAARequest();
// check invariants. The new node must become unavailable.
NodeInstance node = roleHistory.getOrCreateNodeInstance(container);
if (node.canHost(role.getKey(), role.getLabelExpression())) {
log.error("Assigned node still declares as available {}", node.toFullString() );
}
if (role.getPendingAntiAffineRequests() > 0) {
// still an outstanding AA request: need to issue a new one.
log.info("Asking for next container for AA role {}", roleName);
if (!addContainerRequest(operations, createAAContainerRequest(role))) {
log.info("No capacity in cluster for new requests");
} else {
role.decPendingAntiAffineRequests();
}
log.debug("Current AA role status {}", role);
} else {
log.info("AA request sequence completed for role {}", role);
}
}
}
}
}
/**
* Get diagnostics info about containers
*/
public String getContainerDiagnosticInfo() {
StringBuilder builder = new StringBuilder();
for (RoleStatus roleStatus : getRoleStatusMap().values()) {
builder.append(roleStatus).append('\n');
}
return builder.toString();
}
/**
* Event handler for the list of active containers on restart.
* Sets the info key {@link StatusKeys#INFO_CONTAINERS_AM_RESTART}
* to the size of the list passed down (and does not set it if none were)
* @param liveContainers the containers allocated
* @return true if a rebuild took place (even if size 0)
* @throws RuntimeException on problems
*/
private boolean rebuildModelFromRestart(List<Container> liveContainers)
throws BadClusterStateException {
if (liveContainers == null) {
return false;
}
for (Container container : liveContainers) {
addRestartedContainer(container);
}
clusterStatus.setInfo(StatusKeys.INFO_CONTAINERS_AM_RESTART,
Integer.toString(liveContainers.size()));
return true;
}
/**
* Add a restarted container by walking it through the create/submit/start
* lifecycle, so building up the internal structures
* @param container container that was running before the AM restarted
* @throws RuntimeException on problems
*/
private void addRestartedContainer(Container container)
throws BadClusterStateException {
String containerHostInfo = container.getNodeId().getHost()
+ ":" +
container.getNodeId().getPort();
// get the container ID
ContainerId cid = container.getId();
// get the role
int roleId = ContainerPriority.extractRole(container);
RoleStatus role =
lookupRoleStatus(roleId);
// increment its count
role.incActual();
String roleName = role.getName();
log.info("Rebuilding container {} in role {} on {},",
cid,
roleName,
containerHostInfo);
//update app state internal structures and maps
RoleInstance instance = new RoleInstance(container);
instance.command = roleName;
instance.role = roleName;
instance.roleId = roleId;
instance.environment = new String[0];
instance.container = container;
instance.createTime = now();
instance.state = STATE_LIVE;
instance.appVersion = SliderKeys.APP_VERSION_UNKNOWN;
putOwnedContainer(cid, instance);
//role history gets told
roleHistory.onContainerAssigned(container);
// pretend the container has just had its start actions submitted
containerStartSubmitted(container, instance);
// now pretend it has just started
innerOnNodeManagerContainerStarted(cid);
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder("AppState{");
sb.append("applicationLive=").append(applicationLive);
sb.append(", live nodes=").append(liveNodes.size());
sb.append(", startedContainers=").append(startedContainers);
sb.append(", startFailedContainerCount=").append(startFailedContainerCount);
sb.append(", surplusContainers=").append(surplusContainers);
sb.append(", failedContainerCount=").append(failedContainerCount);
sb.append(", outstanding non-AA Container Requests=")
.append(outstandingContainerRequests);
sb.append('}');
return sb.toString();
}
/**
* Build map of role ID-> name
* @return
*/
public Map<Integer, String> buildNamingMap() {
Map<Integer, RoleStatus> statusMap = getRoleStatusMap();
Map<Integer, String> naming = new HashMap<>(statusMap.size());
for (Map.Entry<Integer, RoleStatus> entry : statusMap.entrySet()) {
naming.put(entry.getKey(), entry.getValue().getName());
}
return naming;
}
}