| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.fs.s3native; |
| |
| import java.io.BufferedOutputStream; |
| import java.io.File; |
| import java.io.FileNotFoundException; |
| import java.io.FileOutputStream; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.io.OutputStream; |
| import java.net.URI; |
| import java.security.DigestOutputStream; |
| import java.security.MessageDigest; |
| import java.security.NoSuchAlgorithmException; |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.TreeSet; |
| import java.util.concurrent.TimeUnit; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.classification.InterfaceAudience; |
| import org.apache.hadoop.classification.InterfaceStability; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.BufferedFSInputStream; |
| import org.apache.hadoop.fs.FSDataInputStream; |
| import org.apache.hadoop.fs.FSDataOutputStream; |
| import org.apache.hadoop.fs.FSInputStream; |
| import org.apache.hadoop.fs.FileStatus; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.fs.permission.FsPermission; |
| import org.apache.hadoop.fs.s3.S3Exception; |
| import org.apache.hadoop.io.retry.RetryPolicies; |
| import org.apache.hadoop.io.retry.RetryPolicy; |
| import org.apache.hadoop.io.retry.RetryProxy; |
| import org.apache.hadoop.util.Progressable; |
| |
| /** |
| * <p> |
| * A {@link FileSystem} for reading and writing files stored on |
| * <a href="http://aws.amazon.com/s3">Amazon S3</a>. |
| * Unlike {@link org.apache.hadoop.fs.s3.S3FileSystem} this implementation |
| * stores files on S3 in their |
| * native form so they can be read by other S3 tools. |
| * |
| * A note about directories. S3 of course has no "native" support for them. |
| * The idiom we choose then is: for any directory created by this class, |
| * we use an empty object "#{dirpath}_$folder$" as a marker. |
| * Further, to interoperate with other S3 tools, we also accept the following: |
| * - an object "#{dirpath}/' denoting a directory marker |
| * - if there exists any objects with the prefix "#{dirpath}/", then the |
| * directory is said to exist |
| * - if both a file with the name of a directory and a marker for that |
| * directory exists, then the *file masks the directory*, and the directory |
| * is never returned. |
| * </p> |
| * @see org.apache.hadoop.fs.s3.S3FileSystem |
| */ |
| @InterfaceAudience.Public |
| @InterfaceStability.Stable |
| public class NativeS3FileSystem extends FileSystem { |
| |
| public static final Log LOG = |
| LogFactory.getLog(NativeS3FileSystem.class); |
| |
| private static final String FOLDER_SUFFIX = "_$folder$"; |
| static final String PATH_DELIMITER = Path.SEPARATOR; |
| private static final int S3_MAX_LISTING_LENGTH = 1000; |
| |
| static class NativeS3FsInputStream extends FSInputStream { |
| |
| private NativeFileSystemStore store; |
| private Statistics statistics; |
| private InputStream in; |
| private final String key; |
| private long pos = 0; |
| |
| public NativeS3FsInputStream(NativeFileSystemStore store, Statistics statistics, InputStream in, String key) { |
| this.store = store; |
| this.statistics = statistics; |
| this.in = in; |
| this.key = key; |
| } |
| |
| @Override |
| public synchronized int read() throws IOException { |
| int result = -1; |
| try { |
| result = in.read(); |
| } catch (IOException e) { |
| LOG.info("Received IOException while reading '" + key + "', attempting to reopen."); |
| seek(pos); |
| result = in.read(); |
| } |
| if (result != -1) { |
| pos++; |
| } |
| if (statistics != null && result != -1) { |
| statistics.incrementBytesRead(1); |
| } |
| return result; |
| } |
| @Override |
| public synchronized int read(byte[] b, int off, int len) |
| throws IOException { |
| |
| int result = -1; |
| try { |
| result = in.read(b, off, len); |
| } catch (IOException e) { |
| LOG.info("Received IOException while reading '" + key + "', attempting to reopen."); |
| seek(pos); |
| result = in.read(b, off, len); |
| } |
| if (result > 0) { |
| pos += result; |
| } |
| if (statistics != null && result > 0) { |
| statistics.incrementBytesRead(result); |
| } |
| return result; |
| } |
| |
| @Override |
| public void close() throws IOException { |
| in.close(); |
| } |
| |
| @Override |
| public synchronized void seek(long pos) throws IOException { |
| in.close(); |
| LOG.info("Opening key '" + key + "' for reading at position '" + pos + "'"); |
| in = store.retrieve(key, pos); |
| this.pos = pos; |
| } |
| @Override |
| public synchronized long getPos() throws IOException { |
| return pos; |
| } |
| @Override |
| public boolean seekToNewSource(long targetPos) throws IOException { |
| return false; |
| } |
| } |
| |
| private class NativeS3FsOutputStream extends OutputStream { |
| |
| private Configuration conf; |
| private String key; |
| private File backupFile; |
| private OutputStream backupStream; |
| private MessageDigest digest; |
| private boolean closed; |
| |
| public NativeS3FsOutputStream(Configuration conf, |
| NativeFileSystemStore store, String key, Progressable progress, |
| int bufferSize) throws IOException { |
| this.conf = conf; |
| this.key = key; |
| this.backupFile = newBackupFile(); |
| LOG.info("OutputStream for key '" + key + "' writing to tempfile '" + this.backupFile + "'"); |
| try { |
| this.digest = MessageDigest.getInstance("MD5"); |
| this.backupStream = new BufferedOutputStream(new DigestOutputStream( |
| new FileOutputStream(backupFile), this.digest)); |
| } catch (NoSuchAlgorithmException e) { |
| LOG.warn("Cannot load MD5 digest algorithm," + |
| "skipping message integrity check.", e); |
| this.backupStream = new BufferedOutputStream( |
| new FileOutputStream(backupFile)); |
| } |
| } |
| |
| private File newBackupFile() throws IOException { |
| File dir = new File(conf.get("fs.s3.buffer.dir")); |
| if (!dir.mkdirs() && !dir.exists()) { |
| throw new IOException("Cannot create S3 buffer directory: " + dir); |
| } |
| File result = File.createTempFile("output-", ".tmp", dir); |
| result.deleteOnExit(); |
| return result; |
| } |
| |
| @Override |
| public void flush() throws IOException { |
| backupStream.flush(); |
| } |
| |
| @Override |
| public synchronized void close() throws IOException { |
| if (closed) { |
| return; |
| } |
| |
| backupStream.close(); |
| LOG.info("OutputStream for key '" + key + "' closed. Now beginning upload"); |
| |
| try { |
| byte[] md5Hash = digest == null ? null : digest.digest(); |
| store.storeFile(key, backupFile, md5Hash); |
| } finally { |
| if (!backupFile.delete()) { |
| LOG.warn("Could not delete temporary s3n file: " + backupFile); |
| } |
| super.close(); |
| closed = true; |
| } |
| LOG.info("OutputStream for key '" + key + "' upload complete"); |
| } |
| |
| @Override |
| public void write(int b) throws IOException { |
| backupStream.write(b); |
| } |
| |
| @Override |
| public void write(byte[] b, int off, int len) throws IOException { |
| backupStream.write(b, off, len); |
| } |
| } |
| |
| private URI uri; |
| private NativeFileSystemStore store; |
| private Path workingDir; |
| |
| public NativeS3FileSystem() { |
| // set store in initialize() |
| } |
| |
| public NativeS3FileSystem(NativeFileSystemStore store) { |
| this.store = store; |
| } |
| |
| @Override |
| public void initialize(URI uri, Configuration conf) throws IOException { |
| super.initialize(uri, conf); |
| if (store == null) { |
| store = createDefaultStore(conf); |
| } |
| store.initialize(uri, conf); |
| setConf(conf); |
| this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority()); |
| this.workingDir = |
| new Path("/user", System.getProperty("user.name")).makeQualified(this); |
| } |
| |
| private static NativeFileSystemStore createDefaultStore(Configuration conf) { |
| NativeFileSystemStore store = new Jets3tNativeFileSystemStore(); |
| |
| RetryPolicy basePolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep( |
| conf.getInt("fs.s3.maxRetries", 4), |
| conf.getLong("fs.s3.sleepTimeSeconds", 10), TimeUnit.SECONDS); |
| Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap = |
| new HashMap<Class<? extends Exception>, RetryPolicy>(); |
| exceptionToPolicyMap.put(IOException.class, basePolicy); |
| exceptionToPolicyMap.put(S3Exception.class, basePolicy); |
| |
| RetryPolicy methodPolicy = RetryPolicies.retryByException( |
| RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap); |
| Map<String, RetryPolicy> methodNameToPolicyMap = |
| new HashMap<String, RetryPolicy>(); |
| methodNameToPolicyMap.put("storeFile", methodPolicy); |
| methodNameToPolicyMap.put("rename", methodPolicy); |
| |
| return (NativeFileSystemStore) |
| RetryProxy.create(NativeFileSystemStore.class, store, |
| methodNameToPolicyMap); |
| } |
| |
| private static String pathToKey(Path path) { |
| if (path.toUri().getScheme() != null && "".equals(path.toUri().getPath())) { |
| // allow uris without trailing slash after bucket to refer to root, |
| // like s3n://mybucket |
| return ""; |
| } |
| if (!path.isAbsolute()) { |
| throw new IllegalArgumentException("Path must be absolute: " + path); |
| } |
| String ret = path.toUri().getPath().substring(1); // remove initial slash |
| if (ret.endsWith("/") && (ret.indexOf("/") != ret.length() - 1)) { |
| ret = ret.substring(0, ret.length() -1); |
| } |
| return ret; |
| } |
| |
| private static Path keyToPath(String key) { |
| return new Path("/" + key); |
| } |
| |
| private Path makeAbsolute(Path path) { |
| if (path.isAbsolute()) { |
| return path; |
| } |
| return new Path(workingDir, path); |
| } |
| |
| /** This optional operation is not yet supported. */ |
| @Override |
| public FSDataOutputStream append(Path f, int bufferSize, |
| Progressable progress) throws IOException { |
| throw new IOException("Not supported"); |
| } |
| |
| @Override |
| public FSDataOutputStream create(Path f, FsPermission permission, |
| boolean overwrite, int bufferSize, short replication, long blockSize, |
| Progressable progress) throws IOException { |
| |
| if (exists(f) && !overwrite) { |
| throw new IOException("File already exists:"+f); |
| } |
| |
| if(LOG.isDebugEnabled()) { |
| LOG.debug("Creating new file '" + f + "' in S3"); |
| } |
| Path absolutePath = makeAbsolute(f); |
| String key = pathToKey(absolutePath); |
| return new FSDataOutputStream(new NativeS3FsOutputStream(getConf(), store, |
| key, progress, bufferSize), statistics); |
| } |
| |
| @Override |
| public boolean delete(Path f, boolean recurse) throws IOException { |
| FileStatus status; |
| try { |
| status = getFileStatus(f); |
| } catch (FileNotFoundException e) { |
| if(LOG.isDebugEnabled()) { |
| LOG.debug("Delete called for '" + f + |
| "' but file does not exist, so returning false"); |
| } |
| return false; |
| } |
| Path absolutePath = makeAbsolute(f); |
| String key = pathToKey(absolutePath); |
| if (status.isDirectory()) { |
| if (!recurse && listStatus(f).length > 0) { |
| throw new IOException("Can not delete " + f + " at is a not empty directory and recurse option is false"); |
| } |
| |
| createParent(f); |
| |
| if(LOG.isDebugEnabled()) { |
| LOG.debug("Deleting directory '" + f + "'"); |
| } |
| String priorLastKey = null; |
| do { |
| PartialListing listing = store.list(key, S3_MAX_LISTING_LENGTH, priorLastKey, true); |
| for (FileMetadata file : listing.getFiles()) { |
| store.delete(file.getKey()); |
| } |
| priorLastKey = listing.getPriorLastKey(); |
| } while (priorLastKey != null); |
| |
| try { |
| store.delete(key + FOLDER_SUFFIX); |
| } catch (FileNotFoundException e) { |
| //this is fine, we don't require a marker |
| } |
| } else { |
| if(LOG.isDebugEnabled()) { |
| LOG.debug("Deleting file '" + f + "'"); |
| } |
| createParent(f); |
| store.delete(key); |
| } |
| return true; |
| } |
| |
| @Override |
| public FileStatus getFileStatus(Path f) throws IOException { |
| Path absolutePath = makeAbsolute(f); |
| String key = pathToKey(absolutePath); |
| |
| if (key.length() == 0) { // root always exists |
| return newDirectory(absolutePath); |
| } |
| |
| if(LOG.isDebugEnabled()) { |
| LOG.debug("getFileStatus retrieving metadata for key '" + key + "'"); |
| } |
| FileMetadata meta = store.retrieveMetadata(key); |
| if (meta != null) { |
| if(LOG.isDebugEnabled()) { |
| LOG.debug("getFileStatus returning 'file' for key '" + key + "'"); |
| } |
| return newFile(meta, absolutePath); |
| } |
| if (store.retrieveMetadata(key + FOLDER_SUFFIX) != null) { |
| if(LOG.isDebugEnabled()) { |
| LOG.debug("getFileStatus returning 'directory' for key '" + key + |
| "' as '" + key + FOLDER_SUFFIX + "' exists"); |
| } |
| return newDirectory(absolutePath); |
| } |
| |
| if(LOG.isDebugEnabled()) { |
| LOG.debug("getFileStatus listing key '" + key + "'"); |
| } |
| PartialListing listing = store.list(key, 1); |
| if (listing.getFiles().length > 0 || |
| listing.getCommonPrefixes().length > 0) { |
| if(LOG.isDebugEnabled()) { |
| LOG.debug("getFileStatus returning 'directory' for key '" + key + |
| "' as it has contents"); |
| } |
| return newDirectory(absolutePath); |
| } |
| |
| if(LOG.isDebugEnabled()) { |
| LOG.debug("getFileStatus could not find key '" + key + "'"); |
| } |
| throw new FileNotFoundException("No such file or directory '" + absolutePath + "'"); |
| } |
| |
| @Override |
| public URI getUri() { |
| return uri; |
| } |
| |
| /** |
| * <p> |
| * If <code>f</code> is a file, this method will make a single call to S3. |
| * If <code>f</code> is a directory, this method will make a maximum of |
| * (<i>n</i> / 1000) + 2 calls to S3, where <i>n</i> is the total number of |
| * files and directories contained directly in <code>f</code>. |
| * </p> |
| */ |
| @Override |
| public FileStatus[] listStatus(Path f) throws IOException { |
| |
| Path absolutePath = makeAbsolute(f); |
| String key = pathToKey(absolutePath); |
| |
| if (key.length() > 0) { |
| FileMetadata meta = store.retrieveMetadata(key); |
| if (meta != null) { |
| return new FileStatus[] { newFile(meta, absolutePath) }; |
| } |
| } |
| |
| URI pathUri = absolutePath.toUri(); |
| Set<FileStatus> status = new TreeSet<FileStatus>(); |
| String priorLastKey = null; |
| do { |
| PartialListing listing = store.list(key, S3_MAX_LISTING_LENGTH, priorLastKey, false); |
| for (FileMetadata fileMetadata : listing.getFiles()) { |
| Path subpath = keyToPath(fileMetadata.getKey()); |
| String relativePath = pathUri.relativize(subpath.toUri()).getPath(); |
| |
| if (fileMetadata.getKey().equals(key + "/")) { |
| // this is just the directory we have been asked to list |
| } |
| else if (relativePath.endsWith(FOLDER_SUFFIX)) { |
| status.add(newDirectory(new Path( |
| absolutePath, |
| relativePath.substring(0, relativePath.indexOf(FOLDER_SUFFIX))))); |
| } |
| else { |
| status.add(newFile(fileMetadata, subpath)); |
| } |
| } |
| for (String commonPrefix : listing.getCommonPrefixes()) { |
| Path subpath = keyToPath(commonPrefix); |
| String relativePath = pathUri.relativize(subpath.toUri()).getPath(); |
| status.add(newDirectory(new Path(absolutePath, relativePath))); |
| } |
| priorLastKey = listing.getPriorLastKey(); |
| } while (priorLastKey != null); |
| |
| if (status.isEmpty() && |
| key.length() > 0 && |
| store.retrieveMetadata(key + FOLDER_SUFFIX) == null) { |
| throw new FileNotFoundException("File " + f + " does not exist."); |
| } |
| |
| return status.toArray(new FileStatus[status.size()]); |
| } |
| |
| private FileStatus newFile(FileMetadata meta, Path path) { |
| return new FileStatus(meta.getLength(), false, 1, getDefaultBlockSize(), |
| meta.getLastModified(), path.makeQualified(this)); |
| } |
| |
| private FileStatus newDirectory(Path path) { |
| return new FileStatus(0, true, 1, 0, 0, path.makeQualified(this)); |
| } |
| |
| @Override |
| public boolean mkdirs(Path f, FsPermission permission) throws IOException { |
| Path absolutePath = makeAbsolute(f); |
| List<Path> paths = new ArrayList<Path>(); |
| do { |
| paths.add(0, absolutePath); |
| absolutePath = absolutePath.getParent(); |
| } while (absolutePath != null); |
| |
| boolean result = true; |
| for (Path path : paths) { |
| result &= mkdir(path); |
| } |
| return result; |
| } |
| |
| private boolean mkdir(Path f) throws IOException { |
| try { |
| FileStatus fileStatus = getFileStatus(f); |
| if (fileStatus.isFile()) { |
| throw new IOException(String.format( |
| "Can't make directory for path '%s' since it is a file.", f)); |
| |
| } |
| } catch (FileNotFoundException e) { |
| if(LOG.isDebugEnabled()) { |
| LOG.debug("Making dir '" + f + "' in S3"); |
| } |
| String key = pathToKey(f) + FOLDER_SUFFIX; |
| store.storeEmptyFile(key); |
| } |
| return true; |
| } |
| |
| @Override |
| public FSDataInputStream open(Path f, int bufferSize) throws IOException { |
| FileStatus fs = getFileStatus(f); // will throw if the file doesn't exist |
| if (fs.isDirectory()) { |
| throw new IOException("'" + f + "' is a directory"); |
| } |
| LOG.info("Opening '" + f + "' for reading"); |
| Path absolutePath = makeAbsolute(f); |
| String key = pathToKey(absolutePath); |
| return new FSDataInputStream(new BufferedFSInputStream( |
| new NativeS3FsInputStream(store, statistics, store.retrieve(key), key), bufferSize)); |
| } |
| |
| // rename() and delete() use this method to ensure that the parent directory |
| // of the source does not vanish. |
| private void createParent(Path path) throws IOException { |
| Path parent = path.getParent(); |
| if (parent != null) { |
| String key = pathToKey(makeAbsolute(parent)); |
| if (key.length() > 0) { |
| store.storeEmptyFile(key + FOLDER_SUFFIX); |
| } |
| } |
| } |
| |
| |
| @Override |
| public boolean rename(Path src, Path dst) throws IOException { |
| |
| String srcKey = pathToKey(makeAbsolute(src)); |
| |
| if (srcKey.length() == 0) { |
| // Cannot rename root of file system |
| return false; |
| } |
| |
| final String debugPreamble = "Renaming '" + src + "' to '" + dst + "' - "; |
| |
| // Figure out the final destination |
| String dstKey; |
| try { |
| boolean dstIsFile = getFileStatus(dst).isFile(); |
| if (dstIsFile) { |
| if(LOG.isDebugEnabled()) { |
| LOG.debug(debugPreamble + |
| "returning false as dst is an already existing file"); |
| } |
| return false; |
| } else { |
| if(LOG.isDebugEnabled()) { |
| LOG.debug(debugPreamble + "using dst as output directory"); |
| } |
| dstKey = pathToKey(makeAbsolute(new Path(dst, src.getName()))); |
| } |
| } catch (FileNotFoundException e) { |
| if(LOG.isDebugEnabled()) { |
| LOG.debug(debugPreamble + "using dst as output destination"); |
| } |
| dstKey = pathToKey(makeAbsolute(dst)); |
| try { |
| if (getFileStatus(dst.getParent()).isFile()) { |
| if(LOG.isDebugEnabled()) { |
| LOG.debug(debugPreamble + |
| "returning false as dst parent exists and is a file"); |
| } |
| return false; |
| } |
| } catch (FileNotFoundException ex) { |
| if(LOG.isDebugEnabled()) { |
| LOG.debug(debugPreamble + |
| "returning false as dst parent does not exist"); |
| } |
| return false; |
| } |
| } |
| |
| boolean srcIsFile; |
| try { |
| srcIsFile = getFileStatus(src).isFile(); |
| } catch (FileNotFoundException e) { |
| if(LOG.isDebugEnabled()) { |
| LOG.debug(debugPreamble + "returning false as src does not exist"); |
| } |
| return false; |
| } |
| if (srcIsFile) { |
| if(LOG.isDebugEnabled()) { |
| LOG.debug(debugPreamble + |
| "src is file, so doing copy then delete in S3"); |
| } |
| store.copy(srcKey, dstKey); |
| store.delete(srcKey); |
| } else { |
| if(LOG.isDebugEnabled()) { |
| LOG.debug(debugPreamble + "src is directory, so copying contents"); |
| } |
| store.storeEmptyFile(dstKey + FOLDER_SUFFIX); |
| |
| List<String> keysToDelete = new ArrayList<String>(); |
| String priorLastKey = null; |
| do { |
| PartialListing listing = store.list(srcKey, S3_MAX_LISTING_LENGTH, priorLastKey, true); |
| for (FileMetadata file : listing.getFiles()) { |
| keysToDelete.add(file.getKey()); |
| store.copy(file.getKey(), dstKey + file.getKey().substring(srcKey.length())); |
| } |
| priorLastKey = listing.getPriorLastKey(); |
| } while (priorLastKey != null); |
| |
| if(LOG.isDebugEnabled()) { |
| LOG.debug(debugPreamble + |
| "all files in src copied, now removing src files"); |
| } |
| for (String key: keysToDelete) { |
| store.delete(key); |
| } |
| |
| try { |
| store.delete(srcKey + FOLDER_SUFFIX); |
| } catch (FileNotFoundException e) { |
| //this is fine, we don't require a marker |
| } |
| if(LOG.isDebugEnabled()) { |
| LOG.debug(debugPreamble + "done"); |
| } |
| } |
| |
| return true; |
| } |
| |
| @Override |
| public long getDefaultBlockSize() { |
| return getConf().getLong("fs.s3n.block.size", 64 * 1024 * 1024); |
| } |
| |
| /** |
| * Set the working directory to the given directory. |
| */ |
| @Override |
| public void setWorkingDirectory(Path newDir) { |
| workingDir = newDir; |
| } |
| |
| @Override |
| public Path getWorkingDirectory() { |
| return workingDir; |
| } |
| } |