blob: a9b3d72ff04629352005cd80ff76b86064b2cfab [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.hadoop.hbase.oss;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileChecksum;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FilterFileSystem;
import org.apache.hadoop.fs.FsStatus;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Options.ChecksumOpt;
import org.apache.hadoop.fs.ParentNotDirectoryException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.fs.XAttrSetFlag;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.oss.sync.AutoLock;
import org.apache.hadoop.hbase.oss.sync.AutoLock.LockedFSDataOutputStream;
import org.apache.hadoop.hbase.oss.sync.AutoLock.LockedRemoteIterator;
import org.apache.hadoop.hbase.oss.sync.TreeLockManager;
import org.apache.hadoop.hbase.oss.sync.TreeLockManager.Depth;
import org.apache.hadoop.util.Progressable;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* A FileSystem implementation that layers locking logic on top of another,
* underlying implementation. The appropriate lock on a path is acquired before
* passing on the call, and is either released upon returning or the resulting
* data stream / iterator takes ownership of the lock until it is closed. Newer
* features of FileSystem may either be unimplemented or will not be aware of
* the locks. Caveats with existing features include:
* <ul>
* <li>
* The deleteOnExit feature has no locking logic beyond the underlying
* exists() and delete() calls they make. Shouldn't be a problem due to the
* documented best-effort nature of the feature.
* </li>
* <li>
* globStatus isn't even atomic on HDFS, as it does a tree-walk and may do
* multiple independent requests to the NameNode. This is potentially worse
* on object-stores where latency is higher, etc. Currently globStatus is
* only used for the CoprocessorClassLoader and listing table directories.
* These operations are not considered sensitive to atomicity. Globbing
* could be made atomic by getting a write lock on the parent of the first
* wildcard.
* </li>
* </li>
* <li>
* Symlinking is not supported, but not used by HBase at all and not
* supported by mainstream object-stores considered in this design.
* </li>
* </ul>
public class HBaseObjectStoreSemantics extends FilterFileSystem {
private static final Logger LOG =
private TreeLockManager sync;
public void initialize(URI name, Configuration conf) throws IOException {
String scheme = name.getScheme();
String schemeImpl = "fs." + scheme + ".impl";
String hbossSchemeImpl = "fs.hboss." + schemeImpl;
String wrappedImpl = conf.get(hbossSchemeImpl);
Configuration internalConf = new Configuration(conf);
if (wrappedImpl != null) {"HBOSS wrapping file-system {} using implementation {}", name,
String disableCache = "fs." + scheme + ".impl.disable.cache";
internalConf.set(schemeImpl, wrappedImpl);
internalConf.set(disableCache, "true");
fs = FileSystem.get(name, internalConf);
sync = TreeLockManager.get(fs);
TreeLockManager getLockManager() {
return sync;
public String getScheme() {
return fs.getScheme();
public String getCanonicalServiceName() {
return fs.getCanonicalServiceName();
public String getName() {
return fs.getName();
public BlockLocation[] getFileBlockLocations(FileStatus file,
long start, long len) throws IOException {
try (AutoLock l = sync.lock(file.getPath())) {
return fs.getFileBlockLocations(file, start, len);
public BlockLocation[] getFileBlockLocations(Path p,
long start, long len) throws IOException {
try (AutoLock l = sync.lock(p)) {
return fs.getFileBlockLocations(p, start, len);
public FSDataInputStream open(Path f, int bufferSize)
throws IOException {
try (AutoLock l = sync.lock(f)) {
return, bufferSize);
public FSDataInputStream open(Path f) throws IOException {
try (AutoLock l = sync.lock(f)) {
public FSDataOutputStream create(Path f) throws IOException {
AutoLock lock = sync.lockWrite(f);
try {
FSDataOutputStream stream = fs.create(f);
return new LockedFSDataOutputStream(stream, lock);
} catch (IOException e) {
public FSDataOutputStream create(Path f, boolean overwrite)
throws IOException {
AutoLock lock = sync.lockWrite(f);
try {
FSDataOutputStream stream = fs.create(f, overwrite);
return new LockedFSDataOutputStream(stream, lock);
} catch (IOException e) {
public FSDataOutputStream create(Path f, Progressable progress)
throws IOException {
AutoLock lock = sync.lockWrite(f);
try {
FSDataOutputStream stream = fs.create(f, progress);
return new LockedFSDataOutputStream(stream, lock);
} catch (IOException e) {
public FSDataOutputStream create(Path f, short replication)
throws IOException {
AutoLock lock = sync.lockWrite(f);
try {
FSDataOutputStream stream = fs.create(f, replication);
return new LockedFSDataOutputStream(stream, lock);
} catch (IOException e) {
public FSDataOutputStream create(Path f, short replication,
Progressable progress) throws IOException {
AutoLock lock = sync.lockWrite(f);
try {
FSDataOutputStream stream = fs.create(f, replication, progress);
return new LockedFSDataOutputStream(stream, lock);
} catch (IOException e) {
public FSDataOutputStream create(Path f,
boolean overwrite,
int bufferSize
) throws IOException {
AutoLock lock = sync.lockWrite(f);
try {
FSDataOutputStream stream = fs.create(f, overwrite, bufferSize);
return new LockedFSDataOutputStream(stream, lock);
} catch (IOException e) {
public FSDataOutputStream create(Path f,
boolean overwrite,
int bufferSize,
Progressable progress
) throws IOException {
AutoLock lock = sync.lockWrite(f);
try {
FSDataOutputStream stream = fs.create(f, overwrite, bufferSize, progress);
return new LockedFSDataOutputStream(stream, lock);
} catch (IOException e) {
public FSDataOutputStream create(Path f,
boolean overwrite,
int bufferSize,
short replication,
long blockSize) throws IOException {
AutoLock lock = sync.lockWrite(f);
try {
FSDataOutputStream stream = fs.create(f, overwrite, bufferSize,
replication, blockSize);
return new LockedFSDataOutputStream(stream, lock);
} catch (IOException e) {
public FSDataOutputStream create(Path f,
boolean overwrite,
int bufferSize,
short replication,
long blockSize,
Progressable progress
) throws IOException {
AutoLock lock = sync.lockWrite(f);
try {
FSDataOutputStream stream = fs.create(f, overwrite, bufferSize,
replication, blockSize, progress);
return new LockedFSDataOutputStream(stream, lock);
} catch (IOException e) {
public FSDataOutputStream create(Path f,
FsPermission permission,
boolean overwrite,
int bufferSize,
short replication,
long blockSize,
Progressable progress) throws IOException {
AutoLock lock = sync.lockWrite(f);
try {
FSDataOutputStream stream = fs.create(f, permission, overwrite,
bufferSize, replication, blockSize, progress);
return new LockedFSDataOutputStream(stream, lock);
} catch (IOException e) {
public FSDataOutputStream create(Path f,
FsPermission permission,
EnumSet<CreateFlag> flags,
int bufferSize,
short replication,
long blockSize,
Progressable progress) throws IOException {
AutoLock lock = sync.lockWrite(f);
try {
FSDataOutputStream stream = fs.create(f, permission, flags, bufferSize,
replication, blockSize, progress);
return new LockedFSDataOutputStream(stream, lock);
} catch (IOException e) {
public FSDataOutputStream create(Path f,
FsPermission permission,
EnumSet<CreateFlag> flags,
int bufferSize,
short replication,
long blockSize,
Progressable progress,
ChecksumOpt checksumOpt) throws IOException {
AutoLock lock = sync.lockWrite(f);
try {
FSDataOutputStream stream = fs.create(f, permission, flags, bufferSize,
replication, blockSize, progress, checksumOpt);
return new LockedFSDataOutputStream(stream, lock);
} catch (IOException e) {
public FSDataOutputStream createNonRecursive(Path f,
boolean overwrite,
int bufferSize, short replication, long blockSize,
Progressable progress) throws IOException {
AutoLock lock = sync.lockWrite(f);
try {
FSDataOutputStream stream = fs.create(f, overwrite, bufferSize,
replication, blockSize, progress);
return new LockedFSDataOutputStream(stream, lock);
} catch (IOException e) {
public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
boolean overwrite, int bufferSize, short replication, long blockSize,
Progressable progress) throws IOException {
AutoLock lock = sync.lockWrite(f);
try {
FSDataOutputStream stream = fs.create(f, permission, overwrite,
bufferSize, replication, blockSize, progress);
return new LockedFSDataOutputStream(stream, lock);
} catch (IOException e) {
public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
EnumSet<CreateFlag> flags, int bufferSize, short replication, long blockSize,
Progressable progress) throws IOException {
AutoLock lock = sync.lockWrite(f);
try {
FSDataOutputStream stream = fs.create(f, permission, flags, bufferSize,
replication, blockSize, progress);
return new LockedFSDataOutputStream(stream, lock);
} catch (IOException e) {
public boolean createNewFile(Path f) throws IOException {
try (AutoLock l = sync.lockWrite(f)) {
return fs.createNewFile(f);
public FSDataOutputStream append(Path f) throws IOException {
AutoLock lock = sync.lockWrite(f);
try {
FSDataOutputStream stream = fs.append(f);
return new LockedFSDataOutputStream(stream, lock);
} catch (IOException e) {
public FSDataOutputStream append(Path f, int bufferSize) throws IOException {
AutoLock lock = sync.lockWrite(f);
try {
FSDataOutputStream stream = fs.append(f, bufferSize);
return new LockedFSDataOutputStream(stream, lock);
} catch (IOException e) {
public FSDataOutputStream append(Path f, int bufferSize,
Progressable progress) throws IOException {
AutoLock lock = sync.lockWrite(f);
try {
FSDataOutputStream stream = fs.append(f, bufferSize, progress);
return new LockedFSDataOutputStream(stream, lock);
} catch (IOException e) {
public void concat(final Path trg, final Path[] psrcs) throws IOException {
try (AutoLock l = sync.lock(trg, psrcs)) {
fs.concat(trg, psrcs);
public short getReplication(Path src) throws IOException {
try (AutoLock l = sync.lock(src)) {
return fs.getReplication(src);
public boolean setReplication(Path src, short replication)
throws IOException {
try (AutoLock l = sync.lock(src)) {
return fs.setReplication(src, replication);
public boolean rename(Path src, Path dst) throws IOException {
try (AutoLock l = sync.lockRename(src, dst)) {
return fs.rename(src, dst);
public boolean truncate(Path f, long newLength) throws IOException {
try (AutoLock l = sync.lock(f)) {
return fs.truncate(f, newLength);
public boolean delete(Path f) throws IOException {
try (AutoLock l = sync.lockDelete(f)) {
return fs.delete(f);
public boolean delete(Path f, boolean recursive) throws IOException {
try (AutoLock l = sync.lockDelete(f)) {
return fs.delete(f, recursive);
public boolean exists(Path f) throws IOException {
try (AutoLock l = sync.lock(f)) {
try {
return fs.exists(f);
} catch (Exception e) {
throw e;
public boolean isDirectory(Path f) throws IOException {
try (AutoLock l = sync.lock(f)) {
return fs.isDirectory(f);
public boolean isFile(Path f) throws IOException {
try (AutoLock l = sync.lock(f)) {
return fs.isFile(f);
public long getLength(Path f) throws IOException {
try (AutoLock l = sync.lock(f)) {
return fs.getLength(f);
public ContentSummary getContentSummary(Path f) throws IOException {
try (AutoLock l = sync.lock(f)) {
return fs.getContentSummary(f);
public FileStatus[] listStatus(Path f) throws FileNotFoundException,
IOException {
try (AutoLock l = sync.lockListing(f, Depth.DIRECTORY)) {
return fs.listStatus(f);
public RemoteIterator<Path> listCorruptFileBlocks(Path path)
throws IOException {
try (AutoLock l = sync.lock(path)) {
return fs.listCorruptFileBlocks(path);
public FileStatus[] listStatus(Path f, PathFilter filter)
throws FileNotFoundException, IOException {
try (AutoLock l = sync.lockListing(f, Depth.DIRECTORY)) {
return fs.listStatus(f, filter);
public FileStatus[] listStatus(Path[] files)
throws FileNotFoundException, IOException {
try (AutoLock l = sync.lockListings(files, Depth.DIRECTORY)) {
return fs.listStatus(files);
public FileStatus[] listStatus(Path[] files, PathFilter filter)
throws FileNotFoundException, IOException {
try (AutoLock l = sync.lockListings(files, Depth.DIRECTORY)) {
return fs.listStatus(files, filter);
public FileStatus[] globStatus(Path pathPattern) throws IOException {
LOG.warn("Globbing is never atomic!");
return fs.globStatus(pathPattern);
public FileStatus[] globStatus(Path pathPattern, PathFilter filter)
throws IOException {
LOG.warn("Globbing is never atomic!");
return fs.globStatus(pathPattern, filter);
public RemoteIterator<LocatedFileStatus> listLocatedStatus(final Path f)
throws FileNotFoundException, IOException {
AutoLock lock = sync.lockListing(f, Depth.DIRECTORY);
try {
RemoteIterator<LocatedFileStatus> iterator = fs.listLocatedStatus(f);
return new LockedRemoteIterator<LocatedFileStatus>(iterator, lock);
} catch (Exception e) {
public RemoteIterator<FileStatus> listStatusIterator(final Path p)
throws FileNotFoundException, IOException {
AutoLock lock = sync.lockListing(p, Depth.DIRECTORY);
try {
RemoteIterator<FileStatus> iterator = fs.listStatusIterator(p);
return new LockedRemoteIterator<FileStatus>(iterator, lock);
} catch (Exception e) {
public RemoteIterator<LocatedFileStatus> listFiles(
final Path f, final boolean recursive)
throws FileNotFoundException, IOException {
Depth depth = recursive ? Depth.RECURSIVE : Depth.DIRECTORY;
AutoLock lock = sync.lockListing(f, depth);
try {
RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(f, recursive);
return new LockedRemoteIterator<LocatedFileStatus>(iterator, lock);
} catch (Exception e) {
public boolean mkdirs(Path f) throws IOException {
// TODO this has implications for the parent dirs too
try (AutoLock l = sync.lock(f)) {
return fs.mkdirs(f);
public boolean mkdirs(Path f, FsPermission permission) throws IOException {
try (AutoLock l = sync.lock(f)) {
return fs.mkdirs(f, permission);
public void copyFromLocalFile(Path src, Path dst)
throws IOException {
try (AutoLock l = sync.lock(dst)) {
fs.copyFromLocalFile(src, dst);
public void moveFromLocalFile(Path[] srcs, Path dst)
throws IOException {
try (AutoLock l = sync.lock(dst)) {
fs.moveFromLocalFile(srcs, dst);
public void moveFromLocalFile(Path src, Path dst)
throws IOException {
try (AutoLock l = sync.lock(dst)) {
fs.moveFromLocalFile(src, dst);
public void copyFromLocalFile(boolean delSrc, Path src, Path dst)
throws IOException {
try (AutoLock l = sync.lock(dst)) {
fs.copyFromLocalFile(delSrc, src, dst);
public void copyFromLocalFile(boolean delSrc, boolean overwrite,
Path[] srcs, Path dst)
throws IOException {
try (AutoLock l = sync.lock(dst)) {
fs.copyFromLocalFile(delSrc, overwrite, srcs, dst);
public void copyFromLocalFile(boolean delSrc, boolean overwrite,
Path src, Path dst)
throws IOException {
try (AutoLock l = sync.lock(dst)) {
fs.copyFromLocalFile(delSrc, overwrite, src, dst);
public void copyToLocalFile(Path src, Path dst) throws IOException {
try (AutoLock l = sync.lock(src)) {
fs.copyToLocalFile(src, dst);
public void moveToLocalFile(Path src, Path dst) throws IOException {
try (AutoLock l = sync.lockDelete(src)) {
fs.moveToLocalFile(src, dst);
public void copyToLocalFile(boolean delSrc, Path src, Path dst)
throws IOException {
try (AutoLock l = sync.lock(src)) {
fs.copyToLocalFile(delSrc, src, dst);
public void copyToLocalFile(boolean delSrc, Path src, Path dst,
boolean useRawLocalFileSystem) throws IOException {
try (AutoLock l = sync.lock(src)) {
fs.copyToLocalFile(delSrc, src, dst, useRawLocalFileSystem);
public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile)
throws IOException {
return fs.startLocalOutput(fsOutputFile, tmpLocalFile);
public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile)
throws IOException {
try (AutoLock l = sync.lockWrite(fsOutputFile)) {
fs.completeLocalOutput(fsOutputFile, tmpLocalFile);
public void close() throws IOException {
// FS must close first so that FileSystem.processDeleteOnExit() can run
// while locking is still available.
public long getBlockSize(Path f) throws IOException {
try (AutoLock l = sync.lock(f)) {
return fs.getBlockSize(f);
public FileStatus getFileStatus(Path f) throws IOException {
try (AutoLock l = sync.lock(f)) {
return fs.getFileStatus(f);
public void access(Path path, FsAction mode) throws AccessControlException,
FileNotFoundException, IOException {
try (AutoLock l = sync.lock(path)) {
fs.access(path, mode);
public void createSymlink(final Path target, final Path link,
final boolean createParent) throws AccessControlException,
FileAlreadyExistsException, FileNotFoundException,
ParentNotDirectoryException, UnsupportedFileSystemException,
IOException {
throw new UnsupportedOperationException("HBOSS does not support symlinks");
public FileStatus getFileLinkStatus(final Path f)
throws AccessControlException, FileNotFoundException,
UnsupportedFileSystemException, IOException {
throw new UnsupportedOperationException("HBOSS does not support symlinks");
public boolean supportsSymlinks() {
return false;
public Path getLinkTarget(Path f) throws IOException {
throw new UnsupportedOperationException("HBOSS does not support symlinks");
public FileChecksum getFileChecksum(Path f) throws IOException {
try (AutoLock l = sync.lock(f)) {
return fs.getFileChecksum(f);
public FileChecksum getFileChecksum(Path f, final long length)
throws IOException {
try (AutoLock l = sync.lock(f)) {
return fs.getFileChecksum(f, length);
public void setVerifyChecksum(boolean verifyChecksum) {
public void setWriteChecksum(boolean writeChecksum) {
public FsStatus getStatus() throws IOException {
return fs.getStatus();
public FsStatus getStatus(Path p) throws IOException {
try (AutoLock l = sync.lock(p)) {
return fs.getStatus(p);
public void setPermission(Path p, FsPermission permission
) throws IOException {
try (AutoLock l = sync.lock(p)) {
fs.setPermission(p, permission);
public void setOwner(Path p, String username, String groupname
) throws IOException {
try (AutoLock l = sync.lock(p)) {
fs.setOwner(p, username, groupname);
public void setTimes(Path p, long mtime, long atime
) throws IOException {
try (AutoLock l = sync.lock(p)) {
fs.setTimes(p, mtime, atime);
public Path createSnapshot(Path path, String snapshotName)
throws IOException {
try (AutoLock l = sync.lockListing(path, Depth.RECURSIVE)) {
return fs.createSnapshot(path, snapshotName);
public void renameSnapshot(Path path, String snapshotOldName,
String snapshotNewName) throws IOException {
fs.renameSnapshot(path, snapshotOldName, snapshotNewName);
public void deleteSnapshot(Path path, String snapshotName)
throws IOException {
fs.deleteSnapshot(path, snapshotName);
public void modifyAclEntries(Path path, List<AclEntry> aclSpec)
throws IOException {
try (AutoLock l = sync.lock(path)) {
fs.modifyAclEntries(path, aclSpec);
public void removeAclEntries(Path path, List<AclEntry> aclSpec)
throws IOException {
try (AutoLock l = sync.lock(path)) {
fs.removeAclEntries(path, aclSpec);
public void removeDefaultAcl(Path path)
throws IOException {
try (AutoLock l = sync.lock(path)) {
public void removeAcl(Path path)
throws IOException {
try (AutoLock l = sync.lock(path)) {
public void setAcl(Path path, List<AclEntry> aclSpec) throws IOException {
try (AutoLock l = sync.lock(path)) {
fs.setAcl(path, aclSpec);
public AclStatus getAclStatus(Path path) throws IOException {
try (AutoLock l = sync.lock(path)) {
return fs.getAclStatus(path);
public void setXAttr(Path path, String name, byte[] value)
throws IOException {
try (AutoLock l = sync.lock(path)) {
fs.setXAttr(path, name, value);
public void setXAttr(Path path, String name, byte[] value,
EnumSet<XAttrSetFlag> flag) throws IOException {
try (AutoLock l = sync.lock(path)) {
fs.setXAttr(path, name, value, flag);
public byte[] getXAttr(Path path, String name) throws IOException {
try (AutoLock l = sync.lock(path)) {
return fs.getXAttr(path, name);
public Map<String, byte[]> getXAttrs(Path path) throws IOException {
try (AutoLock l = sync.lock(path)) {
return fs.getXAttrs(path);
public Map<String, byte[]> getXAttrs(Path path, List<String> names)
throws IOException {
try (AutoLock l = sync.lock(path)) {
return fs.getXAttrs(path, names);
public List<String> listXAttrs(Path path) throws IOException {
try (AutoLock l = sync.lock(path)) {
return fs.listXAttrs(path);
public void removeXAttr(Path path, String name) throws IOException {
try (AutoLock l = sync.lock(path)) {
fs.removeXAttr(path, name);