| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.yarn.server.resourcemanager.nodelabels; |
| |
| import java.io.IOException; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.Timer; |
| import java.util.TimerTask; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.service.CompositeService; |
| import org.apache.hadoop.yarn.api.records.NodeId; |
| import org.apache.hadoop.yarn.api.records.NodeLabel; |
| import org.apache.hadoop.yarn.conf.YarnConfiguration; |
| import org.apache.hadoop.yarn.server.resourcemanager.RMContext; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| |
| /** |
| * Update nodes labels map for ResourceManager periodically. It collects |
| * nodes labels from {@link RMNodeLabelsMappingProvider} and updates the |
| * nodes {@literal ->} labels map via {@link RMNodeLabelsManager}. |
| * This service is enabled when configuration |
| * "yarn.node-labels.configuration-type" is set to "delegated-centralized". |
| */ |
| public class RMDelegatedNodeLabelsUpdater extends CompositeService { |
| |
| private static final Log LOG = LogFactory |
| .getLog(RMDelegatedNodeLabelsUpdater.class); |
| |
| public static final long DISABLE_DELEGATED_NODE_LABELS_UPDATE = -1; |
| |
| // Timer used to schedule node labels fetching |
| private Timer nodeLabelsScheduler; |
| // 30 seconds |
| @VisibleForTesting |
| public long nodeLabelsUpdateInterval = 30 * 1000; |
| |
| private Set<NodeId> newlyRegisteredNodes = new HashSet<NodeId>(); |
| // Lock to protect newlyRegisteredNodes |
| private Object lock = new Object(); |
| private long lastAllNodesLabelUpdateMills = 0L; |
| private long allNodesLabelUpdateInterval; |
| |
| private RMNodeLabelsMappingProvider rmNodeLabelsMappingProvider; |
| |
| private RMContext rmContext; |
| |
| public RMDelegatedNodeLabelsUpdater(RMContext rmContext) { |
| super("RMDelegatedNodeLabelsUpdater"); |
| this.rmContext = rmContext; |
| } |
| |
| @Override |
| protected void serviceInit(Configuration conf) throws Exception { |
| allNodesLabelUpdateInterval = conf.getLong( |
| YarnConfiguration.RM_NODE_LABELS_PROVIDER_FETCH_INTERVAL_MS, |
| YarnConfiguration.DEFAULT_RM_NODE_LABELS_PROVIDER_FETCH_INTERVAL_MS); |
| rmNodeLabelsMappingProvider = createRMNodeLabelsMappingProvider(conf); |
| addService(rmNodeLabelsMappingProvider); |
| super.serviceInit(conf); |
| } |
| |
| @Override |
| protected void serviceStart() throws Exception { |
| nodeLabelsScheduler = new Timer( |
| "RMDelegatedNodeLabelsUpdater-Timer", true); |
| TimerTask delegatedNodeLabelsUpdaterTimerTask = |
| new RMDelegatedNodeLabelsUpdaterTimerTask(); |
| nodeLabelsScheduler.scheduleAtFixedRate( |
| delegatedNodeLabelsUpdaterTimerTask, |
| nodeLabelsUpdateInterval, |
| nodeLabelsUpdateInterval); |
| |
| super.serviceStart(); |
| } |
| |
| /** |
| * Terminate the timer. |
| * @throws Exception |
| */ |
| @Override |
| protected void serviceStop() throws Exception { |
| if (nodeLabelsScheduler != null) { |
| nodeLabelsScheduler.cancel(); |
| } |
| super.serviceStop(); |
| } |
| |
| private class RMDelegatedNodeLabelsUpdaterTimerTask extends TimerTask { |
| @Override |
| public void run() { |
| Set<NodeId> nodesToUpdateLabels = null; |
| boolean isUpdatingAllNodes = false; |
| |
| if (allNodesLabelUpdateInterval != DISABLE_DELEGATED_NODE_LABELS_UPDATE) { |
| long elapsedTimeSinceLastUpdate = |
| System.currentTimeMillis() - lastAllNodesLabelUpdateMills; |
| if (elapsedTimeSinceLastUpdate > allNodesLabelUpdateInterval) { |
| nodesToUpdateLabels = |
| Collections.unmodifiableSet(rmContext.getRMNodes().keySet()); |
| isUpdatingAllNodes = true; |
| } |
| } |
| |
| if (nodesToUpdateLabels == null && !newlyRegisteredNodes.isEmpty()) { |
| synchronized (lock) { |
| if (!newlyRegisteredNodes.isEmpty()) { |
| nodesToUpdateLabels = new HashSet<NodeId>(newlyRegisteredNodes); |
| } |
| } |
| } |
| |
| try { |
| if (nodesToUpdateLabels != null && !nodesToUpdateLabels.isEmpty()) { |
| updateNodeLabelsInternal(nodesToUpdateLabels); |
| if (isUpdatingAllNodes) { |
| lastAllNodesLabelUpdateMills = System.currentTimeMillis(); |
| } |
| synchronized (lock) { |
| newlyRegisteredNodes.removeAll(nodesToUpdateLabels); |
| } |
| } |
| } catch (IOException e) { |
| LOG.error("Failed to update node Labels", e); |
| } |
| } |
| } |
| |
| private void updateNodeLabelsInternal(Set<NodeId> nodes) |
| throws IOException { |
| Map<NodeId, Set<NodeLabel>> labelsUpdated = |
| rmNodeLabelsMappingProvider.getNodeLabels(nodes); |
| if (labelsUpdated != null && labelsUpdated.size() != 0) { |
| Map<NodeId, Set<String>> nodeToLabels = |
| new HashMap<NodeId, Set<String>>(labelsUpdated.size()); |
| for (Map.Entry<NodeId, Set<NodeLabel>> entry |
| : labelsUpdated.entrySet()) { |
| nodeToLabels.put(entry.getKey(), |
| NodeLabelsUtils.convertToStringSet(entry.getValue())); |
| } |
| rmContext.getNodeLabelManager().replaceLabelsOnNode(nodeToLabels); |
| } |
| } |
| |
| /** |
| * Get the RMNodeLabelsMappingProvider which is used to provide node labels. |
| */ |
| private RMNodeLabelsMappingProvider createRMNodeLabelsMappingProvider( |
| Configuration conf) throws IOException { |
| RMNodeLabelsMappingProvider nodeLabelsMappingProvider = null; |
| try { |
| Class<? extends RMNodeLabelsMappingProvider> labelsProviderClass = |
| conf.getClass(YarnConfiguration.RM_NODE_LABELS_PROVIDER_CONFIG, |
| null, RMNodeLabelsMappingProvider.class); |
| if (labelsProviderClass != null) { |
| nodeLabelsMappingProvider = labelsProviderClass.newInstance(); |
| } |
| } catch (InstantiationException | IllegalAccessException |
| | RuntimeException e) { |
| LOG.error("Failed to create RMNodeLabelsMappingProvider based on" |
| + " Configuration", e); |
| throw new IOException("Failed to create RMNodeLabelsMappingProvider : " |
| + e.getMessage(), e); |
| } |
| |
| if (nodeLabelsMappingProvider == null) { |
| String msg = "RMNodeLabelsMappingProvider should be configured when " |
| + "delegated-centralized node label configuration is enabled"; |
| LOG.error(msg); |
| throw new IOException(msg); |
| } else if (LOG.isDebugEnabled()) { |
| LOG.debug("RM Node labels mapping provider class is : " |
| + nodeLabelsMappingProvider.getClass().toString()); |
| } |
| |
| return nodeLabelsMappingProvider; |
| } |
| |
| /** |
| * Update node labels for a specified node. |
| * @param node the node to update node labels |
| */ |
| public void updateNodeLabels(NodeId node) { |
| synchronized (lock) { |
| newlyRegisteredNodes.add(node); |
| } |
| } |
| } |