blob: 3f5a4b0ea6c39f0d0f00cab0b21619cea78be3cd [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;
import java.util.*;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.Progressable;
/** An implementation of the in-memory filesystem. This implementation assumes
* that the file lengths are known ahead of time and the total lengths of all
* the files is below a certain number (like 100 MB, configurable). Use the API
* reserveSpaceWithCheckSum(Path f, int size) (see below for a description of
* the API for reserving space in the FS. The uri of this filesystem starts with
* ramfs:// .
*/
@Deprecated
public class InMemoryFileSystem extends ChecksumFileSystem {
private static class RawInMemoryFileSystem extends FileSystem {
private URI uri;
private long fsSize;
private volatile long totalUsed;
private Path staticWorkingDir;
//pathToFileAttribs is the final place where a file is put after it is closed
private Map<String, FileAttributes> pathToFileAttribs =
new HashMap<String, FileAttributes>();
//tempFileAttribs is a temp place which is updated while reserving memory for
//files we are going to create. It is read in the createRaw method and the
//temp key/value is discarded. If the file makes it to "close", then it
//ends up being in the pathToFileAttribs map.
private Map<String, FileAttributes> tempFileAttribs =
new HashMap<String, FileAttributes>();
public RawInMemoryFileSystem() {
setConf(new Configuration());
}
public RawInMemoryFileSystem(URI uri, Configuration conf) {
initialize(uri, conf);
}
//inherit javadoc
public void initialize(URI uri, Configuration conf) {
setConf(conf);
int size = Integer.parseInt(conf.get("fs.inmemory.size.mb", "100"));
this.fsSize = size * 1024L * 1024L;
this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
String path = this.uri.getPath();
if (path.length() == 0) {
path = Path.CUR_DIR;
}
this.staticWorkingDir = new Path(path);
LOG.info("Initialized InMemoryFileSystem: " + uri.toString() +
" of size (in bytes): " + fsSize);
}
//inherit javadoc
public URI getUri() {
return uri;
}
private class InMemoryInputStream extends FSInputStream {
private DataInputBuffer din = new DataInputBuffer();
private FileAttributes fAttr;
public InMemoryInputStream(Path f) throws IOException {
synchronized (RawInMemoryFileSystem.this) {
fAttr = pathToFileAttribs.get(getPath(f));
if (fAttr == null) {
throw new FileNotFoundException("File " + f + " does not exist");
}
din.reset(fAttr.data, 0, fAttr.size);
}
}
public long getPos() throws IOException {
return din.getPosition();
}
public void seek(long pos) throws IOException {
if ((int)pos > fAttr.size)
throw new IOException("Cannot seek after EOF");
din.reset(fAttr.data, (int)pos, fAttr.size - (int)pos);
}
public boolean seekToNewSource(long targetPos) throws IOException {
return false;
}
public int available() throws IOException {
return din.available();
}
public boolean markSupport() { return false; }
public int read() throws IOException {
return din.read();
}
public int read(byte[] b, int off, int len) throws IOException {
return din.read(b, off, len);
}
public long skip(long n) throws IOException { return din.skip(n); }
}
public FSDataInputStream open(Path f, int bufferSize) throws IOException {
return new FSDataInputStream(new InMemoryInputStream(f));
}
private class InMemoryOutputStream extends OutputStream {
private int count;
private FileAttributes fAttr;
private Path f;
public InMemoryOutputStream(Path f, FileAttributes fAttr)
throws IOException {
this.fAttr = fAttr;
this.f = f;
}
public long getPos() throws IOException {
return count;
}
public void close() throws IOException {
synchronized (RawInMemoryFileSystem.this) {
pathToFileAttribs.put(getPath(f), fAttr);
}
}
public void write(byte[] b, int off, int len) throws IOException {
if ((off < 0) || (off > b.length) || (len < 0) ||
((off + len) > b.length) || ((off + len) < 0)) {
throw new IndexOutOfBoundsException();
} else if (len == 0) {
return;
}
int newcount = count + len;
if (newcount > fAttr.size) {
throw new IOException("Insufficient space");
}
System.arraycopy(b, off, fAttr.data, count, len);
count = newcount;
}
public void write(int b) throws IOException {
int newcount = count + 1;
if (newcount > fAttr.size) {
throw new IOException("Insufficient space");
}
fAttr.data[count] = (byte)b;
count = newcount;
}
}
/** This optional operation is not yet supported. */
public FSDataOutputStream append(Path f, int bufferSize,
Progressable progress) throws IOException {
throw new IOException("Not supported");
}
/**
* @param permission Currently ignored.
*/
public FSDataOutputStream create(Path f, FsPermission permission,
boolean overwrite, int bufferSize,
short replication, long blockSize, Progressable progress)
throws IOException {
synchronized (this) {
if (exists(f) && !overwrite) {
throw new IOException("File already exists:"+f);
}
FileAttributes fAttr = tempFileAttribs.remove(getPath(f));
if (fAttr != null)
return create(f, fAttr);
return null;
}
}
public FSDataOutputStream create(Path f, FileAttributes fAttr)
throws IOException {
// the path is not added into the filesystem (in the pathToFileAttribs
// map) until close is called on the outputstream that this method is
// going to return
// Create an output stream out of data byte array
return new FSDataOutputStream(new InMemoryOutputStream(f, fAttr),
statistics);
}
public void close() throws IOException {
super.close();
synchronized (this) {
if (pathToFileAttribs != null) {
pathToFileAttribs.clear();
}
pathToFileAttribs = null;
if (tempFileAttribs != null) {
tempFileAttribs.clear();
}
tempFileAttribs = null;
}
}
public boolean setReplication(Path src, short replication)
throws IOException {
return true;
}
public boolean rename(Path src, Path dst) throws IOException {
synchronized (this) {
if (exists(dst)) {
throw new IOException ("Path " + dst + " already exists");
}
FileAttributes fAttr = pathToFileAttribs.remove(getPath(src));
if (fAttr == null) return false;
pathToFileAttribs.put(getPath(dst), fAttr);
return true;
}
}
@Deprecated
public boolean delete(Path f) throws IOException {
return delete(f, true);
}
public boolean delete(Path f, boolean recursive) throws IOException {
synchronized (this) {
FileAttributes fAttr = pathToFileAttribs.remove(getPath(f));
if (fAttr != null) {
fAttr.data = null;
totalUsed -= fAttr.size;
return true;
}
return false;
}
}
/**
* Directory operations are not supported
*/
public FileStatus[] listStatus(Path f) throws IOException {
return null;
}
public void setWorkingDirectory(Path new_dir) {
staticWorkingDir = new_dir;
}
public Path getWorkingDirectory() {
return staticWorkingDir;
}
/**
* @param permission Currently ignored.
*/
public boolean mkdirs(Path f, FsPermission permission) throws IOException {
return true;
}
public FileStatus getFileStatus(Path f) throws IOException {
synchronized (this) {
FileAttributes attr = pathToFileAttribs.get(getPath(f));
if (attr==null) {
throw new FileNotFoundException("File " + f + " does not exist.");
}
return new InMemoryFileStatus(f.makeQualified(this), attr);
}
}
/** Some APIs exclusively for InMemoryFileSystem */
/** Register a path with its size. */
public boolean reserveSpace(Path f, long size) {
synchronized (this) {
if (!canFitInMemory(size))
return false;
FileAttributes fileAttr;
try {
fileAttr = new FileAttributes((int)size);
} catch (OutOfMemoryError o) {
return false;
}
totalUsed += size;
tempFileAttribs.put(getPath(f), fileAttr);
return true;
}
}
public void unreserveSpace(Path f) {
synchronized (this) {
FileAttributes fAttr = tempFileAttribs.remove(getPath(f));
if (fAttr != null) {
fAttr.data = null;
totalUsed -= fAttr.size;
}
}
}
/** This API getClosedFiles could have been implemented over listPathsRaw
* but it is an overhead to maintain directory structures for this impl of
* the in-memory fs.
*/
public Path[] getFiles(PathFilter filter) {
synchronized (this) {
List<String> closedFilesList = new ArrayList<String>();
synchronized (pathToFileAttribs) {
Set paths = pathToFileAttribs.keySet();
if (paths == null || paths.isEmpty()) {
return new Path[0];
}
Iterator iter = paths.iterator();
while (iter.hasNext()) {
String f = (String)iter.next();
if (filter.accept(new Path(f))) {
closedFilesList.add(f);
}
}
}
String [] names =
closedFilesList.toArray(new String[closedFilesList.size()]);
Path [] results = new Path[names.length];
for (int i = 0; i < names.length; i++) {
results[i] = new Path(names[i]);
}
return results;
}
}
public int getNumFiles(PathFilter filter) {
return getFiles(filter).length;
}
public long getFSSize() {
return fsSize;
}
public float getPercentUsed() {
if (fsSize > 0)
return (float)totalUsed/fsSize;
else return 0.1f;
}
/**
* @TODO: Fix for Java6?
* As of Java5 it is safe to assume that if the file can fit
* in-memory then its file-size is less than Integer.MAX_VALUE.
*/
private boolean canFitInMemory(long size) {
if ((size <= Integer.MAX_VALUE) && ((size + totalUsed) < fsSize)) {
return true;
}
return false;
}
private String getPath(Path f) {
return f.toUri().getPath();
}
private static class FileAttributes {
private byte[] data;
private int size;
public FileAttributes(int size) {
this.size = size;
this.data = new byte[size];
}
}
private class InMemoryFileStatus extends FileStatus {
InMemoryFileStatus(Path f, FileAttributes attr) throws IOException {
super(attr.size, false, 1, getDefaultBlockSize(), 0, f);
}
}
}
public InMemoryFileSystem() {
super(new RawInMemoryFileSystem());
}
public InMemoryFileSystem(URI uri, Configuration conf) {
super(new RawInMemoryFileSystem(uri, conf));
}
/**
* Register a file with its size. This will also register a checksum for the
* file that the user is trying to create. This is required since none of
* the FileSystem APIs accept the size of the file as argument. But since it
* is required for us to apriori know the size of the file we are going to
* create, the user must call this method for each file he wants to create
* and reserve memory for that file. We either succeed in reserving memory
* for both the main file and the checksum file and return true, or return
* false.
*/
public boolean reserveSpaceWithCheckSum(Path f, long size) {
RawInMemoryFileSystem mfs = (RawInMemoryFileSystem)getRawFileSystem();
synchronized(mfs) {
boolean b = mfs.reserveSpace(f, size);
if (b) {
long checksumSize = getChecksumFileLength(f, size);
b = mfs.reserveSpace(getChecksumFile(f), checksumSize);
if (!b) {
mfs.unreserveSpace(f);
}
}
return b;
}
}
public Path[] getFiles(PathFilter filter) {
return ((RawInMemoryFileSystem)getRawFileSystem()).getFiles(filter);
}
public int getNumFiles(PathFilter filter) {
return ((RawInMemoryFileSystem)getRawFileSystem()).getNumFiles(filter);
}
public long getFSSize() {
return ((RawInMemoryFileSystem)getRawFileSystem()).getFSSize();
}
public float getPercentUsed() {
return ((RawInMemoryFileSystem)getRawFileSystem()).getPercentUsed();
}
}