blob: 53966b8afbfccc455d134f4292a4dcd86538965b [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.viewfs;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.MultipleIOException;
import org.apache.hadoop.net.DNSToSwitchMapping;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.net.NodeBase;
import org.apache.hadoop.net.ScriptBasedMapping;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.List;
/**
* Nfly is a multi filesystem mount point.
*/
@Private
final class NflyFSystem extends FileSystem {
private static final Log LOG = LogFactory.getLog(NflyFSystem.class);
private static final String NFLY_TMP_PREFIX = "_nfly_tmp_";
enum NflyKey {
// minimum replication, if local filesystem is included +1 is recommended
minReplication,
// forces to check all the replicas and fetch the one with the most recent
// time stamp
//
readMostRecent,
// create missing replica from far to near, including local?
repairOnRead
}
private static final int DEFAULT_MIN_REPLICATION = 2;
private static URI nflyURI = URI.create("nfly:///");
private final NflyNode[] nodes;
private final int minReplication;
private final EnumSet<NflyKey> nflyFlags;
private final Node myNode;
private final NetworkTopology topology;
/**
* URI's authority is used as an approximation of the distance from the
* client. It's sufficient for DC but not accurate because worker nodes can be
* closer.
*/
private static class NflyNode extends NodeBase {
private final ChRootedFileSystem fs;
NflyNode(String hostName, String rackName, URI uri,
Configuration conf) throws IOException {
this(hostName, rackName, new ChRootedFileSystem(uri, conf));
}
NflyNode(String hostName, String rackName, ChRootedFileSystem fs) {
super(hostName, rackName);
this.fs = fs;
}
ChRootedFileSystem getFs() {
return fs;
}
@Override
public boolean equals(Object o) {
// satisfy findbugs
return super.equals(o);
}
@Override
public int hashCode() {
// satisfy findbugs
return super.hashCode();
}
}
private static final class MRNflyNode
extends NflyNode implements Comparable<MRNflyNode> {
private FileStatus status;
private MRNflyNode(NflyNode n) {
super(n.getName(), n.getNetworkLocation(), n.fs);
}
private void updateFileStatus(Path f) throws IOException {
final FileStatus tmpStatus = getFs().getFileStatus(f);
status = tmpStatus == null
? notFoundStatus(f)
: tmpStatus;
}
// TODO allow configurable error margin for FileSystems with different
// timestamp precisions
@Override
public int compareTo(MRNflyNode other) {
if (status == null) {
return other.status == null ? 0 : 1; // move non-null towards head
} else if (other.status == null) {
return -1; // move this towards head
} else {
final long mtime = status.getModificationTime();
final long their = other.status.getModificationTime();
return Long.compare(their, mtime); // move more recent towards head
}
}
@Override
public boolean equals(Object o) {
if (!(o instanceof MRNflyNode)) {
return false;
}
MRNflyNode other = (MRNflyNode) o;
return 0 == compareTo(other);
}
@Override
public int hashCode() {
// satisfy findbugs
return super.hashCode();
}
private FileStatus nflyStatus() throws IOException {
return new NflyStatus(getFs(), status);
}
private FileStatus cloneStatus() throws IOException {
return new FileStatus(status.getLen(),
status.isDirectory(),
status.getReplication(),
status.getBlockSize(),
status.getModificationTime(),
status.getAccessTime(),
null, null, null,
status.isSymlink() ? status.getSymlink() : null,
status.getPath());
}
}
private MRNflyNode[] workSet() {
final MRNflyNode[] res = new MRNflyNode[nodes.length];
for (int i = 0; i < res.length; i++) {
res[i] = new MRNflyNode(nodes[i]);
}
return res;
}
/**
* Utility to replace null with DEFAULT_RACK.
*
* @param rackString rack value, can be null
* @return non-null rack string
*/
private static String getRack(String rackString) {
return rackString == null ? NetworkTopology.DEFAULT_RACK : rackString;
}
/**
* Creates a new Nfly instance.
*
* @param uris the list of uris in the mount point
* @param conf configuration object
* @param minReplication minimum copies to commit a write op
* @param nflyFlags modes such readMostRecent
* @throws IOException
*/
private NflyFSystem(URI[] uris, Configuration conf, int minReplication,
EnumSet<NflyKey> nflyFlags) throws IOException {
if (uris.length < minReplication) {
throw new IOException(minReplication + " < " + uris.length
+ ": Minimum replication < #destinations");
}
setConf(conf);
final String localHostName = InetAddress.getLocalHost().getHostName();
// build a list for topology resolution
final List<String> hostStrings = new ArrayList<String>(uris.length + 1);
for (URI uri : uris) {
final String uriHost = uri.getHost();
// assume local file system or another closest filesystem if no authority
hostStrings.add(uriHost == null ? localHostName : uriHost);
}
// resolve the client node
hostStrings.add(localHostName);
final DNSToSwitchMapping tmpDns = ReflectionUtils.newInstance(conf.getClass(
CommonConfigurationKeys.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
ScriptBasedMapping.class, DNSToSwitchMapping.class), conf);
// this is an ArrayList
final List<String> rackStrings = tmpDns.resolve(hostStrings);
nodes = new NflyNode[uris.length];
final Iterator<String> rackIter = rackStrings.iterator();
for (int i = 0; i < nodes.length; i++) {
nodes[i] = new NflyNode(hostStrings.get(i), rackIter.next(), uris[i],
conf);
}
// sort all the uri's by distance from myNode, the local file system will
// automatically be the the first one.
//
myNode = new NodeBase(localHostName, getRack(rackIter.next()));
topology = NetworkTopology.getInstance(conf);
topology.sortByDistance(myNode, nodes, nodes.length);
this.minReplication = minReplication;
this.nflyFlags = nflyFlags;
statistics = getStatistics(nflyURI.getScheme(), getClass());
}
/**
* Transactional output stream. When creating path /dir/file
* 1) create invisible /real/dir_i/_nfly_tmp_file
* 2) when more than min replication was written, write is committed by
* renaming all successfully written files to /real/dir_i/file
*/
private final class NflyOutputStream extends OutputStream {
// actual path
private final Path nflyPath;
// tmp path before commit
private final Path tmpPath;
// broadcast set
private final FSDataOutputStream[] outputStreams;
// status set: 1 working, 0 problem
private final BitSet opSet;
private final boolean useOverwrite;
private NflyOutputStream(Path f, FsPermission permission, boolean overwrite,
int bufferSize, short replication, long blockSize,
Progressable progress) throws IOException {
nflyPath = f;
tmpPath = getNflyTmpPath(f);
outputStreams = new FSDataOutputStream[nodes.length];
for (int i = 0; i < outputStreams.length; i++) {
outputStreams[i] = nodes[i].fs.create(tmpPath, permission, true,
bufferSize, replication, blockSize, progress);
}
opSet = new BitSet(outputStreams.length);
opSet.set(0, outputStreams.length);
useOverwrite = false;
}
//
// TODO consider how to clean up and throw an exception early when the clear
// bits under min replication
//
private void mayThrow(List<IOException> ioExceptions) throws IOException {
final IOException ioe = MultipleIOException
.createIOException(ioExceptions);
if (opSet.cardinality() < minReplication) {
throw ioe;
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Exceptions occurred: " + ioe);
}
}
}
@Override
public void write(int d) throws IOException {
final List<IOException> ioExceptions = new ArrayList<IOException>();
for (int i = opSet.nextSetBit(0);
i >=0;
i = opSet.nextSetBit(i + 1)) {
try {
outputStreams[i].write(d);
} catch (Throwable t) {
osException(i, "write", t, ioExceptions);
}
}
mayThrow(ioExceptions);
}
private void osException(int i, String op, Throwable t,
List<IOException> ioExceptions) {
opSet.clear(i);
processThrowable(nodes[i], op, t, ioExceptions, tmpPath, nflyPath);
}
@Override
public void write(byte[] bytes, int offset, int len) throws IOException {
final List<IOException> ioExceptions = new ArrayList<IOException>();
for (int i = opSet.nextSetBit(0);
i >= 0;
i = opSet.nextSetBit(i + 1)) {
try {
outputStreams[i].write(bytes, offset, len);
} catch (Throwable t) {
osException(i, "write", t, ioExceptions);
}
}
mayThrow(ioExceptions);
}
@Override
public void flush() throws IOException {
final List<IOException> ioExceptions = new ArrayList<IOException>();
for (int i = opSet.nextSetBit(0);
i >= 0;
i = opSet.nextSetBit(i + 1)) {
try {
outputStreams[i].flush();
} catch (Throwable t) {
osException(i, "flush", t, ioExceptions);
}
}
mayThrow(ioExceptions);
}
@Override
public void close() throws IOException {
final List<IOException> ioExceptions = new ArrayList<IOException>();
for (int i = opSet.nextSetBit(0);
i >= 0;
i = opSet.nextSetBit(i + 1)) {
try {
outputStreams[i].close();
} catch (Throwable t) {
osException(i, "close", t, ioExceptions);
}
}
if (opSet.cardinality() < minReplication) {
cleanupAllTmpFiles();
throw new IOException("Failed to sufficiently replicate: min="
+ minReplication + " actual=" + opSet.cardinality());
} else {
commit();
}
}
private void cleanupAllTmpFiles() throws IOException {
for (int i = 0; i < outputStreams.length; i++) {
try {
nodes[i].fs.delete(tmpPath);
} catch (Throwable t) {
processThrowable(nodes[i], "delete", t, null, tmpPath);
}
}
}
private void commit() throws IOException {
final List<IOException> ioExceptions = new ArrayList<IOException>();
for (int i = opSet.nextSetBit(0);
i >= 0;
i = opSet.nextSetBit(i + 1)) {
final NflyNode nflyNode = nodes[i];
try {
if (useOverwrite) {
nflyNode.fs.delete(nflyPath);
}
nflyNode.fs.rename(tmpPath, nflyPath);
} catch (Throwable t) {
osException(i, "commit", t, ioExceptions);
}
}
if (opSet.cardinality() < minReplication) {
// cleanup should be done outside. If rename failed, it's unlikely that
// delete will work either. It's the same kind of metadata-only op
//
throw MultipleIOException.createIOException(ioExceptions);
}
// best effort to have a consistent timestamp
final long commitTime = System.currentTimeMillis();
for (int i = opSet.nextSetBit(0);
i >= 0;
i = opSet.nextSetBit(i + 1)) {
try {
nodes[i].fs.setTimes(nflyPath, commitTime, commitTime);
} catch (Throwable t) {
LOG.info("Failed to set timestamp: " + nodes[i] + " " + nflyPath);
}
}
}
}
private Path getNflyTmpPath(Path f) {
return new Path(f.getParent(), NFLY_TMP_PREFIX + f.getName());
}
/**
* // TODO
* Some file status implementations have expensive deserialization or metadata
* retrieval. This probably does not go beyond RawLocalFileSystem. Wrapping
* the the real file status to preserve this behavior. Otherwise, calling
* realStatus getters in constructor defeats this design.
*/
static final class NflyStatus extends FileStatus {
private static final long serialVersionUID = 0x21f276d8;
private final FileStatus realStatus;
private final String strippedRoot;
private NflyStatus(ChRootedFileSystem realFs, FileStatus realStatus)
throws IOException {
this.realStatus = realStatus;
this.strippedRoot = realFs.stripOutRoot(realStatus.getPath());
}
String stripRoot() throws IOException {
return strippedRoot;
}
@Override
public long getLen() {
return realStatus.getLen();
}
@Override
public boolean isFile() {
return realStatus.isFile();
}
@Override
public boolean isDirectory() {
return realStatus.isDirectory();
}
@Override
public boolean isSymlink() {
return realStatus.isSymlink();
}
@Override
public long getBlockSize() {
return realStatus.getBlockSize();
}
@Override
public short getReplication() {
return realStatus.getReplication();
}
@Override
public long getModificationTime() {
return realStatus.getModificationTime();
}
@Override
public long getAccessTime() {
return realStatus.getAccessTime();
}
@Override
public FsPermission getPermission() {
return realStatus.getPermission();
}
@Override
public String getOwner() {
return realStatus.getOwner();
}
@Override
public String getGroup() {
return realStatus.getGroup();
}
@Override
public Path getPath() {
return realStatus.getPath();
}
@Override
public void setPath(Path p) {
realStatus.setPath(p);
}
@Override
public Path getSymlink() throws IOException {
return realStatus.getSymlink();
}
@Override
public void setSymlink(Path p) {
realStatus.setSymlink(p);
}
@Override
public boolean equals(Object o) {
return realStatus.equals(o);
}
@Override
public int hashCode() {
return realStatus.hashCode();
}
@Override
public String toString() {
return realStatus.toString();
}
}
@Override
public URI getUri() {
return nflyURI;
}
/**
* Category: READ.
*
* @param f the file name to open
* @param bufferSize the size of the buffer to be used.
* @return input stream according to nfly flags (closest, most recent)
* @throws IOException
* @throws FileNotFoundException iff all destinations generate this exception
*/
@Override
public FSDataInputStream open(Path f, int bufferSize) throws IOException {
// TODO proxy stream for reads
final List<IOException> ioExceptions =
new ArrayList<IOException>(nodes.length);
int numNotFounds = 0;
final MRNflyNode[] mrNodes = workSet();
// naively iterate until one can be opened
//
for (final MRNflyNode nflyNode : mrNodes) {
try {
if (nflyFlags.contains(NflyKey.repairOnRead)
|| nflyFlags.contains(NflyKey.readMostRecent)) {
// calling file status to avoid pulling bytes prematurely
nflyNode.updateFileStatus(f);
} else {
return nflyNode.getFs().open(f, bufferSize);
}
} catch (FileNotFoundException fnfe) {
nflyNode.status = notFoundStatus(f);
numNotFounds++;
processThrowable(nflyNode, "open", fnfe, ioExceptions, f);
} catch (Throwable t) {
processThrowable(nflyNode, "open", t, ioExceptions, f);
}
}
if (nflyFlags.contains(NflyKey.readMostRecent)) {
// sort from most recent to least recent
Arrays.sort(mrNodes);
}
final FSDataInputStream fsdisAfterRepair = repairAndOpen(mrNodes, f,
bufferSize);
if (fsdisAfterRepair != null) {
return fsdisAfterRepair;
}
mayThrowFileNotFound(ioExceptions, numNotFounds);
throw MultipleIOException.createIOException(ioExceptions);
}
private static FileStatus notFoundStatus(Path f) {
return new FileStatus(-1, false, 0, 0, 0, f);
}
/**
* Iterate all available nodes in the proximity order to attempt repair of all
* FileNotFound nodes.
*
* @param mrNodes work set copy of nodes
* @param f path to repair and open
* @param bufferSize buffer size for read RPC
* @return the closest/most recent replica stream AFTER repair
*/
private FSDataInputStream repairAndOpen(MRNflyNode[] mrNodes, Path f,
int bufferSize) {
long maxMtime = 0L;
for (final MRNflyNode srcNode : mrNodes) {
if (srcNode.status == null // not available
|| srcNode.status.getLen() < 0L) { // not found
continue; // not available
}
if (srcNode.status.getModificationTime() > maxMtime) {
maxMtime = srcNode.status.getModificationTime();
}
// attempt to repair all notFound nodes with srcNode
//
for (final MRNflyNode dstNode : mrNodes) {
if (dstNode.status == null // not available
|| srcNode.compareTo(dstNode) == 0) { // same mtime
continue;
}
try {
// status is absolute from the underlying mount, making it chrooted
//
final FileStatus srcStatus = srcNode.cloneStatus();
srcStatus.setPath(f);
final Path tmpPath = getNflyTmpPath(f);
FileUtil.copy(srcNode.getFs(), srcStatus, dstNode.getFs(), tmpPath,
false, // don't delete
true, // overwrite
getConf());
dstNode.getFs().delete(f, false);
if (dstNode.getFs().rename(tmpPath, f)) {
try {
dstNode.getFs().setTimes(f, srcNode.status.getModificationTime(),
srcNode.status.getAccessTime());
} finally {
// save getFileStatus rpc
srcStatus.setPath(dstNode.getFs().makeQualified(f));
dstNode.status = srcStatus;
}
}
} catch (IOException ioe) {
// can blame the source by statusSet.clear(ai), however, it would
// cost an extra RPC, so just rely on the loop below that will attempt
// an open anyhow
//
LOG.info(f + " " + srcNode + "->" + dstNode + ": Failed to repair",
ioe);
}
}
}
// Since Java7, QuickSort is used instead of MergeSort.
// QuickSort may not be stable and thus the equal most recent nodes, may no
// longer appear in the NetworkTopology order.
//
if (maxMtime > 0) {
final List<MRNflyNode> mrList = new ArrayList<MRNflyNode>();
for (final MRNflyNode openNode : mrNodes) {
if (openNode.status != null && openNode.status.getLen() >= 0L) {
if (openNode.status.getModificationTime() == maxMtime) {
mrList.add(openNode);
}
}
}
// assert mrList.size > 0
final MRNflyNode[] readNodes = mrList.toArray(new MRNflyNode[0]);
topology.sortByDistance(myNode, readNodes, readNodes.length);
for (final MRNflyNode rNode : readNodes) {
try {
return rNode.getFs().open(f, bufferSize);
} catch (IOException e) {
LOG.info(f + ": Failed to open at " + rNode.getFs().getUri());
}
}
}
return null;
}
private void mayThrowFileNotFound(List<IOException> ioExceptions,
int numNotFounds) throws FileNotFoundException {
if (numNotFounds == nodes.length) {
throw (FileNotFoundException)ioExceptions.get(nodes.length - 1);
}
}
// WRITE
@Override
public FSDataOutputStream create(Path f, FsPermission permission,
boolean overwrite, int bufferSize, short replication, long blockSize,
Progressable progress) throws IOException {
return new FSDataOutputStream(new NflyOutputStream(f, permission, overwrite,
bufferSize, replication, blockSize, progress), statistics);
}
// WRITE
@Override
public FSDataOutputStream append(Path f, int bufferSize,
Progressable progress) throws IOException {
return null;
}
// WRITE
@Override
public boolean rename(Path src, Path dst) throws IOException {
final List<IOException> ioExceptions = new ArrayList<IOException>();
int numNotFounds = 0;
boolean succ = true;
for (final NflyNode nflyNode : nodes) {
try {
succ &= nflyNode.fs.rename(src, dst);
} catch (FileNotFoundException fnfe) {
numNotFounds++;
processThrowable(nflyNode, "rename", fnfe, ioExceptions, src, dst);
} catch (Throwable t) {
processThrowable(nflyNode, "rename", t, ioExceptions, src, dst);
succ = false;
}
}
mayThrowFileNotFound(ioExceptions, numNotFounds);
// if all destinations threw exceptions throw, otherwise return
//
if (ioExceptions.size() == nodes.length) {
throw MultipleIOException.createIOException(ioExceptions);
}
return succ;
}
// WRITE
@Override
public boolean delete(Path f, boolean recursive) throws IOException {
final List<IOException> ioExceptions = new ArrayList<IOException>();
int numNotFounds = 0;
boolean succ = true;
for (final NflyNode nflyNode : nodes) {
try {
succ &= nflyNode.fs.delete(f);
} catch (FileNotFoundException fnfe) {
numNotFounds++;
processThrowable(nflyNode, "delete", fnfe, ioExceptions, f);
} catch (Throwable t) {
processThrowable(nflyNode, "delete", t, ioExceptions, f);
succ = false;
}
}
mayThrowFileNotFound(ioExceptions, numNotFounds);
// if all destinations threw exceptions throw, otherwise return
//
if (ioExceptions.size() == nodes.length) {
throw MultipleIOException.createIOException(ioExceptions);
}
return succ;
}
/**
* Returns the closest non-failing destination's result.
*
* @param f given path
* @return array of file statuses according to nfly modes
* @throws FileNotFoundException
* @throws IOException
*/
@Override
public FileStatus[] listStatus(Path f) throws FileNotFoundException,
IOException {
final List<IOException> ioExceptions =
new ArrayList<IOException>(nodes.length);
final MRNflyNode[] mrNodes = workSet();
if (nflyFlags.contains(NflyKey.readMostRecent)) {
int numNotFounds = 0;
for (final MRNflyNode nflyNode : mrNodes) {
try {
nflyNode.updateFileStatus(f);
} catch (FileNotFoundException fnfe) {
numNotFounds++;
processThrowable(nflyNode, "listStatus", fnfe, ioExceptions, f);
} catch (Throwable t) {
processThrowable(nflyNode, "listStatus", t, ioExceptions, f);
}
}
mayThrowFileNotFound(ioExceptions, numNotFounds);
Arrays.sort(mrNodes);
}
int numNotFounds = 0;
for (final MRNflyNode nflyNode : mrNodes) {
try {
final FileStatus[] realStats = nflyNode.getFs().listStatus(f);
final FileStatus[] nflyStats = new FileStatus[realStats.length];
for (int i = 0; i < realStats.length; i++) {
nflyStats[i] = new NflyStatus(nflyNode.getFs(), realStats[i]);
}
return nflyStats;
} catch (FileNotFoundException fnfe) {
numNotFounds++;
processThrowable(nflyNode, "listStatus", fnfe, ioExceptions, f);
} catch (Throwable t) {
processThrowable(nflyNode, "listStatus", t, ioExceptions, f);
}
}
mayThrowFileNotFound(ioExceptions, numNotFounds);
throw MultipleIOException.createIOException(ioExceptions);
}
@Override
public RemoteIterator<LocatedFileStatus> listLocatedStatus(Path f)
throws FileNotFoundException, IOException {
// TODO important for splits
return super.listLocatedStatus(f);
}
@Override
public void setWorkingDirectory(Path newDir) {
for (final NflyNode nflyNode : nodes) {
nflyNode.fs.setWorkingDirectory(newDir);
}
}
@Override
public Path getWorkingDirectory() {
return nodes[0].fs.getWorkingDirectory(); // 0 is as good as any
}
@Override
public boolean mkdirs(Path f, FsPermission permission) throws IOException {
boolean succ = true;
for (final NflyNode nflyNode : nodes) {
succ &= nflyNode.fs.mkdirs(f, permission);
}
return succ;
}
@Override
public FileStatus getFileStatus(Path f) throws IOException {
// TODO proxy stream for reads
final List<IOException> ioExceptions =
new ArrayList<IOException>(nodes.length);
int numNotFounds = 0;
final MRNflyNode[] mrNodes = workSet();
long maxMtime = Long.MIN_VALUE;
int maxMtimeIdx = Integer.MIN_VALUE;
// naively iterate until one can be returned
//
for (int i = 0; i < mrNodes.length; i++) {
MRNflyNode nflyNode = mrNodes[i];
try {
nflyNode.updateFileStatus(f);
if (nflyFlags.contains(NflyKey.readMostRecent)) {
final long nflyTime = nflyNode.status.getModificationTime();
if (nflyTime > maxMtime) {
maxMtime = nflyTime;
maxMtimeIdx = i;
}
} else {
return nflyNode.nflyStatus();
}
} catch (FileNotFoundException fnfe) {
numNotFounds++;
processThrowable(nflyNode, "getFileStatus", fnfe, ioExceptions, f);
} catch (Throwable t) {
processThrowable(nflyNode, "getFileStatus", t, ioExceptions, f);
}
}
if (maxMtimeIdx >= 0) {
return mrNodes[maxMtimeIdx].nflyStatus();
}
mayThrowFileNotFound(ioExceptions, numNotFounds);
throw MultipleIOException.createIOException(ioExceptions);
}
private static void processThrowable(NflyNode nflyNode, String op,
Throwable t, List<IOException> ioExceptions,
Path... f) {
final String errMsg = Arrays.toString(f)
+ ": failed to " + op + " " + nflyNode.fs.getUri();
final IOException ioex;
if (t instanceof FileNotFoundException) {
ioex = new FileNotFoundException(errMsg);
ioex.initCause(t);
} else {
ioex = new IOException(errMsg, t);
}
if (ioExceptions != null) {
ioExceptions.add(ioex);
}
}
/**
* Initializes an nfly mountpoint in viewfs.
*
* @param uris destinations to replicate writes to
* @param conf file system configuration
* @param settings comma-separated list of k=v pairs.
* @return an Nfly filesystem
* @throws IOException
*/
static FileSystem createFileSystem(URI[] uris, Configuration conf,
String settings) throws IOException {
// assert settings != null
int minRepl = DEFAULT_MIN_REPLICATION;
EnumSet<NflyKey> nflyFlags = EnumSet.noneOf(NflyKey.class);
final String[] kvPairs = StringUtils.split(settings);
for (String kv : kvPairs) {
final String[] kvPair = StringUtils.split(kv, '=');
if (kvPair.length != 2) {
throw new IllegalArgumentException(kv);
}
NflyKey nflyKey = NflyKey.valueOf(kvPair[0]);
switch (nflyKey) {
case minReplication:
minRepl = Integer.parseInt(kvPair[1]);
break;
case repairOnRead:
case readMostRecent:
if (Boolean.valueOf(kvPair[1])) {
nflyFlags.add(nflyKey);
}
break;
default:
throw new IllegalArgumentException(nflyKey + ": Infeasible");
}
}
return new NflyFSystem(uris, conf, minRepl, nflyFlags);
}
}