| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * <p/> |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * <p/> |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.mapred.gridmix; |
| |
| import java.io.FileNotFoundException; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.util.Random; |
| import java.net.URI; |
| |
| import org.apache.hadoop.fs.FSDataInputStream; |
| import org.apache.hadoop.fs.FSDataOutputStream; |
| import org.apache.hadoop.fs.FileStatus; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.fs.PositionedReadable; |
| import org.apache.hadoop.fs.Seekable; |
| import org.apache.hadoop.fs.permission.FsPermission; |
| import org.apache.hadoop.io.BytesWritable; |
| import org.apache.hadoop.util.Progressable; |
| |
| /** |
| * Pseudo local file system that generates random data for any file on the fly |
| * instead of storing files on disk. So opening same file multiple times will |
| * not give same file content. There are no directories in this file system |
| * other than the root and all the files are under root i.e. "/". All file URIs |
| * on pseudo local file system should be of the format <code> |
| * pseudo:///<name>.<fileSize></code> where name is a unique name |
| * and <fileSize> is a number representing the size of the file in bytes. |
| */ |
| class PseudoLocalFs extends FileSystem { |
| Path home; |
| /** |
| * The creation time and modification time of all files in |
| * {@link PseudoLocalFs} is same. |
| */ |
| private static final long TIME = System.currentTimeMillis(); |
| private static final String HOME_DIR = "/"; |
| private static final long BLOCK_SIZE = 4 * 1024 * 1024L; // 4 MB |
| private static final int DEFAULT_BUFFER_SIZE = 1024 * 1024; // 1MB |
| |
| static final URI NAME = URI.create("pseudo:///"); |
| |
| PseudoLocalFs() { |
| this(new Path(HOME_DIR)); |
| } |
| |
| PseudoLocalFs(Path home) { |
| super(); |
| this.home = home; |
| } |
| |
| @Override |
| public URI getUri() { |
| return NAME; |
| } |
| |
| @Override |
| public Path getHomeDirectory() { |
| return home; |
| } |
| |
| @Override |
| public Path getWorkingDirectory() { |
| return getHomeDirectory(); |
| } |
| |
| /** |
| * Generates a valid pseudo local file path from the given <code>fileId</code> |
| * and <code>fileSize</code>. |
| * @param fileId unique file id string |
| * @param fileSize file size |
| * @return the generated relative path |
| */ |
| static Path generateFilePath(String fileId, long fileSize) { |
| return new Path(fileId + "." + fileSize); |
| } |
| |
| /** |
| * Creating a pseudo local file is nothing but validating the file path. |
| * Actual data of the file is generated on the fly when client tries to open |
| * the file for reading. |
| * @param path file path to be created |
| */ |
| @Override |
| public FSDataOutputStream create(Path path) throws IOException { |
| try { |
| validateFileNameFormat(path); |
| } catch (FileNotFoundException e) { |
| throw new IOException("File creation failed for " + path); |
| } |
| return null; |
| } |
| |
| /** |
| * Validate if the path provided is of expected format of Pseudo Local File |
| * System based files. |
| * @param path file path |
| * @return the file size |
| * @throws FileNotFoundException |
| */ |
| long validateFileNameFormat(Path path) throws FileNotFoundException { |
| path = path.makeQualified(this); |
| boolean valid = true; |
| long fileSize = 0; |
| if (!path.toUri().getScheme().equals(getUri().getScheme())) { |
| valid = false; |
| } else { |
| String[] parts = path.toUri().getPath().split("\\."); |
| try { |
| fileSize = Long.valueOf(parts[parts.length - 1]); |
| valid = (fileSize >= 0); |
| } catch (NumberFormatException e) { |
| valid = false; |
| } |
| } |
| if (!valid) { |
| throw new FileNotFoundException("File " + path |
| + " does not exist in pseudo local file system"); |
| } |
| return fileSize; |
| } |
| |
| /** |
| * @See create(Path) for details |
| */ |
| @Override |
| public FSDataInputStream open(Path path, int bufferSize) throws IOException { |
| long fileSize = validateFileNameFormat(path); |
| InputStream in = new RandomInputStream(fileSize, bufferSize); |
| return new FSDataInputStream(in); |
| } |
| |
| /** |
| * @See create(Path) for details |
| */ |
| @Override |
| public FSDataInputStream open(Path path) throws IOException { |
| return open(path, DEFAULT_BUFFER_SIZE); |
| } |
| |
| @Override |
| public FileStatus getFileStatus(Path path) throws IOException { |
| long fileSize = validateFileNameFormat(path); |
| return new FileStatus(fileSize, false, 1, BLOCK_SIZE, TIME, path); |
| } |
| |
| @Override |
| public boolean exists(Path path) { |
| try{ |
| validateFileNameFormat(path); |
| } catch (FileNotFoundException e) { |
| return false; |
| } |
| return true; |
| } |
| |
| @Override |
| public FSDataOutputStream create(Path path, FsPermission permission, |
| boolean overwrite, int bufferSize, short replication, long blockSize, |
| Progressable progress) throws IOException { |
| return create(path); |
| } |
| |
| @Override |
| public FileStatus[] listStatus(Path path) throws FileNotFoundException, |
| IOException { |
| return new FileStatus[] {getFileStatus(path)}; |
| } |
| |
| /** |
| * Input Stream that generates specified number of random bytes. |
| */ |
| static class RandomInputStream extends InputStream |
| implements Seekable, PositionedReadable { |
| |
| private final Random r = new Random(); |
| private BytesWritable val = null; |
| private int positionInVal = 0;// current position in the buffer 'val' |
| |
| private long totalSize = 0;// total number of random bytes to be generated |
| private long curPos = 0;// current position in this stream |
| |
| /** |
| * @param size total number of random bytes to be generated in this stream |
| * @param bufferSize the buffer size. An internal buffer array of length |
| * <code>bufferSize</code> is created. If <code>bufferSize</code> is not a |
| * positive number, then a default value of 1MB is used. |
| */ |
| RandomInputStream(long size, int bufferSize) { |
| totalSize = size; |
| if (bufferSize <= 0) { |
| bufferSize = DEFAULT_BUFFER_SIZE; |
| } |
| val = new BytesWritable(new byte[bufferSize]); |
| } |
| |
| @Override |
| public int read() throws IOException { |
| byte[] b = new byte[1]; |
| if (curPos < totalSize) { |
| if (positionInVal < val.getLength()) {// use buffered byte |
| b[0] = val.getBytes()[positionInVal++]; |
| ++curPos; |
| } else {// generate data |
| int num = read(b); |
| if (num < 0) { |
| return num; |
| } |
| } |
| } else { |
| return -1; |
| } |
| return b[0]; |
| } |
| |
| @Override |
| public int read(byte[] bytes) throws IOException { |
| return read(bytes, 0, bytes.length); |
| } |
| |
| @Override |
| public int read(byte[] bytes, int off, int len) throws IOException { |
| if (curPos == totalSize) { |
| return -1;// EOF |
| } |
| int numBytes = len; |
| if (numBytes > (totalSize - curPos)) {// position in file is close to EOF |
| numBytes = (int)(totalSize - curPos); |
| } |
| if (numBytes > (val.getLength() - positionInVal)) { |
| // need to generate data into val |
| r.nextBytes(val.getBytes()); |
| positionInVal = 0; |
| } |
| |
| System.arraycopy(val.getBytes(), positionInVal, bytes, off, numBytes); |
| curPos += numBytes; |
| positionInVal += numBytes; |
| return numBytes; |
| } |
| |
| @Override |
| public int available() { |
| return (int)(val.getLength() - positionInVal); |
| } |
| |
| @Override |
| public int read(long position, byte[] buffer, int offset, int length) |
| throws IOException { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| public void readFully(long position, byte[] buffer) throws IOException { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| public void readFully(long position, byte[] buffer, int offset, int length) |
| throws IOException { |
| throw new UnsupportedOperationException(); |
| } |
| |
| /** |
| * Get the current position in this stream/pseudo-file |
| * @return the position in this stream/pseudo-file |
| * @throws IOException |
| */ |
| @Override |
| public long getPos() throws IOException { |
| return curPos; |
| } |
| |
| @Override |
| public void seek(long pos) throws IOException { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| public boolean seekToNewSource(long targetPos) throws IOException { |
| throw new UnsupportedOperationException(); |
| } |
| } |
| |
| @Override |
| public FSDataOutputStream append(Path path, int bufferSize, |
| Progressable progress) throws IOException { |
| throw new UnsupportedOperationException("Append is not supported" |
| + " in pseudo local file system."); |
| } |
| |
| @Override |
| public boolean mkdirs(Path f, FsPermission permission) throws IOException { |
| throw new UnsupportedOperationException("Mkdirs is not supported" |
| + " in pseudo local file system."); |
| } |
| |
| @Override |
| public boolean rename(Path src, Path dst) throws IOException { |
| throw new UnsupportedOperationException("Rename is not supported" |
| + " in pseudo local file system."); |
| } |
| |
| @Override |
| public boolean delete(Path path, boolean recursive) { |
| throw new UnsupportedOperationException("File deletion is not supported " |
| + "in pseudo local file system."); |
| } |
| |
| @Override |
| public void setWorkingDirectory(Path newDir) { |
| throw new UnsupportedOperationException("SetWorkingDirectory " |
| + "is not supported in pseudo local file system."); |
| } |
| } |