HBASE-22149 HBOSS: A FileSystem implementation to provide HBase's required semantics on object stores.
* Adds top level project structure
* Adds module for hbase-oss with initial implementation of wrapper FileSystem to enforce semantics needed by hbase
Closes #1
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..061a95c
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,4 @@
+target
+auth-keys.xml
+.idea
+*.iml
diff --git a/LICENSE.txt b/LICENSE.txt
new file mode 100644
index 0000000..d645695
--- /dev/null
+++ b/LICENSE.txt
@@ -0,0 +1,202 @@
+
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "[]"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright [yyyy] [name of copyright owner]
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
diff --git a/NOTICE.txt b/NOTICE.txt
new file mode 100644
index 0000000..6e3985d
--- /dev/null
+++ b/NOTICE.txt
@@ -0,0 +1,5 @@
+Apache HBase FileSystem-related modules
+Copyright 2019 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
diff --git a/hbase-oss/README.md b/hbase-oss/README.md
new file mode 100644
index 0000000..1d100af
--- /dev/null
+++ b/hbase-oss/README.md
@@ -0,0 +1,123 @@
+<!---
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License. See accompanying LICENSE file.
+-->
+
+# HBOSS: HBase / Object Store Semantics adapter
+
+## Introduction
+
+This module provides an implementation of Apache Hadoop's FileSystem interface
+that bridges the gap between Apache HBase, which assumes that many operations
+are atomic, and object-store implementations of FileSystem (such as s3a) which
+inherently cannot provide atomic semantics to those operations natively.
+
+This is implemented separately from s3a so that it can potentially be used for
+other object stores. It is also impractical to provide the required semantics
+for the general case without significant drawbacks in some cases. A separate
+implementation allows all trade-offs to be made on HBase's terms.
+
+## Lock Implementations
+
+TreeLockManager implements generic logic for managing read / write locks on
+branches of filesystem hierarchies, but needs to be extended by an
+implementation that provides individual read / write locks and methods to
+traverse the tree.
+
+The desired implementation must be configured by setting to one of the values
+below:
+
+ fs.hboss.sync.impl
+
+### Null Implementation (org.apache.hadoop.hbase.oss.sync.NullTreeLockManager)
+
+The null implementation just provides no-op methods instead of actual locking
+operations. This functions as an easy way to verify that a test case has
+successfully reproduced a problem that is hidden by the other implementations.
+
+### Local Implementation (org.apache.hadoop.hbase.oss.sync.LocalTreeLockManager)
+
+Primarily intended to help with development and validation, but could possibly
+work for a standalone instance of HBase. This implementation uses Java's
+built-in ReentrantReadWriteLock.
+
+### ZooKeeper Implementation (org.apache.hadoop.hbase.oss.sync.ZKTreeLockManager)
+
+This implementation is intended for production use once it is stable. It uses
+Apache Curator's implementation of read / write locks on Apache ZooKeeper. It
+could share a ZooKeeper ensemble with the HBase cluster.
+
+At a minimum, you must configure the ZooKeeper connection string (including
+root znode):
+
+ fs.hboss.sync.zk.connectionString
+
+You may also want to configure:
+
+ fs.hboss.sync.zk.sleep.base.ms (default 1000)
+ fs.hboss.sync.zk.sleep.max.retries (default 3)
+
+### DynamoDB Implementation (not implemented)
+
+An implementation based on Amazon's DynamoDB lock library was considered but
+was not completed due to the lack of an efficient way to traverse the tree and
+discover locks on child nodes. The benefit is that S3Guard is required for s3a
+use and as such there's a dependency on DynamoDB anyway.
+
+## Storage Implementations
+
+Currently HBOSS is primarily designed for and exclusively tested with Hadoop's
+s3a client against Amazon S3. S3Guard must be enabled. Both this requirement and
+the use of an external data store for locking have serious implications if any
+other client accesses the same data.
+
+In theory, HBOSS could also work well with Google's cloud storage client (gs)
+or other object storage clients.
+
+## FileSystem Instantiation
+
+There are 2 ways to get an HBOSS instance. It can be instantiated directly and
+given a URI and Configuration object that can be used to get the underlying
+FileSystem:
+
+ Configuration conf = new Configuration();
+ FileSystem hboss = new HBaseObjectStoreSemantics();
+ hboss.initialize("s3a://bucket/", conf);
+
+If the application code cannot be changed, you can remap the object store
+client's schema to the HBOSS implementation, and set
+fs.hboss.fs.<scheme>.impl to the underlying implementation that HBOSS
+should wrap:
+
+ Configuration conf = new Configuration();
+ conf.set("fs.hboss.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem");
+ conf.set("fs.s3a.impl", "org.apache.hadoop.hbase.oss.HBaseObjectStoreSemantics");
+ FileSystem hboss = FileSystem.get("s3a://bucket/", conf);
+
+## Testing
+
+You can quickly run HBOSS's tests with any of the implementations:
+
+ mvn verify -Pnull # reproduce the problems
+ mvn verify -Plocal # useful for debugging
+ mvn verify -Pzk # the default
+
+If the 'zk' profile is activated, it will start an embedded ZooKeeper process.
+The tests can also be run against a distributed ZooKeeper ensemble by setting
+fs.hboss.sync.zk.connectionString in src/test/resources/core-site.xml.
+
+By default, the tests will also be run against a mock S3 client that works on
+in-memory data structures. One can also set fs.hboss.data.uri to point to any
+other storage in src/test/resources/core-site.xml.
+
+Any required credentials or other individal configuration should be set in
+src/test/resources/auth-keys.xml, which should be ignored by source control.
diff --git a/hbase-oss/pom.xml b/hbase-oss/pom.xml
new file mode 100644
index 0000000..aa4c475
--- /dev/null
+++ b/hbase-oss/pom.xml
@@ -0,0 +1,236 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License. See accompanying LICENSE file.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.hbase.filesystem</groupId>
+ <artifactId>hbase-filesystem</artifactId>
+ <version>1.0.0-alpha1-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+ <artifactId>hbase-oss</artifactId>
+ <name>Apache HBase / Object-Store Semantics Module</name>
+ <description>
+ This module provides atomic file-system semantics required by HBase when
+ running on object-store based FileSystem implementations that can not
+ natively offer those semantics. It does this by locking, so operations may
+ not be fast, but should be transactional.
+ </description>
+ <packaging>jar</packaging>
+
+ <properties>
+ <fs.hboss.sync.impl>org.apache.hadoop.hbase.oss.sync.ZKTreeLockManager</fs.hboss.sync.impl>
+ <aws-java-sdk.version>1.11.525</aws-java-sdk.version>
+ </properties>
+
+ <profiles>
+ <profile>
+ <id>null</id>
+ <properties>
+ <fs.hboss.sync.impl>org.apache.hadoop.hbase.oss.sync.NullTreeLockManager</fs.hboss.sync.impl>
+ </properties>
+ </profile>
+ <profile>
+ <id>local</id>
+ <properties>
+ <fs.hboss.sync.impl>org.apache.hadoop.hbase.oss.sync.LocalTreeLockManager</fs.hboss.sync.impl>
+ </properties>
+ </profile>
+ <profile>
+ <id>zk</id>
+ <properties>
+ <fs.hboss.sync.impl>org.apache.hadoop.hbase.oss.sync.ZKTreeLockManager</fs.hboss.sync.impl>
+ </properties>
+ </profile>
+ </profiles>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <!-- TODO add parallel tests for everything but TestHBOSSContractRootDirectory-->
+ <configuration>
+ <systemProperties>
+ <fs.hboss.sync.impl>${fs.hboss.sync.impl}</fs.hboss.sync.impl>
+ </systemProperties>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-annotations</artifactId>
+ <version>${hadoop.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>${hadoop.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-common</artifactId>
+ <version>${hbase.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ <version>${commons-lang3.version}</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase.thirdparty</groupId>
+ <artifactId>hbase-shaded-miscellaneous</artifactId>
+ <version>${hbase-thirdparty.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>${slf4j.version}</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <version>${slf4j.version}</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.yetus</groupId>
+ <artifactId>audience-annotations</artifactId>
+ <version>${audience-annotations.version}</version>
+ <scope>compile</scope>
+ </dependency>
+
+ <!-- Test dependencies -->
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>${hadoop.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <!-- Banned import in HBase -->
+ <groupId>com.google.code.findbugs</groupId>
+ <artifactId>jsr305</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-distcp</artifactId>
+ <version>${hadoop.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-distcp</artifactId>
+ <version>${hadoop.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ <version>${hadoop.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
+ <version>${hadoop.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-common</artifactId>
+ <version>${hbase.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>${junit.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <!-- For testing against S3 -->
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-aws</artifactId>
+ <version>${hadoop.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.amazonaws</groupId>
+ <artifactId>aws-java-sdk-bundle</artifactId>
+ <version>${aws-java-sdk.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ <version>${commons-io.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <!-- For ZooKeeper implementation -->
+ <dependency>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>zookeeper</artifactId>
+ <version>${zookeeper.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-zookeeper</artifactId>
+ <version>${hbase.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-zookeeper</artifactId>
+ <version>${hbase.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-client</artifactId>
+ <version>${curator.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-framework</artifactId>
+ <version>${curator.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-recipes</artifactId>
+ <version>${curator.version}</version>
+ </dependency>
+
+ </dependencies>
+
+</project>
diff --git a/hbase-oss/src/main/java/org/apache/hadoop/hbase/oss/Constants.java b/hbase-oss/src/main/java/org/apache/hadoop/hbase/oss/Constants.java
new file mode 100644
index 0000000..8221680
--- /dev/null
+++ b/hbase-oss/src/main/java/org/apache/hadoop/hbase/oss/Constants.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.oss;
+
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class Constants {
+ public static final String DATA_URI = "fs.hboss.data.uri";
+ public static final String SYNC_IMPL = "fs.hboss.sync.impl";
+
+ public static final String ZK_CONN_STRING = "fs.hboss.sync.zk.connectionString";
+ public static final String ZK_BASE_SLEEP_MS = "fs.hboss.sync.zk.sleep.base.ms";
+ public static final String ZK_MAX_RETRIES = "fs.hboss.sync.zk.sleep.max.retries";
+
+ public static final String CONTRACT_TEST_SCHEME = "fs.contract.test.fs.scheme";
+}
diff --git a/hbase-oss/src/main/java/org/apache/hadoop/hbase/oss/HBaseObjectStoreSemantics.java b/hbase-oss/src/main/java/org/apache/hadoop/hbase/oss/HBaseObjectStoreSemantics.java
new file mode 100644
index 0000000..f72d7e3
--- /dev/null
+++ b/hbase-oss/src/main/java/org/apache/hadoop/hbase/oss/HBaseObjectStoreSemantics.java
@@ -0,0 +1,949 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.oss;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileChecksum;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FsServerDefaults;
+import org.apache.hadoop.fs.FsStatus;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Options.ChecksumOpt;
+import org.apache.hadoop.fs.ParentNotDirectoryException;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
+import org.apache.hadoop.fs.XAttrSetFlag;
+import org.apache.hadoop.fs.permission.AclEntry;
+import org.apache.hadoop.fs.permission.AclStatus;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.oss.sync.AutoLock;
+import org.apache.hadoop.hbase.oss.sync.AutoLock.LockedFSDataOutputStream;
+import org.apache.hadoop.hbase.oss.sync.AutoLock.LockedRemoteIterator;
+import org.apache.hadoop.hbase.oss.sync.TreeLockManager;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A FileSystem implementation that layers locking logic on top of another,
+ * underlying implementation. The appropriate lock on a path is acquired before
+ * passing on the call, and is either released upon returning or the resulting
+ * data stream / iterator takes ownership of the lock until it is closed. Newer
+ * features of FileSystem may either be unimplemented or will not be aware of
+ * the locks. Caveats with existing features include:
+ * <ul>
+ * <li>
+ * The deleteOnExit feature has no locking logic beyond the underlying
+ * exists() and delete() calls they make. Shouldn't be a problem due to the
+ * documented best-effort nature of the feature.
+ * </li>
+ * <li>
+ * globStatus isn't even atomic on HDFS, as it does a tree-walk and may do
+ * multiple independent requests to the NameNode. This is potentially worse
+ * on object-stores where latency is higher, etc. Currently globStatus is
+ * only used for the CoprocessorClassLoader and listing table directories.
+ * These operations are not considered sensitive to atomicity. Globbing
+ * could be made atomic by getting a write lock on the parent of the first
+ * wildcard.
+ * </li>
+ * </li>
+ * <li>
+ * Symlinking is not supported, but not used by HBase at all and not
+ * supported by mainstream object-stores considered in this design.
+ * </li>
+ * </ul>
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
+@InterfaceStability.Unstable
+public class HBaseObjectStoreSemantics extends FileSystem {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(HBaseObjectStoreSemantics.class);
+
+ private FileSystem fs;
+
+ private TreeLockManager sync;
+
+ public void initialize(URI name, Configuration conf) throws IOException {
+ setConf(conf);
+
+ String scheme = name.getScheme();
+ String schemeImpl = "fs." + scheme + ".impl";
+ String hbossSchemeImpl = "fs.hboss." + schemeImpl;
+ String wrappedImpl = conf.get(hbossSchemeImpl);
+ Configuration internalConf = new Configuration(conf);
+
+ if (wrappedImpl != null) {
+ LOG.info("HBOSS wrapping file-system {} using implementation {}", name,
+ wrappedImpl);
+ String disableCache = "fs." + scheme + ".impl.disable.cache";
+ internalConf.set(schemeImpl, wrappedImpl);
+ internalConf.set(disableCache, "true");
+ }
+
+ fs = FileSystem.get(name, internalConf);
+ sync = TreeLockManager.get(fs);
+ }
+
+ @VisibleForTesting
+ TreeLockManager getLockManager() {
+ return sync;
+ }
+
+ public String getScheme() {
+ return fs.getScheme();
+ }
+
+ public URI getUri() {
+ return fs.getUri();
+ }
+
+ @Override
+ public String getCanonicalServiceName() {
+ return fs.getCanonicalServiceName();
+ }
+
+ @Deprecated
+ public String getName() {
+ return fs.getName();
+ }
+
+ public Path makeQualified(Path path) {
+ return fs.makeQualified(path);
+ }
+
+ public BlockLocation[] getFileBlockLocations(FileStatus file,
+ long start, long len) throws IOException {
+ try (AutoLock l = sync.lock(file.getPath())) {
+ return fs.getFileBlockLocations(file, start, len);
+ }
+ }
+
+ public BlockLocation[] getFileBlockLocations(Path p,
+ long start, long len) throws IOException {
+ try (AutoLock l = sync.lock(p)) {
+ return fs.getFileBlockLocations(p, start, len);
+ }
+ }
+
+ @Deprecated
+ public FsServerDefaults getServerDefaults() throws IOException {
+ return fs.getServerDefaults();
+ }
+
+ public FsServerDefaults getServerDefaults(Path p) throws IOException {
+ return fs.getServerDefaults(p);
+ }
+
+ public Path resolvePath(final Path p) throws IOException {
+ return fs.resolvePath(p);
+ }
+
+ public FSDataInputStream open(Path f, int bufferSize)
+ throws IOException {
+ try (AutoLock l = sync.lock(f)) {
+ return fs.open(f, bufferSize);
+ }
+ }
+
+ public FSDataInputStream open(Path f) throws IOException {
+ try (AutoLock l = sync.lock(f)) {
+ return fs.open(f);
+ }
+ }
+
+ public FSDataOutputStream create(Path f) throws IOException {
+ AutoLock lock = sync.lockWrite(f);
+ try {
+ FSDataOutputStream stream = fs.create(f);
+ return new LockedFSDataOutputStream(stream, lock);
+ } catch (IOException e) {
+ lock.close();
+ throw(e);
+ }
+ }
+
+ public FSDataOutputStream create(Path f, boolean overwrite)
+ throws IOException {
+ AutoLock lock = sync.lockWrite(f);
+ try {
+ FSDataOutputStream stream = fs.create(f, overwrite);
+ return new LockedFSDataOutputStream(stream, lock);
+ } catch (IOException e) {
+ lock.close();
+ throw(e);
+ }
+ }
+
+ public FSDataOutputStream create(Path f, Progressable progress)
+ throws IOException {
+ AutoLock lock = sync.lockWrite(f);
+ try {
+ FSDataOutputStream stream = fs.create(f, progress);
+ return new LockedFSDataOutputStream(stream, lock);
+ } catch (IOException e) {
+ lock.close();
+ throw(e);
+ }
+ }
+
+ public FSDataOutputStream create(Path f, short replication)
+ throws IOException {
+ AutoLock lock = sync.lockWrite(f);
+ try {
+ FSDataOutputStream stream = fs.create(f, replication);
+ return new LockedFSDataOutputStream(stream, lock);
+ } catch (IOException e) {
+ lock.close();
+ throw(e);
+ }
+ }
+
+ public FSDataOutputStream create(Path f, short replication,
+ Progressable progress) throws IOException {
+ AutoLock lock = sync.lockWrite(f);
+ try {
+ FSDataOutputStream stream = fs.create(f, replication, progress);
+ return new LockedFSDataOutputStream(stream, lock);
+ } catch (IOException e) {
+ lock.close();
+ throw(e);
+ }
+ }
+
+ public FSDataOutputStream create(Path f,
+ boolean overwrite,
+ int bufferSize
+ ) throws IOException {
+ AutoLock lock = sync.lockWrite(f);
+ try {
+ FSDataOutputStream stream = fs.create(f, overwrite, bufferSize);
+ return new LockedFSDataOutputStream(stream, lock);
+ } catch (IOException e) {
+ lock.close();
+ throw(e);
+ }
+ }
+
+ public FSDataOutputStream create(Path f,
+ boolean overwrite,
+ int bufferSize,
+ Progressable progress
+ ) throws IOException {
+ AutoLock lock = sync.lockWrite(f);
+ try {
+ FSDataOutputStream stream = fs.create(f, overwrite, bufferSize, progress);
+ return new LockedFSDataOutputStream(stream, lock);
+ } catch (IOException e) {
+ lock.close();
+ throw(e);
+ }
+ }
+
+ public FSDataOutputStream create(Path f,
+ boolean overwrite,
+ int bufferSize,
+ short replication,
+ long blockSize) throws IOException {
+ AutoLock lock = sync.lockWrite(f);
+ try {
+ FSDataOutputStream stream = fs.create(f, overwrite, bufferSize,
+ replication, blockSize);
+ return new LockedFSDataOutputStream(stream, lock);
+ } catch (IOException e) {
+ lock.close();
+ throw(e);
+ }
+ }
+
+ public FSDataOutputStream create(Path f,
+ boolean overwrite,
+ int bufferSize,
+ short replication,
+ long blockSize,
+ Progressable progress
+ ) throws IOException {
+ AutoLock lock = sync.lockWrite(f);
+ try {
+ FSDataOutputStream stream = fs.create(f, overwrite, bufferSize,
+ replication, blockSize, progress);
+ return new LockedFSDataOutputStream(stream, lock);
+ } catch (IOException e) {
+ lock.close();
+ throw(e);
+ }
+ }
+
+ public FSDataOutputStream create(Path f,
+ FsPermission permission,
+ boolean overwrite,
+ int bufferSize,
+ short replication,
+ long blockSize,
+ Progressable progress) throws IOException {
+ AutoLock lock = sync.lockWrite(f);
+ try {
+ FSDataOutputStream stream = fs.create(f, permission, overwrite,
+ bufferSize, replication, blockSize, progress);
+ return new LockedFSDataOutputStream(stream, lock);
+ } catch (IOException e) {
+ lock.close();
+ throw(e);
+ }
+ }
+
+ public FSDataOutputStream create(Path f,
+ FsPermission permission,
+ EnumSet<CreateFlag> flags,
+ int bufferSize,
+ short replication,
+ long blockSize,
+ Progressable progress) throws IOException {
+ AutoLock lock = sync.lockWrite(f);
+ try {
+ FSDataOutputStream stream = fs.create(f, permission, flags, bufferSize,
+ replication, blockSize, progress);
+ return new LockedFSDataOutputStream(stream, lock);
+ } catch (IOException e) {
+ lock.close();
+ throw(e);
+ }
+ }
+
+ public FSDataOutputStream create(Path f,
+ FsPermission permission,
+ EnumSet<CreateFlag> flags,
+ int bufferSize,
+ short replication,
+ long blockSize,
+ Progressable progress,
+ ChecksumOpt checksumOpt) throws IOException {
+ AutoLock lock = sync.lockWrite(f);
+ try {
+ FSDataOutputStream stream = fs.create(f, permission, flags, bufferSize,
+ replication, blockSize, progress, checksumOpt);
+ return new LockedFSDataOutputStream(stream, lock);
+ } catch (IOException e) {
+ lock.close();
+ throw(e);
+ }
+ }
+
+ public FSDataOutputStream createNonRecursive(Path f,
+ boolean overwrite,
+ int bufferSize, short replication, long blockSize,
+ Progressable progress) throws IOException {
+ AutoLock lock = sync.lockWrite(f);
+ try {
+ FSDataOutputStream stream = fs.create(f, overwrite, bufferSize,
+ replication, blockSize, progress);
+ return new LockedFSDataOutputStream(stream, lock);
+ } catch (IOException e) {
+ lock.close();
+ throw(e);
+ }
+ }
+
+ public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
+ boolean overwrite, int bufferSize, short replication, long blockSize,
+ Progressable progress) throws IOException {
+ AutoLock lock = sync.lockWrite(f);
+ try {
+ FSDataOutputStream stream = fs.create(f, permission, overwrite,
+ bufferSize, replication, blockSize, progress);
+ return new LockedFSDataOutputStream(stream, lock);
+ } catch (IOException e) {
+ lock.close();
+ throw(e);
+ }
+ }
+
+ public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
+ EnumSet<CreateFlag> flags, int bufferSize, short replication, long blockSize,
+ Progressable progress) throws IOException {
+ AutoLock lock = sync.lockWrite(f);
+ try {
+ FSDataOutputStream stream = fs.create(f, permission, flags, bufferSize,
+ replication, blockSize, progress);
+ return new LockedFSDataOutputStream(stream, lock);
+ } catch (IOException e) {
+ lock.close();
+ throw(e);
+ }
+ }
+
+ public boolean createNewFile(Path f) throws IOException {
+ try (AutoLock l = sync.lockWrite(f)) {
+ return fs.createNewFile(f);
+ }
+ }
+
+ public FSDataOutputStream append(Path f) throws IOException {
+ AutoLock lock = sync.lockWrite(f);
+ try {
+ FSDataOutputStream stream = fs.append(f);
+ return new LockedFSDataOutputStream(stream, lock);
+ } catch (IOException e) {
+ lock.close();
+ throw(e);
+ }
+ }
+
+ public FSDataOutputStream append(Path f, int bufferSize) throws IOException {
+ AutoLock lock = sync.lockWrite(f);
+ try {
+ FSDataOutputStream stream = fs.append(f, bufferSize);
+ return new LockedFSDataOutputStream(stream, lock);
+ } catch (IOException e) {
+ lock.close();
+ throw(e);
+ }
+ }
+
+ public FSDataOutputStream append(Path f, int bufferSize,
+ Progressable progress) throws IOException {
+ AutoLock lock = sync.lockWrite(f);
+ try {
+ FSDataOutputStream stream = fs.append(f, bufferSize, progress);
+ return new LockedFSDataOutputStream(stream, lock);
+ } catch (IOException e) {
+ lock.close();
+ throw(e);
+ }
+ }
+
+ public void concat(final Path trg, final Path[] psrcs) throws IOException {
+ try (AutoLock l = sync.lock(trg, psrcs)) {
+ fs.concat(trg, psrcs);
+ }
+ }
+
+ @Deprecated
+ public short getReplication(Path src) throws IOException {
+ try (AutoLock l = sync.lock(src)) {
+ return fs.getReplication(src);
+ }
+ }
+
+ public boolean setReplication(Path src, short replication)
+ throws IOException {
+ try (AutoLock l = sync.lock(src)) {
+ return fs.setReplication(src, replication);
+ }
+ }
+
+ public boolean rename(Path src, Path dst) throws IOException {
+ try (AutoLock l = sync.lockRename(src, dst)) {
+ return fs.rename(src, dst);
+ }
+ }
+
+ public boolean truncate(Path f, long newLength) throws IOException {
+ try (AutoLock l = sync.lock(f)) {
+ return fs.truncate(f, newLength);
+ }
+ }
+
+ @Deprecated
+ public boolean delete(Path f) throws IOException {
+ try (AutoLock l = sync.lockDelete(f)) {
+ return fs.delete(f);
+ }
+ }
+
+ public boolean delete(Path f, boolean recursive) throws IOException {
+ try (AutoLock l = sync.lockDelete(f)) {
+ return fs.delete(f, recursive);
+ }
+ }
+
+ public boolean exists(Path f) throws IOException {
+ try (AutoLock l = sync.lock(f)) {
+ try {
+ return fs.exists(f);
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw e;
+ }
+ }
+ }
+
+ @Deprecated
+ public boolean isDirectory(Path f) throws IOException {
+ try (AutoLock l = sync.lock(f)) {
+ return fs.isDirectory(f);
+ }
+ }
+
+ @Deprecated
+ public boolean isFile(Path f) throws IOException {
+ try (AutoLock l = sync.lock(f)) {
+ return fs.isFile(f);
+ }
+ }
+
+ @Deprecated
+ public long getLength(Path f) throws IOException {
+ try (AutoLock l = sync.lock(f)) {
+ return fs.getLength(f);
+ }
+ }
+
+ public ContentSummary getContentSummary(Path f) throws IOException {
+ try (AutoLock l = sync.lock(f)) {
+ return fs.getContentSummary(f);
+ }
+ }
+
+ public FileStatus[] listStatus(Path f) throws FileNotFoundException,
+ IOException {
+ try (AutoLock l = sync.lockListing(f)) {
+ return fs.listStatus(f);
+ }
+ }
+
+ public RemoteIterator<Path> listCorruptFileBlocks(Path path)
+ throws IOException {
+ try (AutoLock l = sync.lock(path)) {
+ return fs.listCorruptFileBlocks(path);
+ }
+ }
+
+ public FileStatus[] listStatus(Path f, PathFilter filter)
+ throws FileNotFoundException, IOException {
+ try (AutoLock l = sync.lockListing(f)) {
+ return fs.listStatus(f, filter);
+ }
+ }
+
+ public FileStatus[] listStatus(Path[] files)
+ throws FileNotFoundException, IOException {
+ try (AutoLock l = sync.lockListings(files)) {
+ return fs.listStatus(files);
+ }
+ }
+
+ public FileStatus[] listStatus(Path[] files, PathFilter filter)
+ throws FileNotFoundException, IOException {
+ try (AutoLock l = sync.lockListings(files)) {
+ return fs.listStatus(files, filter);
+ }
+ }
+
+ public FileStatus[] globStatus(Path pathPattern) throws IOException {
+ LOG.warn("Globbing is never atomic!");
+ return fs.globStatus(pathPattern);
+ }
+
+ public FileStatus[] globStatus(Path pathPattern, PathFilter filter)
+ throws IOException {
+ LOG.warn("Globbing is never atomic!");
+ return fs.globStatus(pathPattern, filter);
+ }
+
+ public RemoteIterator<LocatedFileStatus> listLocatedStatus(final Path f)
+ throws FileNotFoundException, IOException {
+ AutoLock lock = sync.lockListing(f);
+ try {
+ RemoteIterator<LocatedFileStatus> iterator = fs.listLocatedStatus(f);
+ return new LockedRemoteIterator<LocatedFileStatus>(iterator, lock);
+ } catch (Exception e) {
+ lock.close();
+ throw(e);
+ }
+ }
+
+ public RemoteIterator<FileStatus> listStatusIterator(final Path p)
+ throws FileNotFoundException, IOException {
+ AutoLock lock = sync.lockListing(p);
+ try {
+ RemoteIterator<FileStatus> iterator = fs.listStatusIterator(p);
+ return new LockedRemoteIterator<FileStatus>(iterator, lock);
+ } catch (Exception e) {
+ lock.close();
+ throw(e);
+ }
+ }
+
+ public RemoteIterator<LocatedFileStatus> listFiles(
+ final Path f, final boolean recursive)
+ throws FileNotFoundException, IOException {
+ AutoLock lock = sync.lockListing(f);
+ try {
+ RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(f, recursive);
+ return new LockedRemoteIterator<LocatedFileStatus>(iterator, lock);
+ } catch (Exception e) {
+ lock.close();
+ throw(e);
+ }
+ }
+
+ public Path getHomeDirectory() {
+ return fs.getHomeDirectory();
+ }
+
+ public void setWorkingDirectory(Path newDir) {
+ fs.setWorkingDirectory(newDir);
+ }
+
+ public Path getWorkingDirectory() {
+ return fs.getWorkingDirectory();
+ }
+
+ public boolean mkdirs(Path f) throws IOException {
+ // TODO this has implications for the parent dirs too
+ try (AutoLock l = sync.lock(f)) {
+ return fs.mkdirs(f);
+ }
+ }
+
+ public boolean mkdirs(Path f, FsPermission permission) throws IOException {
+ try (AutoLock l = sync.lock(f)) {
+ return fs.mkdirs(f, permission);
+ }
+ }
+
+ public void copyFromLocalFile(Path src, Path dst)
+ throws IOException {
+ try (AutoLock l = sync.lock(dst)) {
+ fs.copyFromLocalFile(src, dst);
+ }
+ }
+
+ public void moveFromLocalFile(Path[] srcs, Path dst)
+ throws IOException {
+ try (AutoLock l = sync.lock(dst)) {
+ fs.moveFromLocalFile(srcs, dst);
+ }
+ }
+
+ public void moveFromLocalFile(Path src, Path dst)
+ throws IOException {
+ try (AutoLock l = sync.lock(dst)) {
+ fs.moveFromLocalFile(src, dst);
+ }
+ }
+
+ public void copyFromLocalFile(boolean delSrc, Path src, Path dst)
+ throws IOException {
+ try (AutoLock l = sync.lock(dst)) {
+ fs.copyFromLocalFile(delSrc, src, dst);
+ }
+ }
+
+ public void copyFromLocalFile(boolean delSrc, boolean overwrite,
+ Path[] srcs, Path dst)
+ throws IOException {
+ try (AutoLock l = sync.lock(dst)) {
+ fs.copyFromLocalFile(delSrc, overwrite, srcs, dst);
+ }
+ }
+
+ public void copyFromLocalFile(boolean delSrc, boolean overwrite,
+ Path src, Path dst)
+ throws IOException {
+ try (AutoLock l = sync.lock(dst)) {
+ fs.copyFromLocalFile(delSrc, overwrite, src, dst);
+ }
+ }
+
+ public void copyToLocalFile(Path src, Path dst) throws IOException {
+ try (AutoLock l = sync.lock(src)) {
+ fs.copyToLocalFile(src, dst);
+ }
+ }
+
+ public void moveToLocalFile(Path src, Path dst) throws IOException {
+ try (AutoLock l = sync.lockDelete(src)) {
+ fs.moveToLocalFile(src, dst);
+ }
+ }
+
+ public void copyToLocalFile(boolean delSrc, Path src, Path dst)
+ throws IOException {
+ try (AutoLock l = sync.lock(src)) {
+ fs.copyToLocalFile(delSrc, src, dst);
+ }
+ }
+
+ public void copyToLocalFile(boolean delSrc, Path src, Path dst,
+ boolean useRawLocalFileSystem) throws IOException {
+ try (AutoLock l = sync.lock(src)) {
+ fs.copyToLocalFile(delSrc, src, dst, useRawLocalFileSystem);
+ }
+ }
+
+ public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile)
+ throws IOException {
+ return fs.startLocalOutput(fsOutputFile, tmpLocalFile);
+ }
+
+ public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile)
+ throws IOException {
+ try (AutoLock l = sync.lockWrite(fsOutputFile)) {
+ fs.completeLocalOutput(fsOutputFile, tmpLocalFile);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ // FS must close first so that FileSystem.processDeleteOnExit() can run
+ // while locking is still available.
+ fs.close();
+ sync.close();
+ }
+
+ public long getUsed() throws IOException {
+ return fs.getUsed();
+ }
+
+ @Deprecated
+ public long getBlockSize(Path f) throws IOException {
+ try (AutoLock l = sync.lock(f)) {
+ return fs.getBlockSize(f);
+ }
+ }
+
+ @Deprecated
+ public long getDefaultBlockSize() {
+ return fs.getDefaultBlockSize();
+ }
+
+ public long getDefaultBlockSize(Path f) {
+ return fs.getDefaultBlockSize(f);
+ }
+
+ @Deprecated
+ public short getDefaultReplication() {
+ return fs.getDefaultReplication();
+ }
+
+ public short getDefaultReplication(Path path) {
+ return fs.getDefaultReplication(path);
+ }
+
+ public FileStatus getFileStatus(Path f) throws IOException {
+ try (AutoLock l = sync.lock(f)) {
+ return fs.getFileStatus(f);
+ }
+ }
+
+ public void access(Path path, FsAction mode) throws AccessControlException,
+ FileNotFoundException, IOException {
+ try (AutoLock l = sync.lock(path)) {
+ fs.access(path, mode);
+ }
+ }
+
+ public void createSymlink(final Path target, final Path link,
+ final boolean createParent) throws AccessControlException,
+ FileAlreadyExistsException, FileNotFoundException,
+ ParentNotDirectoryException, UnsupportedFileSystemException,
+ IOException {
+ throw new UnsupportedOperationException("HBOSS does not support symlinks");
+ }
+
+ public FileStatus getFileLinkStatus(final Path f)
+ throws AccessControlException, FileNotFoundException,
+ UnsupportedFileSystemException, IOException {
+ throw new UnsupportedOperationException("HBOSS does not support symlinks");
+ }
+
+ public boolean supportsSymlinks() {
+ return false;
+ }
+
+ public Path getLinkTarget(Path f) throws IOException {
+ throw new UnsupportedOperationException("HBOSS does not support symlinks");
+ }
+
+ public FileChecksum getFileChecksum(Path f) throws IOException {
+ try (AutoLock l = sync.lock(f)) {
+ return fs.getFileChecksum(f);
+ }
+ }
+
+ public FileChecksum getFileChecksum(Path f, final long length)
+ throws IOException {
+ try (AutoLock l = sync.lock(f)) {
+ return fs.getFileChecksum(f, length);
+ }
+ }
+
+ public void setVerifyChecksum(boolean verifyChecksum) {
+ fs.setVerifyChecksum(verifyChecksum);
+ }
+
+ public void setWriteChecksum(boolean writeChecksum) {
+ fs.setWriteChecksum(writeChecksum);
+ }
+
+ public FsStatus getStatus() throws IOException {
+ return fs.getStatus();
+ }
+
+ public FsStatus getStatus(Path p) throws IOException {
+ try (AutoLock l = sync.lock(p)) {
+ return fs.getStatus(p);
+ }
+ }
+
+ public void setPermission(Path p, FsPermission permission
+ ) throws IOException {
+ try (AutoLock l = sync.lock(p)) {
+ fs.setPermission(p, permission);
+ }
+ }
+
+ public void setOwner(Path p, String username, String groupname
+ ) throws IOException {
+ try (AutoLock l = sync.lock(p)) {
+ fs.setOwner(p, username, groupname);
+ }
+ }
+
+ public void setTimes(Path p, long mtime, long atime
+ ) throws IOException {
+ try (AutoLock l = sync.lock(p)) {
+ fs.setTimes(p, mtime, atime);
+ }
+ }
+
+ public Path createSnapshot(Path path, String snapshotName)
+ throws IOException {
+ try (AutoLock l = sync.lockListing(path)) {
+ return fs.createSnapshot(path, snapshotName);
+ }
+ }
+
+ public void renameSnapshot(Path path, String snapshotOldName,
+ String snapshotNewName) throws IOException {
+ fs.renameSnapshot(path, snapshotOldName, snapshotNewName);
+ }
+
+ public void deleteSnapshot(Path path, String snapshotName)
+ throws IOException {
+ fs.deleteSnapshot(path, snapshotName);
+ }
+
+ public void modifyAclEntries(Path path, List<AclEntry> aclSpec)
+ throws IOException {
+ try (AutoLock l = sync.lock(path)) {
+ fs.modifyAclEntries(path, aclSpec);
+ }
+ }
+
+ public void removeAclEntries(Path path, List<AclEntry> aclSpec)
+ throws IOException {
+ try (AutoLock l = sync.lock(path)) {
+ fs.removeAclEntries(path, aclSpec);
+ }
+ }
+
+ public void removeDefaultAcl(Path path)
+ throws IOException {
+ try (AutoLock l = sync.lock(path)) {
+ fs.removeDefaultAcl(path);
+ }
+ }
+
+ public void removeAcl(Path path)
+ throws IOException {
+ try (AutoLock l = sync.lock(path)) {
+ fs.removeAcl(path);
+ }
+ }
+
+ public void setAcl(Path path, List<AclEntry> aclSpec) throws IOException {
+ try (AutoLock l = sync.lock(path)) {
+ fs.setAcl(path, aclSpec);
+ }
+ }
+
+ public AclStatus getAclStatus(Path path) throws IOException {
+ try (AutoLock l = sync.lock(path)) {
+ return fs.getAclStatus(path);
+ }
+ }
+
+ public void setXAttr(Path path, String name, byte[] value)
+ throws IOException {
+ try (AutoLock l = sync.lock(path)) {
+ fs.setXAttr(path, name, value);
+ }
+ }
+
+ public void setXAttr(Path path, String name, byte[] value,
+ EnumSet<XAttrSetFlag> flag) throws IOException {
+ try (AutoLock l = sync.lock(path)) {
+ fs.setXAttr(path, name, value, flag);
+ }
+ }
+
+ public byte[] getXAttr(Path path, String name) throws IOException {
+ try (AutoLock l = sync.lock(path)) {
+ return fs.getXAttr(path, name);
+ }
+ }
+
+ public Map<String, byte[]> getXAttrs(Path path) throws IOException {
+ try (AutoLock l = sync.lock(path)) {
+ return fs.getXAttrs(path);
+ }
+ }
+
+ public Map<String, byte[]> getXAttrs(Path path, List<String> names)
+ throws IOException {
+ try (AutoLock l = sync.lock(path)) {
+ return fs.getXAttrs(path, names);
+ }
+ }
+
+ public List<String> listXAttrs(Path path) throws IOException {
+ try (AutoLock l = sync.lock(path)) {
+ return fs.listXAttrs(path);
+ }
+ }
+
+ public void removeXAttr(Path path, String name) throws IOException {
+ try (AutoLock l = sync.lock(path)) {
+ fs.removeXAttr(path, name);
+ }
+ }
+}
diff --git a/hbase-oss/src/main/java/org/apache/hadoop/hbase/oss/package-info.java b/hbase-oss/src/main/java/org/apache/hadoop/hbase/oss/package-info.java
new file mode 100644
index 0000000..bf6a61e
--- /dev/null
+++ b/hbase-oss/src/main/java/org/apache/hadoop/hbase/oss/package-info.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * An implementation of org.apache.hadoop.fs.FileSystem that wraps object store
+ * client implementations to provide additional semantics required by HBase.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.hbase.oss;
+
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
diff --git a/hbase-oss/src/main/java/org/apache/hadoop/hbase/oss/sync/AutoLock.java b/hbase-oss/src/main/java/org/apache/hadoop/hbase/oss/sync/AutoLock.java
new file mode 100644
index 0000000..3b57d20
--- /dev/null
+++ b/hbase-oss/src/main/java/org/apache/hadoop/hbase/oss/sync/AutoLock.java
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.oss.sync;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.NoSuchElementException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+
+/**
+ * Provides convenience data structures to help ensure that locks are closed
+ * when and only when the path is no longer in use. The basic AutoLock is simply
+ * an AutoCloseable that will release the lock after a try-with-resources block.
+ * LockedRemoteIterator will release the lock when the stream has been exhausted
+ * or in the event of any exception. LockedFSDataOutputStream will release the
+ * lock when the stream gets closed.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public interface AutoLock extends AutoCloseable {
+ public void close() throws IOException;
+
+ /**
+ * A wrapper for a RemoteIterator that releases a lock only when the
+ * underlying iterator has been exhausted.
+ */
+ public static class LockedRemoteIterator<E> implements RemoteIterator<E> {
+
+ public LockedRemoteIterator(RemoteIterator<E> iterator, AutoLock lock)
+ {
+ this.iterator = iterator;
+ this.lock = lock;
+ }
+
+ private RemoteIterator<E> iterator;
+ private AutoLock lock;
+ private AtomicBoolean closed = new AtomicBoolean(false);
+
+ public void close() throws IOException {
+ if (!closed.getAndSet(true)) {
+ lock.close();
+ }
+ }
+
+ private void checkClosed() throws IOException {
+ if (closed.get()) {
+ throw new IOException(
+ "LockedRemoteIterator was accessed after releasing lock");
+ }
+ }
+
+ @Override
+ public boolean hasNext() throws IOException {
+ checkClosed();
+ try {
+ if (iterator.hasNext()) {
+ return true;
+ }
+ close();
+ return false;
+ } catch (Throwable e) {
+ close();
+ throw e;
+ }
+ }
+
+ /**
+ * Delegates to the wrapped iterator, but will close the lock in the event
+ * of a NoSuchElementException. Some applications do not call hasNext() and
+ * simply depend on the NoSuchElementException.
+ */
+ @Override
+ public E next() throws IOException {
+ checkClosed();
+ try {
+ return iterator.next();
+ } catch (Throwable e) {
+ close();
+ throw e;
+ }
+ }
+ }
+
+ /**
+ * A wrapper for a FSDataOutputStream that releases a lock only when the
+ * underlying output stream is closed.
+ */
+ public class LockedFSDataOutputStream extends FSDataOutputStream {
+
+ public LockedFSDataOutputStream(FSDataOutputStream stream, AutoLock lock) throws IOException {
+ // super() throws IOException, but this constructor can't catch it.
+ // Instantiators must catch the exception and close the lock.
+ super(stream, null);
+ this.stream = stream;
+ this.lock = lock;
+ }
+
+ private final FSDataOutputStream stream;
+ private AutoLock lock;
+ private AtomicBoolean closed = new AtomicBoolean(false);
+
+ private void checkClosed() throws IOException {
+ if (closed.get()) {
+ throw new IOException(
+ "LockedFSDataOutputStream was accessed after releasing lock");
+ }
+ }
+
+ @Override
+ /**
+ * Returns the position in the wrapped stream. This should not be accessed
+ * after the stream has been closed. Unlike most other functions in this
+ * class, this is not enforced because this function shouldn't throw
+ * IOExceptions.
+ */
+ public long getPos() {
+ return stream.getPos();
+ }
+
+ @Override
+ public void close() throws IOException {
+ // Contract tests attempt to close the stream twice
+ if (!closed.getAndSet(true)) {
+ try {
+ stream.close();
+ } finally {
+ lock.close();
+ }
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "LockedFSDataOutputStream:" + stream.toString();
+ }
+
+ @Override
+ /**
+ * Returns the wrapped stream. This should not be accessed after the stream
+ * has been closed. Unlike most other functions in this class, this is not
+ * enforced because this function shouldn't throw IOExceptions.
+ */
+ public OutputStream getWrappedStream() {
+ return stream.getWrappedStream();
+ }
+
+ @Override
+ public void hflush() throws IOException {
+ checkClosed();
+ stream.hflush();
+ }
+
+ @Override
+ public void hsync() throws IOException {
+ checkClosed();
+ stream.hsync();
+ }
+
+ @Override
+ public void setDropBehind(Boolean dropBehind) throws IOException {
+ checkClosed();
+ stream.setDropBehind(dropBehind);
+ }
+ }
+}
diff --git a/hbase-oss/src/main/java/org/apache/hadoop/hbase/oss/sync/TreeLockManager.java b/hbase-oss/src/main/java/org/apache/hadoop/hbase/oss/sync/TreeLockManager.java
new file mode 100644
index 0000000..feb5f16
--- /dev/null
+++ b/hbase-oss/src/main/java/org/apache/hadoop/hbase/oss/sync/TreeLockManager.java
@@ -0,0 +1,500 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.oss.sync;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
+import org.apache.hadoop.io.retry.RetryPolicy.RetryAction.RetryDecision;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.oss.Constants;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Basic logic for synchronizing FileSystem operations. Needs to be extended
+ * with an implementation of read / write locks and the methods to check the
+ * status of locks above and below FileSystem Paths.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public abstract class TreeLockManager {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TreeLockManager.class);
+
+ public static synchronized TreeLockManager get(FileSystem fs)
+ throws IOException {
+ Configuration conf = fs.getConf();
+ Class<? extends TreeLockManager> impl = conf.getClass(
+ Constants.SYNC_IMPL, TreeLockManager.class, TreeLockManager.class);
+ TreeLockManager instance = null;
+ Exception cause = null;
+ try {
+ instance = impl.newInstance();
+ } catch (Exception e) {
+ cause = e;
+ }
+ if (instance == null) {
+ throw new IOException("Class referred to by "
+ + Constants.SYNC_IMPL + ", " + impl.getName()
+ + ", is not a valid implementation of "
+ + TreeLockManager.class.getName(), cause);
+ }
+ instance.initialize(fs);
+ return instance;
+ }
+
+ protected FileSystem fs;
+
+ private static final String SLASH = "/";
+
+ /**
+ * Returns a normalized logical path that uniquely identifies an object-store
+ * "filesystem" and a path inside it. Assumes a 1:1 mapping between hostnames
+ * and filesystems, and assumes the URI scheme mapping is consistent
+ * everywhere.
+ */
+ private Path norm(Path path) {
+ URI uri = fs.makeQualified(path).toUri();
+ String uriScheme = uri.getScheme();
+ String uriHost = uri.getHost();
+ String uriPath = uri.getPath().substring(1);
+ if (uriPath.length() == 0) {
+ uriPath = SLASH;
+ }
+ // To combine fsRoot and uriPath, fsRoot must start with /, and uriPath
+ // must not.
+ Path fsRoot = new Path(SLASH + uriScheme, uriHost);
+ return new Path(fsRoot, uriPath);
+ }
+
+ /**
+ * Convenience function for calling norm on an array. Returned copy of the
+ * array will also be sorted for deadlock avoidance.
+ */
+ private Path[] norm(Path[] paths) {
+ Path[] newPaths = new Path[paths.length];
+ for (int i = 0; i < paths.length; i++) {
+ newPaths[i] = norm(paths[i]);
+ }
+ Arrays.sort(newPaths);
+ return newPaths;
+ }
+
+ /**
+ * Convenience function for calling norm on an array with one extra arg.
+ * Returned copy of the combined array will also be sorted for deadlock
+ * avoidance.
+ */
+ private Path[] norm(Path[] paths, Path path) {
+ Path[] newPaths = new Path[paths.length + 1];
+ int i;
+ for (i = 0; i < paths.length; i++) {
+ newPaths[i] = norm(paths[i]);
+ }
+ newPaths[i] = path;
+ Arrays.sort(newPaths);
+ return newPaths;
+ }
+
+ /**
+ * In addition to any implementation-specific setup, implementations must set
+ * this.fs = fs in order for path normalization to work.
+ */
+ public abstract void initialize(FileSystem fs) throws IOException;
+
+ /**
+ * Performs any shutdown necessary when a client is exiting. Should be
+ * considered best-effort and for planned shut downs.
+ */
+ public void close() throws IOException {
+ }
+
+ /**
+ * Acquires a single exclusive (write) lock.
+ *
+ * @param p Path to lock
+ */
+ protected abstract void writeLock(Path p) throws IOException;
+
+ /**
+ * Releases a single exclusive (write) lock.
+ *
+ * @param p Path to unlock
+ */
+ protected abstract void writeUnlock(Path p) throws IOException;
+
+ /**
+ * Acquires a single non-exclusive (read) lock.
+ *
+ * @param p Path to lock
+ */
+ protected abstract void readLock(Path p) throws IOException;
+
+ /**
+ * Releases a single non-exclusive (read) lock.
+ *
+ * @param p Path to unlock
+ */
+ protected abstract void readUnlock(Path p) throws IOException;
+
+ /**
+ * Checks for the presence of a write lock on all parent directories of the
+ * path.
+ *
+ * @param p Path to check
+ * @return True if a lock is found, false otherwise
+ */
+ protected abstract boolean writeLockAbove(Path p) throws IOException;
+
+ /**
+ * Checks for the presence of a write lock on all children of the path.
+ *
+ * @param p Path to check
+ * @return True if a lock is found, false otherwise
+ */
+ protected abstract boolean writeLockBelow(Path p) throws IOException;
+
+ /**
+ * Checks for the presence of a write lock on all child directories of the
+ * path.
+ *
+ * @param p Path to check
+ * @return True if a lock is found, false otherwise
+ */
+ protected abstract boolean readLockBelow(Path p) throws IOException;
+
+ /**
+ * Recursively cleans up locks that won't be used again.
+ *
+ * @param p Parent path of all locks to delete
+ */
+ protected abstract void recursiveDelete(Path p) throws IOException;
+
+ private RetryPolicy retryPolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep(600000, 1, TimeUnit.MILLISECONDS);
+
+ private boolean retryBackoff(int retries) throws IOException {
+ RetryAction action;
+ try {
+ action = retryPolicy.shouldRetry(null, retries, 0, true);
+ } catch (Exception e) {
+ throw new IOException("Unexpected exception during locking", e);
+ }
+ if (action.action == RetryDecision.FAIL) {
+ throw new IOException("Exceeded " + retries + " retries for locking");
+ }
+ LOG.trace("Sleeping {}ms before next retry", action.delayMillis);
+ try {
+ Thread.sleep(action.delayMillis);
+ } catch (InterruptedException e) {
+ throw new IOException("Interrupted during locking", e);
+ }
+ return true;
+ }
+
+ /**
+ * Acquires a write (exclusive) lock on a path. Between this lock being
+ * acquired and being released, we should hold a write lock on this path, no
+ * write locks should be held by anyone on any parent directory, and no read
+ * or write locks should be held by anyone on any child directory.
+ *
+ * @param path Path to lock
+ */
+ protected void treeWriteLock(Path p) throws IOException {
+ int outerRetries = 0;
+ do {
+ int innerRetries = 0;
+ do {
+ // If there's already a write-lock above or below us in the tree, wait for it to leave
+ if (writeLockAbove(p) || writeLockBelow(p)) {
+ LOG.warn("Blocked on some parent write lock, waiting: {}", p);
+ continue;
+ }
+ break;
+ } while (retryBackoff(innerRetries++));
+ // Try obtain the write lock just for our node
+ writeLock(p);
+ // If there's now a write-lock above or below us in the tree, release and retry
+ if (writeLockAbove(p) || writeLockBelow(p)) {
+ LOG.warn("Blocked on some other write lock, retrying: {}", p);
+ writeUnlock(p);
+ continue;
+ }
+ break;
+ } while (retryBackoff(outerRetries++));
+
+ // Once we know we're the only write-lock in our path, drain all read-locks below
+ int drainReadLocksRetries = 0;
+ do {
+ if (readLockBelow(p)) {
+ LOG.warn("Blocked on some child read lock, writing: {}", p);
+ continue;
+ }
+ break;
+ } while (retryBackoff(drainReadLocksRetries++));
+ }
+
+ /**
+ * Acquires a read (non-exclusive) lock on a path. Between this lock being
+ * acquired and being released, we should hold a read lock on this path, and
+ * no write locks should be held by anyone on any parent directory.
+ *
+ * @param path Path to lock
+ */
+ protected void treeReadLock(Path p) throws IOException {
+ int outerRetries = 0;
+ do {
+ int innerRetries = 0;
+ do {
+ // If there's a write lock above us, wait
+ if (writeLockAbove(p)) {
+ LOG.warn("Blocked waiting for some parent write lock, waiting: {}",
+ p);
+ continue;
+ }
+ break;
+ } while (retryBackoff(innerRetries++));
+ // Try obtain the read-lock just for our node
+ readLock(p);
+ // If there's a write lock above us, release the lock and try again
+ if (writeLockAbove(p)) {
+ LOG.warn("Blocked waiting for some parent write lock, retrying: {}",
+ p);
+ readUnlock(p);
+ continue;
+ }
+ break;
+ } while (retryBackoff(outerRetries++));
+ }
+
+ /**
+ * Acquires an exclusive lock on a single path to create or append to a file.
+ * This is required for createNonRecursive() as well as other operations to be
+ * atomic because the underlying file may not be created until all data has
+ * been written.
+ *
+ * @param path Path of the create operation
+ * @return AutoLock to release this path
+ */
+ public AutoLock lockWrite(Path rawPath) throws IOException {
+ Path path = norm(rawPath);
+ LOG.debug("About to lock for create / write: {}", rawPath);
+ treeWriteLock(path);
+ return new AutoLock() {
+ public void close() throws IOException {
+ LOG.debug("About to unlock after create / write: {}", path);
+ writeUnlock(path);
+ }
+ };
+ }
+
+ /**
+ * Acquires an exclusive lock on a single path and then cleans up the lock
+ * and those of all children. The lock ensures this doesn't interfere with any
+ * renames or other listing operations above this path.
+ *
+ * @param path Path of the create operation
+ * @return AutoLock to release this path
+ */
+ public AutoLock lockDelete(Path rawPath) throws IOException {
+ Path path = norm(rawPath);
+ LOG.debug("About to lock for delete: {}", path);
+ treeWriteLock(path);
+ return new AutoLock() {
+ public void close() throws IOException {
+ LOG.debug("About to recursively delete locks: {}", path);
+ recursiveDelete(path);
+ writeUnlock(path);
+ }
+ };
+ }
+
+ /**
+ * Acquires a lock on a single path to run a listing operation. We need to
+ * ensure that the listing is not generated mid-rename. This will lock all
+ * children of the root path as well, which is only necessary for recursive
+ * listings. Other listings should only need a read lock on the root and all
+ * children, but that is not implemented.
+ *
+ * @param path Root of the listing operation
+ * @return AutoCloseable to release this path
+ */
+ public AutoLock lockListing(Path rawPath) throws IOException {
+ Path path = norm(rawPath);
+ LOG.debug("About to lock for listing: {}", path);
+ treeWriteLock(path);
+ return new AutoLock() {
+ public void close() throws IOException {
+ LOG.debug("About to unlock after listing: {}", path);
+ writeUnlock(path);
+ }
+ };
+ }
+
+ /**
+ * Same considerations of lockListing, but locks an array of paths in order
+ * and returns an AutoLock that encapsulates all of them.
+ *
+ * @param paths
+ * @return AutoCloseable that encapsulate all paths
+ */
+ public AutoLock lockListings(Path[] rawPaths) throws IOException {
+ Path[] paths = norm(rawPaths);
+ for (int i = 0; i < paths.length; i++) {
+ LOG.debug("About to lock for listings: {}", paths[i]);
+ treeWriteLock(paths[i]);
+ }
+ return new AutoLock() {
+ public void close() throws IOException {
+ Throwable lastThrown = null;
+ for (int i = 0; i < paths.length; i++) {
+ LOG.debug("About to unlock after listings: {}", paths[i]);
+ try {
+ writeUnlock(paths[i]);
+ } catch (Throwable e) {
+ lastThrown = e;
+ LOG.warn("Caught throwable while unlocking: {}", e.getMessage());
+ e.printStackTrace();
+ }
+ }
+ if (lastThrown != null) {
+ throw new IOException("At least one throwable caught while unlocking",
+ lastThrown);
+ }
+ }
+ };
+ }
+
+ /**
+ * Acquires an exclusive (write) lock on 2 paths, for a rename. Any given pair
+ * of paths will always be locked in the same order, regardless of their order
+ * in the method call. This is to avoid deadlocks. In the future this method
+ * may also record the start of the rename in something like a write-ahead log
+ * to recover in-progress renames in the event of a failure.
+ *
+ * @param src Source of the rename
+ * @param dst Destination of the rename
+ * @return AutoCloseable to release both paths
+ * @throws IOException
+ */
+ public AutoLock lockRename(Path rawSrc, Path rawDst) throws IOException {
+ Path src = norm(rawSrc);
+ Path dst = norm(rawDst);
+ LOG.debug("About to lock for rename: from {} to {}", src, dst);
+ if (src.compareTo(dst) < 0) {
+ treeWriteLock(src);
+ treeWriteLock(dst);
+ } else {
+ treeWriteLock(dst);
+ treeWriteLock(src);
+ }
+ return new AutoLock() {
+ public void close() throws IOException {
+ LOG.debug("About to unlock after rename: from {} to {}", src, dst);
+ try {
+ writeUnlock(src);
+ } finally {
+ writeUnlock(dst);
+ }
+ }
+ };
+ }
+
+ /**
+ * Returns a non-exclusive lock on a single path. This is for generic cases
+ * that read or modify the file-system but that don't necessarily need
+ * exclusive access if no other concurrent operations do.
+ *
+ * @param path Path to lock
+ * @return AutoCloseable that will release the path
+ * @throws IOException
+ */
+ public AutoLock lock(Path rawPath) throws IOException {
+ Path path = norm(rawPath);
+ LOG.debug("About to lock: {}", path);
+ treeReadLock(path);
+ return new AutoLock() {
+ public void close() throws IOException {
+ LOG.debug("About to unlock: {}", path);
+ readUnlock(path);
+ }
+ };
+ }
+
+ /**
+ * Returns a non-exclusive lock on an array of paths. This is for generic
+ * cases that read or modify the file-system but that don't necessarily need
+ * exclusive access if no other concurrent operations do.
+ *
+ * @param paths Path to lock
+ * @return AutoCloseable that will release all the paths
+ * @throws IOException
+ */
+ public AutoLock lock(Path[] rawPaths) throws IOException {
+ return innerLock(norm(rawPaths));
+ }
+
+ /**
+ * Returns a non-exclusive lock on an array of paths and a separate path. No
+ * distinction is made between them in locking: this method is only for
+ * convenience in the FileSystem implementation where there is a distinction.
+ *
+ * @param extraPath Extra path to lock
+ * @param paths Paths to lock
+ * @return AutoCloseable that will release all the paths
+ * @throws IOException
+ */
+ public AutoLock lock(Path extraPath, Path[] rawPaths) throws IOException {
+ return innerLock(norm(rawPaths, extraPath));
+ }
+
+ private AutoLock innerLock(Path[] paths) throws IOException {
+ for (int i = 0; i < paths.length; i++) {
+ LOG.debug("About to lock: {}", paths[i]);
+ treeReadLock(paths[i]);
+ }
+ return new AutoLock() {
+ public void close() throws IOException {
+ Throwable lastThrown = null;
+ for (int i = 0; i < paths.length; i++) {
+ LOG.debug("About to unlock: {}", paths[i]);
+ try {
+ readUnlock(paths[i]);
+ } catch (Throwable e) {
+ lastThrown = e;
+ LOG.warn("Caught throwable while unlocking: {}", e.getMessage());
+ e.printStackTrace();
+ }
+ }
+ if (lastThrown != null) {
+ throw new IOException("At least one throwable caught while unlocking",
+ lastThrown);
+ }
+ }
+ };
+ }
+}
diff --git a/hbase-oss/src/main/java/org/apache/hadoop/hbase/oss/sync/ZKTreeLockManager.java b/hbase-oss/src/main/java/org/apache/hadoop/hbase/oss/sync/ZKTreeLockManager.java
new file mode 100644
index 0000000..b1dbcb5
--- /dev/null
+++ b/hbase-oss/src/main/java/org/apache/hadoop/hbase/oss/sync/ZKTreeLockManager.java
@@ -0,0 +1,333 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.oss.sync;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.recipes.locks.InterProcessMutex;
+import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.curator.utils.ZKPaths;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.oss.Constants;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation based on Apache Curator and Apache ZooKeeper. This allows
+ * HBOSS to re-use an Apache HBase cluster's ZooKeeper ensemble for file
+ * system locking.
+ *
+ * Can be enabled in JUnit tests with -Pzk. If {@link Constants.ZK_CONN_STRING}
+ * isn't specified, an embedded ZooKeeper process will be spun up for tests.
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
+@InterfaceStability.Unstable
+public class ZKTreeLockManager extends TreeLockManager {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ZKTreeLockManager.class);
+
+ private CuratorFramework curator;
+
+ private String root;
+
+ private void setRoot() {
+ root = "/hboss";
+ }
+
+ private static final String lockSubZnode = ".hboss-lock-znode";
+
+ private Map<Path,InterProcessReadWriteLock> lockCache = new HashMap<>();
+
+ public void initialize(FileSystem fs) throws IOException {
+ this.fs = fs;
+ Configuration conf = fs.getConf();
+ int baseSleepTimeMs = conf.getInt(Constants.ZK_BASE_SLEEP_MS, 1000);
+ int maxRetries = conf.getInt(Constants.ZK_MAX_RETRIES, 3);
+ RetryPolicy retryPolicy = new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries);
+
+ // Create a temporary connection to ensure the root is created, then create
+ // a new connection 'jailed' inside that root to eliminate the need for
+ // paths to constantly be resolved inside the root.
+
+ String zookeeperConnectionString = conf.get(Constants.ZK_CONN_STRING);
+ curator = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy);
+ curator.start();
+
+ setRoot();
+ try {
+ ZKPaths.mkdirs(curator.getZookeeperClient().getZooKeeper(), root, true);
+ } catch (Exception e) {
+ throw new IOException("Unable to initialize root znodes", e);
+ }
+ curator.close();
+
+ zookeeperConnectionString += root;
+ curator = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy);
+ curator.start();
+ }
+
+ @Override
+ public void close() throws IOException {
+ curator.close();
+ }
+
+ @Override
+ protected void writeLock(Path p) throws IOException {
+ try {
+ LOG.debug("writeLock {} acquire", p);
+ get(p).writeLock().acquire();
+ } catch (Exception e) {
+ throw new IOException("Exception during write locking of path " + p, e);
+ }
+ }
+
+ @Override
+ protected void writeUnlock(Path p) throws IOException {
+ try {
+ LOG.debug("writeLock {} release", p);
+ get(p).writeLock().release();
+ } catch(IllegalMonitorStateException e) {
+ // Reentrant locks might be acquired multiple times
+ LOG.error("Tried to release unacquired write lock: {}", p);
+ throw e;
+ } catch (Exception e) {
+ throw new IOException("Exception during write unlocking of path " + p, e);
+ }
+ }
+
+ @Override
+ protected void readLock(Path p) throws IOException {
+ LOG.debug("readLock {} acquire", p);
+ try {
+ get(p).readLock().acquire();
+ } catch (Exception e) {
+ throw new IOException("Exception during read locking of path " + p, e);
+ }
+ }
+
+ @Override
+ protected void readUnlock(Path p) throws IOException {
+ LOG.debug("readLock {} release", p);
+ try {
+ get(p).readLock().release();
+ } catch(IllegalMonitorStateException e) {
+ // Reentrant locks might be acquired multiple times
+ LOG.error("Tried to release unacquired write lock: {}", p);
+ throw e;
+ } catch (Exception e) {
+ throw new IOException("Exception during read unlocking of path " + p, e);
+ }
+ }
+
+ @Override
+ protected boolean writeLockAbove(Path p) throws IOException {
+ LOG.debug("Checking for write lock above {}", p);
+ while (!p.isRoot()) {
+ p = p.getParent();
+ if (isLocked(get(p).writeLock())) {
+ LOG.warn("Parent write lock currently held: {}", p);
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ protected boolean writeLockBelow(Path p) throws IOException {
+ boolean b = writeLockBelow(p, true);
+ return b;
+ }
+
+ @Override
+ protected boolean readLockBelow(Path p) throws IOException {
+ boolean b = readLockBelow(p, true);
+ return b;
+ }
+
+ @Override
+ protected void recursiveDelete(Path p) throws IOException {
+ try {
+ ZKPaths.deleteChildren(curator.getZookeeperClient().getZooKeeper(),
+ p.toString(), !p.isRoot());
+ // Before this method is called, we have a guarantee that
+ // 1. There are no write locks above or below us
+ // 2. There are no read locks below us
+ // As such, we can just remove locks beneath us as we find them.
+ removeInMemoryLocks(p);
+ } catch (KeeperException.NoNodeException e) {
+ LOG.warn("Lock not found during recursive delete: {}", p);
+ } catch (Exception e) {
+ throw new IOException("Exception while deleting lock " + p, e);
+ }
+ }
+
+ private synchronized void removeInMemoryLocks(Path p) {
+ Iterator<Entry<Path,InterProcessReadWriteLock>> iter = lockCache.entrySet().iterator();
+ while (iter.hasNext()) {
+ Entry<Path,InterProcessReadWriteLock> entry = iter.next();
+ if (isBeneath(p, entry.getKey())) {
+ LOG.trace("Removing lock for {}", entry.getKey());
+ iter.remove();
+ }
+ }
+ }
+
+ private boolean isBeneath(Path parent, Path other) {
+ if (parent.equals(other)) {
+ return false;
+ }
+ // Is `other` fully contained in some path beneath the parent.
+ return 0 == other.toString().indexOf(parent.toString());
+ }
+
+ private boolean writeLockBelow(Path p, boolean firstLevel) throws IOException {
+ try {
+ if (!firstLevel && isLocked(get(p).writeLock())) {
+ return true;
+ }
+ List<String> children = curator.getChildren().forPath(p.toString());
+ for (String child : children) {
+ if (child.equals(lockSubZnode)) {
+ continue;
+ }
+ if (writeLockBelow(new Path(p, child), false)) {
+ LOG.warn("Parent write lock currently held: {}", p);
+ return true;
+ }
+ }
+ } catch (KeeperException.NoNodeException e) {
+ // Ignore, means we hit the bottom of the tree
+ } catch (Exception e) {
+ throw new IOException("Error checking parents for write lock: " + p, e);
+ }
+ return false;
+ }
+
+ private boolean readLockBelow(Path p, boolean firstLevel) throws IOException {
+ try {
+ if (!firstLevel && isLocked(get(p).readLock())) {
+ return true;
+ }
+ List<String> children = curator.getChildren().forPath(p.toString());
+ for (String child : children) {
+ if (child.equals(lockSubZnode)) {
+ continue;
+ }
+ if (readLockBelow(new Path(p, child), false)) {
+ LOG.warn("Child read lock currently held: {}", p);
+ return true;
+ }
+ }
+ } catch (KeeperException.NoNodeException e) {
+ // Ignore, means we hit the bottom of the tree
+ } catch (Exception e) {
+ throw new IOException("Error checking children for read lock: " + p, e);
+ }
+ return false;
+ }
+
+ /**
+ * Specifically, if this is lock by another thread.
+ */
+ private boolean isLocked(InterProcessMutex lock) throws IOException {
+ try {
+ if (lock.isOwnedByCurrentThread()) {
+ // First check the current thread, because we allow you to get locks
+ // when parent or child paths are only locked by you.
+ return false;
+ }
+ if (lock.isAcquiredInThisProcess()) {
+ // We know it's not this thread, but this is less expensive
+ // than checking other processes.
+ return true;
+ }
+ // Finally, see if another process holds the lock. This is a terrible way
+ // to check but Curator doesn't expose another way.
+ if (lock.acquire(0, TimeUnit.NANOSECONDS)) {
+ lock.release();
+ return false;
+ }
+ } catch (Exception e) {
+ throw new IOException("Exception while testing a lock", e);
+ }
+ return true;
+ }
+
+ public String summarizeLocks() {
+ StringBuilder sb = new StringBuilder();
+ Map<Path,InterProcessReadWriteLock> cache = getUnmodifiableCache();
+ for (Entry<Path,InterProcessReadWriteLock> entry : cache.entrySet()) {
+ sb.append(entry.getKey()).append("=").append(describeLock(entry.getValue()));
+ }
+ return sb.toString();
+ }
+
+ String describeLock(InterProcessReadWriteLock lock) {
+ if (lock == null) {
+ return "null";
+ }
+ InterProcessMutex rlock = lock.readLock();
+ InterProcessMutex wlock = lock.writeLock();
+ StringBuilder sb = new StringBuilder();
+ sb.append("ReadLock[heldByThisThread=").append(rlock.isOwnedByCurrentThread());
+ sb.append(", heldInThisProcess=").append(rlock.isAcquiredInThisProcess()).append("]");
+ sb.append(" WriteLock[heldByThisThread=").append(wlock.isOwnedByCurrentThread());
+ sb.append(", heldInThisProcess=").append(wlock.isAcquiredInThisProcess()).append("]");
+ return sb.toString();
+ }
+
+ public synchronized Map<Path,InterProcessReadWriteLock> getUnmodifiableCache() {
+ return Collections.unmodifiableMap(lockCache);
+ }
+
+ private synchronized InterProcessReadWriteLock get(Path path) throws IOException {
+ if (!lockCache.containsKey(path)) {
+ String zkPath = new Path(path, lockSubZnode).toString();
+ try {
+ ZKPaths.mkdirs(curator.getZookeeperClient().getZooKeeper(), zkPath, true);
+ } catch (KeeperException.NodeExistsException e) {
+ // Ignore
+ } catch (Exception e) {
+ throw new IOException("Exception while ensuring lock parents exist: " +
+ path, e);
+ }
+ lockCache.put(path, new InterProcessReadWriteLock(curator, zkPath));
+ }
+ return lockCache.get(path);
+ }
+}
+
diff --git a/hbase-oss/src/main/java/org/apache/hadoop/hbase/oss/sync/package-info.java b/hbase-oss/src/main/java/org/apache/hadoop/hbase/oss/sync/package-info.java
new file mode 100644
index 0000000..de1a27e
--- /dev/null
+++ b/hbase-oss/src/main/java/org/apache/hadoop/hbase/oss/sync/package-info.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Synchronization utilities for use in HBaseObjectStoreSemantics.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.hbase.oss.sync;
+
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
diff --git a/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/EmbeddedS3.java b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/EmbeddedS3.java
new file mode 100644
index 0000000..03b8470
--- /dev/null
+++ b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/EmbeddedS3.java
@@ -0,0 +1,339 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.oss;
+
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.SdkClientException;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.services.s3.AbstractAmazonS3;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.model.Bucket;
+import com.amazonaws.services.s3.model.CopyObjectRequest;
+import com.amazonaws.services.s3.model.CopyObjectResult;
+import com.amazonaws.services.s3.model.DeleteObjectsRequest;
+import com.amazonaws.services.s3.model.DeleteObjectsResult;
+import com.amazonaws.services.s3.model.GetObjectMetadataRequest;
+import com.amazonaws.services.s3.model.GetObjectRequest;
+import com.amazonaws.services.s3.model.ListObjectsRequest;
+import com.amazonaws.services.s3.model.ListObjectsV2Request;
+import com.amazonaws.services.s3.model.ListObjectsV2Result;
+import com.amazonaws.services.s3.model.ObjectListing;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.PutObjectRequest;
+import com.amazonaws.services.s3.model.PutObjectResult;
+import com.amazonaws.services.s3.model.S3Object;
+import com.amazonaws.services.s3.model.S3ObjectInputStream;
+import com.amazonaws.services.s3.model.S3ObjectSummary;
+import java.io.File;
+import java.io.InputStream;
+import java.io.IOException;
+import java.net.URI;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.s3a.S3ClientFactory;
+import org.apache.hadoop.fs.s3a.s3guard.LocalMetadataStore;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.hadoop.hbase.oss.Constants.*;
+import static org.apache.hadoop.fs.s3a.Constants.*;
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class EmbeddedS3 {
+
+ public static boolean usingEmbeddedS3 = false;
+
+ private static final String BUCKET = "embedded";
+
+ public static void conditionalStart(Configuration conf) throws Exception {
+ if (StringUtils.isEmpty(conf.get(S3_METADATA_STORE_IMPL))) {
+ conf.set(S3_METADATA_STORE_IMPL, LocalMetadataStore.class.getName());
+ }
+
+ boolean notConfigured = StringUtils.isEmpty(conf.get(DATA_URI));
+ if (notConfigured) {
+ usingEmbeddedS3 = true;
+ conf.set(S3_CLIENT_FACTORY_IMPL,
+ EmbeddedS3ClientFactory.class.getName());
+ conf.set(DATA_URI, "s3a://" + BUCKET);
+ } else {
+ usingEmbeddedS3 = false;
+ }
+ }
+
+ /**
+ * Replaces the default S3ClientFactory to inject an EmbeddedAmazonS3
+ * instance. This is currently a private API in Hadoop, but is the same method
+ * used by S3Guard's inconsistency-injection tests. The method signature
+ * defined in the interface varies depending on the Hadoop version.
+ */
+ public static class EmbeddedS3ClientFactory implements S3ClientFactory {
+ public AmazonS3 createS3Client(URI name) {
+ AmazonS3 s3 = new EmbeddedAmazonS3();
+ s3.createBucket(BUCKET);
+ return s3;
+ }
+
+ public AmazonS3 createS3Client(URI name,
+ String bucket,
+ AWSCredentialsProvider credentialSet,
+ String userAgentSuffix) {
+ AmazonS3 s3 = new EmbeddedAmazonS3();
+ s3.createBucket(bucket);
+ return s3;
+ }
+
+ public AmazonS3 createS3Client(URI name,
+ String bucket,
+ AWSCredentialsProvider credentialSet) {
+ return createS3Client(name);
+ }
+ }
+
+ /**
+ * Emulates an S3-connected client. This is the bare minimum implementation
+ * required for s3a to pass the contract tests while continuing to reproduce
+ * such quirks as delayed file creation and non-atomic / slow renames.
+ * Specifically, the following features are not supported:
+ * <ul>
+ * <li>Multiple buckets</li>
+ * <li>Requester-pays</li>
+ * <li>Encryption</li>
+ * <li>Object versioning</li>
+ * <li>Multi-part</li>
+ * <li>ACLs</li>
+ * <li>Objects larger than Integer.MAX_VALUE</li>
+ * </ul>
+ */
+ public static class EmbeddedAmazonS3 extends AbstractAmazonS3 {
+
+ public static final Logger LOG =
+ LoggerFactory.getLogger(EmbeddedAmazonS3.class);
+
+ private String bucketName = null;
+
+ // Randomized contract test datasets are generated byte-by-byte, so we must
+ // ensure it's encoded as 8-bit characters
+ private static final String ISO_8859_1 = "ISO-8859-1";
+ private Charset encoding = Charset.forName(ISO_8859_1);
+
+ private class EmbeddedS3Object extends S3Object {
+ private ObjectMetadata meta;
+ private String data;
+ private long[] range;
+
+ public EmbeddedS3Object() {
+ super();
+ }
+
+ public EmbeddedS3Object(EmbeddedS3Object that, long[] range) {
+ super();
+ this.meta = that.meta;
+ this.data = that.data;
+ this.range = range;
+ }
+
+ public S3ObjectInputStream getObjectContent() {
+ String substring = data.substring((int)range[0], (int)range[1]+1);
+ InputStream in = IOUtils.toInputStream(substring, encoding);
+ return new S3ObjectInputStream(in, null);
+ }
+ }
+
+ private Map<String, EmbeddedS3Object> bucket = new HashMap<>();
+
+ private void simulateServerSideCopy() {
+ try {
+ // For realism, this could be a function of data size, but 1/100s is
+ // more than enough to reliably observe non-atomic renames.
+ Thread.sleep(10);
+ } catch (InterruptedException e) {}
+ }
+
+ // AmazonS3 interface below
+
+ public CopyObjectResult copyObject(CopyObjectRequest request) {
+ String sourceKey = request.getSourceKey();
+ String destinationKey = request.getDestinationKey();
+ LOG.debug("copyObject: {} -> {}", sourceKey, destinationKey);
+ EmbeddedS3Object object = bucket.get(sourceKey);
+ simulateServerSideCopy();
+ bucket.put(destinationKey, object);
+ return new CopyObjectResult();
+ }
+
+ public Bucket createBucket(String bucketName) {
+ LOG.debug("createBucket: {}", bucketName);
+ this.bucketName = bucketName;
+ Bucket bucket = new Bucket(bucketName);
+ return bucket;
+ }
+
+ public void deleteObject(String bucketName, String key) {
+ LOG.debug("deleteObject: {}", key);
+ bucket.remove(key);
+ }
+
+ public DeleteObjectsResult deleteObjects(DeleteObjectsRequest request) {
+ for (DeleteObjectsRequest.KeyVersion keyVersion : request.getKeys()) {
+ String key = keyVersion.getKey();
+ LOG.debug("deleteObjects: {}", key);
+ bucket.remove(key);
+ }
+ return new DeleteObjectsResult(
+ new ArrayList<DeleteObjectsResult.DeletedObject>());
+ }
+
+ public boolean doesBucketExist(String bucketName) {
+ LOG.debug("doesBucketExist: {}", bucketName);
+ if (this.bucketName == null) {
+ this.bucketName = bucketName;
+ }
+ return this.bucketName.equals(bucketName);
+ }
+
+ public boolean doesObjectExist(String bucketName, String objectName) {
+ LOG.debug("doesObjectExist: {}", objectName);
+ return bucket.containsKey(objectName);
+ }
+
+ public String getBucketLocation(String bucketName) {
+ LOG.debug("getBucketLocation: {}", bucketName);
+ // This is Region.US_Standard, but it's .toString() returns null
+ return "us-east-1";
+ }
+
+ public S3Object getObject(GetObjectRequest request) {
+ String key = request.getKey();
+ long[] range = request.getRange();
+ if (range.length != 2) {
+ throw new IllegalArgumentException("Range must have 2 elements!");
+ }
+ LOG.debug("getObject: {} [{} - {}]", key, range[0], range[1]);
+ EmbeddedS3Object object = bucket.get(key);
+ return new EmbeddedS3Object(object, range);
+ }
+
+ public ObjectMetadata getObjectMetadata(GetObjectMetadataRequest request) {
+ String key = request.getKey();
+ LOG.debug("getObjectMetadata: {}", key);
+ if (!bucket.containsKey(key)) {
+ AmazonServiceException e = new AmazonServiceException("404");
+ e.setStatusCode(404);
+ throw e;
+ }
+ EmbeddedS3Object object = bucket.get(key);
+ return object.getObjectMetadata();
+ }
+
+ public ObjectListing listObjects(ListObjectsRequest request) {
+ String prefix = request.getPrefix();
+ String delimiter = request.getDelimiter();
+ LOG.debug("listObjects: {} (delimiter = {})", prefix, delimiter);
+
+ ObjectListing result = new ObjectListing();
+ innerListing(prefix, delimiter, result.getObjectSummaries(),
+ result.getCommonPrefixes());
+ return result;
+ }
+
+ public ListObjectsV2Result listObjectsV2(ListObjectsV2Request request) {
+ String prefix = request.getPrefix();
+ String delimiter = request.getDelimiter();
+ LOG.debug("listObjectsV2: {} (delimiter = {})", prefix, delimiter);
+
+ ListObjectsV2Result result = new ListObjectsV2Result();
+ innerListing(prefix, delimiter, result.getObjectSummaries(),
+ result.getCommonPrefixes());
+ return result;
+ }
+
+ private void innerListing(String prefix, String delimiter,
+ List<S3ObjectSummary> summaries, List<String> prefixes) {
+ Set<String> commonPrefixes = new HashSet<>();
+ for (String key : bucket.keySet()) {
+ if (!key.startsWith(prefix)) {
+ continue;
+ }
+ if (delimiter != null) {
+ int index = key.indexOf(delimiter, prefix.length());
+ if (index > 0) {
+ // Finding the delimiter means non-recursive listing
+ // Add the first-level child to common prefixes and continue
+ commonPrefixes.add(key.substring(0, index));
+ continue;
+ }
+ }
+ S3ObjectSummary summary = new S3ObjectSummary();
+ summary.setKey(key);
+ summary.setSize(bucket.get(key).data.length());
+ summaries.add(summary);
+ }
+ prefixes.addAll(commonPrefixes);
+ }
+
+ public PutObjectResult putObject(PutObjectRequest request) {
+ String key = request.getKey();
+ LOG.debug("putObject: {}", key);
+
+ EmbeddedS3Object object = bucket.get(key);
+ if (object == null) {
+ object = new EmbeddedS3Object();
+ }
+
+ try {
+ if (request.getInputStream() != null) {
+ InputStream in = request.getInputStream();
+ object.data = IOUtils.toString(in, encoding);
+ in.close();
+ } else if (request.getFile() != null) {
+ File file = request.getFile();
+ object.data = FileUtils.readFileToString(file, encoding);
+ } else {
+ throw new UnsupportedOperationException("Unknown putObject method");
+ }
+ } catch (IOException e) {
+ throw new SdkClientException("Error reading object to put", e);
+ }
+ // TODO later Hadoop versions will require setting ETags, etc. too
+ object.getObjectMetadata().setContentLength(object.data.length());
+
+ // Object isn't listed in the bucket until after it's written
+ bucket.put(key, object);
+ return new PutObjectResult();
+ }
+
+ public void shutdown() {
+ LOG.debug("shutdown");
+ }
+ }
+}
diff --git a/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/EmbeddedZK.java b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/EmbeddedZK.java
new file mode 100644
index 0000000..c50940c
--- /dev/null
+++ b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/EmbeddedZK.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.oss;
+
+import java.net.InetAddress;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseZKTestingUtility;
+import org.apache.hadoop.hbase.oss.Constants;
+import org.apache.hadoop.hbase.oss.sync.TreeLockManager;
+import org.apache.hadoop.hbase.oss.sync.ZKTreeLockManager;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class EmbeddedZK {
+
+ private static HBaseZKTestingUtility util = null;
+
+ public static synchronized void conditionalStart(Configuration conf) throws Exception {
+ Class implementation = conf.getClass(Constants.SYNC_IMPL, TreeLockManager.class);
+ boolean notConfigured = StringUtils.isEmpty(conf.get(Constants.ZK_CONN_STRING));
+ if (implementation == ZKTreeLockManager.class && notConfigured) {
+ if (util == null) {
+ util = new HBaseZKTestingUtility(conf);
+ util.startMiniZKCluster();
+ }
+ int port = util.getZkCluster().getClientPort();
+ String hostname = InetAddress.getLocalHost().getHostName();
+ String connectionString = hostname + ":" + port;
+ conf.set(Constants.ZK_CONN_STRING, connectionString);
+ }
+ }
+
+ public static synchronized void conditionalStop() throws Exception {
+ if (util != null) {
+ util.shutdownMiniZKCluster();
+ util = null;
+ }
+ }
+}
diff --git a/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/HBaseObjectStoreSemanticsTest.java b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/HBaseObjectStoreSemanticsTest.java
new file mode 100644
index 0000000..1bb3710
--- /dev/null
+++ b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/HBaseObjectStoreSemanticsTest.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.oss;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.oss.sync.TreeLockManager;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+import org.junit.After;
+import org.junit.Before;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class HBaseObjectStoreSemanticsTest {
+ public static final Logger LOG =
+ LoggerFactory.getLogger(HBaseObjectStoreSemanticsTest.class);
+
+ protected HBaseObjectStoreSemantics hboss = null;
+ protected TreeLockManager sync = null;
+
+ public Path testPathRoot() {
+ return TestUtils.testPathRoot(hboss);
+ }
+
+ public Path testPath(String path) {
+ return TestUtils.testPath(hboss, path);
+ }
+
+ public Path testPath(Path path) {
+ return TestUtils.testPath(hboss, path);
+ }
+
+ @Before
+ public void setup() throws Exception {
+ Configuration conf = new Configuration();
+ hboss = TestUtils.getFileSystem(conf);
+ sync = hboss.getLockManager();
+ hboss.mkdirs(testPathRoot());
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ TestUtils.cleanup(hboss);
+ }
+}
diff --git a/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/TestAtomicRename.java b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/TestAtomicRename.java
new file mode 100644
index 0000000..be95dbf
--- /dev/null
+++ b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/TestAtomicRename.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.oss;
+
+import java.io.IOException;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.oss.sync.TreeLockManager;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestAtomicRename extends HBaseObjectStoreSemanticsTest {
+ public static final Logger LOG =
+ LoggerFactory.getLogger(TestAtomicRename.class);
+
+ @Test
+ public void testAtomicRename() throws Exception {
+ Path renameSource = testPath("atomicRenameSource");
+ Path renameTarget = testPath("atomicRenameTarget");
+ try {
+ for (int i = 1; i <= 8; i++) {
+ Path dir = new Path(renameSource, "dir" + i);
+ hboss.mkdirs(dir);
+ for (int j = 1; j <= 8; j++) {
+ Path file = new Path(dir, "file" + j);
+ FSDataOutputStream out = hboss.create(file);
+ // Write 4kb
+ for (int k = 0; k < 256; k++) {
+ out.write("0123456789ABCDEF".getBytes());
+ }
+ out.close();
+ }
+ }
+
+ Thread renameThread = new Thread(
+ new Runnable() {
+ public void run() {
+ try {
+ boolean success = hboss.rename(renameSource, renameTarget);
+ Assert.assertTrue("Rename returned false, indicating some error.",
+ success);
+ } catch(IOException e) {
+ Assert.fail("Unexpected exception during rename: " + e);
+ }
+ }
+ }
+ );
+ renameThread.start();
+
+ // If the rename fails before ever creating the target, this will hang forever
+ while (!hboss.exists(renameTarget)) {
+ Thread.sleep(1);
+ }
+ Assert.assertFalse("Rename source is still visible after rename finished or target showed up.",
+ hboss.exists(renameSource));
+ renameThread.join();
+ } finally {
+ try {
+ hboss.delete(renameSource);
+ hboss.delete(renameTarget);
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw e;
+ }
+ }
+ }
+
+
+}
diff --git a/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/TestCreateNonRecursive.java b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/TestCreateNonRecursive.java
new file mode 100644
index 0000000..9208e9f
--- /dev/null
+++ b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/TestCreateNonRecursive.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.oss;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadPoolExecutor;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestCreateNonRecursive extends HBaseObjectStoreSemanticsTest {
+ public static final Logger LOG =
+ LoggerFactory.getLogger(TestCreateNonRecursive.class);
+
+ @Test
+ public void testCreateNonRecursiveSerial() throws Exception {
+ Path serialPath = testPath("testCreateNonRecursiveSerial");
+ try {
+ FSDataOutputStream out;
+
+ out = hboss.createNonRecursive(serialPath, false, 1024, (short)1, 1024, null);
+ out.close();
+
+ boolean exceptionThrown = false;
+ try {
+ out = hboss.createNonRecursive(serialPath, false, 1024, (short)1, 1024, null);
+ } catch(FileAlreadyExistsException e) {
+ exceptionThrown = true;
+ }
+ if (!exceptionThrown) {
+ Assert.fail("Second call to createNonRecursive should throw FileAlreadyExistsException, but didn't.");
+ }
+ } finally {
+ hboss.delete(serialPath);
+ }
+ }
+
+ @Test
+ public void testCreateNonRecursiveParallel() throws Exception {
+ int experiments = 10;
+ int experimentSize = 10;
+ for (int e = 0; e < experiments; e++) {
+ ArrayList<Callable<Boolean>> callables = new ArrayList<Callable<Boolean>>(experimentSize);
+ ArrayList<Future<Boolean>> futures = new ArrayList<Future<Boolean>>(experimentSize);
+
+ Path parallelPath = testPath("testCreateNonRecursiveParallel" + e);
+ ThreadPoolExecutor executor = (ThreadPoolExecutor)Executors.newFixedThreadPool(10);
+ executor.prestartAllCoreThreads();
+ for (int i = 0; i < experimentSize; i++) {
+ callables.add(new Callable<Boolean>() {
+ public Boolean call() throws IOException {
+ FSDataOutputStream out = null;
+ boolean exceptionThrown = false;
+ try {
+ out = hboss.createNonRecursive(parallelPath, false, 1024, (short)1, 1024, null);
+ } catch(FileAlreadyExistsException e) {
+ exceptionThrown = true;
+ } finally {
+ if (out != null) {
+ out.close();
+ }
+ }
+ return exceptionThrown;
+ }
+ });
+ }
+ try {
+ for (Callable callable : callables) {
+ // This is in a separate loop to try and get them all as overlapped as possible
+ futures.add(executor.submit(callable));
+ }
+ int exceptionsThrown = 0;
+ for (Future<Boolean> future : futures) {
+ // This is in a separate loop to try and get them all as overlapped as possible
+ if (future.get()) {
+ exceptionsThrown++;
+ }
+ }
+ Assert.assertEquals("All but exactly 1 call should have thrown exceptions. " +
+ "Experiment " + (e+1) + " of " + experiments + ".",
+ experimentSize - 1, exceptionsThrown);
+ } finally {
+ hboss.delete(parallelPath);
+ }
+ }
+ }
+}
diff --git a/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/TestRecursiveDelete.java b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/TestRecursiveDelete.java
new file mode 100644
index 0000000..0d5c6a8
--- /dev/null
+++ b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/TestRecursiveDelete.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.oss;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.oss.sync.ZKTreeLockManager;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestRecursiveDelete extends HBaseObjectStoreSemanticsTest {
+ public static final Logger LOG =
+ LoggerFactory.getLogger(TestRecursiveDelete.class);
+
+ @Test
+ public void testCreationAfterDeletion() throws Exception {
+ // This was an observed failure condition where creating a
+ // directory that we had just removed would be deadlocked
+ // because the thread that issued the original deletion still
+ // held the lock on the directory it deleted.
+ ExecutorService svc = Executors.newSingleThreadExecutor();
+ Path tmp = testPath(".tmp");
+ Path lock = testPath(".tmp/hbase-hbck.lock");
+
+ try {
+ hboss.mkdirs(tmp);
+ hboss.delete(tmp, true);
+ LOG.info("After mkdir and delete in test thread");
+ summarizeZKLocks();
+
+ Future<?> fut = svc.submit(new Runnable() {
+ public void run() {
+ try {
+ hboss.mkdirs(tmp);
+ LOG.info("After mkdir in separate thread");
+ summarizeZKLocks();
+ try (FSDataOutputStream out = hboss.create(lock)) {
+ out.write(Bytes.toBytes("localhost"));
+ out.flush();
+ }
+ } catch (Exception e) {
+ LOG.error("Caught exception", e);
+ Assert.fail("Failed to create file");
+ }
+ }
+ });
+ Assert.assertNull(fut.get(15, TimeUnit.SECONDS));
+ } finally {
+ hboss.delete(tmp, true);
+ }
+ }
+
+ void summarizeZKLocks() {
+ if (sync instanceof ZKTreeLockManager) {
+ LOG.info(((ZKTreeLockManager) sync).summarizeLocks());
+ }
+ }
+}
diff --git a/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/TestUtils.java b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/TestUtils.java
new file mode 100644
index 0000000..0cac933
--- /dev/null
+++ b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/TestUtils.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.oss;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+import org.junit.After;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Rule;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class TestUtils {
+ public static final Logger LOG =
+ LoggerFactory.getLogger(TestUtils.class);
+
+ // This is defined by the Maven Surefire plugin configuration
+ private static final String TEST_UNIQUE_FORK_ID = "test.unique.fork.id";
+
+ public static final String S3A = "s3a";
+
+ public static String getScheme(Configuration conf) {
+ String dataUri = conf.get(Constants.DATA_URI);
+ try {
+ return new URI(dataUri).getScheme();
+ } catch (URISyntaxException e) {
+ return null;
+ }
+ }
+
+ public static boolean fsIs(String scheme, Configuration conf) {
+ return getScheme(conf).equals(scheme);
+ }
+
+ public static void disableFilesystemCaching(Configuration conf) {
+ String property = "fs." + getScheme(conf) + ".impl.disable.cache";
+ conf.setBoolean(property, true);
+ }
+
+ public static Path testPathRoot(HBaseObjectStoreSemantics hboss) {
+ String testUniqueForkId = System.getProperty(TEST_UNIQUE_FORK_ID);
+ String root = "/hboss-junit";
+ if (testUniqueForkId != null) {
+ root += "-" + testUniqueForkId;
+ }
+ return new Path(hboss.getHomeDirectory(), root);
+ }
+
+ public static Path testPath(HBaseObjectStoreSemantics hboss, String path) {
+ return testPath(hboss, new Path(path));
+ }
+
+ public static Path testPath(HBaseObjectStoreSemantics hboss, Path path) {
+ return new Path(testPathRoot(hboss), path);
+ }
+
+ public static HBaseObjectStoreSemantics getFileSystem(Configuration conf) throws Exception {
+ // Newer versions of Hadoop will do this for us, but older ones won't
+ // This allows Maven properties, profiles, etc. to set the implementation
+ if (StringUtils.isEmpty(conf.get(Constants.SYNC_IMPL))) {
+ conf.set(Constants.SYNC_IMPL, System.getProperty(Constants.SYNC_IMPL));
+ }
+
+ EmbeddedS3.conditionalStart(conf);
+ EmbeddedZK.conditionalStart(conf);
+
+ try {
+ String dataURI = conf.get(Constants.DATA_URI);
+ Assume.assumeNotNull(dataURI);
+ URI name = new URI(dataURI);
+ HBaseObjectStoreSemantics hboss = new HBaseObjectStoreSemantics();
+ hboss.initialize(name, conf);
+ return hboss;
+ } catch (Exception e) {
+ LOG.error(e.getMessage());
+ e.printStackTrace();
+ throw e;
+ }
+ }
+
+ public static void cleanup(HBaseObjectStoreSemantics hboss) throws Exception {
+ if (hboss != null) {
+ hboss.close();
+ }
+ EmbeddedZK.conditionalStop();
+ }
+}
diff --git a/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/contract/HBOSSContract.java b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/contract/HBOSSContract.java
new file mode 100644
index 0000000..948b46d
--- /dev/null
+++ b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/contract/HBOSSContract.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.oss.contract;
+
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.oss.Constants;
+import org.apache.hadoop.hbase.oss.HBaseObjectStoreSemantics;
+import org.apache.hadoop.hbase.oss.TestUtils;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class HBOSSContract extends AbstractFSContract {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(HBOSSContract.class);
+
+ private Configuration conf = null;
+ private HBaseObjectStoreSemantics fs = null;
+
+ /**
+ * Constructor: loads the authentication keys if found
+ * @param conf configuration to work with
+ */
+ public HBOSSContract(Configuration conf) {
+ super(conf);
+ this.conf = conf;
+ addConfResource("contract/s3a.xml");
+ }
+
+ /**
+ * Any initialisation logic can go here
+ * @throws IOException IO problems
+ */
+ public void init() throws IOException {
+
+ }
+
+ /**
+ * Get the filesystem for these tests
+ * @return the test fs
+ * @throws IOException IO problems
+ */
+ public FileSystem getTestFileSystem() throws IOException {
+ if (fs == null) {
+ try {
+ fs = TestUtils.getFileSystem(conf);
+ } catch (Exception e) {
+ LOG.error(e.getMessage());
+ e.printStackTrace();
+ throw new IOException("Failed to get FS", e);
+ }
+ }
+ return fs;
+ }
+
+ /**
+ * Get the scheme of this FS
+ * @return the scheme this FS supports
+ */
+ public String getScheme() {
+ return conf.get(Constants.CONTRACT_TEST_SCHEME, "s3a");
+ }
+
+ /**
+ * Return the path string for tests, e.g. <code>file:///tmp</code>
+ * @return a path in the test FS
+ */
+ public Path getTestPath() {
+ return TestUtils.testPath(fs, "contract-tests");
+ }
+
+ @Override
+ public String toString() {
+ return "FSContract for HBOSS/" + getScheme();
+ }
+}
diff --git a/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/contract/TestHBOSSContract.java b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/contract/TestHBOSSContract.java
new file mode 100644
index 0000000..1ba31f9
--- /dev/null
+++ b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/contract/TestHBOSSContract.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.oss.contract;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystemContractBaseTest;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.oss.HBaseObjectStoreSemantics;
+import org.apache.hadoop.hbase.oss.TestUtils;
+import org.junit.Assume;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestHBOSSContract extends FileSystemContractBaseTest {
+
+ protected static final Logger LOG =
+ LoggerFactory.getLogger(TestHBOSSContract.class);
+
+ private Path basePath;
+ private Configuration conf;
+
+ @Rule
+ public TestName methodName = new TestName();
+
+ private void nameThread() {
+ Thread.currentThread().setName("JUnit-" + methodName.getMethodName());
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ nameThread();
+ conf = new Configuration();
+ fs = TestUtils.getFileSystem(conf);
+ Assume.assumeNotNull(fs);
+ HBaseObjectStoreSemantics hboss = (HBaseObjectStoreSemantics)fs;
+ basePath = fs.makeQualified(TestUtils.testPath(hboss, "ITestHBOSSContract"));
+ }
+
+ @Override
+ public Path getTestBaseDir() {
+ return basePath;
+ }
+
+ @Test
+ public void testMkdirsWithUmask() throws Exception {
+ // Skipped in the hadoop-aws tests
+ Assume.assumeFalse(TestUtils.fsIs(TestUtils.S3A, conf));
+ super.testMkdirsWithUmask();
+ }
+
+ @Test
+ public void testRenameDirectoryAsExistingDirectory() throws Exception {
+ if (!TestUtils.fsIs(TestUtils.S3A, conf)) {
+ super.testRenameDirectoryAsExistingDirectory();
+ return;
+ }
+
+ // Overridden implementation in the hadoop-aws tests
+ Assume.assumeTrue(renameSupported());
+
+ Path src = path("testRenameDirectoryAsExisting/dir");
+ fs.mkdirs(src);
+ createFile(path(src + "/file1"));
+ createFile(path(src + "/subdir/file2"));
+
+ Path dst = path("testRenameDirectoryAsExistingNew/newdir");
+ fs.mkdirs(dst);
+ rename(src, dst, true, false, true);
+ Assert.assertFalse("Nested file1 exists",
+ fs.exists(path(src + "/file1")));
+ Assert.assertFalse("Nested file2 exists",
+ fs.exists(path(src + "/subdir/file2")));
+ Assert.assertTrue("Renamed nested file1 exists",
+ fs.exists(path(dst + "/file1")));
+ Assert.assertTrue("Renamed nested exists",
+ fs.exists(path(dst + "/subdir/file2")));
+ }
+
+ @Test
+ public void testMoveDirUnderParent() throws Throwable {
+ // Skipped in the hadoop-aws tests
+ Assume.assumeFalse(TestUtils.fsIs(TestUtils.S3A, conf));
+ super.testMoveDirUnderParent();
+ }
+}
diff --git a/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/contract/TestHBOSSContractAppend.java b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/contract/TestHBOSSContractAppend.java
new file mode 100644
index 0000000..568ffbb
--- /dev/null
+++ b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/contract/TestHBOSSContractAppend.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.oss.contract;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.contract.AbstractContractAppendTest;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+
+/**
+ * Note that this has not been tested because S3A does not support it.
+ */
+public class TestHBOSSContractAppend extends AbstractContractAppendTest {
+
+ @Override
+ protected Configuration createConfiguration() {
+ return super.createConfiguration();
+ }
+
+ @Override
+ protected AbstractFSContract createContract(Configuration conf) {
+ return new HBOSSContract(conf);
+ }
+}
diff --git a/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/contract/TestHBOSSContractConcat.java b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/contract/TestHBOSSContractConcat.java
new file mode 100644
index 0000000..9639057
--- /dev/null
+++ b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/contract/TestHBOSSContractConcat.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.oss.contract;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.contract.AbstractContractConcatTest;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+
+/**
+ * Note that this has not been tested because S3A does not support it.
+ */
+public class TestHBOSSContractConcat extends AbstractContractConcatTest {
+
+ @Override
+ protected Configuration createConfiguration() {
+ return super.createConfiguration();
+ }
+
+ @Override
+ protected AbstractFSContract createContract(Configuration conf) {
+ return new HBOSSContract(conf);
+ }
+}
diff --git a/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/contract/TestHBOSSContractCreate.java b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/contract/TestHBOSSContractCreate.java
new file mode 100644
index 0000000..ec088fa
--- /dev/null
+++ b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/contract/TestHBOSSContractCreate.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.oss.contract;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.AbstractContractCreateTest;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.hbase.oss.EmbeddedS3;
+import org.apache.hadoop.hbase.oss.TestUtils;
+import org.apache.hadoop.hbase.oss.sync.TreeLockManager;
+import org.junit.Assume;
+import org.junit.Test;
+
+public class TestHBOSSContractCreate extends AbstractContractCreateTest {
+
+ @Override
+ protected Configuration createConfiguration() {
+ return super.createConfiguration();
+ }
+
+ @Override
+ protected AbstractFSContract createContract(Configuration conf) {
+ return new HBOSSContract(conf);
+ }
+
+ @Test
+ @Override
+ public void testCreatedFileIsVisibleOnFlush() throws Throwable {
+ Configuration conf = createConfiguration();
+ try {
+ TestUtils.getFileSystem(conf);
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail("Exception configuring FS: " + e);
+ }
+ // HBOSS satisfies the contract that this test checks for, but it also
+ // relies on flush, which s3a still does not support.
+ Assume.assumeFalse(TestUtils.fsIs(TestUtils.S3A, conf));
+ super.testCreatedFileIsVisibleOnFlush();
+ }
+
+ @Test
+ @Override
+ public void testCreatedFileIsImmediatelyVisible() throws Throwable {
+ describe("verify that a newly created file exists as soon as open returns");
+ Path path = path("testCreatedFileIsImmediatelyVisible");
+ try(FSDataOutputStream out = getFileSystem().create(path,
+ false,
+ 4096,
+ (short) 1,
+ 1024)) {
+
+ // The original contract test delays close() until after the files appear.
+ // HBOSS only guarantees that create() + close() is atomic, so the
+ // original test still wouldn't work.
+ out.close();
+
+ if (!getFileSystem().exists(path)) {
+
+ if (isSupported(CREATE_VISIBILITY_DELAYED)) {
+ // For some file systems, downgrade to a skip so that the failure is
+ // visible in test results.
+ ContractTestUtils.skip(
+ "This Filesystem delays visibility of newly created files");
+ }
+ assertPathExists("expected path to be visible before anything written",
+ path);
+ }
+ }
+ }
+}
diff --git a/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/contract/TestHBOSSContractDelete.java b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/contract/TestHBOSSContractDelete.java
new file mode 100644
index 0000000..3a1a503
--- /dev/null
+++ b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/contract/TestHBOSSContractDelete.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.oss.contract;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.contract.AbstractContractDeleteTest;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+
+public class TestHBOSSContractDelete extends AbstractContractDeleteTest {
+
+ @Override
+ protected Configuration createConfiguration() {
+ return super.createConfiguration();
+ }
+
+ @Override
+ protected AbstractFSContract createContract(Configuration conf) {
+ return new HBOSSContract(conf);
+ }
+}
diff --git a/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/contract/TestHBOSSContractDistCp.java b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/contract/TestHBOSSContractDistCp.java
new file mode 100644
index 0000000..73afb07
--- /dev/null
+++ b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/contract/TestHBOSSContractDistCp.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.oss.contract;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+import org.apache.hadoop.hbase.oss.TestUtils;
+import org.apache.hadoop.tools.contract.AbstractContractDistCpTest;
+
+public class TestHBOSSContractDistCp extends AbstractContractDistCpTest {
+
+ @Override
+ protected Configuration createConfiguration() {
+ return super.createConfiguration();
+ }
+
+ @Override
+ protected AbstractFSContract createContract(Configuration conf) {
+ return new HBOSSContract(conf);
+ }
+}
diff --git a/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/contract/TestHBOSSContractGetFileStatus.java b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/contract/TestHBOSSContractGetFileStatus.java
new file mode 100644
index 0000000..3425afc
--- /dev/null
+++ b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/contract/TestHBOSSContractGetFileStatus.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.oss.contract;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.contract.AbstractContractGetFileStatusTest;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+import org.apache.hadoop.hbase.oss.TestUtils;
+
+public class TestHBOSSContractGetFileStatus extends AbstractContractGetFileStatusTest {
+
+ @Override
+ protected Configuration createConfiguration() {
+ Configuration conf = super.createConfiguration();
+ try {
+ TestUtils.getFileSystem(conf);
+ TestUtils.disableFilesystemCaching(conf);
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail("Exception configuring FS: " + e);
+ }
+ return conf;
+ }
+
+ @Override
+ protected AbstractFSContract createContract(Configuration conf) {
+ return new HBOSSContract(conf);
+ }
+}
diff --git a/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/contract/TestHBOSSContractMkdir.java b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/contract/TestHBOSSContractMkdir.java
new file mode 100644
index 0000000..268d8d1
--- /dev/null
+++ b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/contract/TestHBOSSContractMkdir.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.oss.contract;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.contract.AbstractContractMkdirTest;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+
+public class TestHBOSSContractMkdir extends AbstractContractMkdirTest {
+
+ @Override
+ protected Configuration createConfiguration() {
+ return super.createConfiguration();
+ }
+
+ @Override
+ protected AbstractFSContract createContract(Configuration conf) {
+ return new HBOSSContract(conf);
+ }
+}
diff --git a/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/contract/TestHBOSSContractOpen.java b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/contract/TestHBOSSContractOpen.java
new file mode 100644
index 0000000..36adfcf
--- /dev/null
+++ b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/contract/TestHBOSSContractOpen.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.oss.contract;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.contract.AbstractContractOpenTest;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+
+public class TestHBOSSContractOpen extends AbstractContractOpenTest {
+
+ @Override
+ protected Configuration createConfiguration() {
+ return super.createConfiguration();
+ }
+
+ @Override
+ protected AbstractFSContract createContract(Configuration conf) {
+ return new HBOSSContract(conf);
+ }
+}
diff --git a/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/contract/TestHBOSSContractRename.java b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/contract/TestHBOSSContractRename.java
new file mode 100644
index 0000000..f1047c4
--- /dev/null
+++ b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/contract/TestHBOSSContractRename.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.oss.contract;
+
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+import org.apache.hadoop.fs.contract.AbstractContractRenameTest;
+import org.apache.hadoop.hbase.oss.EmbeddedS3;
+import org.apache.hadoop.hbase.oss.TestUtils;
+import org.junit.Assume;
+import org.junit.Test;
+
+/**
+ * There is an S3A-specific extension of AbstractContractRenameTest
+ * that is similarly extended for HBOSS-on-S3A. This class remains to be run in
+ * the general case.
+ */
+public class TestHBOSSContractRename extends AbstractContractRenameTest {
+
+ @Override
+ protected Configuration createConfiguration() {
+ return super.createConfiguration();
+ }
+
+ @Override
+ protected AbstractFSContract createContract(Configuration conf) {
+ HBOSSContract contract = new HBOSSContract(conf);
+ try {
+ TestUtils.getFileSystem(conf);
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail("Exception configuring FS: " + e);
+ }
+ Assume.assumeFalse(TestUtils.fsIs(TestUtils.S3A, conf));
+ return contract;
+ }
+}
diff --git a/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/contract/TestHBOSSContractRenameS3A.java b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/contract/TestHBOSSContractRenameS3A.java
new file mode 100644
index 0000000..a441dbf
--- /dev/null
+++ b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/contract/TestHBOSSContractRenameS3A.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.oss.contract;
+
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+import org.apache.hadoop.fs.contract.AbstractContractRenameTest;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.hbase.oss.EmbeddedS3;
+import org.apache.hadoop.hbase.oss.TestUtils;
+import org.junit.Assume;
+import org.junit.Test;
+
+/**
+ * There is an S3A-specific extension of AbstractContractRenameTest, and this
+ * class implements the same modifications for HBOSS-on-S3A.
+ * TestHBOSSContractRename remains to be run in the general case.
+ */
+public class TestHBOSSContractRenameS3A extends AbstractContractRenameTest {
+
+ @Override
+ protected Configuration createConfiguration() {
+ return super.createConfiguration();
+ }
+
+ @Override
+ protected AbstractFSContract createContract(Configuration conf) {
+ HBOSSContract contract = new HBOSSContract(conf);
+ try {
+ TestUtils.getFileSystem(conf);
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail("Exception configuring FS: " + e);
+ }
+ Assume.assumeTrue(TestUtils.fsIs(TestUtils.S3A, conf));
+ return contract;
+ }
+
+ // This is copied from Hadoop's ITestS3AContractRename. Ideally we could
+ // extend that instead of AbstractContractRenameTest, but it is not published.
+ @Override
+ public void testRenameDirIntoExistingDir() throws Throwable {
+ describe("Verify renaming a dir into an existing dir puts the files"
+ +" from the source dir into the existing dir"
+ +" and leaves existing files alone");
+ FileSystem fs = getFileSystem();
+ String sourceSubdir = "source";
+ Path srcDir = path(sourceSubdir);
+ Path srcFilePath = new Path(srcDir, "source-256.txt");
+ byte[] srcDataset = ContractTestUtils.dataset(256, 'a', 'z');
+ ContractTestUtils.writeDataset(fs, srcFilePath, srcDataset, srcDataset.length, 1024, false);
+ Path destDir = path("dest");
+
+ Path destFilePath = new Path(destDir, "dest-512.txt");
+ byte[] destDateset = ContractTestUtils.dataset(512, 'A', 'Z');
+ ContractTestUtils.writeDataset(fs, destFilePath, destDateset, destDateset.length, 1024,
+ false);
+ assertIsFile(destFilePath);
+
+ boolean rename = fs.rename(srcDir, destDir);
+ assertFalse("s3a doesn't support rename to non-empty directory", rename);
+ }
+}
diff --git a/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/contract/TestHBOSSContractRootDirectory.java b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/contract/TestHBOSSContractRootDirectory.java
new file mode 100644
index 0000000..189693a
--- /dev/null
+++ b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/contract/TestHBOSSContractRootDirectory.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.oss.contract;
+
+import java.net.URI;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.contract.AbstractContractRootDirectoryTest;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+public class TestHBOSSContractRootDirectory extends AbstractContractRootDirectoryTest {
+
+ @Override
+ protected Configuration createConfiguration() {
+ return super.createConfiguration();
+ }
+
+ @Override
+ protected AbstractFSContract createContract(Configuration conf) {
+ return new HBOSSContract(conf);
+ }
+}
diff --git a/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/contract/TestHBOSSContractSeek.java b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/contract/TestHBOSSContractSeek.java
new file mode 100644
index 0000000..136b056
--- /dev/null
+++ b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/contract/TestHBOSSContractSeek.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.oss.contract;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.contract.AbstractContractSeekTest;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+
+public class TestHBOSSContractSeek extends AbstractContractSeekTest {
+
+ @Override
+ protected Configuration createConfiguration() {
+ return super.createConfiguration();
+ }
+
+ @Override
+ protected AbstractFSContract createContract(Configuration conf) {
+ return new HBOSSContract(conf);
+ }
+}
diff --git a/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/contract/TestHBOSSContractSetTimes.java b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/contract/TestHBOSSContractSetTimes.java
new file mode 100644
index 0000000..b5223cb
--- /dev/null
+++ b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/contract/TestHBOSSContractSetTimes.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.oss.contract;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.contract.AbstractContractSetTimesTest;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+
+/**
+ * Note that this has not been tested because S3A does not support it.
+ */
+public class TestHBOSSContractSetTimes extends AbstractContractSetTimesTest {
+
+ @Override
+ protected Configuration createConfiguration() {
+ return super.createConfiguration();
+ }
+
+ @Override
+ protected AbstractFSContract createContract(Configuration conf) {
+ return new HBOSSContract(conf);
+ }
+}
diff --git a/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/sync/LocalTreeLockManager.java b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/sync/LocalTreeLockManager.java
new file mode 100644
index 0000000..60b213c
--- /dev/null
+++ b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/sync/LocalTreeLockManager.java
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.oss.sync;
+
+import java.io.IOException;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation based on standard Java concurrency primitives. This makes it
+ * useless for anything but testing, development, and maybe a single-node HBase
+ * instance. It is however much faster and easier to debug, despite including
+ * logic very similar to what other implementations would need.
+ *
+ * Can be enabled in JUnit tests with -Plocal (although it's the default).
+ */
+public class LocalTreeLockManager extends TreeLockManager {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(LocalTreeLockManager.class);
+
+ public void initialize(FileSystem fs) {
+ this.fs = fs;
+ }
+
+ @Override
+ protected void writeLock(Path p) throws IOException {
+ createLocksIfNeeded(p);
+ index.get(p).lock.writeLock().lock();
+ }
+
+ @Override
+ protected void writeUnlock(Path p) throws IOException {
+ try {
+ LockNode node = index.get(p);
+ // Node to unlock may already be gone after deletes
+ if (node != null) {
+ node.lock.writeLock().unlock();
+ }
+ } catch(IllegalMonitorStateException e) {
+ // Reentrant locks might be acquired multiple times
+ LOG.error("Tried to release unacquired write lock: {}", p);
+ throw e;
+ }
+ }
+
+ @Override
+ protected void readLock(Path p) throws IOException {
+ createLocksIfNeeded(p);
+ index.get(p).lock.readLock().lock();
+ }
+
+ @Override
+ protected void readUnlock(Path p) throws IOException {
+ try {
+ index.get(p).lock.readLock().unlock();
+ } catch(IllegalMonitorStateException e) {
+ // Reentrant locks might be acquired multiple times
+ LOG.error("Tried to release unacquired read lock: {}", p);
+ throw e;
+ }
+ }
+
+ @Override
+ protected boolean writeLockAbove(Path p) throws IOException {
+ createLocksIfNeeded(p);
+ while (!p.isRoot()) {
+ p = p.getParent();
+ LockNode currentNode = index.get(p);
+ if (currentNode.lock.isWriteLocked() &&
+ !currentNode.lock.isWriteLockedByCurrentThread()) {
+ LOG.warn("Parent write lock currently held: {}", p);
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ protected boolean writeLockBelow(Path p) throws IOException {
+ createLocksIfNeeded(p);
+ return writeLockBelow(p, true);
+ }
+
+ @Override
+ protected boolean readLockBelow(Path p) throws IOException {
+ createLocksIfNeeded(p);
+ return readLockBelow(p, true);
+ }
+
+ @Override
+ protected void recursiveDelete(Path p) {
+ if (!p.isRoot()) {
+ index.get(p.getParent()).children.remove(p);
+ }
+ LockNode currentNode = index.get(p);
+ innerRecursiveDelete(p);
+ currentNode.lock.writeLock().unlock();
+ }
+
+ private void innerRecursiveDelete(Path p) {
+ LockNode currentNode = index.remove(p);
+ Set<Path> childPaths = currentNode.children.keySet();
+ for (Path child : childPaths) {
+ innerRecursiveDelete(child);
+ }
+ }
+
+ private class LockNode {
+ public Path path;
+
+ public LockNode(Path path) {
+ this.path = path;
+ }
+
+ public Map<Path, LockNode> children = new HashMap<>();
+ public ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+
+ }
+
+ private Map<Path, LockNode> index = new HashMap<>();
+
+ private synchronized void createLocksIfNeeded(Path p) {
+ if (index.containsKey(p)) {
+ return;
+ }
+ Path lastPath = null;
+ while (p != null) {
+ LockNode currentNode = index.get(p);
+ if (currentNode != null) {
+ if (lastPath != null) {
+ currentNode.children.put(lastPath, index.get(lastPath));
+ }
+ return;
+ }
+ currentNode = new LockNode(p);
+ if (lastPath != null) {
+ currentNode.children.put(lastPath, index.get(lastPath));
+ }
+ index.put(p, currentNode);
+ lastPath = p;
+ p = p.getParent();
+ }
+ }
+
+ private synchronized boolean writeLockBelow(Path p, boolean firstLevel) {
+ LockNode currentNode = index.get(p);
+ if (!firstLevel && currentNode.lock.isWriteLocked() &&
+ !currentNode.lock.isWriteLockedByCurrentThread()) {
+ LOG.warn("Child write lock current held: {}", p);
+ return true;
+ }
+ Set<Path> childPaths = currentNode.children.keySet();
+ for (Path child : childPaths) {
+ if (writeLockBelow(child, false)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ // TODO will return true even if current thread has a read lock below...
+ private synchronized boolean readLockBelow(Path p, boolean firstLevel) {
+ LockNode currentNode = index.get(p);
+ if (!firstLevel && currentNode.lock.getReadLockCount() > 0) {
+ LOG.warn("Child read lock currently held: {}", p);
+ return true;
+ }
+ Set<Path> childPaths = index.get(p).children.keySet();
+ for (Path child : childPaths) {
+ if (readLockBelow(child, false)) {
+ return true;
+ }
+ }
+ return false;
+ }
+}
+
diff --git a/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/sync/NullTreeLockManager.java b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/sync/NullTreeLockManager.java
new file mode 100644
index 0000000..9d94077
--- /dev/null
+++ b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/sync/NullTreeLockManager.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.oss.sync;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Bypasses all synchronization to effectively make HBOSS operations no-ops.
+ *
+ * Can be enabled in JUnit tests with -Pnull to reproduce the problems HBOSS is
+ * intended to fix.
+ */
+public class NullTreeLockManager extends TreeLockManager {
+
+ public void initialize(FileSystem fs) {
+ this.fs = fs;
+ }
+
+ @Override
+ protected void writeLock(Path p) {
+ }
+
+ @Override
+ protected void writeUnlock(Path p) {
+ }
+
+ @Override
+ protected void readLock(Path p) {
+ }
+
+ @Override
+ protected void readUnlock(Path p) {
+ }
+
+ @Override
+ protected boolean writeLockAbove(Path p) {
+ return false;
+ }
+
+ @Override
+ protected boolean writeLockBelow(Path p) {
+ return false;
+ }
+
+ @Override
+ protected boolean readLockBelow(Path p) {
+ return false;
+ }
+
+ @Override
+ protected void recursiveDelete(Path p) {
+ }
+}
+
diff --git a/hbase-oss/src/test/resources/contract/s3a.xml b/hbase-oss/src/test/resources/contract/s3a.xml
new file mode 100644
index 0000000..72aac03
--- /dev/null
+++ b/hbase-oss/src/test/resources/contract/s3a.xml
@@ -0,0 +1,128 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<configuration>
+
+ <!--
+ This is S3A's contract with HBOSS guarantees added. Specifically:
+
+ fs.contract.is-blobstore = false
+ fs.contract.create-visibility-delayed = false
+ fs.contract.supports-atomic-directory-delete = true
+ fs.contract.supports-atomic-rename = true
+
+ Note that fs.contract.is-blobstore appears to be identical in meaning to
+ fs.contract.create-visibility-delayed.
+ -->
+
+ <property>
+ <name>fs.contract.test.root-tests-enabled</name>
+ <value>true</value>
+ </property>
+
+ <property>
+ <name>fs.contract.test.random-seek-count</name>
+ <value>10</value>
+ </property>
+
+ <property>
+ <name>fs.contract.is-blobstore</name>
+ <value>false</value>
+ </property>
+
+ <property>
+ <name>fs.contract.create-visibility-delayed</name>
+ <value>false</value>
+ </property>
+
+ <property>
+ <name>fs.contract.is-case-sensitive</name>
+ <value>true</value>
+ </property>
+
+ <property>
+ <name>fs.contract.rename-returns-false-if-source-missing</name>
+ <value>true</value>
+ </property>
+
+ <property>
+ <name>fs.contract.rename-remove-dest-if-empty-dir</name>
+ <value>true</value>
+ </property>
+
+ <property>
+ <name>fs.contract.supports-append</name>
+ <value>false</value>
+ </property>
+
+ <property>
+ <name>fs.contract.supports-atomic-directory-delete</name>
+ <value>true</value>
+ </property>
+
+ <property>
+ <name>fs.contract.supports-atomic-rename</name>
+ <value>true</value>
+ </property>
+
+ <property>
+ <name>fs.contract.supports-block-locality</name>
+ <value>false</value>
+ </property>
+
+ <property>
+ <name>fs.contract.supports-concat</name>
+ <value>false</value>
+ </property>
+
+ <property>
+ <name>fs.contract.supports-getfilestatus</name>
+ <value>true</value>
+ </property>
+
+ <property>
+ <name>fs.contract.supports-seek</name>
+ <value>true</value>
+ </property>
+
+ <property>
+ <name>fs.contract.supports-seek-on-closed-file</name>
+ <value>true</value>
+ </property>
+
+ <property>
+ <name>fs.contract.rejects-seek-past-eof</name>
+ <value>true</value>
+ </property>
+
+ <property>
+ <name>fs.contract.supports-strict-exceptions</name>
+ <value>true</value>
+ </property>
+
+ <property>
+ <name>fs.contract.supports-unix-permissions</name>
+ <value>false</value>
+ </property>
+
+ <property>
+ <name>fs.contract.rename-overwrites-dest</name>
+ <value>false</value>
+ </property>
+
+</configuration>
diff --git a/hbase-oss/src/test/resources/core-site.xml b/hbase-oss/src/test/resources/core-site.xml
new file mode 100644
index 0000000..a46594f
--- /dev/null
+++ b/hbase-oss/src/test/resources/core-site.xml
@@ -0,0 +1,63 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<configuration>
+
+ <!--
+ By default the tests run against a mocked S3 client. To run these against
+ the real Amazon S3 service or another object store, set fs.hboss.data.uri
+ and any required credentials in auth-keys.xml. e.g.:
+
+ <configuration>
+ <property>
+ <name>fs.hboss.data.uri</name>
+ <value>s3a://.../</value>
+ </property>
+
+ <property>
+ <name>fs.s3a.secret.key</name>
+ <value>...</value>
+ </property>
+
+ <property>
+ <name>fs.s3a.access.key</name>
+ <value>...</value>
+ </property>
+ </configuration>
+
+ S3Guard will also be configured to the LocalMetadataStore implementation
+ by default. You may wish to configure DynamoDBMetadataStore for integration
+ or if S3Guard is not needed (S3-compatible appliance, for instance), you
+ can disable it by configuring NullMetadataStore explicitly (see below).
+ -->
+
+ <!--
+ <property>
+ <name>fs.s3a.metadatastore.impl</name>
+ <value>org.apache.hadoop.fs.s3a.s3guard.NullMetadataStore</value>
+ <description>Uncomment to disable S3Guard entirely</description>
+ </property>
+ -->
+
+ <include xmlns="http://www.w3.org/2001/XInclude" href="auth-keys.xml">
+ <fallback/>
+ </include>
+
+</configuration>
diff --git a/hbase-oss/src/test/resources/log4j.properties b/hbase-oss/src/test/resources/log4j.properties
new file mode 100644
index 0000000..a59c82e
--- /dev/null
+++ b/hbase-oss/src/test/resources/log4j.properties
@@ -0,0 +1,19 @@
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# log4j configuration used during build and unit tests
+
+log4j.rootLogger=info,stdout
+log4j.threshold=ALL
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} (%F:%M(%L)) - %m%n
+log4j.logger.org.apache.hadoop=DEBUG
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..ec62ba5
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,71 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License. See accompanying LICENSE file.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <groupId>org.apache.hbase.filesystem</groupId>
+ <artifactId>hbase-filesystem</artifactId>
+ <version>1.0.0-alpha1-SNAPSHOT</version>
+
+ <name>Apache HBase FileSystem-related Modules</name>
+ <description>
+ This project houses projects that work at the Apache Hadoop FileSystem layer
+ to meet Apache HBase's needs. This work is experimental, and may eventually
+ be replaced by other initiatives in HBase. It is therefore released
+ separately.
+ </description>
+ <packaging>pom</packaging>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <maven.compiler.source>1.8</maven.compiler.source>
+ <maven.compiler.target>1.8</maven.compiler.target>
+
+ <audience-annotations.version>0.5.0</audience-annotations.version>
+ <aws-java-sdk.version>1.11.525</aws-java-sdk.version>
+ <commons-io.version>2.5</commons-io.version>
+ <commons-lang3.version>3.6</commons-lang3.version>
+ <curator.version>4.0.0</curator.version>
+ <hadoop.version>3.2.0</hadoop.version>
+ <hbase.version>2.1.4</hbase.version>
+ <hbase-thirdparty.version>2.2.0</hbase-thirdparty.version>
+ <junit.version>4.12</junit.version>
+ <slf4j.version>1.7.25</slf4j.version>
+ <zookeeper.version>3.4.10</zookeeper.version>
+ </properties>
+
+ <modules>
+ <module>hbase-oss</module>
+ </modules>
+
+ <build>
+ <pluginManagement>
+ <plugins>
+ <plugin>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>2.22.1</version>
+ <configuration>
+ <redirectTestOutputToFile>true</redirectTestOutputToFile>
+ <reuseForks>false</reuseForks>
+ <systemProperties>
+ <test.unique.fork.id>fork-000${surefire.forkNumber}</test.unique.fork.id>
+ </systemProperties>
+ </configuration>
+ </plugin>
+ </plugins>
+ </pluginManagement>
+ </build>
+</project>