blob: 69b2316e510a1c51a48bd1e194f4fba9f5d89e40 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.nodelabels;
import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.AddToClusterNodeLabelsRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RemoveFromClusterNodeLabelsRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ReplaceLabelsOnNodeRequestProto;
import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.AddToClusterNodeLabelsRequestPBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RemoveFromClusterNodeLabelsRequestPBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReplaceLabelsOnNodeRequestPBImpl;
import com.google.common.collect.Sets;
public class FileSystemNodeLabelsStore extends NodeLabelsStore {
protected static final Log LOG = LogFactory.getLog(FileSystemNodeLabelsStore.class);
protected static final String DEFAULT_DIR_NAME = "node-labels";
protected static final String MIRROR_FILENAME = "nodelabel.mirror";
protected static final String EDITLOG_FILENAME = "nodelabel.editlog";
protected enum SerializedLogType {
ADD_LABELS, NODE_TO_LABELS, REMOVE_LABELS
}
Path fsWorkingPath;
FileSystem fs;
private FSDataOutputStream editlogOs;
private Path editLogPath;
private String getDefaultFSNodeLabelsRootDir() throws IOException {
// default is in local: /tmp/hadoop-yarn-${user}/node-labels/
return "file:///tmp/hadoop-yarn-"
+ UserGroupInformation.getCurrentUser().getShortUserName() + "/"
+ DEFAULT_DIR_NAME;
}
@Override
public void init(Configuration conf) throws Exception {
fsWorkingPath =
new Path(conf.get(YarnConfiguration.FS_NODE_LABELS_STORE_ROOT_DIR,
getDefaultFSNodeLabelsRootDir()));
setFileSystem(conf);
// mkdir of root dir path
fs.mkdirs(fsWorkingPath);
}
@Override
public void close() throws IOException {
IOUtils.cleanup(LOG, fs, editlogOs);
}
void setFileSystem(Configuration conf) throws IOException {
Configuration confCopy = new Configuration(conf);
confCopy.setBoolean("dfs.client.retry.policy.enabled", true);
String retryPolicy =
confCopy.get(YarnConfiguration.FS_NODE_LABELS_STORE_RETRY_POLICY_SPEC,
YarnConfiguration.DEFAULT_FS_NODE_LABELS_STORE_RETRY_POLICY_SPEC);
confCopy.set("dfs.client.retry.policy.spec", retryPolicy);
fs = fsWorkingPath.getFileSystem(confCopy);
// if it's local file system, use RawLocalFileSystem instead of
// LocalFileSystem, the latter one doesn't support append.
if (fs.getScheme().equals("file")) {
fs = ((LocalFileSystem)fs).getRaw();
}
}
private void ensureAppendEditlogFile() throws IOException {
editlogOs = fs.append(editLogPath);
}
private void ensureCloseEditlogFile() throws IOException {
editlogOs.close();
}
@Override
public void updateNodeToLabelsMappings(
Map<NodeId, Set<String>> nodeToLabels) throws IOException {
try {
ensureAppendEditlogFile();
editlogOs.writeInt(SerializedLogType.NODE_TO_LABELS.ordinal());
((ReplaceLabelsOnNodeRequestPBImpl) ReplaceLabelsOnNodeRequest
.newInstance(nodeToLabels)).getProto().writeDelimitedTo(editlogOs);
} finally {
ensureCloseEditlogFile();
}
}
@Override
public void storeNewClusterNodeLabels(List<NodeLabel> labels)
throws IOException {
try {
ensureAppendEditlogFile();
editlogOs.writeInt(SerializedLogType.ADD_LABELS.ordinal());
((AddToClusterNodeLabelsRequestPBImpl) AddToClusterNodeLabelsRequest
.newInstance(labels)).getProto().writeDelimitedTo(editlogOs);
} finally {
ensureCloseEditlogFile();
}
}
@Override
public void removeClusterNodeLabels(Collection<String> labels)
throws IOException {
try {
ensureAppendEditlogFile();
editlogOs.writeInt(SerializedLogType.REMOVE_LABELS.ordinal());
((RemoveFromClusterNodeLabelsRequestPBImpl) RemoveFromClusterNodeLabelsRequest.newInstance(Sets
.newHashSet(labels.iterator()))).getProto().writeDelimitedTo(editlogOs);
} finally {
ensureCloseEditlogFile();
}
}
protected void loadFromMirror(Path newMirrorPath, Path oldMirrorPath)
throws IOException {
// If mirror.new exists, read from mirror.new,
FSDataInputStream is = null;
try {
is = fs.open(newMirrorPath);
} catch (FileNotFoundException e) {
try {
is = fs.open(oldMirrorPath);
} catch (FileNotFoundException ignored) {
}
}
if (null != is) {
List<NodeLabel> labels = new AddToClusterNodeLabelsRequestPBImpl(
AddToClusterNodeLabelsRequestProto.parseDelimitedFrom(is))
.getNodeLabels();
mgr.addToCluserNodeLabels(labels);
if (mgr.isCentralizedConfiguration()) {
// Only load node to labels mapping while using centralized configuration
Map<NodeId, Set<String>> nodeToLabels =
new ReplaceLabelsOnNodeRequestPBImpl(
ReplaceLabelsOnNodeRequestProto.parseDelimitedFrom(is))
.getNodeToLabels();
mgr.replaceLabelsOnNode(nodeToLabels);
}
is.close();
}
}
/* (non-Javadoc)
* @see org.apache.hadoop.yarn.nodelabels.NodeLabelsStore#recover(boolean)
*/
@Override
public void recover() throws YarnException,
IOException {
/*
* Steps of recover
* 1) Read from last mirror (from mirror or mirror.old)
* 2) Read from last edit log, and apply such edit log
* 3) Write new mirror to mirror.writing
* 4) Rename mirror to mirror.old
* 5) Move mirror.writing to mirror
* 6) Remove mirror.old
* 7) Remove edit log and create a new empty edit log
*/
// Open mirror from serialized file
Path mirrorPath = new Path(fsWorkingPath, MIRROR_FILENAME);
Path oldMirrorPath = new Path(fsWorkingPath, MIRROR_FILENAME + ".old");
loadFromMirror(mirrorPath, oldMirrorPath);
// Open and process editlog
editLogPath = new Path(fsWorkingPath, EDITLOG_FILENAME);
FSDataInputStream is;
try {
is = fs.open(editLogPath);
} catch (FileNotFoundException e) {
is = null;
}
if (null != is) {
while (true) {
try {
// read edit log one by one
SerializedLogType type = SerializedLogType.values()[is.readInt()];
switch (type) {
case ADD_LABELS: {
List<NodeLabel> labels =
new AddToClusterNodeLabelsRequestPBImpl(
AddToClusterNodeLabelsRequestProto.parseDelimitedFrom(is))
.getNodeLabels();
mgr.addToCluserNodeLabels(labels);
break;
}
case REMOVE_LABELS: {
Collection<String> labels =
RemoveFromClusterNodeLabelsRequestProto.parseDelimitedFrom(is)
.getNodeLabelsList();
mgr.removeFromClusterNodeLabels(labels);
break;
}
case NODE_TO_LABELS: {
Map<NodeId, Set<String>> map =
new ReplaceLabelsOnNodeRequestPBImpl(
ReplaceLabelsOnNodeRequestProto.parseDelimitedFrom(is))
.getNodeToLabels();
if (mgr.isCentralizedConfiguration()) {
/*
* In case of Distributed NodeLabels setup,
* ignoreNodeToLabelsMappings will be set to true and recover will
* be invoked. As RM will collect the node labels from NM through
* registration/HB
*/
mgr.replaceLabelsOnNode(map);
}
break;
}
}
} catch (EOFException e) {
// EOF hit, break
break;
}
}
is.close();
}
// Serialize current mirror to mirror.writing
Path writingMirrorPath = new Path(fsWorkingPath, MIRROR_FILENAME + ".writing");
FSDataOutputStream os = fs.create(writingMirrorPath, true);
((AddToClusterNodeLabelsRequestPBImpl) AddToClusterNodeLabelsRequestPBImpl
.newInstance(mgr.getClusterNodeLabels())).getProto().writeDelimitedTo(os);
((ReplaceLabelsOnNodeRequestPBImpl) ReplaceLabelsOnNodeRequest
.newInstance(mgr.getNodeLabels())).getProto().writeDelimitedTo(os);
os.close();
// Move mirror to mirror.old
if (fs.exists(mirrorPath)) {
fs.delete(oldMirrorPath, false);
fs.rename(mirrorPath, oldMirrorPath);
}
// move mirror.writing to mirror
fs.rename(writingMirrorPath, mirrorPath);
fs.delete(writingMirrorPath, false);
// remove mirror.old
fs.delete(oldMirrorPath, false);
// create a new editlog file
editlogOs = fs.create(editLogPath, true);
editlogOs.close();
LOG.info("Finished write mirror at:" + mirrorPath.toString());
LOG.info("Finished create editlog file at:" + editLogPath.toString());
}
}