blob: d3c5c269abbaaddb0e15fb02d599c721e1680908 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.federation.resolver;
import java.io.BufferedReader;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.InvalidPathException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.HashSet;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
* Default simple sub-cluster and rack resolver class.
*
* This class expects a three-column comma separated file, specified in
* yarn.federation.machine-list. Each line of the file should be of the format:
*
* nodeName, subClusterId, rackName
*
* Lines that do not follow this format will be ignored. This resolver only
* loads the file when load() is explicitly called; it will not react to changes
* to the file.
*
* It is case-insensitive on the rack and node names and ignores
* leading/trailing whitespace.
*
*/
public class DefaultSubClusterResolverImpl extends AbstractSubClusterResolver
implements SubClusterResolver {
private static final Logger LOG =
LoggerFactory.getLogger(DefaultSubClusterResolverImpl.class);
private Configuration conf;
// Index of the node hostname in the machine info file.
private static final int NODE_NAME_INDEX = 0;
// Index of the sub-cluster ID in the machine info file.
private static final int SUBCLUSTER_ID_INDEX = 1;
// Index of the rack name ID in the machine info file.
private static final int RACK_NAME_INDEX = 2;
@Override
public void setConf(Configuration conf) {
this.conf = conf;
}
@Override
public Configuration getConf() {
return this.conf;
}
@Override
public SubClusterId getSubClusterForNode(String nodename)
throws YarnException {
return super.getSubClusterForNode(nodename.toUpperCase());
}
@Override
public void load() {
String fileName =
this.conf.get(YarnConfiguration.FEDERATION_MACHINE_LIST, "");
try {
if (fileName == null || fileName.trim().length() == 0) {
LOG.info(
"The machine list file path is not specified in the configuration");
return;
}
Path file = null;
BufferedReader reader = null;
try {
file = Paths.get(fileName);
} catch (InvalidPathException e) {
LOG.info("The configured machine list file path {} does not exist",
fileName);
return;
}
try {
reader = Files.newBufferedReader(file, Charset.defaultCharset());
String line = null;
while ((line = reader.readLine()) != null) {
String[] tokens = line.split(",");
if (tokens.length == 3) {
String nodeName = tokens[NODE_NAME_INDEX].trim().toUpperCase();
SubClusterId subClusterId =
SubClusterId.newInstance(tokens[SUBCLUSTER_ID_INDEX].trim());
String rackName = tokens[RACK_NAME_INDEX].trim().toUpperCase();
if (LOG.isDebugEnabled()) {
LOG.debug("Loading node into resolver: {} --> {}", nodeName,
subClusterId);
LOG.debug("Loading rack into resolver: {} --> {} ", rackName,
subClusterId);
}
this.getNodeToSubCluster().put(nodeName, subClusterId);
loadRackToSubCluster(rackName, subClusterId);
} else {
LOG.warn("Skipping malformed line in machine list: " + line);
}
}
} finally {
if (reader != null) {
reader.close();
}
}
LOG.info("Successfully loaded file {}", fileName);
} catch (Exception e) {
LOG.error("Failed to parse file " + fileName, e);
}
}
private void loadRackToSubCluster(String rackName,
SubClusterId subClusterId) {
String rackNameUpper = rackName.toUpperCase();
if (!this.getRackToSubClusters().containsKey(rackNameUpper)) {
this.getRackToSubClusters().put(rackNameUpper,
new HashSet<SubClusterId>());
}
this.getRackToSubClusters().get(rackNameUpper).add(subClusterId);
}
@Override
public Set<SubClusterId> getSubClustersForRack(String rackname)
throws YarnException {
return super.getSubClustersForRack(rackname.toUpperCase());
}
}