blob: 66e945fc2ff628804fa4cae484168cd3e06f471b [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.nodelabels;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import java.util.regex.Pattern;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.nodelabels.event.NodeLabelsStoreEvent;
import org.apache.hadoop.yarn.nodelabels.event.NodeLabelsStoreEventType;
import org.apache.hadoop.yarn.nodelabels.event.RemoveClusterNodeLabels;
import org.apache.hadoop.yarn.nodelabels.event.StoreNewClusterNodeLabels;
import org.apache.hadoop.yarn.nodelabels.event.UpdateNodeToLabelsMappingsEvent;
import org.apache.hadoop.yarn.util.resource.Resources;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
@Private
public class CommonNodeLabelsManager extends AbstractService {
protected static final Log LOG = LogFactory.getLog(CommonNodeLabelsManager.class);
private static final int MAX_LABEL_LENGTH = 255;
public static final Set<String> EMPTY_STRING_SET = Collections
.unmodifiableSet(new HashSet<String>(0));
public static final Set<NodeLabel> EMPTY_NODELABEL_SET = Collections
.unmodifiableSet(new HashSet<NodeLabel>(0));
public static final String ANY = "*";
public static final Set<String> ACCESS_ANY_LABEL_SET = ImmutableSet.of(ANY);
private static final Pattern LABEL_PATTERN = Pattern
.compile("^[0-9a-zA-Z][0-9a-zA-Z-_]*");
public static final int WILDCARD_PORT = 0;
// Flag to identify startup for removelabel
private boolean initNodeLabelStoreInProgress = false;
/**
* Error messages
*/
@VisibleForTesting
public static final String NODE_LABELS_NOT_ENABLED_ERR =
"Node-label-based scheduling is disabled. Please check "
+ YarnConfiguration.NODE_LABELS_ENABLED;
/**
* If a user doesn't specify label of a queue or node, it belongs
* DEFAULT_LABEL
*/
public static final String NO_LABEL = "";
protected Dispatcher dispatcher;
protected ConcurrentMap<String, RMNodeLabel> labelCollections =
new ConcurrentHashMap<String, RMNodeLabel>();
protected ConcurrentMap<String, Host> nodeCollections =
new ConcurrentHashMap<String, Host>();
protected RMNodeLabel noNodeLabel;
protected final ReadLock readLock;
protected final WriteLock writeLock;
protected NodeLabelsStore store;
private boolean nodeLabelsEnabled = false;
private boolean isCentralizedNodeLabelConfiguration = true;
/**
* A <code>Host</code> can have multiple <code>Node</code>s
*/
protected static class Host {
public Set<String> labels;
public Map<NodeId, Node> nms;
protected Host() {
labels =
Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
nms = new ConcurrentHashMap<NodeId, Node>();
}
public Host copy() {
Host c = new Host();
c.labels = new HashSet<String>(labels);
for (Entry<NodeId, Node> entry : nms.entrySet()) {
c.nms.put(entry.getKey(), entry.getValue().copy());
}
return c;
}
}
protected static class Node {
public Set<String> labels;
public Resource resource;
public boolean running;
public NodeId nodeId;
protected Node(NodeId nodeid) {
labels = null;
resource = Resource.newInstance(0, 0);
running = false;
nodeId = nodeid;
}
public Node copy() {
Node c = new Node(nodeId);
if (labels != null) {
c.labels =
Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
c.labels.addAll(labels);
} else {
c.labels = null;
}
c.resource = Resources.clone(resource);
c.running = running;
return c;
}
}
private enum NodeLabelUpdateOperation {
ADD,
REMOVE,
REPLACE
}
private final class ForwardingEventHandler implements
EventHandler<NodeLabelsStoreEvent> {
@Override
public void handle(NodeLabelsStoreEvent event) {
handleStoreEvent(event);
}
}
// Dispatcher related code
protected void handleStoreEvent(NodeLabelsStoreEvent event) {
try {
switch (event.getType()) {
case ADD_LABELS:
StoreNewClusterNodeLabels storeNewClusterNodeLabelsEvent =
(StoreNewClusterNodeLabels) event;
store.storeNewClusterNodeLabels(storeNewClusterNodeLabelsEvent
.getLabels());
break;
case REMOVE_LABELS:
RemoveClusterNodeLabels removeClusterNodeLabelsEvent =
(RemoveClusterNodeLabels) event;
store.removeClusterNodeLabels(removeClusterNodeLabelsEvent.getLabels());
break;
case STORE_NODE_TO_LABELS:
UpdateNodeToLabelsMappingsEvent updateNodeToLabelsMappingsEvent =
(UpdateNodeToLabelsMappingsEvent) event;
store.updateNodeToLabelsMappings(updateNodeToLabelsMappingsEvent
.getNodeToLabels());
break;
}
} catch (IOException e) {
LOG.error("Failed to store label modification to storage");
throw new YarnRuntimeException(e);
}
}
public CommonNodeLabelsManager() {
super(CommonNodeLabelsManager.class.getName());
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
readLock = lock.readLock();
writeLock = lock.writeLock();
}
// for UT purpose
protected void initDispatcher(Configuration conf) {
// create async handler
dispatcher = new AsyncDispatcher("NodeLabelManager dispatcher");
AsyncDispatcher asyncDispatcher = (AsyncDispatcher) dispatcher;
asyncDispatcher.init(conf);
asyncDispatcher.setDrainEventsOnStop();
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
// set if node labels enabled
nodeLabelsEnabled = YarnConfiguration.areNodeLabelsEnabled(conf);
isCentralizedNodeLabelConfiguration =
YarnConfiguration.isCentralizedNodeLabelConfiguration(conf);
noNodeLabel = new RMNodeLabel(NO_LABEL);
labelCollections.put(NO_LABEL, noNodeLabel);
}
/**
* @return the isStartup
*/
protected boolean isInitNodeLabelStoreInProgress() {
return initNodeLabelStoreInProgress;
}
boolean isCentralizedConfiguration() {
return isCentralizedNodeLabelConfiguration;
}
protected void initNodeLabelStore(Configuration conf) throws Exception {
this.store =
ReflectionUtils
.newInstance(
conf.getClass(YarnConfiguration.FS_NODE_LABELS_STORE_IMPL_CLASS,
FileSystemNodeLabelsStore.class, NodeLabelsStore.class),
conf);
this.store.setNodeLabelsManager(this);
this.store.init(conf);
this.store.recover();
}
// for UT purpose
protected void startDispatcher() {
// start dispatcher
AsyncDispatcher asyncDispatcher = (AsyncDispatcher) dispatcher;
asyncDispatcher.start();
}
@Override
protected void serviceStart() throws Exception {
if (nodeLabelsEnabled) {
setInitNodeLabelStoreInProgress(true);
initNodeLabelStore(getConfig());
setInitNodeLabelStoreInProgress(false);
}
// init dispatcher only when service start, because recover will happen in
// service init, we don't want to trigger any event handling at that time.
initDispatcher(getConfig());
if (null != dispatcher) {
dispatcher.register(NodeLabelsStoreEventType.class,
new ForwardingEventHandler());
}
startDispatcher();
}
// for UT purpose
protected void stopDispatcher() {
AsyncDispatcher asyncDispatcher = (AsyncDispatcher) dispatcher;
if (null != asyncDispatcher) {
asyncDispatcher.stop();
}
}
@Override
protected void serviceStop() throws Exception {
// finalize store
stopDispatcher();
// only close store when we enabled store persistent
if (null != store) {
store.close();
}
}
@SuppressWarnings("unchecked")
public void addToCluserNodeLabels(Collection<NodeLabel> labels)
throws IOException {
if (!nodeLabelsEnabled) {
LOG.error(NODE_LABELS_NOT_ENABLED_ERR);
throw new IOException(NODE_LABELS_NOT_ENABLED_ERR);
}
if (null == labels || labels.isEmpty()) {
return;
}
List<NodeLabel> newLabels = new ArrayList<NodeLabel>();
normalizeNodeLabels(labels);
// check any mismatch in exclusivity no mismatch with skip
checkExclusivityMatch(labels);
// do a check before actual adding them, will throw exception if any of them
// doesn't meet label name requirement
for (NodeLabel label : labels) {
checkAndThrowLabelName(label.getName());
}
for (NodeLabel label : labels) {
// shouldn't overwrite it to avoid changing the Label.resource
if (this.labelCollections.get(label.getName()) == null) {
this.labelCollections.put(label.getName(), new RMNodeLabel(label));
newLabels.add(label);
}
}
if (null != dispatcher && !newLabels.isEmpty()) {
dispatcher.getEventHandler().handle(
new StoreNewClusterNodeLabels(newLabels));
}
LOG.info("Add labels: [" + StringUtils.join(labels.iterator(), ",") + "]");
}
/**
* Add multiple node labels to repository
*
* @param labels
* new node labels added
*/
@VisibleForTesting
public void addToCluserNodeLabelsWithDefaultExclusivity(Set<String> labels)
throws IOException {
Set<NodeLabel> nodeLabels = new HashSet<NodeLabel>();
for (String label : labels) {
nodeLabels.add(NodeLabel.newInstance(label));
}
addToCluserNodeLabels(nodeLabels);
}
protected void checkAddLabelsToNode(
Map<NodeId, Set<String>> addedLabelsToNode) throws IOException {
if (null == addedLabelsToNode || addedLabelsToNode.isEmpty()) {
return;
}
// check all labels being added existed
Set<String> knownLabels = labelCollections.keySet();
for (Entry<NodeId, Set<String>> entry : addedLabelsToNode.entrySet()) {
NodeId nodeId = entry.getKey();
Set<String> labels = entry.getValue();
if (!knownLabels.containsAll(labels)) {
String msg =
"Not all labels being added contained by known "
+ "label collections, please check" + ", added labels=["
+ StringUtils.join(labels, ",") + "]";
LOG.error(msg);
throw new IOException(msg);
}
// In YARN-2694, we temporarily disable user add more than 1 labels on a
// same host
if (!labels.isEmpty()) {
Set<String> newLabels = new HashSet<String>(getLabelsByNode(nodeId));
newLabels.addAll(labels);
// we don't allow number of labels on a node > 1 after added labels
if (newLabels.size() > 1) {
String msg =
String.format(
"%d labels specified on host=%s after add labels to node"
+ ", please note that we do not support specifying multiple"
+ " labels on a single host for now.",
newLabels.size(), nodeId.getHost());
LOG.error(msg);
throw new IOException(msg);
}
}
}
}
/**
* add more labels to nodes
*
* @param addedLabelsToNode node {@literal ->} labels map
*/
public void addLabelsToNode(Map<NodeId, Set<String>> addedLabelsToNode)
throws IOException {
if (!nodeLabelsEnabled) {
LOG.error(NODE_LABELS_NOT_ENABLED_ERR);
throw new IOException(NODE_LABELS_NOT_ENABLED_ERR);
}
addedLabelsToNode = normalizeNodeIdToLabels(addedLabelsToNode);
checkAddLabelsToNode(addedLabelsToNode);
internalUpdateLabelsOnNodes(addedLabelsToNode, NodeLabelUpdateOperation.ADD);
}
protected void checkRemoveFromClusterNodeLabels(
Collection<String> labelsToRemove) throws IOException {
if (null == labelsToRemove || labelsToRemove.isEmpty()) {
return;
}
// Check if label to remove doesn't existed or null/empty, will throw
// exception if any of labels to remove doesn't meet requirement
for (String label : labelsToRemove) {
label = normalizeLabel(label);
if (label == null || label.isEmpty()) {
throw new IOException("Label to be removed is null or empty");
}
if (!labelCollections.containsKey(label)) {
throw new IOException("Node label=" + label
+ " to be removed doesn't existed in cluster "
+ "node labels collection.");
}
}
}
@SuppressWarnings("unchecked")
protected void internalRemoveFromClusterNodeLabels(Collection<String> labelsToRemove) {
// remove labels from nodes
for (Map.Entry<String,Host> nodeEntry : nodeCollections.entrySet()) {
Host host = nodeEntry.getValue();
if (null != host) {
host.labels.removeAll(labelsToRemove);
for (Node nm : host.nms.values()) {
if (nm.labels != null) {
nm.labels.removeAll(labelsToRemove);
}
}
}
}
// remove labels from node labels collection
for (String label : labelsToRemove) {
labelCollections.remove(label);
}
// create event to remove labels
if (null != dispatcher) {
dispatcher.getEventHandler().handle(
new RemoveClusterNodeLabels(labelsToRemove));
}
LOG.info("Remove labels: ["
+ StringUtils.join(labelsToRemove.iterator(), ",") + "]");
}
/**
* Remove multiple node labels from repository
*
* @param labelsToRemove
* node labels to remove
* @throws IOException
*/
public void removeFromClusterNodeLabels(Collection<String> labelsToRemove)
throws IOException {
if (!nodeLabelsEnabled) {
LOG.error(NODE_LABELS_NOT_ENABLED_ERR);
throw new IOException(NODE_LABELS_NOT_ENABLED_ERR);
}
labelsToRemove = normalizeLabels(labelsToRemove);
checkRemoveFromClusterNodeLabels(labelsToRemove);
internalRemoveFromClusterNodeLabels(labelsToRemove);
}
protected void checkRemoveLabelsFromNode(
Map<NodeId, Set<String>> removeLabelsFromNode) throws IOException {
// check all labels being added existed
Set<String> knownLabels = labelCollections.keySet();
for (Entry<NodeId, Set<String>> entry : removeLabelsFromNode.entrySet()) {
NodeId nodeId = entry.getKey();
Set<String> labels = entry.getValue();
if (!knownLabels.containsAll(labels)) {
String msg =
"Not all labels being removed contained by known "
+ "label collections, please check" + ", removed labels=["
+ StringUtils.join(labels, ",") + "]";
LOG.error(msg);
throw new IOException(msg);
}
Set<String> originalLabels = null;
boolean nodeExisted = false;
if (WILDCARD_PORT != nodeId.getPort()) {
Node nm = getNMInNodeSet(nodeId);
if (nm != null) {
originalLabels = nm.labels;
nodeExisted = true;
}
} else {
Host host = nodeCollections.get(nodeId.getHost());
if (null != host) {
originalLabels = host.labels;
nodeExisted = true;
}
}
if (!nodeExisted) {
String msg =
"Try to remove labels from NM=" + nodeId
+ ", but the NM doesn't existed";
LOG.error(msg);
throw new IOException(msg);
}
// the labels will never be null
if (labels.isEmpty()) {
continue;
}
// originalLabels may be null,
// because when a Node is created, Node.labels can be null.
if (originalLabels == null || !originalLabels.containsAll(labels)) {
String msg =
"Try to remove labels = [" + StringUtils.join(labels, ",")
+ "], but not all labels contained by NM=" + nodeId;
LOG.error(msg);
throw new IOException(msg);
}
}
}
private void addNodeToLabels(NodeId node, Set<String> labels) {
for(String l : labels) {
labelCollections.get(l).addNodeId(node);
}
}
protected void removeNodeFromLabels(NodeId node, Set<String> labels) {
for(String l : labels) {
labelCollections.get(l).removeNodeId(node);
}
}
private void replaceNodeForLabels(NodeId node, Set<String> oldLabels,
Set<String> newLabels) {
if(oldLabels != null) {
removeNodeFromLabels(node, oldLabels);
}
addNodeToLabels(node, newLabels);
}
@SuppressWarnings("unchecked")
protected void internalUpdateLabelsOnNodes(
Map<NodeId, Set<String>> nodeToLabels, NodeLabelUpdateOperation op)
throws IOException {
// do update labels from nodes
Map<NodeId, Set<String>> newNMToLabels =
new HashMap<NodeId, Set<String>>();
Set<String> oldLabels;
for (Entry<NodeId, Set<String>> entry : nodeToLabels.entrySet()) {
NodeId nodeId = entry.getKey();
Set<String> labels = entry.getValue();
createHostIfNonExisted(nodeId.getHost());
if (nodeId.getPort() == WILDCARD_PORT) {
Host host = nodeCollections.get(nodeId.getHost());
switch (op) {
case REMOVE:
removeNodeFromLabels(nodeId, labels);
host.labels.removeAll(labels);
for (Node node : host.nms.values()) {
if (node.labels != null) {
node.labels.removeAll(labels);
}
removeNodeFromLabels(node.nodeId, labels);
}
break;
case ADD:
addNodeToLabels(nodeId, labels);
host.labels.addAll(labels);
for (Node node : host.nms.values()) {
if (node.labels != null) {
node.labels.addAll(labels);
}
addNodeToLabels(node.nodeId, labels);
}
break;
case REPLACE:
replaceNodeForLabels(nodeId, host.labels, labels);
host.labels.clear();
host.labels.addAll(labels);
for (Node node : host.nms.values()) {
replaceNodeForLabels(node.nodeId, node.labels, labels);
node.labels = null;
}
break;
default:
break;
}
newNMToLabels.put(nodeId, host.labels);
} else {
if (EnumSet.of(NodeLabelUpdateOperation.ADD,
NodeLabelUpdateOperation.REPLACE).contains(op)) {
// Add and replace
createNodeIfNonExisted(nodeId);
Node nm = getNMInNodeSet(nodeId);
switch (op) {
case ADD:
addNodeToLabels(nodeId, labels);
if (nm.labels == null) {
nm.labels = new HashSet<String>();
}
nm.labels.addAll(labels);
break;
case REPLACE:
oldLabels = getLabelsByNode(nodeId);
replaceNodeForLabels(nodeId, oldLabels, labels);
if (nm.labels == null) {
nm.labels = new HashSet<String>();
}
nm.labels.clear();
nm.labels.addAll(labels);
break;
default:
break;
}
newNMToLabels.put(nodeId, nm.labels);
} else {
// remove
removeNodeFromLabels(nodeId, labels);
Node nm = getNMInNodeSet(nodeId);
if (nm.labels != null) {
nm.labels.removeAll(labels);
newNMToLabels.put(nodeId, nm.labels);
}
}
}
}
if (null != dispatcher && isCentralizedNodeLabelConfiguration) {
// In case of DistributedNodeLabelConfiguration or
// DelegatedCentralizedNodeLabelConfiguration, no need to save the the
// NodeLabels Mapping to the back-end store, as on RM restart/failover
// NodeLabels are collected from NM through Register/Heartbeat again
// in case of DistributedNodeLabelConfiguration and collected from
// RMNodeLabelsMappingProvider in case of
// DelegatedCentralizedNodeLabelConfiguration
dispatcher.getEventHandler().handle(
new UpdateNodeToLabelsMappingsEvent(newNMToLabels));
}
// shows node->labels we added
LOG.info(op.name() + " labels on nodes:");
for (Entry<NodeId, Set<String>> entry : newNMToLabels.entrySet()) {
LOG.info(" NM=" + entry.getKey() + ", labels=["
+ StringUtils.join(entry.getValue().iterator(), ",") + "]");
}
}
/**
* remove labels from nodes, labels being removed most be contained by these
* nodes
*
* @param removeLabelsFromNode node {@literal ->} labels map
*/
public void
removeLabelsFromNode(Map<NodeId, Set<String>> removeLabelsFromNode)
throws IOException {
if (!nodeLabelsEnabled) {
LOG.error(NODE_LABELS_NOT_ENABLED_ERR);
throw new IOException(NODE_LABELS_NOT_ENABLED_ERR);
}
removeLabelsFromNode = normalizeNodeIdToLabels(removeLabelsFromNode);
checkRemoveLabelsFromNode(removeLabelsFromNode);
internalUpdateLabelsOnNodes(removeLabelsFromNode,
NodeLabelUpdateOperation.REMOVE);
}
protected void checkReplaceLabelsOnNode(
Map<NodeId, Set<String>> replaceLabelsToNode) throws IOException {
if (null == replaceLabelsToNode || replaceLabelsToNode.isEmpty()) {
return;
}
// check all labels being added existed
Set<String> knownLabels = labelCollections.keySet();
for (Entry<NodeId, Set<String>> entry : replaceLabelsToNode.entrySet()) {
NodeId nodeId = entry.getKey();
Set<String> labels = entry.getValue();
// As in YARN-2694, we disable user add more than 1 labels on a same host
if (labels.size() > 1) {
String msg = String.format("%d labels specified on host=%s"
+ ", please note that we do not support specifying multiple"
+ " labels on a single host for now.", labels.size(),
nodeId.getHost());
LOG.error(msg);
throw new IOException(msg);
}
if (!knownLabels.containsAll(labels)) {
String msg =
"Not all labels being replaced contained by known "
+ "label collections, please check" + ", new labels=["
+ StringUtils.join(labels, ",") + "]";
LOG.error(msg);
throw new IOException(msg);
}
}
}
/**
* replace labels to nodes
*
* @param replaceLabelsToNode node {@literal ->} labels map
*/
public void replaceLabelsOnNode(Map<NodeId, Set<String>> replaceLabelsToNode)
throws IOException {
if (!nodeLabelsEnabled) {
LOG.error(NODE_LABELS_NOT_ENABLED_ERR);
throw new IOException(NODE_LABELS_NOT_ENABLED_ERR);
}
replaceLabelsToNode = normalizeNodeIdToLabels(replaceLabelsToNode);
checkReplaceLabelsOnNode(replaceLabelsToNode);
internalUpdateLabelsOnNodes(replaceLabelsToNode,
NodeLabelUpdateOperation.REPLACE);
}
/**
* Get mapping of nodes to labels
*
* @return nodes to labels map
*/
public Map<NodeId, Set<String>> getNodeLabels() {
Map<NodeId, Set<String>> nodeToLabels =
generateNodeLabelsInfoPerNode(String.class);
return nodeToLabels;
}
/**
* Get mapping of nodes to label info
*
* @return nodes to labels map
*/
public Map<NodeId, Set<NodeLabel>> getNodeLabelsInfo() {
Map<NodeId, Set<NodeLabel>> nodeToLabels =
generateNodeLabelsInfoPerNode(NodeLabel.class);
return nodeToLabels;
}
@SuppressWarnings("unchecked")
private <T> Map<NodeId, Set<T>> generateNodeLabelsInfoPerNode(Class<T> type) {
try {
readLock.lock();
Map<NodeId, Set<T>> nodeToLabels = new HashMap<>();
for (Entry<String, Host> entry : nodeCollections.entrySet()) {
String hostName = entry.getKey();
Host host = entry.getValue();
for (NodeId nodeId : host.nms.keySet()) {
if (type.isAssignableFrom(String.class)) {
Set<String> nodeLabels = getLabelsByNode(nodeId);
if (nodeLabels == null || nodeLabels.isEmpty()) {
continue;
}
nodeToLabels.put(nodeId, (Set<T>) nodeLabels);
} else {
Set<NodeLabel> nodeLabels = getLabelsInfoByNode(nodeId);
if (nodeLabels == null || nodeLabels.isEmpty()) {
continue;
}
nodeToLabels.put(nodeId, (Set<T>) nodeLabels);
}
}
if (!host.labels.isEmpty()) {
if (type.isAssignableFrom(String.class)) {
nodeToLabels.put(NodeId.newInstance(hostName, WILDCARD_PORT),
(Set<T>) host.labels);
} else {
nodeToLabels.put(NodeId.newInstance(hostName, WILDCARD_PORT),
(Set<T>) createNodeLabelFromLabelNames(host.labels));
}
}
}
return Collections.unmodifiableMap(nodeToLabels);
} finally {
readLock.unlock();
}
}
/**
* Get nodes that have no labels.
*
* @return set of nodes with no labels
*/
public Set<NodeId> getNodesWithoutALabel() {
try {
readLock.lock();
Set<NodeId> nodes = new HashSet<>();
for (Host host : nodeCollections.values()) {
for (NodeId nodeId : host.nms.keySet()) {
if (getLabelsByNode(nodeId).isEmpty()) {
nodes.add(nodeId);
}
}
}
return Collections.unmodifiableSet(nodes);
} finally {
readLock.unlock();
}
}
/**
* Get mapping of labels to nodes for all the labels.
*
* @return labels to nodes map
*/
public Map<String, Set<NodeId>> getLabelsToNodes() {
try {
readLock.lock();
return getLabelsToNodes(labelCollections.keySet());
} finally {
readLock.unlock();
}
}
/**
* Get mapping of labels to nodes for specified set of labels.
*
* @param labels set of labels for which labels to nodes mapping will be
* returned.
* @return labels to nodes map
*/
public Map<String, Set<NodeId>> getLabelsToNodes(Set<String> labels) {
try {
readLock.lock();
Map<String, Set<NodeId>> labelsToNodes = getLabelsToNodesMapping(labels,
String.class);
return Collections.unmodifiableMap(labelsToNodes);
} finally {
readLock.unlock();
}
}
/**
* Get mapping of labels to nodes for all the labels.
*
* @return labels to nodes map
*/
public Map<NodeLabel, Set<NodeId>> getLabelsInfoToNodes() {
try {
readLock.lock();
return getLabelsInfoToNodes(labelCollections.keySet());
} finally {
readLock.unlock();
}
}
/**
* Get mapping of labels info to nodes for specified set of labels.
*
* @param labels
* set of nodelabels for which labels to nodes mapping will be
* returned.
* @return labels to nodes map
*/
public Map<NodeLabel, Set<NodeId>> getLabelsInfoToNodes(Set<String> labels) {
try {
readLock.lock();
Map<NodeLabel, Set<NodeId>> labelsToNodes = getLabelsToNodesMapping(
labels, NodeLabel.class);
return Collections.unmodifiableMap(labelsToNodes);
} finally {
readLock.unlock();
}
}
private <T> Map<T, Set<NodeId>> getLabelsToNodesMapping(Set<String> labels,
Class<T> type) {
Map<T, Set<NodeId>> labelsToNodes = new HashMap<T, Set<NodeId>>();
for (String label : labels) {
if (label.equals(NO_LABEL)) {
continue;
}
RMNodeLabel nodeLabelInfo = labelCollections.get(label);
if (nodeLabelInfo != null) {
Set<NodeId> nodeIds = nodeLabelInfo.getAssociatedNodeIds();
if (!nodeIds.isEmpty()) {
if (type.isAssignableFrom(String.class)) {
labelsToNodes.put(type.cast(label), nodeIds);
} else {
labelsToNodes.put(type.cast(nodeLabelInfo.getNodeLabel()), nodeIds);
}
}
} else {
LOG.warn("getLabelsToNodes : Label [" + label + "] cannot be found");
}
}
return labelsToNodes;
}
/**
* Get existing valid labels in repository
*
* @return existing valid labels in repository
*/
public Set<String> getClusterNodeLabelNames() {
try {
readLock.lock();
Set<String> labels = new HashSet<String>(labelCollections.keySet());
labels.remove(NO_LABEL);
return Collections.unmodifiableSet(labels);
} finally {
readLock.unlock();
}
}
public List<NodeLabel> getClusterNodeLabels() {
try {
readLock.lock();
List<NodeLabel> nodeLabels = new ArrayList<>();
for (RMNodeLabel label : labelCollections.values()) {
if (!label.getLabelName().equals(NO_LABEL)) {
nodeLabels.add(NodeLabel.newInstance(label.getLabelName(),
label.getIsExclusive()));
}
}
return nodeLabels;
} finally {
readLock.unlock();
}
}
public boolean isExclusiveNodeLabel(String nodeLabel) throws IOException {
if (nodeLabel.equals(NO_LABEL)) {
return noNodeLabel.getIsExclusive();
}
try {
readLock.lock();
RMNodeLabel label = labelCollections.get(nodeLabel);
if (label == null) {
String message =
"Getting is-exclusive-node-label, node-label = " + nodeLabel
+ ", is not existed.";
LOG.error(message);
throw new IOException(message);
}
return label.getIsExclusive();
} finally {
readLock.unlock();
}
}
public static void checkAndThrowLabelName(String label) throws IOException {
if (label == null || label.isEmpty() || label.length() > MAX_LABEL_LENGTH) {
throw new IOException("label added is empty or exceeds "
+ MAX_LABEL_LENGTH + " character(s)");
}
label = label.trim();
boolean match = LABEL_PATTERN.matcher(label).matches();
if (!match) {
throw new IOException("label name should only contains "
+ "{0-9, a-z, A-Z, -, _} and should not started with {-,_}"
+ ", now it is=" + label);
}
}
private void checkExclusivityMatch(Collection<NodeLabel> labels)
throws IOException {
ArrayList<NodeLabel> mismatchlabels = new ArrayList<NodeLabel>();
for (NodeLabel label : labels) {
RMNodeLabel rmNodeLabel = this.labelCollections.get(label.getName());
if (rmNodeLabel != null
&& rmNodeLabel.getIsExclusive() != label.isExclusive()) {
mismatchlabels.add(label);
}
}
if (mismatchlabels.size() > 0) {
throw new IOException(
"Exclusivity cannot be modified for an existing label with : "
+ StringUtils.join(mismatchlabels.iterator(), ","));
}
}
protected String normalizeLabel(String label) {
if (label != null) {
return label.trim();
}
return NO_LABEL;
}
private Set<String> normalizeLabels(Collection<String> labels) {
Set<String> newLabels = new HashSet<String>();
for (String label : labels) {
newLabels.add(normalizeLabel(label));
}
return newLabels;
}
private void normalizeNodeLabels(Collection<NodeLabel> labels) {
for (NodeLabel label : labels) {
label.setName(normalizeLabel(label.getName()));
}
}
protected Node getNMInNodeSet(NodeId nodeId) {
return getNMInNodeSet(nodeId, nodeCollections);
}
protected Node getNMInNodeSet(NodeId nodeId, Map<String, Host> map) {
return getNMInNodeSet(nodeId, map, false);
}
protected Node getNMInNodeSet(NodeId nodeId, Map<String, Host> map,
boolean checkRunning) {
Host host = map.get(nodeId.getHost());
if (null == host) {
return null;
}
Node nm = host.nms.get(nodeId);
if (null == nm) {
return null;
}
if (checkRunning) {
return nm.running ? nm : null;
}
return nm;
}
protected Set<String> getLabelsByNode(NodeId nodeId) {
return getLabelsByNode(nodeId, nodeCollections);
}
protected Set<String> getLabelsByNode(NodeId nodeId, Map<String, Host> map) {
Host host = map.get(nodeId.getHost());
if (null == host) {
return EMPTY_STRING_SET;
}
Node nm = host.nms.get(nodeId);
if (null != nm && null != nm.labels) {
return nm.labels;
} else {
return host.labels;
}
}
public Set<NodeLabel> getLabelsInfoByNode(NodeId nodeId) {
try {
readLock.lock();
Set<String> labels = getLabelsByNode(nodeId, nodeCollections);
if (labels.isEmpty()) {
return EMPTY_NODELABEL_SET;
}
Set<NodeLabel> nodeLabels = createNodeLabelFromLabelNames(labels);
return nodeLabels;
} finally {
readLock.unlock();
}
}
private Set<NodeLabel> createNodeLabelFromLabelNames(Set<String> labels) {
Set<NodeLabel> nodeLabels = new HashSet<NodeLabel>();
for (String label : labels) {
if (label.equals(NO_LABEL)) {
continue;
}
RMNodeLabel rmLabel = labelCollections.get(label);
if (rmLabel == null) {
continue;
}
nodeLabels.add(rmLabel.getNodeLabel());
}
return nodeLabels;
}
protected void createNodeIfNonExisted(NodeId nodeId) throws IOException {
Host host = nodeCollections.get(nodeId.getHost());
if (null == host) {
throw new IOException("Should create host before creating node.");
}
Node nm = host.nms.get(nodeId);
if (null == nm) {
host.nms.put(nodeId, new Node(nodeId));
}
}
protected void createHostIfNonExisted(String hostName) {
Host host = nodeCollections.get(hostName);
if (null == host) {
host = new Host();
nodeCollections.put(hostName, host);
}
}
protected Map<NodeId, Set<String>> normalizeNodeIdToLabels(
Map<NodeId, Set<String>> nodeIdToLabels) {
Map<NodeId, Set<String>> newMap = new TreeMap<NodeId, Set<String>>();
for (Entry<NodeId, Set<String>> entry : nodeIdToLabels.entrySet()) {
NodeId id = entry.getKey();
Set<String> labels = entry.getValue();
newMap.put(id, normalizeLabels(labels));
}
return newMap;
}
public void setInitNodeLabelStoreInProgress(
boolean initNodeLabelStoreInProgress) {
this.initNodeLabelStoreInProgress = initNodeLabelStoreInProgress;
}
}