blob: 33c344242c913a843ce635f3006490dea552f01f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.slider.server.appmaster.state;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.slider.common.tools.SliderUtils;
import org.apache.slider.core.exceptions.BadConfigException;
import org.apache.slider.providers.ProviderRole;
import org.apache.slider.server.avro.RoleHistoryHeader;
import org.apache.slider.server.avro.RoleHistoryWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
/**
* The Role History.
*
* Synchronization policy: all public operations are synchronized.
* Protected methods are in place for testing -no guarantees are made.
*
* Inner classes have no synchronization guarantees; they should be manipulated
* in these classes and not externally.
*
* Note that as well as some methods marked visible for testing, there
* is the option for the time generator method, {@link #now()} to
* be overridden so that a repeatable time series can be used.
*
*/
public class RoleHistory {
protected static final Logger log =
LoggerFactory.getLogger(RoleHistory.class);
private final List<ProviderRole> providerRoles;
private final Map<String, ProviderRole> providerRoleMap =
new HashMap<String, ProviderRole>();
private long startTime;
/**
* Time when saved
*/
private long saveTime;
/**
* If the history was loaded, the time at which the history was saved
*
*/
private long thawedDataTime;
private NodeMap nodemap;
private int roleSize;
private boolean dirty;
private FileSystem filesystem;
private Path historyPath;
private RoleHistoryWriter historyWriter = new RoleHistoryWriter();
private OutstandingRequestTracker outstandingRequests =
new OutstandingRequestTracker();
/**
* For each role, lists nodes that are available for data-local allocation,
ordered by more recently released - To accelerate node selection
*/
private Map<Integer, LinkedList<NodeInstance>> availableNodes;
/**
* Track the failed nodes. Currently used to make wiser decision of container
* ask with/without locality. Has other potential uses as well.
*/
private Set<String> failedNodes = new HashSet<String>();
// dummy to be used in maps for faster lookup where we don't care about values
private final Object DUMMY_VALUE = new Object();
public RoleHistory(List<ProviderRole> providerRoles) throws
BadConfigException {
this.providerRoles = providerRoles;
roleSize = providerRoles.size();
for (ProviderRole providerRole : providerRoles) {
providerRoleMap.put(providerRole.name, providerRole);
}
reset();
}
/**
* Reset the variables -this does not adjust the fixed attributes
* of the history
*/
protected synchronized void reset() throws BadConfigException {
nodemap = new NodeMap(roleSize);
resetAvailableNodeLists();
outstandingRequests = new OutstandingRequestTracker();
Map<Integer, RoleStatus> roleStats = new HashMap<Integer, RoleStatus>();
for (ProviderRole providerRole : providerRoles) {
addProviderRole(roleStats, providerRole);
}
}
private void addProviderRole(Map<Integer, RoleStatus> roleStats,
ProviderRole providerRole)
throws ArrayIndexOutOfBoundsException, BadConfigException {
int index = providerRole.id;
if (index < 0) {
throw new BadConfigException("Provider " + providerRole
+ " id is out of range");
}
if (roleStats.get(index) != null) {
throw new BadConfigException(
providerRole.toString() + " id duplicates that of " +
roleStats.get(index));
}
roleStats.put(index, new RoleStatus(providerRole));
}
/**
* Add a new provider role to the map
* @param providerRole new provider role
*/
public void addNewProviderRole(ProviderRole providerRole)
throws BadConfigException {
Map<Integer, RoleStatus> roleStats = new HashMap<Integer, RoleStatus>();
for (ProviderRole role : providerRoles) {
roleStats.put(role.id, new RoleStatus(role));
}
addProviderRole(roleStats, providerRole);
}
/**
* Clear the lists of available nodes
*/
private synchronized void resetAvailableNodeLists() {
availableNodes = new HashMap<Integer, LinkedList<NodeInstance>>(roleSize);
}
/**
* Reset the variables -this does not adjust the fixed attributes
* of the history.
* This intended for use by the RoleWriter logic.
*/
public synchronized void prepareForReading(RoleHistoryHeader header) throws
IOException,
BadConfigException {
reset();
int roleCountInSource = header.getRoles();
if (roleCountInSource != roleSize) {
throw new IOException("Number of roles in source " + roleCountInSource
+ " does not match the expected number of " +
roleSize);
}
//record when the data was loaded
setThawedDataTime(header.getSaved());
}
public synchronized long getStartTime() {
return startTime;
}
public synchronized long getSaveTime() {
return saveTime;
}
public long getThawedDataTime() {
return thawedDataTime;
}
public void setThawedDataTime(long thawedDataTime) {
this.thawedDataTime = thawedDataTime;
}
public synchronized int getRoleSize() {
return roleSize;
}
/**
* Get the total size of the cluster -the number of NodeInstances
* @return a count
*/
public synchronized int getClusterSize() {
return nodemap.size();
}
public synchronized boolean isDirty() {
return dirty;
}
public synchronized void setDirty(boolean dirty) {
this.dirty = dirty;
}
/**
* Tell the history that it has been saved; marks itself as clean
* @param timestamp timestamp -updates the savetime field
*/
public synchronized void saved(long timestamp) {
dirty = false;
saveTime = timestamp;
}
/**
* Get a clone of the nodemap.
* The instances inside are not cloned
* @return the map
*/
public synchronized NodeMap cloneNodemap() {
return (NodeMap) nodemap.clone();
}
/**
* Get the node instance for the specific node -creating it if needed
* @param hostname node address
* @return the instance
*/
public synchronized NodeInstance getOrCreateNodeInstance(String hostname) {
//convert to a string
return nodemap.getOrCreate(hostname);
}
/**
* Insert a list of nodes into the map; overwrite any with that name.
* This is a bulk operation for testing.
* Important: this does not update the available node lists, these
* must be rebuilt afterwards.
* @param nodes collection of nodes.
*/
@VisibleForTesting
public synchronized void insert(Collection<NodeInstance> nodes) {
nodemap.insert(nodes);
}
/**
* Get current time. overrideable for test subclasses
* @return current time in millis
*/
protected long now() {
return System.currentTimeMillis();
}
/**
* Garbage collect the structure -this will dropp
* all nodes that have been inactive since the (relative) age
* @param age relative age
*/
public void gc(long age) {
long absoluteTime = now() - age;
purgeUnusedEntries(absoluteTime);
}
/**
* Mark ourselves as dirty
*/
public void touch() {
setDirty(true);
try {
saveHistoryIfDirty();
} catch (IOException e) {
log.warn("Failed to save history file ", e);
}
}
/**
* purge the history of
* all nodes that have been inactive since the absolute time
* @param absoluteTime time
*/
public synchronized void purgeUnusedEntries(long absoluteTime) {
nodemap.purgeUnusedEntries(absoluteTime);
}
/**
* Get the path used for history files
* @return the directory used for history files
*/
public Path getHistoryPath() {
return historyPath;
}
/**
* Save the history to its location using the timestamp as part of
* the filename. The saveTime and dirty fields are updated
* @param time timestamp timestamp to use as the save time
* @return the path saved to
* @throws IOException IO problems
*/
@VisibleForTesting
public synchronized Path saveHistory(long time) throws IOException {
Path filename = historyWriter.createHistoryFilename(historyPath, time);
historyWriter.write(filesystem, filename, true, this, time);
saved(time);
return filename;
}
/**
* Save the history with the current timestamp if it is dirty;
* return the path saved to if this is the case
* @return the path or null if the history was not saved
* @throws IOException failed to save for some reason
*/
public synchronized Path saveHistoryIfDirty() throws IOException {
if (isDirty()) {
long time = now();
return saveHistory(time);
} else {
return null;
}
}
/**
* Start up
* @param fs filesystem
* @param historyDir path in FS for history
* @return true if the history was thawed
*/
public boolean onStart(FileSystem fs, Path historyDir) throws
BadConfigException {
assert filesystem == null;
filesystem = fs;
historyPath = historyDir;
startTime = now();
//assume the history is being thawed; this will downgrade as appropriate
return onThaw();
}
/**
* Handler for bootstrap event
*/
public void onBootstrap() {
log.debug("Role history bootstrapped");
}
/**
* Handle the start process <i>after the history has been rebuilt</i>,
* and after any gc/purge
*/
@VisibleForTesting
public synchronized boolean onThaw() throws BadConfigException {
assert filesystem != null;
assert historyPath != null;
boolean thawSuccessful = false;
//load in files from data dir
Path loaded = null;
try {
loaded = historyWriter.loadFromHistoryDir(filesystem, historyPath, this);
} catch (IOException e) {
log.warn("Exception trying to load history from {}", historyPath, e);
}
if (loaded != null) {
thawSuccessful = true;
log.debug("loaded history from {}", loaded);
// delete any old entries
try {
int count = historyWriter.purgeOlderHistoryEntries(filesystem, loaded);
log.debug("Deleted {} old history entries", count);
} catch (IOException e) {
log.info("Ignoring exception raised while trying to delete old entries",
e);
}
//start is then completed
buildAvailableNodeLists();
} else {
//fallback to bootstrap procedure
onBootstrap();
}
return thawSuccessful;
}
/**
* (After the start), rebuild the availability data structures
*/
@VisibleForTesting
public synchronized void buildAvailableNodeLists() {
resetAvailableNodeLists();
// build the list of available nodes
for (Map.Entry<String, NodeInstance> entry : nodemap
.entrySet()) {
NodeInstance ni = entry.getValue();
for (int i = 0; i < roleSize; i++) {
NodeEntry nodeEntry = ni.get(i);
if (nodeEntry != null && nodeEntry.isAvailable()) {
log.debug("Adding {} for role {}", ni, i);
getOrCreateNodesForRoleId(i).add(ni);
}
}
}
// sort the resulting arrays
for (int i = 0; i < roleSize; i++) {
sortAvailableNodeList(i);
}
}
/**
* Get the nodes for an ID -may be null
* @param id role ID
* @return potenially null list
*/
private List<NodeInstance> getNodesForRoleId(int id) {
return availableNodes.get(id);
}
/**
* Get the nodes for an ID -may be null
* @param id role ID
* @return list
*/
private LinkedList<NodeInstance> getOrCreateNodesForRoleId(int id) {
LinkedList<NodeInstance> instances = availableNodes.get(id);
if (instances == null) {
instances = new LinkedList<NodeInstance>();
availableNodes.put(id, instances);
}
return instances;
}
/**
* Sort an available node list
* @param role role to sort
*/
private void sortAvailableNodeList(int role) {
List<NodeInstance> nodesForRoleId = getNodesForRoleId(role);
if (nodesForRoleId != null) {
Collections.sort(nodesForRoleId, new NodeInstance.newerThan(role));
}
}
/**
* Find a node for use
* @param role role
* @return the instance, or null for none
*/
@VisibleForTesting
public synchronized NodeInstance findNodeForNewInstance(RoleStatus role) {
if (role.getNoDataLocality()) {
return null;
}
int roleKey = role.getKey();
NodeInstance nodeInstance = null;
List<NodeInstance> targets = getNodesForRoleId(roleKey);
int cnt = targets == null ? 0 : targets.size();
log.debug("There are {} node(s) to consider for {}", cnt, role.getName());
while (targets != null && !targets.isEmpty() && nodeInstance == null) {
NodeInstance head = targets.remove(0);
if (head.getActiveRoleInstances(roleKey) == 0) {
nodeInstance = head;
}
}
if (nodeInstance == null) {
log.debug("No historical node found for {}", role.getName());
}
return nodeInstance;
}
/**
* Request an instance on a given node.
* An outstanding request is created & tracked, with the
* relevant node entry for that role updated.
*
* The role status entries will also be tracked
*
* Returns the request that is now being tracked.
* If the node instance is not null, it's details about the role is incremented
*
*
* @param node node to target or null for "any"
* @param role role to request
* @param labelExpression label to satisfy
* @return the container priority
*/
public synchronized AMRMClient.ContainerRequest requestInstanceOnNode(
NodeInstance node, RoleStatus role, Resource resource, String labelExpression) {
OutstandingRequest outstanding = outstandingRequests.addRequest(node, role.getKey());
return outstanding.buildContainerRequest(resource, role, now(), labelExpression);
}
/**
* Find a node for a role and request an instance on that (or a location-less
* instance) with a label expression
* @param role role status
* @param resource resource capabilities
* @param labelExpression label to satisfy
* @return a request ready to go
*/
public synchronized AMRMClient.ContainerRequest requestNode(RoleStatus role,
Resource resource,
String labelExpression) {
NodeInstance node = findNodeForNewInstance(role);
return requestInstanceOnNode(node, role, resource, labelExpression);
}
/**
* Find a node for a role and request an instance on that (or a location-less
* instance)
* @param role role status
* @param resource resource capabilities
* @return a request ready to go
*/
public synchronized AMRMClient.ContainerRequest requestNode(RoleStatus role,
Resource resource) {
NodeInstance node = findNodeForNewInstance(role);
return requestInstanceOnNode(node, role, resource, null);
}
/**
* Get the list of active nodes ... walks the node map so
* is O(nodes)
* @param role role index
* @return a possibly empty list of nodes with an instance of that node
*/
public synchronized List<NodeInstance> listActiveNodes(int role) {
return nodemap.listActiveNodes(role);
}
/**
* Get the node entry of a container
* @param container container to look up
* @return the entry
*/
public NodeEntry getOrCreateNodeEntry(Container container) {
NodeInstance node = getOrCreateNodeInstance(container);
return node.getOrCreate(ContainerPriority.extractRole(container));
}
/**
* Get the node instance of a container -always returns something
* @param container container to look up
* @return a (possibly new) node instance
*/
public synchronized NodeInstance getOrCreateNodeInstance(Container container) {
String hostname = RoleHistoryUtils.hostnameOf(container);
return nodemap.getOrCreate(hostname);
}
/**
* Get the node instance of a host if defined
* @param hostname hostname to look up
* @return a node instance or null
*/
public synchronized NodeInstance getExistingNodeInstance(String hostname) {
return nodemap.get(hostname);
}
/**
* Perform any pre-allocation operations on the list of allocated containers
* based on knowledge of system state.
* Currently this places requested hosts ahead of unrequested ones.
* @param allocatedContainers list of allocated containers
* @return list of containers potentially reordered
*/
public synchronized List<Container> prepareAllocationList(List<Container> allocatedContainers) {
//partition into requested and unrequested
List<Container> requested =
new ArrayList<Container>(allocatedContainers.size());
List<Container> unrequested =
new ArrayList<Container>(allocatedContainers.size());
outstandingRequests.partitionRequests(this, allocatedContainers, requested, unrequested);
//give the unrequested ones lower priority
requested.addAll(unrequested);
return requested;
}
/**
* A container has been allocated on a node -update the data structures
* @param container container
* @param desiredCount desired #of instances
* @param actualCount current count of instances
* @return true if an entry was found and dropped
*/
public synchronized boolean onContainerAllocated(Container container, int desiredCount, int actualCount) {
int role = ContainerPriority.extractRole(container);
String hostname = RoleHistoryUtils.hostnameOf(container);
boolean requestFound =
outstandingRequests.onContainerAllocated(role, hostname);
if (desiredCount <= actualCount) {
//cancel the nodes
List<NodeInstance>
hosts = outstandingRequests.cancelOutstandingRequests(role);
if (!hosts.isEmpty()) {
//add the list
log.debug("Adding {} hosts for role {}", hosts.size(), role);
getOrCreateNodesForRoleId(role).addAll(hosts);
sortAvailableNodeList(role);
}
}
return requestFound;
}
/**
* A container has been assigned to a role instance on a node -update the data structures
* @param container container
*/
public void onContainerAssigned(Container container) {
NodeEntry nodeEntry = getOrCreateNodeEntry(container);
nodeEntry.onStarting();
}
/**
* Event: a container start has been submitter
* @param container container being started
* @param instance instance bound to the container
*/
public void onContainerStartSubmitted(Container container,
RoleInstance instance) {
NodeEntry nodeEntry = getOrCreateNodeEntry(container);
int role = ContainerPriority.extractRole(container);
// any actions we want here
}
/**
* Container start event
* @param container
*/
public void onContainerStarted(Container container) {
NodeEntry nodeEntry = getOrCreateNodeEntry(container);
nodeEntry.onStartCompleted();
touch();
}
/**
* A container failed to start: update the node entry state
* and return the container to the queue
* @param container container that failed
* @return true if the node was queued
*/
public boolean onNodeManagerContainerStartFailed(Container container) {
return markContainerFinished(container, false, true);
}
/**
* Update failedNodes and nodemap based on the node state
*
* @param updatedNodes list of updated nodes
*/
public synchronized void onNodesUpdated(List<NodeReport> updatedNodes) {
for (NodeReport updatedNode : updatedNodes) {
String hostname = updatedNode.getNodeId() == null ? null : updatedNode
.getNodeId().getHost();
if (hostname == null) {
continue;
}
if (updatedNode.getNodeState() != null
&& updatedNode.getNodeState().isUnusable()) {
failedNodes.add(hostname);
nodemap.remove(hostname);
} else {
failedNodes.remove(hostname);
}
}
}
/**
* A container release request was issued
* @param container container submitted
*/
public void onContainerReleaseSubmitted(Container container) {
NodeEntry nodeEntry = getOrCreateNodeEntry(container);
nodeEntry.release();
}
/**
* App state notified of a container completed
* @param container completed container
* @param wasReleased
* @return true if the node was queued
*/
public boolean onReleaseCompleted(Container container, boolean wasReleased) {
return markContainerFinished(container, wasReleased, false);
}
/**
* App state notified of a container completed -but as
* it wasn't being released it is marked as failed
*
* @param container completed container
* @param shortLived was the container short lived?
* @return true if the node is considered available for work
*/
public boolean onFailedContainer(Container container, boolean shortLived) {
return markContainerFinished(container, false, shortLived);
}
/**
* Mark a container finished; if it was released then that is treated
* differently. history is touch()ed
*
*
* @param container completed container
* @param wasReleased was the container released?
* @param shortLived was the container short lived?
* @return true if the node was queued
*/
protected synchronized boolean markContainerFinished(Container container,
boolean wasReleased,
boolean shortLived) {
NodeEntry nodeEntry = getOrCreateNodeEntry(container);
boolean available;
if (shortLived) {
nodeEntry.onStartFailed();
available = false;
} else {
available = nodeEntry.containerCompleted(wasReleased);
boolean isFailedNode = failedNodes.contains(RoleHistoryUtils
.hostnameOf(container));
if (!isFailedNode) {
maybeQueueNodeForWork(container, nodeEntry, available);
}
}
touch();
return available;
}
/**
* If the node is marked as available; queue it for assignments.
* Unsynced: expects caller to be in a sync block.
* @param container completed container
* @param nodeEntry node
* @param available available flag
* @return true if the node was queued
*/
private boolean maybeQueueNodeForWork(Container container,
NodeEntry nodeEntry,
boolean available) {
if (available) {
//node is free
nodeEntry.setLastUsed(now());
NodeInstance ni = getOrCreateNodeInstance(container);
int roleId = ContainerPriority.extractRole(container);
log.debug("Node {} is now available for role id {}", ni, roleId);
getOrCreateNodesForRoleId(roleId).addFirst(ni);
}
return available;
}
/**
* Print the history to the log. This is for testing and diagnostics
*/
public synchronized void dump() {
for (ProviderRole role : providerRoles) {
log.info(role.toString());
List<NodeInstance> instances =
getOrCreateNodesForRoleId(role.id);
log.info(" available: " + instances.size()
+ " " + SliderUtils.joinWithInnerSeparator(", ", instances));
}
log.info("Nodes in Cluster: {}", getClusterSize());
for (NodeInstance node : nodemap.values()) {
log.info(node.toFullString());
}
}
/**
* Get a clone of the available list
* @param role role index
* @return a clone of the list
*/
@VisibleForTesting
public List<NodeInstance> cloneAvailableList(int role) {
return new LinkedList<NodeInstance>(getOrCreateNodesForRoleId(role));
}
/**
* Get a snapshot of the outstanding request list
* @return a list of the requests outstanding at the time of requesting
*/
@VisibleForTesting
public synchronized List<OutstandingRequest> getOutstandingRequestList() {
return outstandingRequests.listOutstandingRequests();
}
/**
* Get a clone of the failedNodes
*
* @return the list
*/
public List<String> cloneFailedNodes() {
List<String> lst = new ArrayList<String>();
lst.addAll(failedNodes);
return lst;
}
}