blob: 38c70f373f364b89522c677a15c19291736db260 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.slider.server.appmaster.state;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.slider.api.types.NodeInformation;
import org.apache.slider.common.tools.SliderUtils;
import org.apache.slider.core.exceptions.BadConfigException;
import org.apache.slider.providers.ProviderRole;
import org.apache.slider.server.appmaster.management.BoolMetric;
import org.apache.slider.server.appmaster.management.MetricsAndMonitoring;
import org.apache.slider.server.appmaster.management.Timestamp;
import org.apache.slider.server.appmaster.operations.AbstractRMOperation;
import org.apache.slider.server.appmaster.operations.UpdateBlacklistOperation;
import org.apache.slider.server.avro.LoadedRoleHistory;
import org.apache.slider.server.avro.NodeEntryRecord;
import org.apache.slider.server.avro.RoleHistoryHeader;
import org.apache.slider.server.avro.RoleHistoryWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
/**
* The Role History.
* <p>
* Synchronization policy: all public operations are synchronized.
* Protected methods are in place for testing -no guarantees are made.
* <p>
* Inner classes have no synchronization guarantees; they should be manipulated
* in these classes and not externally.
* <p>
* Note that as well as some methods marked visible for testing, there
* is the option for the time generator method, {@link #now()} to
* be overridden so that a repeatable time series can be used.
*
*/
public class RoleHistory {
protected static final Logger log =
LoggerFactory.getLogger(RoleHistory.class);
private final List<ProviderRole> providerRoles;
/** the roles in here are shared with App State */
private final Map<Integer, RoleStatus> roleStatusMap = new HashMap<>();
private final AbstractClusterServices recordFactory;
private long startTime;
/** Time when saved */
private final Timestamp saveTime = new Timestamp(0);
/** If the history was loaded, the time at which the history was saved.
* That is: the time the data was valid */
private final Timestamp thawedDataTime = new Timestamp(0);
private NodeMap nodemap;
private int roleSize;
private final BoolMetric dirty = new BoolMetric(false);
private FileSystem filesystem;
private Path historyPath;
private RoleHistoryWriter historyWriter = new RoleHistoryWriter();
/**
* When were the nodes updated in a {@link #onNodesUpdated(List)} call?
* If zero: never.
*/
private final Timestamp nodesUpdatedTime = new Timestamp(0);
private final BoolMetric nodeUpdateReceived = new BoolMetric(false);
private OutstandingRequestTracker outstandingRequests =
new OutstandingRequestTracker();
/**
* For each role, lists nodes that are available for data-local allocation,
* ordered by more recently released - to accelerate node selection.
* That is, they are "recently used nodes"
*/
private Map<Integer, LinkedList<NodeInstance>> recentNodes;
/**
* Instantiate
* @param roles initial role list
* @param recordFactory yarn record factory
* @throws BadConfigException
*/
public RoleHistory(Collection<RoleStatus> roles, AbstractClusterServices recordFactory) throws BadConfigException {
this.recordFactory = recordFactory;
roleSize = roles.size();
providerRoles = new ArrayList<>(roleSize);
for (RoleStatus role : roles) {
addNewRole(role);
}
reset();
}
/**
* Reset the variables -this does not adjust the fixed attributes
* of the history, but the nodemap and failed node map are cleared.
*/
protected synchronized void reset() throws BadConfigException {
nodemap = new NodeMap(roleSize);
resetAvailableNodeLists();
outstandingRequests = new OutstandingRequestTracker();
}
/**
* Register all metrics with the metrics infra
* @param metrics metrics
*/
public void register(MetricsAndMonitoring metrics) {
metrics.register(RoleHistory.class, dirty, "dirty");
metrics.register(RoleHistory.class, nodesUpdatedTime, "nodes-updated.time");
metrics.register(RoleHistory.class, nodeUpdateReceived, "nodes-updated.flag");
metrics.register(RoleHistory.class, thawedDataTime, "thawed.time");
metrics.register(RoleHistory.class, saveTime, "saved.time");
}
/**
* safety check: make sure the role is unique amongst
* the role stats...which is extended with the new role
* @param roleStatus role
* @throws ArrayIndexOutOfBoundsException
* @throws BadConfigException
*/
protected void putRole(RoleStatus roleStatus) throws BadConfigException {
int index = roleStatus.getKey();
if (index < 0) {
throw new BadConfigException("Provider " + roleStatus + " id is out of range");
}
if (roleStatusMap.get(index) != null) {
throw new BadConfigException(
roleStatus.toString() + " id duplicates that of " +
roleStatusMap.get(index));
}
roleStatusMap.put(index, roleStatus);
}
/**
* Add a new role
* @param roleStatus new role
*/
public void addNewRole(RoleStatus roleStatus) throws BadConfigException {
log.debug("Validating/adding new role to role history: {} ", roleStatus);
putRole(roleStatus);
this.providerRoles.add(roleStatus.getProviderRole());
}
/**
* Lookup a role by ID
* @param roleId role Id
* @return role or null if not found
*/
public ProviderRole lookupRole(int roleId) {
for (ProviderRole role : providerRoles) {
if (role.id == roleId) {
return role;
}
}
return null;
}
/**
* Clear the lists of available nodes
*/
private synchronized void resetAvailableNodeLists() {
recentNodes = new ConcurrentHashMap<>(roleSize);
}
/**
* Prepare the history for re-reading its state.
* <p>
* This intended for use by the RoleWriter logic.
* @throws BadConfigException if there is a problem rebuilding the state
*/
private void prepareForReading(RoleHistoryHeader header)
throws BadConfigException {
reset();
int roleCountInSource = header.getRoles();
if (roleCountInSource != roleSize) {
log.warn("Number of roles in source {}"
+" does not match the expected number of {}",
roleCountInSource,
roleSize);
}
//record when the data was loaded
setThawedDataTime(header.getSaved());
}
/**
* rebuild the placement history from the loaded role history
* @param loadedRoleHistory loaded history
* @return the number of entries discarded
* @throws BadConfigException if there is a problem rebuilding the state
*/
@VisibleForTesting
public synchronized int rebuild(LoadedRoleHistory loadedRoleHistory) throws BadConfigException {
RoleHistoryHeader header = loadedRoleHistory.getHeader();
prepareForReading(header);
int discarded = 0;
Long saved = header.getSaved();
for (NodeEntryRecord nodeEntryRecord : loadedRoleHistory.records) {
Integer roleId = nodeEntryRecord.getRole();
NodeEntry nodeEntry = new NodeEntry(roleId);
nodeEntry.setLastUsed(nodeEntryRecord.getLastUsed());
if (nodeEntryRecord.getActive()) {
//if active at the time of save, make the last used time the save time
nodeEntry.setLastUsed(saved);
}
String hostname = SliderUtils.sequenceToString(nodeEntryRecord.getHost());
ProviderRole providerRole = lookupRole(roleId);
if (providerRole == null) {
// discarding entry
log.info("Discarding history entry with unknown role: {} on host {}",
roleId, hostname);
discarded ++;
} else {
NodeInstance instance = getOrCreateNodeInstance(hostname);
instance.set(roleId, nodeEntry);
}
}
return discarded;
}
public synchronized long getStartTime() {
return startTime;
}
public synchronized long getSaveTime() {
return saveTime.get();
}
public long getThawedDataTime() {
return thawedDataTime.get();
}
public void setThawedDataTime(long thawedDataTime) {
this.thawedDataTime.set(thawedDataTime);
}
public synchronized int getRoleSize() {
return roleSize;
}
/**
* Get the total size of the cluster -the number of NodeInstances
* @return a count
*/
public synchronized int getClusterSize() {
return nodemap.size();
}
public synchronized boolean isDirty() {
return dirty.get();
}
public synchronized void setDirty(boolean dirty) {
this.dirty.set(dirty);
}
/**
* Tell the history that it has been saved; marks itself as clean
* @param timestamp timestamp -updates the savetime field
*/
public synchronized void saved(long timestamp) {
setDirty(false);
saveTime.set(timestamp);
}
/**
* Get a clone of the nodemap.
* The instances inside are not cloned
* @return the map
*/
public synchronized NodeMap cloneNodemap() {
return (NodeMap) nodemap.clone();
}
/**
* Get snapshot of the node map
* @return a snapshot of the current node state
* @param naming naming map of priority to enty name; entries must be unique.
* It's OK to be incomplete, for those the list falls back to numbers.
*/
public synchronized Map<String, NodeInformation> getNodeInformationSnapshot(
Map<Integer, String> naming) {
Map<String, NodeInformation> result = new HashMap<>(nodemap.size());
for (Map.Entry<String, NodeInstance> entry : nodemap.entrySet()) {
result.put(entry.getKey(), entry.getValue().serialize(naming));
}
return result;
}
/**
* Get the information on a node
* @param hostname hostname
* @param naming naming map of priority to enty name; entries must be unique.
* It's OK to be incomplete, for those the list falls back to numbers.
* @return the information about that host, or null if there is none
*/
public synchronized NodeInformation getNodeInformation(String hostname,
Map<Integer, String> naming) {
NodeInstance nodeInstance = nodemap.get(hostname);
return nodeInstance != null ? nodeInstance.serialize(naming) : null;
}
/**
* Get the node instance for the specific node -creating it if needed
* @param hostname node address
* @return the instance
*/
public synchronized NodeInstance getOrCreateNodeInstance(String hostname) {
//convert to a string
return nodemap.getOrCreate(hostname);
}
/**
* Insert a list of nodes into the map; overwrite any with that name.
* This is a bulk operation for testing.
* Important: this does not update the available node lists, these
* must be rebuilt afterwards.
* @param nodes collection of nodes.
*/
@VisibleForTesting
public synchronized void insert(Collection<NodeInstance> nodes) {
nodemap.insert(nodes);
}
/**
* Get current time. overrideable for test subclasses
* @return current time in millis
*/
protected long now() {
return System.currentTimeMillis();
}
/**
* Mark ourselves as dirty
*/
public void touch() {
setDirty(true);
try {
saveHistoryIfDirty();
} catch (IOException e) {
log.warn("Failed to save history file ", e);
}
}
/**
* reset the failed recently counters
*/
public synchronized void resetFailedRecently() {
log.info("Resetting failure history");
nodemap.resetFailedRecently();
}
/**
* Get the path used for history files
* @return the directory used for history files
*/
public Path getHistoryPath() {
return historyPath;
}
/**
* Save the history to its location using the timestamp as part of
* the filename. The saveTime and dirty fields are updated
* @param time timestamp timestamp to use as the save time
* @return the path saved to
* @throws IOException IO problems
*/
@VisibleForTesting
public synchronized Path saveHistory(long time) throws IOException {
Path filename = historyWriter.createHistoryFilename(historyPath, time);
historyWriter.write(filesystem, filename, true, this, time);
saved(time);
return filename;
}
/**
* Save the history with the current timestamp if it is dirty;
* return the path saved to if this is the case
* @return the path or null if the history was not saved
* @throws IOException failed to save for some reason
*/
public synchronized Path saveHistoryIfDirty() throws IOException {
if (isDirty()) {
return saveHistory(now());
} else {
return null;
}
}
/**
* Start up
* @param fs filesystem
* @param historyDir path in FS for history
* @return true if the history was thawed
*/
public boolean onStart(FileSystem fs, Path historyDir) throws BadConfigException {
assert filesystem == null;
filesystem = fs;
historyPath = historyDir;
startTime = now();
//assume the history is being thawed; this will downgrade as appropriate
return onThaw();
}
/**
* Handler for bootstrap event: there was no history to thaw
*/
public void onBootstrap() {
log.debug("Role history bootstrapped");
}
/**
* Handle the start process <i>after the history has been rebuilt</i>,
* and after any gc/purge
*/
public synchronized boolean onThaw() throws BadConfigException {
assert filesystem != null;
assert historyPath != null;
boolean thawSuccessful = false;
//load in files from data dir
LoadedRoleHistory loadedRoleHistory = null;
try {
loadedRoleHistory = historyWriter.loadFromHistoryDir(filesystem, historyPath);
} catch (IOException e) {
log.warn("Exception trying to load history from {}", historyPath, e);
}
if (loadedRoleHistory != null) {
rebuild(loadedRoleHistory);
thawSuccessful = true;
Path loadPath = loadedRoleHistory.getPath();
log.debug("loaded history from {}", loadPath);
// delete any old entries
try {
int count = historyWriter.purgeOlderHistoryEntries(filesystem, loadPath);
log.debug("Deleted {} old history entries", count);
} catch (IOException e) {
log.info("Ignoring exception raised while trying to delete old entries",
e);
}
//start is then completed
buildRecentNodeLists();
} else {
//fallback to bootstrap procedure
onBootstrap();
}
return thawSuccessful;
}
/**
* (After the start), rebuild the availability data structures
*/
@VisibleForTesting
public synchronized void buildRecentNodeLists() {
resetAvailableNodeLists();
// build the list of available nodes
for (Map.Entry<String, NodeInstance> entry : nodemap.entrySet()) {
NodeInstance ni = entry.getValue();
for (int i = 0; i < roleSize; i++) {
NodeEntry nodeEntry = ni.get(i);
if (nodeEntry != null && nodeEntry.isAvailable()) {
log.debug("Adding {} for role {}", ni, i);
listRecentNodesForRoleId(i).add(ni);
}
}
}
// sort the resulting arrays
for (int i = 0; i < roleSize; i++) {
sortRecentNodeList(i);
}
}
/**
* Get the nodes for an ID -may be null
* @param id role ID
* @return potentially null list
*/
@VisibleForTesting
public List<NodeInstance> getRecentNodesForRoleId(int id) {
return recentNodes.get(id);
}
/**
* Get a possibly empty list of suggested nodes for a role.
* @param id role ID
* @return list
*/
private LinkedList<NodeInstance> listRecentNodesForRoleId(int id) {
LinkedList<NodeInstance> instances = recentNodes.get(id);
if (instances == null) {
synchronized (this) {
// recheck in the synchronized block and recreate
if (recentNodes.get(id) == null) {
recentNodes.put(id, new LinkedList<NodeInstance>());
}
instances = recentNodes.get(id);
}
}
return instances;
}
/**
* Sort a the recent node list for a single role
* @param role role to sort
*/
private void sortRecentNodeList(int role) {
List<NodeInstance> nodesForRoleId = getRecentNodesForRoleId(role);
if (nodesForRoleId != null) {
Collections.sort(nodesForRoleId, new NodeInstance.Preferred(role));
}
}
public synchronized UpdateBlacklistOperation updateBlacklist(
Collection<RoleStatus> roleStatuses) {
List<String> blacklistAdditions = new ArrayList<>();
List<String> blacklistRemovals = new ArrayList<>();
for (Entry<String, NodeInstance> nodeInstanceEntry : nodemap.entrySet()) {
boolean shouldBeBlacklisted = false;
String nodeHost = nodeInstanceEntry.getKey();
NodeInstance nodeInstance = nodeInstanceEntry.getValue();
for (RoleStatus roleStatus : roleStatuses) {
if (nodeInstance.exceedsFailureThreshold(roleStatus)) {
shouldBeBlacklisted = true;
break;
}
}
if (shouldBeBlacklisted) {
if (!nodeInstance.isBlacklisted()) {
blacklistAdditions.add(nodeHost);
nodeInstance.setBlacklisted(true);
}
} else {
if (nodeInstance.isBlacklisted()) {
blacklistRemovals.add(nodeHost);
nodeInstance.setBlacklisted(false);
}
}
}
if (blacklistAdditions.isEmpty() && blacklistRemovals.isEmpty()) {
return null;
}
return new UpdateBlacklistOperation(blacklistAdditions, blacklistRemovals);
}
/**
* Find a node for use
* @param role role
* @return the instance, or null for none
*/
@VisibleForTesting
public synchronized NodeInstance findRecentNodeForNewInstance(RoleStatus role) {
if (!role.isPlacementDesired()) {
// no data locality policy
return null;
}
int roleId = role.getKey();
boolean strictPlacement = role.isStrictPlacement();
NodeInstance nodeInstance = null;
// Get the list of possible targets.
// This is a live list: changes here are preserved
List<NodeInstance> targets = getRecentNodesForRoleId(roleId);
if (targets == null) {
// nothing to allocate on
return null;
}
int cnt = targets.size();
log.debug("There are {} node(s) to consider for {}", cnt, role.getName());
for (int i = 0; i < cnt && nodeInstance == null; i++) {
NodeInstance candidate = targets.get(i);
if (candidate.getActiveRoleInstances(roleId) == 0) {
// no active instances: check failure statistics
if (strictPlacement
|| (candidate.isOnline() && !candidate.exceedsFailureThreshold(role))) {
targets.remove(i);
// exit criteria for loop is now met
nodeInstance = candidate;
} else {
// too many failures for this node
log.info("Recent node failures is higher than threshold {}. Not requesting host {}",
role.getNodeFailureThreshold(), candidate.hostname);
}
}
}
if (nodeInstance == null) {
log.info("No node found for {}", role.getName());
}
return nodeInstance;
}
/**
* Find a node for use
* @param role role
* @return the instance, or null for none
*/
@VisibleForTesting
public synchronized List<NodeInstance> findNodeForNewAAInstance(RoleStatus role) {
// all nodes that are live and can host the role; no attempt to exclude ones
// considered failing
return nodemap.findAllNodesForRole(role.getKey(), role.getLabelExpression());
}
/**
* Request an instance on a given node.
* An outstanding request is created & tracked, with the
* relevant node entry for that role updated.
*<p>
* The role status entries will also be tracked
* <p>
* Returns the request that is now being tracked.
* If the node instance is not null, it's details about the role is incremented
*
* @param node node to target or null for "any"
* @param role role to request
* @return the request
*/
public synchronized OutstandingRequest requestInstanceOnNode(
NodeInstance node, RoleStatus role, Resource resource) {
OutstandingRequest outstanding = outstandingRequests.newRequest(node, role.getKey());
outstanding.buildContainerRequest(resource, role, now());
return outstanding;
}
/**
* Find a node for a role and request an instance on that (or a location-less
* instance)
* @param role role status
* @return a request ready to go, or null if this is an AA request and no
* location can be found.
*/
public synchronized OutstandingRequest requestContainerForRole(RoleStatus role) {
if (role.isAntiAffinePlacement()) {
return requestContainerForAARole(role);
} else {
Resource resource = recordFactory.newResource();
role.copyResourceRequirements(resource);
NodeInstance node = findRecentNodeForNewInstance(role);
return requestInstanceOnNode(node, role, resource);
}
}
/**
* Find a node for an AA role and request an instance on that (or a location-less
* instance)
* @param role role status
* @return a request ready to go, or null if no location can be found.
*/
public synchronized OutstandingRequest requestContainerForAARole(RoleStatus role) {
List<NodeInstance> nodes = findNodeForNewAAInstance(role);
if (!nodes.isEmpty()) {
OutstandingRequest outstanding = outstandingRequests.newAARequest(
role.getKey(), nodes, role.getLabelExpression());
Resource resource = recordFactory.newResource();
role.copyResourceRequirements(resource);
outstanding.buildContainerRequest(resource, role, now());
return outstanding;
} else {
log.warn("No suitable location for {}", role.getName());
return null;
}
}
/**
* Get the list of active nodes ... walks the node map so
* is {@code O(nodes)}
* @param role role index
* @return a possibly empty list of nodes with an instance of that node
*/
public synchronized List<NodeInstance> listActiveNodes(int role) {
return nodemap.listActiveNodes(role);
}
/**
* Get the node entry of a container
* @param container container to look up
* @return the entry
* @throws RuntimeException if the container has no hostname
*/
public NodeEntry getOrCreateNodeEntry(Container container) {
return getOrCreateNodeInstance(container).getOrCreate(container);
}
/**
* Get the node instance of a container -always returns something
* @param container container to look up
* @return a (possibly new) node instance
* @throws RuntimeException if the container has no hostname
*/
public synchronized NodeInstance getOrCreateNodeInstance(Container container) {
return nodemap.getOrCreate(RoleHistoryUtils.hostnameOf(container));
}
/**
* Get the node instance of a host if defined
* @param hostname hostname to look up
* @return a node instance or null
* @throws RuntimeException if the container has no hostname
*/
public synchronized NodeInstance getExistingNodeInstance(String hostname) {
return nodemap.get(hostname);
}
/**
* Get the node instance of a container <i>if there's an entry in the history</i>
* @param container container to look up
* @return a node instance or null
* @throws RuntimeException if the container has no hostname
*/
public synchronized NodeInstance getExistingNodeInstance(Container container) {
return nodemap.get(RoleHistoryUtils.hostnameOf(container));
}
/**
* Perform any pre-allocation operations on the list of allocated containers
* based on knowledge of system state.
* Currently this places requested hosts ahead of unrequested ones.
* @param allocatedContainers list of allocated containers
* @return list of containers potentially reordered
*/
public synchronized List<Container> prepareAllocationList(List<Container> allocatedContainers) {
//partition into requested and unrequested
List<Container> requested =
new ArrayList<>(allocatedContainers.size());
List<Container> unrequested =
new ArrayList<>(allocatedContainers.size());
outstandingRequests.partitionRequests(this, allocatedContainers, requested, unrequested);
//give the unrequested ones lower priority
requested.addAll(unrequested);
return requested;
}
/**
* A container has been allocated on a node -update the data structures
* @param container container
* @param desiredCount desired #of instances
* @param actualCount current count of instances
* @return The allocation outcome
*/
public synchronized ContainerAllocationResults onContainerAllocated(Container container,
long desiredCount,
long actualCount) {
int role = ContainerPriority.extractRole(container);
String hostname = RoleHistoryUtils.hostnameOf(container);
List<NodeInstance> nodeInstances = listRecentNodesForRoleId(role);
ContainerAllocationResults outcome =
outstandingRequests.onContainerAllocated(role, hostname, container);
if (desiredCount <= actualCount) {
// all outstanding requests have been satisfied
// clear all the lists, so returning nodes to the available set
List<NodeInstance> hosts = outstandingRequests.resetOutstandingRequests(role);
if (!hosts.isEmpty()) {
//add the list
log.info("Adding {} hosts for role {}", hosts.size(), role);
nodeInstances.addAll(hosts);
sortRecentNodeList(role);
}
}
return outcome;
}
/**
* A container has been assigned to a role instance on a node -update the data structures
* @param container container
*/
public void onContainerAssigned(Container container) {
NodeInstance node = getOrCreateNodeInstance(container);
NodeEntry nodeEntry = node.getOrCreate(container);
nodeEntry.onStarting();
log.debug("Node {} has updated NodeEntry {}", node, nodeEntry);
}
/**
* Event: a container start has been submitted
* @param container container being started
* @param instance instance bound to the container
*/
public void onContainerStartSubmitted(Container container,
RoleInstance instance) {
// no actions here
}
/**
* Container start event
* @param container container that just started
*/
public void onContainerStarted(Container container) {
NodeEntry nodeEntry = getOrCreateNodeEntry(container);
nodeEntry.onStartCompleted();
touch();
}
/**
* A container failed to start: update the node entry state
* and return the container to the queue
* @param container container that failed
* @return true if the node was queued
*/
public boolean onNodeManagerContainerStartFailed(Container container) {
return markContainerFinished(container, false, true, ContainerOutcome.Failed);
}
/**
* Does the RoleHistory have enough information about the YARN cluster
* to start placing AA requests? That is: has it the node map and
* any label information needed?
* @return true if the caller can start requesting AA nodes
*/
public boolean canPlaceAANodes() {
return nodeUpdateReceived.get();
}
/**
* Get the last time the nodes were updated from YARN
* @return the update time or zero if never updated.
*/
public long getNodesUpdatedTime() {
return nodesUpdatedTime.get();
}
/**
* Update failedNodes and nodemap based on the node state
*
* @param updatedNodes list of updated nodes
* @return true if a review should be triggered.
*/
public synchronized boolean onNodesUpdated(List<NodeReport> updatedNodes) {
log.debug("Updating {} nodes", updatedNodes.size());
nodesUpdatedTime.set(now());
nodeUpdateReceived.set(true);
int printed = 0;
boolean triggerReview = false;
for (NodeReport updatedNode : updatedNodes) {
String hostname = updatedNode.getNodeId() == null
? ""
: updatedNode.getNodeId().getHost();
NodeState nodeState = updatedNode.getNodeState();
if (hostname.isEmpty() || nodeState == null) {
log.warn("Ignoring incomplete update");
continue;
}
if (log.isDebugEnabled() && printed++ < 10) {
// log the first few, but avoid overloading the logs for a full cluster
// update
log.debug("Node \"{}\" is in state {}", hostname, nodeState);
}
// update the node; this also creates an instance if needed
boolean updated = nodemap.updateNode(hostname, updatedNode);
triggerReview |= updated;
}
return triggerReview;
}
/**
* A container release request was issued
* @param container container submitted
*/
public void onContainerReleaseSubmitted(Container container) {
NodeEntry nodeEntry = getOrCreateNodeEntry(container);
nodeEntry.release();
}
/**
* App state notified of a container completed
* @param container completed container
* @return true if the node was queued
*/
public boolean onReleaseCompleted(Container container) {
return markContainerFinished(container, true, false, ContainerOutcome.Failed);
}
/**
* App state notified of a container completed -but as
* it wasn't being released it is marked as failed
*
* @param container completed container
* @param shortLived was the container short lived?
* @param outcome
* @return true if the node is considered available for work
*/
public boolean onFailedContainer(Container container,
boolean shortLived,
ContainerOutcome outcome) {
return markContainerFinished(container, false, shortLived, outcome);
}
/**
* Mark a container finished; if it was released then that is treated
* differently. history is {@code touch()}-ed
*
*
* @param container completed container
* @param wasReleased was the container released?
* @param shortLived was the container short lived?
* @param outcome
* @return true if the node was queued
*/
protected synchronized boolean markContainerFinished(Container container,
boolean wasReleased,
boolean shortLived,
ContainerOutcome outcome) {
NodeEntry nodeEntry = getOrCreateNodeEntry(container);
log.info("Finished container for node {}, released={}, shortlived={}",
nodeEntry.rolePriority, wasReleased, shortLived);
boolean available;
if (shortLived) {
nodeEntry.onStartFailed();
available = false;
} else {
available = nodeEntry.containerCompleted(wasReleased, outcome);
maybeQueueNodeForWork(container, nodeEntry, available);
}
touch();
return available;
}
/**
* If the node is marked as available; queue it for assignments.
* Unsynced: requires caller to be in a sync block.
* @param container completed container
* @param nodeEntry node
* @param available available flag
* @return true if the node was queued
*/
private boolean maybeQueueNodeForWork(Container container,
NodeEntry nodeEntry,
boolean available) {
if (available) {
//node is free
nodeEntry.setLastUsed(now());
NodeInstance ni = getOrCreateNodeInstance(container);
int roleId = ContainerPriority.extractRole(container);
log.debug("Node {} is now available for role id {}", ni, roleId);
listRecentNodesForRoleId(roleId).addFirst(ni);
}
return available;
}
/**
* Print the history to the log. This is for testing and diagnostics
*/
public synchronized void dump() {
for (ProviderRole role : providerRoles) {
log.info(role.toString());
List<NodeInstance> instances = listRecentNodesForRoleId(role.id);
log.info(" available: " + instances.size()
+ " " + SliderUtils.joinWithInnerSeparator(" ", instances));
}
log.info("Nodes in Cluster: {}", getClusterSize());
for (NodeInstance node : nodemap.values()) {
log.info(node.toFullString());
}
}
/**
* Build the mapping entry for persisting to the role history
* @return a mapping object
*/
public synchronized Map<CharSequence, Integer> buildMappingForHistoryFile() {
Map<CharSequence, Integer> mapping = new HashMap<>(getRoleSize());
for (ProviderRole role : providerRoles) {
mapping.put(role.name, role.id);
}
return mapping;
}
/**
* Get a clone of the available list
* @param role role index
* @return a clone of the list
*/
@VisibleForTesting
public List<NodeInstance> cloneRecentNodeList(int role) {
return new LinkedList<>(listRecentNodesForRoleId(role));
}
/**
* Get a snapshot of the outstanding placed request list
* @return a list of the requests outstanding at the time of requesting
*/
@VisibleForTesting
public List<OutstandingRequest> listPlacedRequests() {
return outstandingRequests.listPlacedRequests();
}
/**
* Get a snapshot of the outstanding placed request list
* @return a list of the requests outstanding at the time of requesting
*/
@VisibleForTesting
public List<OutstandingRequest> listOpenRequests() {
return outstandingRequests.listOpenRequests();
}
/**
* Escalate operation as triggered by external timer.
* @return a (usually empty) list of cancel/request operations.
*/
public synchronized List<AbstractRMOperation> escalateOutstandingRequests() {
return outstandingRequests.escalateOutstandingRequests(now());
}
/**
* Escalate operation as triggered by external timer.
* @return a (usually empty) list of cancel/request operations.
*/
public List<AbstractRMOperation> cancelOutstandingAARequests() {
return outstandingRequests.cancelOutstandingAARequests();
}
/**
* Cancel a number of outstanding requests for a role -that is, not
* actual containers, just requests for new ones.
* @param role role
* @param toCancel number to cancel
* @return a list of cancellable operations.
*/
public List<AbstractRMOperation> cancelRequestsForRole(RoleStatus role, int toCancel) {
return role.isAntiAffinePlacement() ?
cancelRequestsForAARole(role, toCancel)
: cancelRequestsForSimpleRole(role, toCancel);
}
/**
* Build the list of requests to cancel from the outstanding list.
* @param role role
* @param toCancel number to cancel
* @return a list of cancellable operations.
*/
private synchronized List<AbstractRMOperation> cancelRequestsForSimpleRole(RoleStatus role, int toCancel) {
Preconditions.checkArgument(toCancel > 0,
"trying to cancel invalid number of requests: " + toCancel);
List<AbstractRMOperation> results = new ArrayList<>(toCancel);
// first scan through the unplaced request list to find all of a role
int roleId = role.getKey();
List<OutstandingRequest> requests =
outstandingRequests.extractOpenRequestsForRole(roleId, toCancel);
// are there any left?
int remaining = toCancel - requests.size();
// ask for some placed nodes
requests.addAll(outstandingRequests.extractPlacedRequestsForRole(roleId, remaining));
// build cancellations
for (OutstandingRequest request : requests) {
results.add(request.createCancelOperation());
}
return results;
}
/**
* Build the list of requests to cancel for an AA role. This reduces the number
* of outstanding pending requests first, then cancels any active request,
* before finally asking for any placed containers
* @param role role
* @param toCancel number to cancel
* @return a list of cancellable operations.
*/
private synchronized List<AbstractRMOperation> cancelRequestsForAARole(RoleStatus role, int toCancel) {
List<AbstractRMOperation> results = new ArrayList<>(toCancel);
int roleId = role.getKey();
List<OutstandingRequest> requests = new ArrayList<>(toCancel);
// there may be pending requests which can be cancelled here
long pending = role.getPendingAntiAffineRequests();
if (pending > 0) {
// there are some pending ones which can be cancelled first
long pendingToCancel = Math.min(pending, toCancel);
log.info("Cancelling {} pending AA allocations, leaving {}", toCancel,
pendingToCancel);
role.setPendingAntiAffineRequests(pending - pendingToCancel);
toCancel -= pendingToCancel;
}
if (toCancel > 0 && role.isAARequestOutstanding()) {
// not enough
log.info("Cancelling current AA request");
// find the single entry which may be running
requests = outstandingRequests.extractOpenRequestsForRole(roleId, toCancel);
role.cancelOutstandingAARequest();
toCancel--;
}
// ask for some excess nodes
if (toCancel > 0) {
requests.addAll(outstandingRequests.extractPlacedRequestsForRole(roleId, toCancel));
}
// build cancellations
for (OutstandingRequest request : requests) {
results.add(request.createCancelOperation());
}
return results;
}
}