HBASE-22149 HBOSS: A FileSystem implementation to provide HBase's required semantics on object stores.


* Adds top level project structure
* Adds module for hbase-oss with initial implementation of wrapper FileSystem to enforce semantics needed by hbase

Closes #1 
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..061a95c
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,4 @@
+target
+auth-keys.xml
+.idea
+*.iml
diff --git a/LICENSE.txt b/LICENSE.txt
new file mode 100644
index 0000000..d645695
--- /dev/null
+++ b/LICENSE.txt
@@ -0,0 +1,202 @@
+
+                                 Apache License
+                           Version 2.0, January 2004
+                        http://www.apache.org/licenses/
+
+   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+   1. Definitions.
+
+      "License" shall mean the terms and conditions for use, reproduction,
+      and distribution as defined by Sections 1 through 9 of this document.
+
+      "Licensor" shall mean the copyright owner or entity authorized by
+      the copyright owner that is granting the License.
+
+      "Legal Entity" shall mean the union of the acting entity and all
+      other entities that control, are controlled by, or are under common
+      control with that entity. For the purposes of this definition,
+      "control" means (i) the power, direct or indirect, to cause the
+      direction or management of such entity, whether by contract or
+      otherwise, or (ii) ownership of fifty percent (50%) or more of the
+      outstanding shares, or (iii) beneficial ownership of such entity.
+
+      "You" (or "Your") shall mean an individual or Legal Entity
+      exercising permissions granted by this License.
+
+      "Source" form shall mean the preferred form for making modifications,
+      including but not limited to software source code, documentation
+      source, and configuration files.
+
+      "Object" form shall mean any form resulting from mechanical
+      transformation or translation of a Source form, including but
+      not limited to compiled object code, generated documentation,
+      and conversions to other media types.
+
+      "Work" shall mean the work of authorship, whether in Source or
+      Object form, made available under the License, as indicated by a
+      copyright notice that is included in or attached to the work
+      (an example is provided in the Appendix below).
+
+      "Derivative Works" shall mean any work, whether in Source or Object
+      form, that is based on (or derived from) the Work and for which the
+      editorial revisions, annotations, elaborations, or other modifications
+      represent, as a whole, an original work of authorship. For the purposes
+      of this License, Derivative Works shall not include works that remain
+      separable from, or merely link (or bind by name) to the interfaces of,
+      the Work and Derivative Works thereof.
+
+      "Contribution" shall mean any work of authorship, including
+      the original version of the Work and any modifications or additions
+      to that Work or Derivative Works thereof, that is intentionally
+      submitted to Licensor for inclusion in the Work by the copyright owner
+      or by an individual or Legal Entity authorized to submit on behalf of
+      the copyright owner. For the purposes of this definition, "submitted"
+      means any form of electronic, verbal, or written communication sent
+      to the Licensor or its representatives, including but not limited to
+      communication on electronic mailing lists, source code control systems,
+      and issue tracking systems that are managed by, or on behalf of, the
+      Licensor for the purpose of discussing and improving the Work, but
+      excluding communication that is conspicuously marked or otherwise
+      designated in writing by the copyright owner as "Not a Contribution."
+
+      "Contributor" shall mean Licensor and any individual or Legal Entity
+      on behalf of whom a Contribution has been received by Licensor and
+      subsequently incorporated within the Work.
+
+   2. Grant of Copyright License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      copyright license to reproduce, prepare Derivative Works of,
+      publicly display, publicly perform, sublicense, and distribute the
+      Work and such Derivative Works in Source or Object form.
+
+   3. Grant of Patent License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      (except as stated in this section) patent license to make, have made,
+      use, offer to sell, sell, import, and otherwise transfer the Work,
+      where such license applies only to those patent claims licensable
+      by such Contributor that are necessarily infringed by their
+      Contribution(s) alone or by combination of their Contribution(s)
+      with the Work to which such Contribution(s) was submitted. If You
+      institute patent litigation against any entity (including a
+      cross-claim or counterclaim in a lawsuit) alleging that the Work
+      or a Contribution incorporated within the Work constitutes direct
+      or contributory patent infringement, then any patent licenses
+      granted to You under this License for that Work shall terminate
+      as of the date such litigation is filed.
+
+   4. Redistribution. You may reproduce and distribute copies of the
+      Work or Derivative Works thereof in any medium, with or without
+      modifications, and in Source or Object form, provided that You
+      meet the following conditions:
+
+      (a) You must give any other recipients of the Work or
+          Derivative Works a copy of this License; and
+
+      (b) You must cause any modified files to carry prominent notices
+          stating that You changed the files; and
+
+      (c) You must retain, in the Source form of any Derivative Works
+          that You distribute, all copyright, patent, trademark, and
+          attribution notices from the Source form of the Work,
+          excluding those notices that do not pertain to any part of
+          the Derivative Works; and
+
+      (d) If the Work includes a "NOTICE" text file as part of its
+          distribution, then any Derivative Works that You distribute must
+          include a readable copy of the attribution notices contained
+          within such NOTICE file, excluding those notices that do not
+          pertain to any part of the Derivative Works, in at least one
+          of the following places: within a NOTICE text file distributed
+          as part of the Derivative Works; within the Source form or
+          documentation, if provided along with the Derivative Works; or,
+          within a display generated by the Derivative Works, if and
+          wherever such third-party notices normally appear. The contents
+          of the NOTICE file are for informational purposes only and
+          do not modify the License. You may add Your own attribution
+          notices within Derivative Works that You distribute, alongside
+          or as an addendum to the NOTICE text from the Work, provided
+          that such additional attribution notices cannot be construed
+          as modifying the License.
+
+      You may add Your own copyright statement to Your modifications and
+      may provide additional or different license terms and conditions
+      for use, reproduction, or distribution of Your modifications, or
+      for any such Derivative Works as a whole, provided Your use,
+      reproduction, and distribution of the Work otherwise complies with
+      the conditions stated in this License.
+
+   5. Submission of Contributions. Unless You explicitly state otherwise,
+      any Contribution intentionally submitted for inclusion in the Work
+      by You to the Licensor shall be under the terms and conditions of
+      this License, without any additional terms or conditions.
+      Notwithstanding the above, nothing herein shall supersede or modify
+      the terms of any separate license agreement you may have executed
+      with Licensor regarding such Contributions.
+
+   6. Trademarks. This License does not grant permission to use the trade
+      names, trademarks, service marks, or product names of the Licensor,
+      except as required for reasonable and customary use in describing the
+      origin of the Work and reproducing the content of the NOTICE file.
+
+   7. Disclaimer of Warranty. Unless required by applicable law or
+      agreed to in writing, Licensor provides the Work (and each
+      Contributor provides its Contributions) on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+      implied, including, without limitation, any warranties or conditions
+      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+      PARTICULAR PURPOSE. You are solely responsible for determining the
+      appropriateness of using or redistributing the Work and assume any
+      risks associated with Your exercise of permissions under this License.
+
+   8. Limitation of Liability. In no event and under no legal theory,
+      whether in tort (including negligence), contract, or otherwise,
+      unless required by applicable law (such as deliberate and grossly
+      negligent acts) or agreed to in writing, shall any Contributor be
+      liable to You for damages, including any direct, indirect, special,
+      incidental, or consequential damages of any character arising as a
+      result of this License or out of the use or inability to use the
+      Work (including but not limited to damages for loss of goodwill,
+      work stoppage, computer failure or malfunction, or any and all
+      other commercial damages or losses), even if such Contributor
+      has been advised of the possibility of such damages.
+
+   9. Accepting Warranty or Additional Liability. While redistributing
+      the Work or Derivative Works thereof, You may choose to offer,
+      and charge a fee for, acceptance of support, warranty, indemnity,
+      or other liability obligations and/or rights consistent with this
+      License. However, in accepting such obligations, You may act only
+      on Your own behalf and on Your sole responsibility, not on behalf
+      of any other Contributor, and only if You agree to indemnify,
+      defend, and hold each Contributor harmless for any liability
+      incurred by, or claims asserted against, such Contributor by reason
+      of your accepting any such warranty or additional liability.
+
+   END OF TERMS AND CONDITIONS
+
+   APPENDIX: How to apply the Apache License to your work.
+
+      To apply the Apache License to your work, attach the following
+      boilerplate notice, with the fields enclosed by brackets "[]"
+      replaced with your own identifying information. (Don't include
+      the brackets!)  The text should be enclosed in the appropriate
+      comment syntax for the file format. We also recommend that a
+      file or class name and description of purpose be included on the
+      same "printed page" as the copyright notice for easier
+      identification within third-party archives.
+
+   Copyright [yyyy] [name of copyright owner]
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
diff --git a/NOTICE.txt b/NOTICE.txt
new file mode 100644
index 0000000..6e3985d
--- /dev/null
+++ b/NOTICE.txt
@@ -0,0 +1,5 @@
+Apache HBase FileSystem-related modules
+Copyright 2019 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
diff --git a/hbase-oss/README.md b/hbase-oss/README.md
new file mode 100644
index 0000000..1d100af
--- /dev/null
+++ b/hbase-oss/README.md
@@ -0,0 +1,123 @@
+<!---
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+
+# HBOSS: HBase / Object Store Semantics adapter
+
+## Introduction
+
+This module provides an implementation of Apache Hadoop's FileSystem interface
+that bridges the gap between Apache HBase, which assumes that many operations
+are atomic, and object-store implementations of FileSystem (such as s3a) which
+inherently cannot provide atomic semantics to those operations natively.
+
+This is implemented separately from s3a so that it can potentially be used for
+other object stores. It is also impractical to provide the required semantics
+for the general case without significant drawbacks in some cases. A separate
+implementation allows all trade-offs to be made on HBase's terms.
+
+## Lock Implementations
+
+TreeLockManager implements generic logic for managing read / write locks on
+branches of filesystem hierarchies, but needs to be extended by an
+implementation that provides individual read / write locks and methods to
+traverse the tree.
+
+The desired implementation must be configured by setting to one of the values
+below:
+
+    fs.hboss.sync.impl
+
+### Null Implementation (org.apache.hadoop.hbase.oss.sync.NullTreeLockManager)
+
+The null implementation just provides no-op methods instead of actual locking
+operations. This functions as an easy way to verify that a test case has
+successfully reproduced a problem that is hidden by the other implementations.
+
+### Local Implementation (org.apache.hadoop.hbase.oss.sync.LocalTreeLockManager)
+
+Primarily intended to help with development and validation, but could possibly
+work for a standalone instance of HBase. This implementation uses Java's
+built-in ReentrantReadWriteLock.
+
+### ZooKeeper Implementation (org.apache.hadoop.hbase.oss.sync.ZKTreeLockManager)
+
+This implementation is intended for production use once it is stable. It uses
+Apache Curator's implementation of read / write locks on Apache ZooKeeper. It
+could share a ZooKeeper ensemble with the HBase cluster.
+
+At a minimum, you must configure the ZooKeeper connection string (including
+root znode):
+
+    fs.hboss.sync.zk.connectionString
+
+You may also want to configure:
+
+    fs.hboss.sync.zk.sleep.base.ms (default 1000)
+    fs.hboss.sync.zk.sleep.max.retries (default 3)
+
+### DynamoDB Implementation (not implemented)
+
+An implementation based on Amazon's DynamoDB lock library was considered but
+was not completed due to the lack of an efficient way to traverse the tree and
+discover locks on child nodes. The benefit is that S3Guard is required for s3a
+use and as such there's a dependency on DynamoDB anyway.
+
+## Storage Implementations
+
+Currently HBOSS is primarily designed for and exclusively tested with Hadoop's
+s3a client against Amazon S3. S3Guard must be enabled. Both this requirement and
+the use of an external data store for locking have serious implications if any
+other client accesses the same data.
+
+In theory, HBOSS could also work well with Google's cloud storage client (gs)
+or other object storage clients.
+
+## FileSystem Instantiation
+
+There are 2 ways to get an HBOSS instance. It can be instantiated directly and
+given a URI and Configuration object that can be used to get the underlying
+FileSystem:
+
+    Configuration conf = new Configuration();
+    FileSystem hboss = new HBaseObjectStoreSemantics();
+    hboss.initialize("s3a://bucket/", conf);
+
+If the application code cannot be changed, you can remap the object store
+client's schema to the HBOSS implementation, and set
+fs.hboss.fs.&lt;scheme&gt;.impl to the underlying implementation that HBOSS
+should wrap:
+
+    Configuration conf = new Configuration();
+    conf.set("fs.hboss.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem");
+    conf.set("fs.s3a.impl", "org.apache.hadoop.hbase.oss.HBaseObjectStoreSemantics");
+    FileSystem hboss = FileSystem.get("s3a://bucket/", conf);
+
+## Testing
+
+You can quickly run HBOSS's tests with any of the implementations:
+
+    mvn verify -Pnull  # reproduce the problems
+    mvn verify -Plocal # useful for debugging
+    mvn verify -Pzk    # the default
+
+If the 'zk' profile is activated, it will start an embedded ZooKeeper process.
+The tests can also be run against a distributed ZooKeeper ensemble by setting
+fs.hboss.sync.zk.connectionString in src/test/resources/core-site.xml.
+
+By default, the tests will also be run against a mock S3 client that works on
+in-memory data structures. One can also set fs.hboss.data.uri to point to any
+other storage in src/test/resources/core-site.xml.
+
+Any required credentials or other individal configuration should be set in
+src/test/resources/auth-keys.xml, which should be ignored by source control.
diff --git a/hbase-oss/pom.xml b/hbase-oss/pom.xml
new file mode 100644
index 0000000..aa4c475
--- /dev/null
+++ b/hbase-oss/pom.xml
@@ -0,0 +1,236 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.hbase.filesystem</groupId>
+    <artifactId>hbase-filesystem</artifactId>
+    <version>1.0.0-alpha1-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+  <artifactId>hbase-oss</artifactId>
+  <name>Apache HBase / Object-Store Semantics Module</name>
+  <description>
+    This module provides atomic file-system semantics required by HBase when
+    running on object-store based FileSystem implementations that can not
+    natively offer those semantics. It does this by locking, so operations may
+    not be fast, but should be transactional.
+  </description>
+  <packaging>jar</packaging>
+
+  <properties>
+    <fs.hboss.sync.impl>org.apache.hadoop.hbase.oss.sync.ZKTreeLockManager</fs.hboss.sync.impl>
+    <aws-java-sdk.version>1.11.525</aws-java-sdk.version>
+  </properties>
+
+  <profiles>
+    <profile>
+      <id>null</id>
+      <properties>
+        <fs.hboss.sync.impl>org.apache.hadoop.hbase.oss.sync.NullTreeLockManager</fs.hboss.sync.impl>
+      </properties>
+    </profile>
+    <profile>
+      <id>local</id>
+      <properties>
+        <fs.hboss.sync.impl>org.apache.hadoop.hbase.oss.sync.LocalTreeLockManager</fs.hboss.sync.impl>
+      </properties>
+    </profile>
+    <profile>
+      <id>zk</id>
+      <properties>
+        <fs.hboss.sync.impl>org.apache.hadoop.hbase.oss.sync.ZKTreeLockManager</fs.hboss.sync.impl>
+      </properties>
+    </profile>
+  </profiles>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <!-- TODO add parallel tests for everything but TestHBOSSContractRootDirectory-->
+        <configuration>
+          <systemProperties>
+            <fs.hboss.sync.impl>${fs.hboss.sync.impl}</fs.hboss.sync.impl>
+          </systemProperties>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-annotations</artifactId>
+      <version>${hadoop.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <version>${hadoop.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-common</artifactId>
+      <version>${hbase.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-lang3</artifactId>
+      <version>${commons-lang3.version}</version>
+      <scope>compile</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase.thirdparty</groupId>
+      <artifactId>hbase-shaded-miscellaneous</artifactId>
+      <version>${hbase-thirdparty.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+      <version>${slf4j.version}</version>
+      <scope>compile</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-log4j12</artifactId>
+      <version>${slf4j.version}</version>
+      <scope>compile</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.yetus</groupId>
+      <artifactId>audience-annotations</artifactId>
+      <version>${audience-annotations.version}</version>
+      <scope>compile</scope>
+    </dependency>
+
+    <!-- Test dependencies -->
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <version>${hadoop.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <!-- Banned import in HBase -->
+          <groupId>com.google.code.findbugs</groupId>
+          <artifactId>jsr305</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-distcp</artifactId>
+      <version>${hadoop.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-distcp</artifactId>
+      <version>${hadoop.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-core</artifactId>
+      <version>${hadoop.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
+      <version>${hadoop.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-common</artifactId>
+      <version>${hbase.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <version>${junit.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <!-- For testing against S3 -->
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-aws</artifactId>
+      <version>${hadoop.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.amazonaws</groupId>
+      <artifactId>aws-java-sdk-bundle</artifactId>
+      <version>${aws-java-sdk.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>commons-io</groupId>
+      <artifactId>commons-io</artifactId>
+      <version>${commons-io.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <!-- For ZooKeeper implementation -->
+    <dependency>
+      <groupId>org.apache.zookeeper</groupId>
+      <artifactId>zookeeper</artifactId>
+      <version>${zookeeper.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-zookeeper</artifactId>
+      <version>${hbase.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-zookeeper</artifactId>
+      <version>${hbase.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.curator</groupId>
+      <artifactId>curator-client</artifactId>
+      <version>${curator.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.curator</groupId>
+      <artifactId>curator-framework</artifactId>
+      <version>${curator.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.curator</groupId>
+      <artifactId>curator-recipes</artifactId>
+      <version>${curator.version}</version>
+    </dependency>
+
+  </dependencies>
+
+</project>
diff --git a/hbase-oss/src/main/java/org/apache/hadoop/hbase/oss/Constants.java b/hbase-oss/src/main/java/org/apache/hadoop/hbase/oss/Constants.java
new file mode 100644
index 0000000..8221680
--- /dev/null
+++ b/hbase-oss/src/main/java/org/apache/hadoop/hbase/oss/Constants.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.oss;
+
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class Constants  {
+  public static final String DATA_URI = "fs.hboss.data.uri";
+  public static final String SYNC_IMPL = "fs.hboss.sync.impl";
+
+  public static final String ZK_CONN_STRING = "fs.hboss.sync.zk.connectionString";
+  public static final String ZK_BASE_SLEEP_MS = "fs.hboss.sync.zk.sleep.base.ms";
+  public static final String ZK_MAX_RETRIES = "fs.hboss.sync.zk.sleep.max.retries";
+
+  public static final String CONTRACT_TEST_SCHEME = "fs.contract.test.fs.scheme";
+}
diff --git a/hbase-oss/src/main/java/org/apache/hadoop/hbase/oss/HBaseObjectStoreSemantics.java b/hbase-oss/src/main/java/org/apache/hadoop/hbase/oss/HBaseObjectStoreSemantics.java
new file mode 100644
index 0000000..f72d7e3
--- /dev/null
+++ b/hbase-oss/src/main/java/org/apache/hadoop/hbase/oss/HBaseObjectStoreSemantics.java
@@ -0,0 +1,949 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.oss;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileChecksum;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FsServerDefaults;
+import org.apache.hadoop.fs.FsStatus;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Options.ChecksumOpt;
+import org.apache.hadoop.fs.ParentNotDirectoryException;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
+import org.apache.hadoop.fs.XAttrSetFlag;
+import org.apache.hadoop.fs.permission.AclEntry;
+import org.apache.hadoop.fs.permission.AclStatus;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.oss.sync.AutoLock;
+import org.apache.hadoop.hbase.oss.sync.AutoLock.LockedFSDataOutputStream;
+import org.apache.hadoop.hbase.oss.sync.AutoLock.LockedRemoteIterator;
+import org.apache.hadoop.hbase.oss.sync.TreeLockManager;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A FileSystem implementation that layers locking logic on top of another,
+ * underlying implementation. The appropriate lock on a path is acquired before
+ * passing on the call, and is either released upon returning or the resulting
+ * data stream / iterator takes ownership of the lock until it is closed. Newer
+ * features of FileSystem may either be unimplemented or will not be aware of
+ * the locks. Caveats with existing features include:
+ * <ul>
+ *   <li>
+ *     The deleteOnExit feature has no locking logic beyond the underlying
+ *     exists() and delete() calls they make. Shouldn't be a problem due to the
+ *     documented best-effort nature of the feature.
+ *   </li>
+ *   <li>
+ *     globStatus isn't even atomic on HDFS, as it does a tree-walk and may do
+ *     multiple independent requests to the NameNode. This is potentially worse
+ *     on object-stores where latency is higher, etc. Currently globStatus is
+ *     only used for the CoprocessorClassLoader and listing table directories.
+ *     These operations are not considered sensitive to atomicity. Globbing
+ *     could be made atomic by getting a write lock on the parent of the first
+ *     wildcard.
+ *   </li>
+ *   </li>
+ *   <li>
+ *     Symlinking is not supported, but not used by HBase at all and not
+ *     supported by mainstream object-stores considered in this design.
+ *   </li>
+ * </ul>
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
+@InterfaceStability.Unstable
+public class HBaseObjectStoreSemantics extends FileSystem {
+  private static final Logger LOG =
+        LoggerFactory.getLogger(HBaseObjectStoreSemantics.class);
+
+  private FileSystem fs;
+
+  private TreeLockManager sync;
+
+  public void initialize(URI name, Configuration conf) throws IOException {
+    setConf(conf);
+
+    String scheme = name.getScheme();
+    String schemeImpl = "fs." + scheme + ".impl";
+    String hbossSchemeImpl = "fs.hboss." + schemeImpl;
+    String wrappedImpl = conf.get(hbossSchemeImpl);
+    Configuration internalConf = new Configuration(conf);
+
+    if (wrappedImpl != null) {
+      LOG.info("HBOSS wrapping file-system {} using implementation {}", name,
+            wrappedImpl);
+      String disableCache = "fs." + scheme + ".impl.disable.cache";
+      internalConf.set(schemeImpl, wrappedImpl);
+      internalConf.set(disableCache, "true");
+    }
+
+    fs = FileSystem.get(name, internalConf);
+    sync = TreeLockManager.get(fs);
+  }
+
+  @VisibleForTesting
+  TreeLockManager getLockManager() {
+    return sync;
+  }
+
+  public String getScheme() {
+    return fs.getScheme();
+  }
+
+  public URI getUri() {
+    return fs.getUri();
+  }
+
+  @Override
+  public String getCanonicalServiceName() {
+    return fs.getCanonicalServiceName();
+  }
+
+  @Deprecated
+  public String getName() {
+    return fs.getName();
+  }
+
+  public Path makeQualified(Path path) {
+    return fs.makeQualified(path);
+  }
+
+  public BlockLocation[] getFileBlockLocations(FileStatus file,
+                                               long start, long len) throws IOException {
+    try (AutoLock l = sync.lock(file.getPath())) {
+      return fs.getFileBlockLocations(file, start, len);
+    }
+  }
+
+  public BlockLocation[] getFileBlockLocations(Path p,
+                                               long start, long len) throws IOException {
+    try (AutoLock l = sync.lock(p)) {
+      return fs.getFileBlockLocations(p, start, len);
+    }
+  }
+
+  @Deprecated
+  public FsServerDefaults getServerDefaults() throws IOException {
+    return fs.getServerDefaults();
+  }
+
+  public FsServerDefaults getServerDefaults(Path p) throws IOException {
+    return fs.getServerDefaults(p);
+  }
+
+  public Path resolvePath(final Path p) throws IOException {
+    return fs.resolvePath(p);
+  }
+
+  public FSDataInputStream open(Path f, int bufferSize)
+        throws IOException {
+    try (AutoLock l = sync.lock(f)) {
+      return fs.open(f, bufferSize);
+    }
+  }
+
+  public FSDataInputStream open(Path f) throws IOException {
+    try (AutoLock l = sync.lock(f)) {
+      return fs.open(f);
+    }
+  }
+
+  public FSDataOutputStream create(Path f) throws IOException {
+    AutoLock lock = sync.lockWrite(f);
+    try {
+      FSDataOutputStream stream = fs.create(f);
+      return new LockedFSDataOutputStream(stream, lock);
+    } catch (IOException e) {
+      lock.close();
+      throw(e);
+    }
+  }
+
+  public FSDataOutputStream create(Path f, boolean overwrite)
+        throws IOException {
+    AutoLock lock = sync.lockWrite(f);
+    try {
+      FSDataOutputStream stream = fs.create(f, overwrite);
+      return new LockedFSDataOutputStream(stream, lock);
+    } catch (IOException e) {
+      lock.close();
+      throw(e);
+    }
+  }
+
+  public FSDataOutputStream create(Path f, Progressable progress)
+        throws IOException {
+    AutoLock lock = sync.lockWrite(f);
+    try {
+      FSDataOutputStream stream = fs.create(f, progress);
+      return new LockedFSDataOutputStream(stream, lock);
+    } catch (IOException e) {
+      lock.close();
+      throw(e);
+    }
+  }
+
+  public FSDataOutputStream create(Path f, short replication)
+        throws IOException {
+    AutoLock lock = sync.lockWrite(f);
+    try {
+      FSDataOutputStream stream = fs.create(f, replication);
+      return new LockedFSDataOutputStream(stream, lock);
+    } catch (IOException e) {
+      lock.close();
+      throw(e);
+    }
+  }
+
+  public FSDataOutputStream create(Path f, short replication,
+                                   Progressable progress) throws IOException {
+    AutoLock lock = sync.lockWrite(f);
+    try {
+      FSDataOutputStream stream = fs.create(f, replication, progress);
+      return new LockedFSDataOutputStream(stream, lock);
+    } catch (IOException e) {
+      lock.close();
+      throw(e);
+    }
+  }
+
+  public FSDataOutputStream create(Path f,
+                                   boolean overwrite,
+                                   int bufferSize
+  ) throws IOException {
+    AutoLock lock = sync.lockWrite(f);
+    try {
+      FSDataOutputStream stream = fs.create(f, overwrite, bufferSize);
+      return new LockedFSDataOutputStream(stream, lock);
+    } catch (IOException e) {
+      lock.close();
+      throw(e);
+    }
+  }
+
+  public FSDataOutputStream create(Path f,
+                                   boolean overwrite,
+                                   int bufferSize,
+                                   Progressable progress
+  ) throws IOException {
+    AutoLock lock = sync.lockWrite(f);
+    try {
+      FSDataOutputStream stream = fs.create(f, overwrite, bufferSize, progress);
+      return new LockedFSDataOutputStream(stream, lock);
+    } catch (IOException e) {
+      lock.close();
+      throw(e);
+    }
+  }
+
+  public FSDataOutputStream create(Path f,
+                                   boolean overwrite,
+                                   int bufferSize,
+                                   short replication,
+                                   long blockSize) throws IOException {
+    AutoLock lock = sync.lockWrite(f);
+    try {
+      FSDataOutputStream stream = fs.create(f, overwrite, bufferSize,
+            replication, blockSize);
+      return new LockedFSDataOutputStream(stream, lock);
+    } catch (IOException e) {
+      lock.close();
+      throw(e);
+    }
+  }
+
+  public FSDataOutputStream create(Path f,
+                                   boolean overwrite,
+                                   int bufferSize,
+                                   short replication,
+                                   long blockSize,
+                                   Progressable progress
+  ) throws IOException {
+    AutoLock lock = sync.lockWrite(f);
+    try {
+      FSDataOutputStream stream = fs.create(f, overwrite, bufferSize,
+            replication, blockSize, progress);
+      return new LockedFSDataOutputStream(stream, lock);
+    } catch (IOException e) {
+      lock.close();
+      throw(e);
+    }
+  }
+
+  public FSDataOutputStream create(Path f,
+                                   FsPermission permission,
+                                   boolean overwrite,
+                                   int bufferSize,
+                                   short replication,
+                                   long blockSize,
+                                   Progressable progress) throws IOException {
+    AutoLock lock = sync.lockWrite(f);
+    try {
+      FSDataOutputStream stream = fs.create(f, permission, overwrite,
+            bufferSize, replication, blockSize, progress);
+      return new LockedFSDataOutputStream(stream, lock);
+    } catch (IOException e) {
+      lock.close();
+      throw(e);
+    }
+  }
+
+  public FSDataOutputStream create(Path f,
+                                   FsPermission permission,
+                                   EnumSet<CreateFlag> flags,
+                                   int bufferSize,
+                                   short replication,
+                                   long blockSize,
+                                   Progressable progress) throws IOException {
+    AutoLock lock = sync.lockWrite(f);
+    try {
+      FSDataOutputStream stream = fs.create(f, permission, flags, bufferSize,
+            replication, blockSize, progress);
+      return new LockedFSDataOutputStream(stream, lock);
+    } catch (IOException e) {
+      lock.close();
+      throw(e);
+    }
+  }
+
+  public FSDataOutputStream create(Path f,
+                                   FsPermission permission,
+                                   EnumSet<CreateFlag> flags,
+                                   int bufferSize,
+                                   short replication,
+                                   long blockSize,
+                                   Progressable progress,
+                                   ChecksumOpt checksumOpt) throws IOException {
+    AutoLock lock = sync.lockWrite(f);
+    try {
+      FSDataOutputStream stream = fs.create(f, permission, flags, bufferSize,
+            replication, blockSize, progress, checksumOpt);
+      return new LockedFSDataOutputStream(stream, lock);
+    } catch (IOException e) {
+      lock.close();
+      throw(e);
+    }
+  }
+
+  public FSDataOutputStream createNonRecursive(Path f,
+                                               boolean overwrite,
+                                               int bufferSize, short replication, long blockSize,
+                                               Progressable progress) throws IOException {
+    AutoLock lock = sync.lockWrite(f);
+    try {
+      FSDataOutputStream stream = fs.create(f, overwrite, bufferSize,
+            replication, blockSize, progress);
+      return new LockedFSDataOutputStream(stream, lock);
+    } catch (IOException e) {
+      lock.close();
+      throw(e);
+    }
+  }
+
+  public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
+                                               boolean overwrite, int bufferSize, short replication, long blockSize,
+                                               Progressable progress) throws IOException {
+    AutoLock lock = sync.lockWrite(f);
+    try {
+      FSDataOutputStream stream = fs.create(f, permission, overwrite,
+            bufferSize, replication, blockSize, progress);
+      return new LockedFSDataOutputStream(stream, lock);
+    } catch (IOException e) {
+      lock.close();
+      throw(e);
+    }
+  }
+
+  public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
+                                               EnumSet<CreateFlag> flags, int bufferSize, short replication, long blockSize,
+                                               Progressable progress) throws IOException {
+    AutoLock lock = sync.lockWrite(f);
+    try {
+      FSDataOutputStream stream = fs.create(f, permission, flags, bufferSize,
+            replication, blockSize, progress);
+      return new LockedFSDataOutputStream(stream, lock);
+    } catch (IOException e) {
+      lock.close();
+      throw(e);
+    }
+  }
+
+  public boolean createNewFile(Path f) throws IOException {
+    try (AutoLock l = sync.lockWrite(f)) {
+      return fs.createNewFile(f);
+    }
+  }
+
+  public FSDataOutputStream append(Path f) throws IOException {
+    AutoLock lock = sync.lockWrite(f);
+    try {
+      FSDataOutputStream stream = fs.append(f);
+      return new LockedFSDataOutputStream(stream, lock);
+    } catch (IOException e) {
+      lock.close();
+      throw(e);
+    }
+  }
+
+  public FSDataOutputStream append(Path f, int bufferSize) throws IOException {
+    AutoLock lock = sync.lockWrite(f);
+    try {
+      FSDataOutputStream stream = fs.append(f, bufferSize);
+      return new LockedFSDataOutputStream(stream, lock);
+    } catch (IOException e) {
+      lock.close();
+      throw(e);
+    }
+  }
+
+  public FSDataOutputStream append(Path f, int bufferSize,
+                                   Progressable progress) throws IOException {
+    AutoLock lock = sync.lockWrite(f);
+    try {
+      FSDataOutputStream stream = fs.append(f, bufferSize, progress);
+      return new LockedFSDataOutputStream(stream, lock);
+    } catch (IOException e) {
+      lock.close();
+      throw(e);
+    }
+  }
+
+  public void concat(final Path trg, final Path[] psrcs) throws IOException {
+    try (AutoLock l = sync.lock(trg, psrcs)) {
+      fs.concat(trg, psrcs);
+    }
+  }
+
+  @Deprecated
+  public short getReplication(Path src) throws IOException {
+    try (AutoLock l = sync.lock(src)) {
+      return fs.getReplication(src);
+    }
+  }
+
+  public boolean setReplication(Path src, short replication)
+        throws IOException {
+    try (AutoLock l = sync.lock(src)) {
+      return fs.setReplication(src, replication);
+    }
+  }
+
+  public boolean rename(Path src, Path dst) throws IOException {
+    try (AutoLock l = sync.lockRename(src, dst)) {
+      return fs.rename(src, dst);
+    }
+  }
+
+  public boolean truncate(Path f, long newLength) throws IOException {
+    try (AutoLock l = sync.lock(f)) {
+      return fs.truncate(f, newLength);
+    }
+  }
+
+  @Deprecated
+  public boolean delete(Path f) throws IOException {
+    try (AutoLock l = sync.lockDelete(f)) {
+      return fs.delete(f);
+    }
+  }
+
+  public boolean delete(Path f, boolean recursive) throws IOException {
+    try (AutoLock l = sync.lockDelete(f)) {
+      return fs.delete(f, recursive);
+    }
+  }
+
+  public boolean exists(Path f) throws IOException {
+    try (AutoLock l = sync.lock(f)) {
+      try {
+        return fs.exists(f);
+      } catch (Exception e) {
+        e.printStackTrace();
+        throw e;
+      }
+    }
+  }
+
+  @Deprecated
+  public boolean isDirectory(Path f) throws IOException {
+    try (AutoLock l = sync.lock(f)) {
+      return fs.isDirectory(f);
+    }
+  }
+
+  @Deprecated
+  public boolean isFile(Path f) throws IOException {
+    try (AutoLock l = sync.lock(f)) {
+      return fs.isFile(f);
+    }
+  }
+
+  @Deprecated
+  public long getLength(Path f) throws IOException {
+    try (AutoLock l = sync.lock(f)) {
+      return fs.getLength(f);
+    }
+  }
+
+  public ContentSummary getContentSummary(Path f) throws IOException {
+    try (AutoLock l = sync.lock(f)) {
+      return fs.getContentSummary(f);
+    }
+  }
+
+  public FileStatus[] listStatus(Path f) throws FileNotFoundException,
+        IOException {
+    try (AutoLock l = sync.lockListing(f)) {
+      return fs.listStatus(f);
+    }
+  }
+
+  public RemoteIterator<Path> listCorruptFileBlocks(Path path)
+        throws IOException {
+    try (AutoLock l = sync.lock(path)) {
+      return fs.listCorruptFileBlocks(path);
+    }
+  }
+
+  public FileStatus[] listStatus(Path f, PathFilter filter)
+        throws FileNotFoundException, IOException {
+    try (AutoLock l = sync.lockListing(f)) {
+      return fs.listStatus(f, filter);
+    }
+  }
+
+  public FileStatus[] listStatus(Path[] files)
+        throws FileNotFoundException, IOException {
+    try (AutoLock l = sync.lockListings(files)) {
+      return fs.listStatus(files);
+    }
+  }
+
+  public FileStatus[] listStatus(Path[] files, PathFilter filter)
+        throws FileNotFoundException, IOException {
+    try (AutoLock l = sync.lockListings(files)) {
+      return fs.listStatus(files, filter);
+    }
+  }
+
+  public FileStatus[] globStatus(Path pathPattern) throws IOException {
+    LOG.warn("Globbing is never atomic!");
+    return fs.globStatus(pathPattern);
+  }
+
+  public FileStatus[] globStatus(Path pathPattern, PathFilter filter)
+        throws IOException {
+    LOG.warn("Globbing is never atomic!");
+    return fs.globStatus(pathPattern, filter);
+  }
+
+  public RemoteIterator<LocatedFileStatus> listLocatedStatus(final Path f)
+        throws FileNotFoundException, IOException {
+    AutoLock lock = sync.lockListing(f);
+    try {
+      RemoteIterator<LocatedFileStatus> iterator = fs.listLocatedStatus(f);
+      return new LockedRemoteIterator<LocatedFileStatus>(iterator, lock);
+    } catch (Exception e) {
+      lock.close();
+      throw(e);
+    }
+  }
+
+  public RemoteIterator<FileStatus> listStatusIterator(final Path p)
+        throws FileNotFoundException, IOException {
+    AutoLock lock = sync.lockListing(p);
+    try {
+      RemoteIterator<FileStatus> iterator = fs.listStatusIterator(p);
+      return new LockedRemoteIterator<FileStatus>(iterator, lock);
+    } catch (Exception e) {
+      lock.close();
+      throw(e);
+    }
+  }
+
+  public RemoteIterator<LocatedFileStatus> listFiles(
+        final Path f, final boolean recursive)
+        throws FileNotFoundException, IOException {
+    AutoLock lock = sync.lockListing(f);
+    try {
+      RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(f, recursive);
+      return new LockedRemoteIterator<LocatedFileStatus>(iterator, lock);
+    } catch (Exception e) {
+      lock.close();
+      throw(e);
+    }
+  }
+
+  public Path getHomeDirectory() {
+    return fs.getHomeDirectory();
+  }
+
+  public void setWorkingDirectory(Path newDir) {
+    fs.setWorkingDirectory(newDir);
+  }
+
+  public Path getWorkingDirectory() {
+    return fs.getWorkingDirectory();
+  }
+
+  public boolean mkdirs(Path f) throws IOException {
+    // TODO this has implications for the parent dirs too
+    try (AutoLock l = sync.lock(f)) {
+      return fs.mkdirs(f);
+    }
+  }
+
+  public boolean mkdirs(Path f, FsPermission permission) throws IOException {
+    try (AutoLock l = sync.lock(f)) {
+      return fs.mkdirs(f, permission);
+    }
+  }
+
+  public void copyFromLocalFile(Path src, Path dst)
+        throws IOException {
+    try (AutoLock l = sync.lock(dst)) {
+      fs.copyFromLocalFile(src, dst);
+    }
+  }
+
+  public void moveFromLocalFile(Path[] srcs, Path dst)
+        throws IOException {
+    try (AutoLock l = sync.lock(dst)) {
+      fs.moveFromLocalFile(srcs, dst);
+    }
+  }
+
+  public void moveFromLocalFile(Path src, Path dst)
+        throws IOException {
+    try (AutoLock l = sync.lock(dst)) {
+      fs.moveFromLocalFile(src, dst);
+    }
+  }
+
+  public void copyFromLocalFile(boolean delSrc, Path src, Path dst)
+        throws IOException {
+    try (AutoLock l = sync.lock(dst)) {
+      fs.copyFromLocalFile(delSrc, src, dst);
+    }
+  }
+
+  public void copyFromLocalFile(boolean delSrc, boolean overwrite,
+                                Path[] srcs, Path dst)
+        throws IOException {
+    try (AutoLock l = sync.lock(dst)) {
+      fs.copyFromLocalFile(delSrc, overwrite, srcs, dst);
+    }
+  }
+
+  public void copyFromLocalFile(boolean delSrc, boolean overwrite,
+                                Path src, Path dst)
+        throws IOException {
+    try (AutoLock l = sync.lock(dst)) {
+      fs.copyFromLocalFile(delSrc, overwrite, src, dst);
+    }
+  }
+
+  public void copyToLocalFile(Path src, Path dst) throws IOException {
+    try (AutoLock l = sync.lock(src)) {
+      fs.copyToLocalFile(src, dst);
+    }
+  }
+
+  public void moveToLocalFile(Path src, Path dst) throws IOException {
+    try (AutoLock l = sync.lockDelete(src)) {
+      fs.moveToLocalFile(src, dst);
+    }
+  }
+
+  public void copyToLocalFile(boolean delSrc, Path src, Path dst)
+        throws IOException {
+    try (AutoLock l = sync.lock(src)) {
+      fs.copyToLocalFile(delSrc, src, dst);
+    }
+  }
+
+  public void copyToLocalFile(boolean delSrc, Path src, Path dst,
+                              boolean useRawLocalFileSystem) throws IOException {
+    try (AutoLock l = sync.lock(src)) {
+      fs.copyToLocalFile(delSrc, src, dst, useRawLocalFileSystem);
+    }
+  }
+
+  public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile)
+        throws IOException {
+    return fs.startLocalOutput(fsOutputFile, tmpLocalFile);
+  }
+
+  public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile)
+        throws IOException {
+    try (AutoLock l = sync.lockWrite(fsOutputFile)) {
+      fs.completeLocalOutput(fsOutputFile, tmpLocalFile);
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    // FS must close first so that FileSystem.processDeleteOnExit() can run
+    // while locking is still available.
+    fs.close();
+    sync.close();
+  }
+
+  public long getUsed() throws IOException {
+    return fs.getUsed();
+  }
+
+  @Deprecated
+  public long getBlockSize(Path f) throws IOException {
+    try (AutoLock l = sync.lock(f)) {
+      return fs.getBlockSize(f);
+    }
+  }
+
+  @Deprecated
+  public long getDefaultBlockSize() {
+    return fs.getDefaultBlockSize();
+  }
+
+  public long getDefaultBlockSize(Path f) {
+    return fs.getDefaultBlockSize(f);
+  }
+
+  @Deprecated
+  public short getDefaultReplication() {
+    return fs.getDefaultReplication();
+  }
+
+  public short getDefaultReplication(Path path) {
+    return fs.getDefaultReplication(path);
+  }
+
+  public FileStatus getFileStatus(Path f) throws IOException {
+    try (AutoLock l = sync.lock(f)) {
+      return fs.getFileStatus(f);
+    }
+  }
+
+  public void access(Path path, FsAction mode) throws AccessControlException,
+        FileNotFoundException, IOException {
+    try (AutoLock l = sync.lock(path)) {
+      fs.access(path, mode);
+    }
+  }
+
+  public void createSymlink(final Path target, final Path link,
+                            final boolean createParent) throws AccessControlException,
+        FileAlreadyExistsException, FileNotFoundException,
+        ParentNotDirectoryException, UnsupportedFileSystemException,
+        IOException {
+    throw new UnsupportedOperationException("HBOSS does not support symlinks");
+  }
+
+  public FileStatus getFileLinkStatus(final Path f)
+        throws AccessControlException, FileNotFoundException,
+        UnsupportedFileSystemException, IOException {
+    throw new UnsupportedOperationException("HBOSS does not support symlinks");
+  }
+
+  public boolean supportsSymlinks() {
+    return false;
+  }
+
+  public Path getLinkTarget(Path f) throws IOException {
+    throw new UnsupportedOperationException("HBOSS does not support symlinks");
+  }
+
+  public FileChecksum getFileChecksum(Path f) throws IOException {
+    try (AutoLock l = sync.lock(f)) {
+      return fs.getFileChecksum(f);
+    }
+  }
+
+  public FileChecksum getFileChecksum(Path f, final long length)
+        throws IOException {
+    try (AutoLock l = sync.lock(f)) {
+      return fs.getFileChecksum(f, length);
+    }
+  }
+
+  public void setVerifyChecksum(boolean verifyChecksum) {
+    fs.setVerifyChecksum(verifyChecksum);
+  }
+
+  public void setWriteChecksum(boolean writeChecksum) {
+    fs.setWriteChecksum(writeChecksum);
+  }
+
+  public FsStatus getStatus() throws IOException {
+    return fs.getStatus();
+  }
+
+  public FsStatus getStatus(Path p) throws IOException {
+    try (AutoLock l = sync.lock(p)) {
+      return fs.getStatus(p);
+    }
+  }
+
+  public void setPermission(Path p, FsPermission permission
+  ) throws IOException {
+    try (AutoLock l = sync.lock(p)) {
+      fs.setPermission(p, permission);
+    }
+  }
+
+  public void setOwner(Path p, String username, String groupname
+  ) throws IOException {
+    try (AutoLock l = sync.lock(p)) {
+      fs.setOwner(p, username, groupname);
+    }
+  }
+
+  public void setTimes(Path p, long mtime, long atime
+  ) throws IOException {
+    try (AutoLock l = sync.lock(p)) {
+      fs.setTimes(p, mtime, atime);
+    }
+  }
+
+  public Path createSnapshot(Path path, String snapshotName)
+        throws IOException {
+    try (AutoLock l = sync.lockListing(path)) {
+      return fs.createSnapshot(path, snapshotName);
+    }
+  }
+
+  public void renameSnapshot(Path path, String snapshotOldName,
+                             String snapshotNewName) throws IOException {
+    fs.renameSnapshot(path, snapshotOldName, snapshotNewName);
+  }
+
+  public void deleteSnapshot(Path path, String snapshotName)
+        throws IOException {
+    fs.deleteSnapshot(path, snapshotName);
+  }
+
+  public void modifyAclEntries(Path path, List<AclEntry> aclSpec)
+        throws IOException {
+    try (AutoLock l = sync.lock(path)) {
+      fs.modifyAclEntries(path, aclSpec);
+    }
+  }
+
+  public void removeAclEntries(Path path, List<AclEntry> aclSpec)
+        throws IOException {
+    try (AutoLock l = sync.lock(path)) {
+      fs.removeAclEntries(path, aclSpec);
+    }
+  }
+
+  public void removeDefaultAcl(Path path)
+        throws IOException {
+    try (AutoLock l = sync.lock(path)) {
+      fs.removeDefaultAcl(path);
+    }
+  }
+
+  public void removeAcl(Path path)
+        throws IOException {
+    try (AutoLock l = sync.lock(path)) {
+      fs.removeAcl(path);
+    }
+  }
+
+  public void setAcl(Path path, List<AclEntry> aclSpec) throws IOException {
+    try (AutoLock l = sync.lock(path)) {
+      fs.setAcl(path, aclSpec);
+    }
+  }
+
+  public AclStatus getAclStatus(Path path) throws IOException {
+    try (AutoLock l = sync.lock(path)) {
+      return fs.getAclStatus(path);
+    }
+  }
+
+  public void setXAttr(Path path, String name, byte[] value)
+        throws IOException {
+    try (AutoLock l = sync.lock(path)) {
+      fs.setXAttr(path, name, value);
+    }
+  }
+
+  public void setXAttr(Path path, String name, byte[] value,
+                       EnumSet<XAttrSetFlag> flag) throws IOException {
+    try (AutoLock l = sync.lock(path)) {
+      fs.setXAttr(path, name, value, flag);
+    }
+  }
+
+  public byte[] getXAttr(Path path, String name) throws IOException {
+    try (AutoLock l = sync.lock(path)) {
+      return fs.getXAttr(path, name);
+    }
+  }
+
+  public Map<String, byte[]> getXAttrs(Path path) throws IOException {
+    try (AutoLock l = sync.lock(path)) {
+      return fs.getXAttrs(path);
+    }
+  }
+
+  public Map<String, byte[]> getXAttrs(Path path, List<String> names)
+        throws IOException {
+    try (AutoLock l = sync.lock(path)) {
+      return fs.getXAttrs(path, names);
+    }
+  }
+
+  public List<String> listXAttrs(Path path) throws IOException {
+    try (AutoLock l = sync.lock(path)) {
+      return fs.listXAttrs(path);
+    }
+  }
+
+  public void removeXAttr(Path path, String name) throws IOException {
+    try (AutoLock l = sync.lock(path)) {
+      fs.removeXAttr(path, name);
+    }
+  }
+}
diff --git a/hbase-oss/src/main/java/org/apache/hadoop/hbase/oss/package-info.java b/hbase-oss/src/main/java/org/apache/hadoop/hbase/oss/package-info.java
new file mode 100644
index 0000000..bf6a61e
--- /dev/null
+++ b/hbase-oss/src/main/java/org/apache/hadoop/hbase/oss/package-info.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * An implementation of org.apache.hadoop.fs.FileSystem that wraps object store
+ * client implementations to provide additional semantics required by HBase.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.hbase.oss;
+
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
diff --git a/hbase-oss/src/main/java/org/apache/hadoop/hbase/oss/sync/AutoLock.java b/hbase-oss/src/main/java/org/apache/hadoop/hbase/oss/sync/AutoLock.java
new file mode 100644
index 0000000..3b57d20
--- /dev/null
+++ b/hbase-oss/src/main/java/org/apache/hadoop/hbase/oss/sync/AutoLock.java
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.oss.sync;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.NoSuchElementException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+
+/**
+ * Provides convenience data structures to help ensure that locks are closed
+ * when and only when the path is no longer in use. The basic AutoLock is simply
+ * an AutoCloseable that will release the lock after a try-with-resources block.
+ * LockedRemoteIterator will release the lock when the stream has been exhausted
+ * or in the event of any exception. LockedFSDataOutputStream will release the
+ * lock when the stream gets closed.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public interface AutoLock extends AutoCloseable {
+  public void close() throws IOException;
+
+  /**
+   * A wrapper for a RemoteIterator that releases a lock only when the
+   * underlying iterator has been exhausted.
+   */
+  public static class LockedRemoteIterator<E> implements RemoteIterator<E> {
+
+    public LockedRemoteIterator(RemoteIterator<E> iterator, AutoLock lock)
+          {
+      this.iterator = iterator;
+      this.lock = lock;
+    }
+
+    private RemoteIterator<E> iterator;
+    private AutoLock lock;
+    private AtomicBoolean closed = new AtomicBoolean(false);
+
+    public void close() throws IOException {
+      if (!closed.getAndSet(true)) {
+        lock.close();
+      }
+    }
+
+    private void checkClosed() throws IOException {
+      if (closed.get()) {
+        throw new IOException(
+              "LockedRemoteIterator was accessed after releasing lock");
+      }
+    }
+
+    @Override
+    public boolean hasNext() throws IOException {
+      checkClosed();
+      try {
+        if (iterator.hasNext()) {
+          return true;
+        }
+        close();
+        return false;
+      } catch (Throwable e) {
+        close();
+        throw e;
+      }
+    }
+
+    /**
+     * Delegates to the wrapped iterator, but will close the lock in the event
+     * of a NoSuchElementException. Some applications do not call hasNext() and
+     * simply depend on the NoSuchElementException.
+     */
+    @Override
+    public E next() throws IOException {
+      checkClosed();
+      try {
+        return iterator.next();
+      } catch (Throwable e) {
+        close();
+        throw e;
+      }
+    }
+  }
+
+  /**
+   * A wrapper for a FSDataOutputStream that releases a lock only when the
+   * underlying output stream is closed.
+   */
+  public class LockedFSDataOutputStream extends FSDataOutputStream {
+
+    public LockedFSDataOutputStream(FSDataOutputStream stream, AutoLock lock) throws IOException {
+      // super() throws IOException, but this constructor can't catch it.
+      // Instantiators must catch the exception and close the lock.
+      super(stream, null);
+      this.stream = stream;
+      this.lock = lock;
+    }
+
+    private final FSDataOutputStream stream;
+    private AutoLock lock;
+    private AtomicBoolean closed = new AtomicBoolean(false);
+
+    private void checkClosed() throws IOException {
+      if (closed.get()) {
+        throw new IOException(
+              "LockedFSDataOutputStream was accessed after releasing lock");
+      }
+    }
+
+    @Override
+    /**
+     * Returns the position in the wrapped stream. This should not be accessed
+     * after the stream has been closed. Unlike most other functions in this
+     * class, this is not enforced because this function shouldn't throw
+     * IOExceptions.
+     */
+    public long getPos() {
+      return stream.getPos();
+    }
+
+    @Override
+    public void close() throws IOException {
+      // Contract tests attempt to close the stream twice
+      if (!closed.getAndSet(true)) {
+        try {
+          stream.close();
+        } finally {
+          lock.close();
+        }
+      }
+    }
+
+    @Override
+    public String toString() {
+      return "LockedFSDataOutputStream:" + stream.toString();
+    }
+
+    @Override
+    /**
+     * Returns the wrapped stream. This should not be accessed after the stream
+     * has been closed. Unlike most other functions in this class, this is not
+     * enforced because this function shouldn't throw IOExceptions.
+     */
+    public OutputStream getWrappedStream() {
+      return stream.getWrappedStream();
+    }
+
+    @Override
+    public void hflush() throws IOException {
+      checkClosed();
+      stream.hflush();
+    }
+
+    @Override
+    public void hsync() throws IOException {
+      checkClosed();
+      stream.hsync();
+    }
+
+    @Override
+    public void setDropBehind(Boolean dropBehind) throws IOException {
+      checkClosed();
+      stream.setDropBehind(dropBehind);
+    }
+  }
+}
diff --git a/hbase-oss/src/main/java/org/apache/hadoop/hbase/oss/sync/TreeLockManager.java b/hbase-oss/src/main/java/org/apache/hadoop/hbase/oss/sync/TreeLockManager.java
new file mode 100644
index 0000000..feb5f16
--- /dev/null
+++ b/hbase-oss/src/main/java/org/apache/hadoop/hbase/oss/sync/TreeLockManager.java
@@ -0,0 +1,500 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.oss.sync;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
+import org.apache.hadoop.io.retry.RetryPolicy.RetryAction.RetryDecision;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.oss.Constants;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Basic logic for synchronizing FileSystem operations. Needs to be extended
+ * with an implementation of read / write locks and the methods to check the
+ * status of locks above and below FileSystem Paths.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public abstract class TreeLockManager {
+  private static final Logger LOG =
+        LoggerFactory.getLogger(TreeLockManager.class);
+
+  public static synchronized TreeLockManager get(FileSystem fs)
+        throws IOException {
+    Configuration conf = fs.getConf();
+    Class<? extends TreeLockManager> impl = conf.getClass(
+        Constants.SYNC_IMPL, TreeLockManager.class, TreeLockManager.class);
+    TreeLockManager instance = null;
+    Exception cause = null;
+    try {
+      instance = impl.newInstance();
+    } catch (Exception e) {
+      cause = e;
+    }
+    if (instance == null) {
+      throw new IOException("Class referred to by "
+            + Constants.SYNC_IMPL + ", " + impl.getName()
+            + ", is not a valid implementation of "
+            + TreeLockManager.class.getName(), cause);
+    }
+    instance.initialize(fs);
+    return instance;
+  }
+
+  protected FileSystem fs;
+
+  private static final String SLASH = "/";
+
+  /**
+   * Returns a normalized logical path that uniquely identifies an object-store
+   * "filesystem" and a path inside it. Assumes a 1:1 mapping between hostnames
+   * and filesystems, and assumes the URI scheme mapping is consistent
+   * everywhere.
+   */
+  private Path norm(Path path) {
+    URI uri = fs.makeQualified(path).toUri();
+    String uriScheme = uri.getScheme();
+    String uriHost = uri.getHost();
+    String uriPath = uri.getPath().substring(1);
+    if (uriPath.length() == 0) {
+      uriPath = SLASH;
+    }
+    // To combine fsRoot and uriPath, fsRoot must start with /, and uriPath
+    // must not.
+    Path fsRoot = new Path(SLASH + uriScheme, uriHost);
+    return new Path(fsRoot, uriPath);
+  }
+
+  /**
+   * Convenience function for calling norm on an array. Returned copy of the
+   * array will also be sorted for deadlock avoidance.
+   */
+  private Path[] norm(Path[] paths) {
+    Path[] newPaths = new Path[paths.length];
+    for (int i = 0; i < paths.length; i++) {
+      newPaths[i] = norm(paths[i]);
+    }
+    Arrays.sort(newPaths);
+    return newPaths;
+  }
+
+  /**
+   * Convenience function for calling norm on an array with one extra arg.
+   * Returned copy of the combined array will also be sorted for deadlock
+   * avoidance.
+   */
+  private Path[] norm(Path[] paths, Path path) {
+    Path[] newPaths = new Path[paths.length + 1];
+    int i;
+    for (i = 0; i < paths.length; i++) {
+      newPaths[i] = norm(paths[i]);
+    }
+    newPaths[i] = path;
+    Arrays.sort(newPaths);
+    return newPaths;
+  }
+
+  /**
+   * In addition to any implementation-specific setup, implementations must set
+   * this.fs = fs in order for path normalization to work.
+   */
+  public abstract void initialize(FileSystem fs) throws IOException;
+
+  /**
+   * Performs any shutdown necessary when a client is exiting. Should be
+   * considered best-effort and for planned shut downs.
+   */
+  public void close() throws IOException {
+  }
+
+  /**
+   * Acquires a single exclusive (write) lock.
+   *
+   * @param p Path to lock
+   */
+  protected abstract void writeLock(Path p) throws IOException;
+
+  /**
+   * Releases a single exclusive (write) lock.
+   *
+   * @param p Path to unlock
+   */
+  protected abstract void writeUnlock(Path p) throws IOException;
+
+  /**
+   * Acquires a single non-exclusive (read) lock.
+   *
+   * @param p Path to lock
+   */
+  protected abstract void readLock(Path p) throws IOException;
+
+  /**
+   * Releases a single non-exclusive (read) lock.
+   *
+   * @param p Path to unlock
+   */
+  protected abstract void readUnlock(Path p) throws IOException;
+
+  /**
+   * Checks for the presence of a write lock on all parent directories of the
+   * path.
+   *
+   * @param p Path to check
+   * @return True if a lock is found, false otherwise
+   */
+  protected abstract boolean writeLockAbove(Path p) throws IOException;
+
+  /**
+   * Checks for the presence of a write lock on all children of the path.
+   *
+   * @param p Path to check
+   * @return True if a lock is found, false otherwise
+   */
+  protected abstract boolean writeLockBelow(Path p) throws IOException;
+
+  /**
+   * Checks for the presence of a write lock on all child directories of the
+   * path.
+   *
+   * @param p Path to check
+   * @return True if a lock is found, false otherwise
+   */
+  protected abstract boolean readLockBelow(Path p) throws IOException;
+
+  /**
+   * Recursively cleans up locks that won't be used again.
+   *
+   * @param p Parent path of all locks to delete
+   */
+  protected abstract void recursiveDelete(Path p) throws IOException;
+
+  private RetryPolicy retryPolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep(600000, 1, TimeUnit.MILLISECONDS);
+
+  private boolean retryBackoff(int retries) throws IOException {
+    RetryAction action;
+    try {
+      action = retryPolicy.shouldRetry(null, retries, 0, true);
+    } catch (Exception e) {
+      throw new IOException("Unexpected exception during locking", e);
+    }
+    if (action.action == RetryDecision.FAIL) {
+      throw new IOException("Exceeded " + retries + " retries for locking");
+    }
+    LOG.trace("Sleeping {}ms before next retry", action.delayMillis);
+    try {
+      Thread.sleep(action.delayMillis);
+    } catch (InterruptedException e) {
+      throw new IOException("Interrupted during locking", e);
+    }
+    return true;
+  }
+
+  /**
+   * Acquires a write (exclusive) lock on a path. Between this lock being
+   * acquired and being released, we should hold a write lock on this path, no
+   * write locks should be held by anyone on any parent directory, and no read
+   * or write locks should be held by anyone on any child directory.
+   *
+   * @param path Path to lock
+   */
+  protected void treeWriteLock(Path p) throws IOException {
+    int outerRetries = 0;
+    do {
+      int innerRetries = 0;
+      do {
+        // If there's already a write-lock above or below us in the tree, wait for it to leave
+        if (writeLockAbove(p) || writeLockBelow(p)) {
+          LOG.warn("Blocked on some parent write lock, waiting: {}", p);
+          continue;
+        }
+        break;
+      } while (retryBackoff(innerRetries++));
+      // Try obtain the write lock just for our node
+      writeLock(p);
+      // If there's now a write-lock above or below us in the tree, release and retry
+      if (writeLockAbove(p) || writeLockBelow(p)) {
+        LOG.warn("Blocked on some other write lock, retrying: {}", p);
+        writeUnlock(p);
+        continue;
+      }
+      break;
+    } while (retryBackoff(outerRetries++));
+
+    // Once we know we're the only write-lock in our path, drain all read-locks below
+    int drainReadLocksRetries = 0;
+    do {
+      if (readLockBelow(p)) {
+        LOG.warn("Blocked on some child read lock, writing: {}", p);
+        continue;
+      }
+      break;
+    } while (retryBackoff(drainReadLocksRetries++));
+  }
+
+  /**
+   * Acquires a read (non-exclusive) lock on a path. Between this lock being
+   * acquired and being released, we should hold a read lock on this path, and
+   * no write locks should be held by anyone on any parent directory.
+   *
+   * @param path Path to lock
+   */
+  protected void treeReadLock(Path p) throws IOException {
+    int outerRetries = 0;
+    do {
+      int innerRetries = 0;
+      do {
+        // If there's a write lock above us, wait
+        if (writeLockAbove(p)) {
+          LOG.warn("Blocked waiting for some parent write lock, waiting: {}",
+                p);
+          continue;
+        }
+        break;
+      } while (retryBackoff(innerRetries++));
+      // Try obtain the read-lock just for our node
+      readLock(p);
+      // If there's a write lock above us, release the lock and try again
+      if (writeLockAbove(p)) {
+        LOG.warn("Blocked waiting for some parent write lock, retrying: {}",
+              p);
+        readUnlock(p);
+        continue;
+      }
+      break;
+    } while (retryBackoff(outerRetries++));
+  }
+
+  /**
+   * Acquires an exclusive lock on a single path to create or append to a file.
+   * This is required for createNonRecursive() as well as other operations to be
+   * atomic because the underlying file may not be created until all data has
+   * been written.
+   *
+   * @param path Path of the create operation
+   * @return AutoLock to release this path
+   */
+  public AutoLock lockWrite(Path rawPath) throws IOException {
+    Path path = norm(rawPath);
+    LOG.debug("About to lock for create / write: {}", rawPath);
+    treeWriteLock(path);
+    return new AutoLock() {
+      public void close() throws IOException {
+        LOG.debug("About to unlock after create / write: {}", path);
+        writeUnlock(path);
+      }
+    };
+  }
+
+  /**
+   * Acquires an exclusive lock on a single path and then cleans up the lock
+   * and those of all children. The lock ensures this doesn't interfere with any
+   * renames or other listing operations above this path.
+   *
+   * @param path Path of the create operation
+   * @return AutoLock to release this path
+   */
+  public AutoLock lockDelete(Path rawPath) throws IOException {
+    Path path = norm(rawPath);
+    LOG.debug("About to lock for delete: {}", path);
+    treeWriteLock(path);
+    return new AutoLock() {
+      public void close() throws IOException {
+        LOG.debug("About to recursively delete locks: {}", path);
+        recursiveDelete(path);
+        writeUnlock(path);
+      }
+    };
+  }
+
+  /**
+   * Acquires a lock on a single path to run a listing operation. We need to
+   * ensure that the listing is not generated mid-rename. This will lock all
+   * children of the root path as well, which is only necessary for recursive
+   * listings. Other listings should only need a read lock on the root and all
+   * children, but that is not implemented.
+   *
+   * @param path Root of the listing operation
+   * @return AutoCloseable to release this path
+   */
+  public AutoLock lockListing(Path rawPath) throws IOException {
+    Path path = norm(rawPath);
+    LOG.debug("About to lock for listing: {}", path);
+    treeWriteLock(path);
+    return new AutoLock() {
+      public void close() throws IOException {
+        LOG.debug("About to unlock after listing: {}", path);
+        writeUnlock(path);
+      }
+    };
+  }
+
+  /**
+   * Same considerations of lockListing, but locks an array of paths in order
+   * and returns an AutoLock that encapsulates all of them.
+   *
+   * @param paths
+   * @return AutoCloseable that encapsulate all paths
+   */
+  public AutoLock lockListings(Path[] rawPaths) throws IOException {
+    Path[] paths = norm(rawPaths);
+    for (int i = 0; i < paths.length; i++) {
+      LOG.debug("About to lock for listings: {}", paths[i]);
+      treeWriteLock(paths[i]);
+    }
+    return new AutoLock() {
+      public void close() throws IOException {
+        Throwable lastThrown = null;
+        for (int i = 0; i < paths.length; i++) {
+          LOG.debug("About to unlock after listings: {}", paths[i]);
+          try {
+            writeUnlock(paths[i]);
+          } catch (Throwable e) {
+            lastThrown = e;
+            LOG.warn("Caught throwable while unlocking: {}", e.getMessage());
+            e.printStackTrace();
+          }
+        }
+        if (lastThrown != null) {
+          throw new IOException("At least one throwable caught while unlocking",
+                lastThrown);
+        }
+      }
+    };
+  }
+
+  /**
+   * Acquires an exclusive (write) lock on 2 paths, for a rename. Any given pair
+   * of paths will always be locked in the same order, regardless of their order
+   * in the method call. This is to avoid deadlocks. In the future this method
+   * may also record the start of the rename in something like a write-ahead log
+   * to recover in-progress renames in the event of a failure.
+   *
+   * @param src Source of the rename
+   * @param dst Destination of the rename
+   * @return AutoCloseable to release both paths
+   * @throws IOException
+   */
+  public AutoLock lockRename(Path rawSrc, Path rawDst) throws IOException {
+    Path src = norm(rawSrc);
+    Path dst = norm(rawDst);
+    LOG.debug("About to lock for rename: from {} to {}", src, dst);
+    if (src.compareTo(dst) < 0) {
+      treeWriteLock(src);
+      treeWriteLock(dst);
+    } else {
+      treeWriteLock(dst);
+      treeWriteLock(src);
+    }
+    return new AutoLock() {
+      public void close() throws IOException {
+        LOG.debug("About to unlock after rename: from {} to {}", src, dst);
+        try {
+          writeUnlock(src);
+        } finally {
+          writeUnlock(dst);
+        }
+      }
+    };
+  }
+
+  /**
+   * Returns a non-exclusive lock on a single path. This is for generic cases
+   * that read or modify the file-system but that don't necessarily need
+   * exclusive access if no other concurrent operations do.
+   *
+   * @param path Path to lock
+   * @return AutoCloseable that will release the path
+   * @throws IOException
+   */
+  public AutoLock lock(Path rawPath) throws IOException {
+    Path path = norm(rawPath);
+    LOG.debug("About to lock: {}", path);
+    treeReadLock(path);
+    return new AutoLock() {
+      public void close() throws IOException {
+        LOG.debug("About to unlock: {}", path);
+        readUnlock(path);
+      }
+    };
+  }
+
+  /**
+   * Returns a non-exclusive lock on an array of paths. This is for generic
+   * cases that read or modify the file-system but that don't necessarily need
+   * exclusive access if no other concurrent operations do.
+   *
+   * @param paths Path to lock
+   * @return AutoCloseable that will release all the paths
+   * @throws IOException
+   */
+  public AutoLock lock(Path[] rawPaths) throws IOException {
+    return innerLock(norm(rawPaths));
+  }
+
+  /**
+   * Returns a non-exclusive lock on an array of paths and a separate path. No
+   * distinction is made between them in locking: this method is only for
+   * convenience in the FileSystem implementation where there is a distinction.
+   *
+   * @param extraPath Extra path to lock
+   * @param paths Paths to lock
+   * @return AutoCloseable that will release all the paths
+   * @throws IOException
+   */
+  public AutoLock lock(Path extraPath, Path[] rawPaths) throws IOException {
+    return innerLock(norm(rawPaths, extraPath));
+  }
+
+  private AutoLock innerLock(Path[] paths) throws IOException {
+    for (int i = 0; i < paths.length; i++) {
+      LOG.debug("About to lock: {}", paths[i]);
+      treeReadLock(paths[i]);
+    }
+    return new AutoLock() {
+      public void close() throws IOException {
+        Throwable lastThrown = null;
+        for (int i = 0; i < paths.length; i++) {
+          LOG.debug("About to unlock: {}", paths[i]);
+          try {
+            readUnlock(paths[i]);
+          } catch (Throwable e) {
+            lastThrown = e;
+            LOG.warn("Caught throwable while unlocking: {}", e.getMessage());
+            e.printStackTrace();
+          }
+        }
+        if (lastThrown != null) {
+          throw new IOException("At least one throwable caught while unlocking",
+                lastThrown);
+        }
+      }
+    };
+  }
+}
diff --git a/hbase-oss/src/main/java/org/apache/hadoop/hbase/oss/sync/ZKTreeLockManager.java b/hbase-oss/src/main/java/org/apache/hadoop/hbase/oss/sync/ZKTreeLockManager.java
new file mode 100644
index 0000000..b1dbcb5
--- /dev/null
+++ b/hbase-oss/src/main/java/org/apache/hadoop/hbase/oss/sync/ZKTreeLockManager.java
@@ -0,0 +1,333 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.oss.sync;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.recipes.locks.InterProcessMutex;
+import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.curator.utils.ZKPaths;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.oss.Constants;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation based on Apache Curator and Apache ZooKeeper. This allows
+ * HBOSS to re-use an Apache HBase cluster's ZooKeeper ensemble for file
+ * system locking.
+ *
+ * Can be enabled in JUnit tests with -Pzk. If {@link Constants.ZK_CONN_STRING}
+ * isn't specified, an embedded ZooKeeper process will be spun up for tests.
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
+@InterfaceStability.Unstable
+public class ZKTreeLockManager extends TreeLockManager {
+  private static final Logger LOG =
+        LoggerFactory.getLogger(ZKTreeLockManager.class);
+
+  private CuratorFramework curator;
+
+  private String root;
+
+  private void setRoot() {
+    root = "/hboss";
+  }
+
+  private static final String lockSubZnode = ".hboss-lock-znode";
+
+  private Map<Path,InterProcessReadWriteLock> lockCache = new HashMap<>();
+
+  public void initialize(FileSystem fs) throws IOException {
+    this.fs = fs;
+    Configuration conf = fs.getConf();
+    int baseSleepTimeMs = conf.getInt(Constants.ZK_BASE_SLEEP_MS, 1000);
+    int maxRetries = conf.getInt(Constants.ZK_MAX_RETRIES, 3);
+    RetryPolicy retryPolicy = new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries);
+
+    // Create a temporary connection to ensure the root is created, then create
+    // a new connection 'jailed' inside that root to eliminate the need for
+    // paths to constantly be resolved inside the root.
+
+    String zookeeperConnectionString = conf.get(Constants.ZK_CONN_STRING);
+    curator = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy);
+    curator.start();
+
+    setRoot();
+    try {
+      ZKPaths.mkdirs(curator.getZookeeperClient().getZooKeeper(), root, true);
+    } catch (Exception e) {
+      throw new IOException("Unable to initialize root znodes", e);
+    }
+    curator.close();
+
+    zookeeperConnectionString += root;
+    curator = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy);
+    curator.start();
+  }
+
+  @Override
+  public void close() throws IOException {
+    curator.close();
+  }
+
+  @Override
+  protected void writeLock(Path p) throws IOException {
+    try {
+      LOG.debug("writeLock {} acquire", p);
+      get(p).writeLock().acquire();
+    } catch (Exception e) {
+      throw new IOException("Exception during write locking of path " + p, e);
+    }
+  }
+
+  @Override
+  protected void writeUnlock(Path p) throws IOException {
+    try {
+      LOG.debug("writeLock {} release", p);
+      get(p).writeLock().release();
+    } catch(IllegalMonitorStateException e) {
+      // Reentrant locks might be acquired multiple times
+      LOG.error("Tried to release unacquired write lock: {}", p);
+      throw e;
+    } catch (Exception e) {
+      throw new IOException("Exception during write unlocking of path " + p, e);
+    }
+  }
+
+  @Override
+  protected void readLock(Path p) throws IOException {
+    LOG.debug("readLock {} acquire", p);
+    try {
+      get(p).readLock().acquire();
+    } catch (Exception e) {
+      throw new IOException("Exception during read locking of path " + p, e);
+    }
+  }
+
+  @Override
+  protected void readUnlock(Path p) throws IOException {
+    LOG.debug("readLock {} release", p);
+    try {
+      get(p).readLock().release();
+    } catch(IllegalMonitorStateException e) {
+      // Reentrant locks might be acquired multiple times
+      LOG.error("Tried to release unacquired write lock: {}", p);
+      throw e;
+    } catch (Exception e) {
+      throw new IOException("Exception during read unlocking of path " + p, e);
+    }
+  }
+
+  @Override
+  protected boolean writeLockAbove(Path p) throws IOException {
+    LOG.debug("Checking for write lock above {}", p);
+    while (!p.isRoot()) {
+      p = p.getParent();
+      if (isLocked(get(p).writeLock())) {
+        LOG.warn("Parent write lock currently held: {}", p);
+        return true;
+      }
+    }
+    return false;
+  }
+
+  @Override
+  protected boolean writeLockBelow(Path p) throws IOException {
+    boolean b = writeLockBelow(p, true);
+    return b;
+  }
+
+  @Override
+  protected boolean readLockBelow(Path p) throws IOException {
+    boolean b = readLockBelow(p, true);
+    return b;
+  }
+
+  @Override
+  protected void recursiveDelete(Path p) throws IOException {
+    try {
+      ZKPaths.deleteChildren(curator.getZookeeperClient().getZooKeeper(),
+            p.toString(), !p.isRoot());
+      // Before this method is called, we have a guarantee that 
+      //   1. There are no write locks above or below us
+      //   2. There are no read locks below us
+      // As such, we can just remove locks beneath us as we find them.
+      removeInMemoryLocks(p);
+    } catch (KeeperException.NoNodeException e) {
+      LOG.warn("Lock not found during recursive delete: {}", p);
+    } catch (Exception e) {
+      throw new IOException("Exception while deleting lock " + p, e);
+    }
+  }
+
+  private synchronized void removeInMemoryLocks(Path p) {
+    Iterator<Entry<Path,InterProcessReadWriteLock>> iter = lockCache.entrySet().iterator();
+    while (iter.hasNext()) {
+      Entry<Path,InterProcessReadWriteLock> entry = iter.next();
+      if (isBeneath(p, entry.getKey())) {
+        LOG.trace("Removing lock for {}", entry.getKey());
+        iter.remove();
+      }
+    }
+  }
+
+  private boolean isBeneath(Path parent, Path other) {
+    if (parent.equals(other)) {
+      return false;
+    }
+    // Is `other` fully contained in some path beneath the parent.
+    return 0 == other.toString().indexOf(parent.toString());
+  }
+
+  private boolean writeLockBelow(Path p, boolean firstLevel) throws IOException {
+    try {
+      if (!firstLevel && isLocked(get(p).writeLock())) {
+        return true;
+      }
+      List<String> children = curator.getChildren().forPath(p.toString());
+      for (String child : children) {
+        if (child.equals(lockSubZnode)) {
+          continue;
+        }
+        if (writeLockBelow(new Path(p, child), false)) {
+          LOG.warn("Parent write lock currently held: {}", p);
+          return true;
+        }
+      }
+    } catch (KeeperException.NoNodeException e) {
+      // Ignore, means we hit the bottom of the tree
+    } catch (Exception e) {
+      throw new IOException("Error checking parents for write lock: " + p, e);
+    }
+    return false;
+  }
+
+  private boolean readLockBelow(Path p, boolean firstLevel) throws IOException {
+    try {
+      if (!firstLevel && isLocked(get(p).readLock())) {
+        return true;
+      }
+      List<String> children = curator.getChildren().forPath(p.toString());
+      for (String child : children) {
+        if (child.equals(lockSubZnode)) {
+          continue;
+        }
+        if (readLockBelow(new Path(p, child), false)) {
+          LOG.warn("Child read lock currently held: {}", p);
+          return true;
+        }
+      }
+    } catch (KeeperException.NoNodeException e) {
+      // Ignore, means we hit the bottom of the tree
+    } catch (Exception e) {
+      throw new IOException("Error checking children for read lock: " + p, e);
+    }
+    return false;
+  }
+
+  /**
+   * Specifically, if this is lock by another thread.
+   */
+  private boolean isLocked(InterProcessMutex lock) throws IOException {
+    try {
+      if (lock.isOwnedByCurrentThread()) {
+        // First check the current thread, because we allow you to get locks
+        // when parent or child paths are only locked by you.
+        return false;
+      }
+      if (lock.isAcquiredInThisProcess()) {
+        // We know it's not this thread, but this is less expensive
+        // than checking other processes.
+        return true;
+      }
+      // Finally, see if another process holds the lock. This is a terrible way
+      // to check but Curator doesn't expose another way.
+      if (lock.acquire(0, TimeUnit.NANOSECONDS)) {
+        lock.release();
+        return false;
+      }
+    } catch (Exception e) {
+      throw new IOException("Exception while testing a lock", e);
+    }
+    return true;
+  }
+
+  public String summarizeLocks() {
+    StringBuilder sb = new StringBuilder();
+    Map<Path,InterProcessReadWriteLock> cache = getUnmodifiableCache();
+    for (Entry<Path,InterProcessReadWriteLock> entry : cache.entrySet()) {
+      sb.append(entry.getKey()).append("=").append(describeLock(entry.getValue()));
+    }
+    return sb.toString();
+  }
+
+  String describeLock(InterProcessReadWriteLock lock) {
+    if (lock == null) {
+      return "null";
+    }
+    InterProcessMutex rlock = lock.readLock();
+    InterProcessMutex wlock = lock.writeLock();
+    StringBuilder sb = new StringBuilder();
+    sb.append("ReadLock[heldByThisThread=").append(rlock.isOwnedByCurrentThread());
+    sb.append(", heldInThisProcess=").append(rlock.isAcquiredInThisProcess()).append("]");
+    sb.append(" WriteLock[heldByThisThread=").append(wlock.isOwnedByCurrentThread());
+    sb.append(", heldInThisProcess=").append(wlock.isAcquiredInThisProcess()).append("]");
+    return sb.toString();
+  }
+
+  public synchronized Map<Path,InterProcessReadWriteLock> getUnmodifiableCache() {
+    return Collections.unmodifiableMap(lockCache);
+  }
+
+  private synchronized InterProcessReadWriteLock get(Path path) throws IOException {
+    if (!lockCache.containsKey(path)) {
+      String zkPath = new Path(path, lockSubZnode).toString();
+      try {
+        ZKPaths.mkdirs(curator.getZookeeperClient().getZooKeeper(), zkPath, true);
+      } catch (KeeperException.NodeExistsException e) {
+        // Ignore
+      } catch (Exception e) {
+        throw new IOException("Exception while ensuring lock parents exist: " +
+              path, e);
+      }
+      lockCache.put(path, new InterProcessReadWriteLock(curator, zkPath));
+    }
+    return lockCache.get(path);
+  }
+}
+
diff --git a/hbase-oss/src/main/java/org/apache/hadoop/hbase/oss/sync/package-info.java b/hbase-oss/src/main/java/org/apache/hadoop/hbase/oss/sync/package-info.java
new file mode 100644
index 0000000..de1a27e
--- /dev/null
+++ b/hbase-oss/src/main/java/org/apache/hadoop/hbase/oss/sync/package-info.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Synchronization utilities for use in HBaseObjectStoreSemantics.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.hbase.oss.sync;
+
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
diff --git a/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/EmbeddedS3.java b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/EmbeddedS3.java
new file mode 100644
index 0000000..03b8470
--- /dev/null
+++ b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/EmbeddedS3.java
@@ -0,0 +1,339 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.oss;
+
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.SdkClientException;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.services.s3.AbstractAmazonS3;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.model.Bucket;
+import com.amazonaws.services.s3.model.CopyObjectRequest;
+import com.amazonaws.services.s3.model.CopyObjectResult;
+import com.amazonaws.services.s3.model.DeleteObjectsRequest;
+import com.amazonaws.services.s3.model.DeleteObjectsResult;
+import com.amazonaws.services.s3.model.GetObjectMetadataRequest;
+import com.amazonaws.services.s3.model.GetObjectRequest;
+import com.amazonaws.services.s3.model.ListObjectsRequest;
+import com.amazonaws.services.s3.model.ListObjectsV2Request;
+import com.amazonaws.services.s3.model.ListObjectsV2Result;
+import com.amazonaws.services.s3.model.ObjectListing;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.PutObjectRequest;
+import com.amazonaws.services.s3.model.PutObjectResult;
+import com.amazonaws.services.s3.model.S3Object;
+import com.amazonaws.services.s3.model.S3ObjectInputStream;
+import com.amazonaws.services.s3.model.S3ObjectSummary;
+import java.io.File;
+import java.io.InputStream;
+import java.io.IOException;
+import java.net.URI;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.s3a.S3ClientFactory;
+import org.apache.hadoop.fs.s3a.s3guard.LocalMetadataStore;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.hadoop.hbase.oss.Constants.*;
+import static org.apache.hadoop.fs.s3a.Constants.*;
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class EmbeddedS3 {
+
+  public static boolean usingEmbeddedS3 = false;
+
+  private static final String BUCKET = "embedded";
+
+  public static void conditionalStart(Configuration conf) throws Exception {
+    if (StringUtils.isEmpty(conf.get(S3_METADATA_STORE_IMPL))) {
+      conf.set(S3_METADATA_STORE_IMPL, LocalMetadataStore.class.getName());
+    }
+
+    boolean notConfigured = StringUtils.isEmpty(conf.get(DATA_URI));
+    if (notConfigured) {
+      usingEmbeddedS3 = true;
+      conf.set(S3_CLIENT_FACTORY_IMPL,
+            EmbeddedS3ClientFactory.class.getName());
+      conf.set(DATA_URI, "s3a://" + BUCKET);
+    } else {
+      usingEmbeddedS3 = false;
+    }
+  }
+
+  /**
+   * Replaces the default S3ClientFactory to inject an EmbeddedAmazonS3
+   * instance. This is currently a private API in Hadoop, but is the same method
+   * used by S3Guard's inconsistency-injection tests. The method signature
+   * defined in the interface varies depending on the Hadoop version.
+   */
+  public static class EmbeddedS3ClientFactory implements S3ClientFactory {
+    public AmazonS3 createS3Client(URI name) {
+      AmazonS3 s3 = new EmbeddedAmazonS3();
+      s3.createBucket(BUCKET);
+      return s3;
+    }
+
+    public AmazonS3 createS3Client(URI name,
+        String bucket,
+        AWSCredentialsProvider credentialSet,
+        String userAgentSuffix) {
+      AmazonS3 s3 = new EmbeddedAmazonS3();
+      s3.createBucket(bucket);
+      return s3;
+    }
+
+    public AmazonS3 createS3Client(URI name,
+        String bucket,
+        AWSCredentialsProvider credentialSet) {
+      return createS3Client(name);
+    }
+  }
+
+  /**
+   * Emulates an S3-connected client. This is the bare minimum implementation
+   * required for s3a to pass the contract tests while continuing to reproduce
+   * such quirks as delayed file creation and non-atomic / slow renames.
+   * Specifically, the following features are not supported:
+   * <ul>
+   *   <li>Multiple buckets</li>
+   *   <li>Requester-pays</li>
+   *   <li>Encryption</li>
+   *   <li>Object versioning</li>
+   *   <li>Multi-part</li>
+   *   <li>ACLs</li>
+   *   <li>Objects larger than Integer.MAX_VALUE</li>
+   * </ul>
+   */
+  public static class EmbeddedAmazonS3 extends AbstractAmazonS3 {
+
+    public static final Logger LOG =
+          LoggerFactory.getLogger(EmbeddedAmazonS3.class);
+
+    private String bucketName = null;
+
+    // Randomized contract test datasets are generated byte-by-byte, so we must
+    // ensure it's encoded as 8-bit characters
+    private static final String ISO_8859_1 = "ISO-8859-1";
+    private Charset encoding = Charset.forName(ISO_8859_1);
+
+    private class EmbeddedS3Object extends S3Object {
+      private ObjectMetadata meta;
+      private String data;
+      private long[] range;
+
+      public EmbeddedS3Object() {
+        super();
+      }
+
+      public EmbeddedS3Object(EmbeddedS3Object that, long[] range) {
+        super();
+        this.meta = that.meta;
+        this.data = that.data;
+        this.range = range;
+      }
+
+      public S3ObjectInputStream getObjectContent() {
+        String substring = data.substring((int)range[0], (int)range[1]+1);
+        InputStream in = IOUtils.toInputStream(substring, encoding);
+        return new S3ObjectInputStream(in, null);
+      }
+    }
+
+    private Map<String, EmbeddedS3Object> bucket = new HashMap<>();
+
+    private void simulateServerSideCopy() {
+      try {
+        // For realism, this could be a function of data size, but 1/100s is
+        // more than enough to reliably observe non-atomic renames.
+        Thread.sleep(10);
+      } catch (InterruptedException e) {}
+    }
+
+    // AmazonS3 interface below
+
+    public CopyObjectResult copyObject(CopyObjectRequest request) {
+      String sourceKey = request.getSourceKey();
+      String destinationKey = request.getDestinationKey();
+      LOG.debug("copyObject: {} -> {}", sourceKey, destinationKey);
+      EmbeddedS3Object object = bucket.get(sourceKey);
+      simulateServerSideCopy();
+      bucket.put(destinationKey, object);
+      return new CopyObjectResult();
+    }
+
+    public Bucket createBucket(String bucketName) {
+      LOG.debug("createBucket: {}", bucketName);
+      this.bucketName = bucketName;
+      Bucket bucket = new Bucket(bucketName);
+      return bucket;
+    }
+
+    public void deleteObject(String bucketName, String key) {
+      LOG.debug("deleteObject: {}", key);
+      bucket.remove(key);
+    }
+
+    public DeleteObjectsResult deleteObjects(DeleteObjectsRequest request) {
+      for (DeleteObjectsRequest.KeyVersion keyVersion : request.getKeys()) {
+        String key = keyVersion.getKey();
+        LOG.debug("deleteObjects: {}", key);
+        bucket.remove(key);
+      }
+      return new DeleteObjectsResult(
+            new ArrayList<DeleteObjectsResult.DeletedObject>());
+    }
+
+    public boolean doesBucketExist(String bucketName) {
+      LOG.debug("doesBucketExist: {}", bucketName);
+      if (this.bucketName == null) {
+        this.bucketName = bucketName;
+      }
+      return this.bucketName.equals(bucketName);
+    }
+
+    public boolean doesObjectExist(String bucketName, String objectName)  {
+      LOG.debug("doesObjectExist: {}", objectName);
+      return bucket.containsKey(objectName);
+    }
+
+    public String getBucketLocation(String bucketName) {
+      LOG.debug("getBucketLocation: {}", bucketName);
+      // This is Region.US_Standard, but it's .toString() returns null
+      return "us-east-1";
+    }
+
+    public S3Object getObject(GetObjectRequest request) {
+      String key = request.getKey();
+      long[] range = request.getRange();
+      if (range.length != 2) {
+        throw new IllegalArgumentException("Range must have 2 elements!");
+      }
+      LOG.debug("getObject: {} [{} - {}]", key, range[0], range[1]);
+      EmbeddedS3Object object = bucket.get(key);
+      return new EmbeddedS3Object(object, range);
+    }
+
+    public ObjectMetadata getObjectMetadata(GetObjectMetadataRequest request) {
+      String key = request.getKey();
+      LOG.debug("getObjectMetadata: {}", key);
+      if (!bucket.containsKey(key)) {
+        AmazonServiceException e = new AmazonServiceException("404");
+        e.setStatusCode(404);
+        throw e;
+      }
+      EmbeddedS3Object object = bucket.get(key);
+      return object.getObjectMetadata();
+    }
+
+    public ObjectListing listObjects(ListObjectsRequest request) {
+      String prefix = request.getPrefix();
+      String delimiter = request.getDelimiter();
+      LOG.debug("listObjects: {} (delimiter = {})", prefix, delimiter);
+
+      ObjectListing result = new ObjectListing();
+      innerListing(prefix, delimiter, result.getObjectSummaries(),
+            result.getCommonPrefixes());
+      return result;
+    }
+
+    public ListObjectsV2Result listObjectsV2(ListObjectsV2Request request) {
+      String prefix = request.getPrefix();
+      String delimiter = request.getDelimiter();
+      LOG.debug("listObjectsV2: {} (delimiter = {})", prefix, delimiter);
+
+      ListObjectsV2Result result = new ListObjectsV2Result();
+      innerListing(prefix, delimiter, result.getObjectSummaries(),
+            result.getCommonPrefixes());
+      return result;
+    }
+
+    private void innerListing(String prefix, String delimiter,
+          List<S3ObjectSummary> summaries, List<String> prefixes) {
+      Set<String> commonPrefixes = new HashSet<>();
+      for (String key : bucket.keySet()) {
+        if (!key.startsWith(prefix)) {
+          continue;
+        }
+        if (delimiter != null) {
+          int index = key.indexOf(delimiter, prefix.length());
+          if (index > 0) {
+            // Finding the delimiter means non-recursive listing
+            // Add the first-level child to common prefixes and continue
+            commonPrefixes.add(key.substring(0, index));
+            continue;
+          }
+        }
+        S3ObjectSummary summary = new S3ObjectSummary();
+        summary.setKey(key);
+        summary.setSize(bucket.get(key).data.length());
+        summaries.add(summary);
+      }
+      prefixes.addAll(commonPrefixes);
+    }
+
+    public PutObjectResult putObject(PutObjectRequest request) {
+      String key = request.getKey();
+      LOG.debug("putObject: {}", key);
+
+      EmbeddedS3Object object = bucket.get(key);
+      if (object == null) {
+        object = new EmbeddedS3Object();
+      }
+
+      try {
+        if (request.getInputStream() != null) {
+          InputStream in = request.getInputStream();
+          object.data = IOUtils.toString(in, encoding);
+          in.close();
+        } else if (request.getFile() != null) {
+          File file = request.getFile();
+          object.data = FileUtils.readFileToString(file, encoding);
+        } else {
+          throw new UnsupportedOperationException("Unknown putObject method");
+        }
+      } catch (IOException e) {
+        throw new SdkClientException("Error reading object to put", e);
+      }
+      // TODO later Hadoop versions will require setting ETags, etc. too
+      object.getObjectMetadata().setContentLength(object.data.length());
+
+      // Object isn't listed in the bucket until after it's written
+      bucket.put(key, object);
+      return new PutObjectResult();
+    }
+
+    public void shutdown() {
+      LOG.debug("shutdown");
+    }
+  }
+}
diff --git a/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/EmbeddedZK.java b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/EmbeddedZK.java
new file mode 100644
index 0000000..c50940c
--- /dev/null
+++ b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/EmbeddedZK.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.oss;
+
+import java.net.InetAddress;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseZKTestingUtility;
+import org.apache.hadoop.hbase.oss.Constants;
+import org.apache.hadoop.hbase.oss.sync.TreeLockManager;
+import org.apache.hadoop.hbase.oss.sync.ZKTreeLockManager;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class EmbeddedZK {
+
+  private static HBaseZKTestingUtility util = null;
+
+  public static synchronized void conditionalStart(Configuration conf) throws Exception {
+    Class implementation = conf.getClass(Constants.SYNC_IMPL, TreeLockManager.class);
+    boolean notConfigured = StringUtils.isEmpty(conf.get(Constants.ZK_CONN_STRING));
+    if (implementation == ZKTreeLockManager.class && notConfigured) {
+      if (util == null) {
+        util = new HBaseZKTestingUtility(conf);
+        util.startMiniZKCluster();
+      }
+      int port = util.getZkCluster().getClientPort();
+      String hostname = InetAddress.getLocalHost().getHostName();
+      String connectionString = hostname + ":" + port;
+      conf.set(Constants.ZK_CONN_STRING, connectionString);
+    }
+  }
+
+  public static synchronized void conditionalStop() throws Exception {
+    if (util != null) {
+      util.shutdownMiniZKCluster();
+      util = null;
+    }
+  }
+}
diff --git a/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/HBaseObjectStoreSemanticsTest.java b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/HBaseObjectStoreSemanticsTest.java
new file mode 100644
index 0000000..1bb3710
--- /dev/null
+++ b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/HBaseObjectStoreSemanticsTest.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.oss;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.oss.sync.TreeLockManager;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+import org.junit.After;
+import org.junit.Before;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class HBaseObjectStoreSemanticsTest {
+  public static final Logger LOG =
+        LoggerFactory.getLogger(HBaseObjectStoreSemanticsTest.class);
+
+  protected HBaseObjectStoreSemantics hboss = null;
+  protected TreeLockManager sync = null;
+
+  public Path testPathRoot() {
+    return TestUtils.testPathRoot(hboss);
+  }
+
+  public Path testPath(String path) {
+    return TestUtils.testPath(hboss, path);
+  }
+
+  public Path testPath(Path path) {
+    return TestUtils.testPath(hboss, path);
+  }
+
+  @Before
+  public void setup() throws Exception {
+    Configuration conf = new Configuration();
+    hboss = TestUtils.getFileSystem(conf);
+    sync = hboss.getLockManager();
+    hboss.mkdirs(testPathRoot());
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    TestUtils.cleanup(hboss);
+  }
+}
diff --git a/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/TestAtomicRename.java b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/TestAtomicRename.java
new file mode 100644
index 0000000..be95dbf
--- /dev/null
+++ b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/TestAtomicRename.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.oss;
+
+import java.io.IOException;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.oss.sync.TreeLockManager;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestAtomicRename extends HBaseObjectStoreSemanticsTest {
+  public static final Logger LOG =
+        LoggerFactory.getLogger(TestAtomicRename.class);
+
+  @Test
+  public void testAtomicRename() throws Exception {
+    Path renameSource = testPath("atomicRenameSource");
+    Path renameTarget = testPath("atomicRenameTarget");
+    try {
+      for (int i = 1; i <= 8; i++) {
+        Path dir = new Path(renameSource, "dir" + i);
+        hboss.mkdirs(dir);
+        for (int j = 1; j <= 8; j++) {
+          Path file = new Path(dir, "file" + j);
+          FSDataOutputStream out = hboss.create(file);
+          // Write 4kb
+          for (int k = 0; k < 256; k++) {
+            out.write("0123456789ABCDEF".getBytes());
+          }
+          out.close();
+        }
+      }
+
+      Thread renameThread = new Thread(
+        new Runnable() {
+          public void run() {
+            try {
+              boolean success = hboss.rename(renameSource, renameTarget);
+              Assert.assertTrue("Rename returned false, indicating some error.",
+                    success);
+            } catch(IOException e) {
+              Assert.fail("Unexpected exception during rename: " + e);
+            }
+          }
+        }
+      );
+      renameThread.start();
+
+      // If the rename fails before ever creating the target, this will hang forever
+      while (!hboss.exists(renameTarget)) {
+        Thread.sleep(1);
+      }
+      Assert.assertFalse("Rename source is still visible after rename finished or target showed up.",
+            hboss.exists(renameSource));
+      renameThread.join();
+    } finally {
+      try {
+        hboss.delete(renameSource);
+        hboss.delete(renameTarget);
+      } catch (Exception e) {
+        e.printStackTrace();
+        throw e;
+      }
+    }
+  }
+
+
+}
diff --git a/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/TestCreateNonRecursive.java b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/TestCreateNonRecursive.java
new file mode 100644
index 0000000..9208e9f
--- /dev/null
+++ b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/TestCreateNonRecursive.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.oss;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadPoolExecutor;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestCreateNonRecursive extends HBaseObjectStoreSemanticsTest {
+  public static final Logger LOG =
+        LoggerFactory.getLogger(TestCreateNonRecursive.class);
+
+  @Test
+  public void testCreateNonRecursiveSerial() throws Exception {
+    Path serialPath = testPath("testCreateNonRecursiveSerial");
+    try {
+      FSDataOutputStream out;
+
+      out = hboss.createNonRecursive(serialPath, false, 1024, (short)1, 1024, null);
+      out.close();
+
+      boolean exceptionThrown = false;
+      try {
+        out = hboss.createNonRecursive(serialPath, false, 1024, (short)1, 1024, null);
+      } catch(FileAlreadyExistsException e) {
+        exceptionThrown = true;
+      }
+      if (!exceptionThrown) {
+        Assert.fail("Second call to createNonRecursive should throw FileAlreadyExistsException, but didn't.");
+      }
+    } finally {
+      hboss.delete(serialPath);
+    }
+  }
+
+  @Test
+  public void testCreateNonRecursiveParallel() throws Exception {
+    int experiments = 10;
+    int experimentSize = 10;
+    for (int e = 0; e < experiments; e++) {
+      ArrayList<Callable<Boolean>> callables = new ArrayList<Callable<Boolean>>(experimentSize);
+      ArrayList<Future<Boolean>> futures = new ArrayList<Future<Boolean>>(experimentSize);
+
+      Path parallelPath = testPath("testCreateNonRecursiveParallel" + e);
+      ThreadPoolExecutor executor = (ThreadPoolExecutor)Executors.newFixedThreadPool(10);
+      executor.prestartAllCoreThreads();
+      for (int i = 0; i < experimentSize; i++) {
+        callables.add(new Callable<Boolean>() {
+          public Boolean call() throws IOException {
+            FSDataOutputStream out = null;
+            boolean exceptionThrown = false;
+            try {
+              out = hboss.createNonRecursive(parallelPath, false, 1024, (short)1, 1024, null);
+            } catch(FileAlreadyExistsException e) {
+              exceptionThrown = true;
+            } finally {
+              if (out != null) {
+                out.close();
+              }
+            }
+            return exceptionThrown;
+          }
+        });
+      }
+      try {
+        for (Callable callable : callables) {
+          // This is in a separate loop to try and get them all as overlapped as possible
+          futures.add(executor.submit(callable));
+        }
+        int exceptionsThrown = 0;
+        for (Future<Boolean> future : futures) {
+          // This is in a separate loop to try and get them all as overlapped as possible
+          if (future.get()) {
+            exceptionsThrown++;
+          }
+        }
+        Assert.assertEquals("All but exactly 1 call should have thrown exceptions. " +
+              "Experiment " + (e+1) + " of " + experiments + ".",
+              experimentSize - 1, exceptionsThrown);
+      } finally {
+        hboss.delete(parallelPath);
+      }
+    }
+  }
+}
diff --git a/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/TestRecursiveDelete.java b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/TestRecursiveDelete.java
new file mode 100644
index 0000000..0d5c6a8
--- /dev/null
+++ b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/TestRecursiveDelete.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.oss;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.oss.sync.ZKTreeLockManager;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestRecursiveDelete extends HBaseObjectStoreSemanticsTest {
+  public static final Logger LOG =
+        LoggerFactory.getLogger(TestRecursiveDelete.class);
+
+  @Test
+  public void testCreationAfterDeletion() throws Exception {
+    // This was an observed failure condition where creating a
+    // directory that we had just removed would be deadlocked
+    // because the thread that issued the original deletion still
+    // held the lock on the directory it deleted.
+    ExecutorService svc = Executors.newSingleThreadExecutor();
+    Path tmp = testPath(".tmp");
+    Path lock = testPath(".tmp/hbase-hbck.lock");
+
+    try {
+      hboss.mkdirs(tmp);
+      hboss.delete(tmp, true);
+      LOG.info("After mkdir and delete in test thread");
+      summarizeZKLocks();
+
+      Future<?> fut = svc.submit(new Runnable() {
+        public void run() {
+          try {
+            hboss.mkdirs(tmp);
+            LOG.info("After mkdir in separate thread");
+            summarizeZKLocks();
+            try (FSDataOutputStream out = hboss.create(lock)) {
+              out.write(Bytes.toBytes("localhost"));
+              out.flush();
+            }
+          } catch (Exception e) {
+            LOG.error("Caught exception", e);
+            Assert.fail("Failed to create file");
+          }
+        }
+      });
+      Assert.assertNull(fut.get(15, TimeUnit.SECONDS));
+    } finally {
+      hboss.delete(tmp, true);
+    }
+  }
+
+  void summarizeZKLocks() {
+    if (sync instanceof ZKTreeLockManager) {
+      LOG.info(((ZKTreeLockManager) sync).summarizeLocks());
+    }
+  }
+}
diff --git a/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/TestUtils.java b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/TestUtils.java
new file mode 100644
index 0000000..0cac933
--- /dev/null
+++ b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/TestUtils.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.oss;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+import org.junit.After;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Rule;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class TestUtils {
+  public static final Logger LOG =
+        LoggerFactory.getLogger(TestUtils.class);
+
+  // This is defined by the Maven Surefire plugin configuration
+  private static final String TEST_UNIQUE_FORK_ID = "test.unique.fork.id";
+
+  public static final String S3A = "s3a";
+
+  public static String getScheme(Configuration conf) {
+    String dataUri = conf.get(Constants.DATA_URI);
+    try {
+      return new URI(dataUri).getScheme();
+    } catch (URISyntaxException e) {
+      return null;
+    }
+  }
+
+  public static boolean fsIs(String scheme, Configuration conf) {
+    return getScheme(conf).equals(scheme);
+  }
+
+  public static void disableFilesystemCaching(Configuration conf) {
+    String property = "fs." + getScheme(conf) + ".impl.disable.cache";
+    conf.setBoolean(property, true);
+  }
+
+  public static Path testPathRoot(HBaseObjectStoreSemantics hboss) {
+      String testUniqueForkId = System.getProperty(TEST_UNIQUE_FORK_ID);
+    String root = "/hboss-junit";
+    if (testUniqueForkId != null) {
+      root += "-" + testUniqueForkId;
+    }
+    return new Path(hboss.getHomeDirectory(), root);
+  }
+
+  public static Path testPath(HBaseObjectStoreSemantics hboss, String path) {
+    return testPath(hboss, new Path(path));
+  }
+
+  public static Path testPath(HBaseObjectStoreSemantics hboss, Path path) {
+    return new Path(testPathRoot(hboss), path);
+  }
+
+  public static HBaseObjectStoreSemantics getFileSystem(Configuration conf) throws Exception {
+    // Newer versions of Hadoop will do this for us, but older ones won't
+    // This allows Maven properties, profiles, etc. to set the implementation
+    if (StringUtils.isEmpty(conf.get(Constants.SYNC_IMPL))) {
+      conf.set(Constants.SYNC_IMPL, System.getProperty(Constants.SYNC_IMPL));
+    }
+
+    EmbeddedS3.conditionalStart(conf);
+    EmbeddedZK.conditionalStart(conf);
+
+    try {
+      String dataURI = conf.get(Constants.DATA_URI);
+      Assume.assumeNotNull(dataURI);
+      URI name = new URI(dataURI);
+      HBaseObjectStoreSemantics hboss = new HBaseObjectStoreSemantics();
+      hboss.initialize(name, conf);
+      return hboss;
+    } catch (Exception e) {
+      LOG.error(e.getMessage());
+      e.printStackTrace();
+      throw e;
+    }
+  }
+
+  public static void cleanup(HBaseObjectStoreSemantics hboss) throws Exception {
+    if (hboss != null) {
+      hboss.close();
+    }
+    EmbeddedZK.conditionalStop();
+  }
+}
diff --git a/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/contract/HBOSSContract.java b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/contract/HBOSSContract.java
new file mode 100644
index 0000000..948b46d
--- /dev/null
+++ b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/contract/HBOSSContract.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.oss.contract;
+
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.oss.Constants;
+import org.apache.hadoop.hbase.oss.HBaseObjectStoreSemantics;
+import org.apache.hadoop.hbase.oss.TestUtils;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class HBOSSContract extends AbstractFSContract {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(HBOSSContract.class);
+
+  private Configuration conf = null;
+  private HBaseObjectStoreSemantics fs = null;
+
+  /**
+   * Constructor: loads the authentication keys if found
+   * @param conf configuration to work with
+   */
+  public HBOSSContract(Configuration conf) {
+    super(conf);
+    this.conf = conf;
+    addConfResource("contract/s3a.xml");
+  }
+
+  /**
+   * Any initialisation logic can go here
+   * @throws IOException IO problems
+   */
+  public void init() throws IOException {
+
+  }
+
+  /**
+   * Get the filesystem for these tests
+   * @return the test fs
+   * @throws IOException IO problems
+   */
+  public FileSystem getTestFileSystem() throws IOException {
+    if (fs == null) {
+      try {
+        fs = TestUtils.getFileSystem(conf);
+      } catch (Exception e) {
+        LOG.error(e.getMessage());
+        e.printStackTrace();
+        throw new IOException("Failed to get FS", e);
+      }
+    }
+    return fs;
+  }
+
+  /**
+   * Get the scheme of this FS
+   * @return the scheme this FS supports
+   */
+  public String getScheme() {
+    return conf.get(Constants.CONTRACT_TEST_SCHEME, "s3a");
+  }
+
+  /**
+   * Return the path string for tests, e.g. <code>file:///tmp</code>
+   * @return a path in the test FS
+   */
+  public Path getTestPath() {
+    return TestUtils.testPath(fs, "contract-tests");
+  }
+
+  @Override
+  public String toString() {
+    return "FSContract for HBOSS/" + getScheme();
+  }
+}
diff --git a/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/contract/TestHBOSSContract.java b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/contract/TestHBOSSContract.java
new file mode 100644
index 0000000..1ba31f9
--- /dev/null
+++ b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/contract/TestHBOSSContract.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.oss.contract;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystemContractBaseTest;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.oss.HBaseObjectStoreSemantics;
+import org.apache.hadoop.hbase.oss.TestUtils;
+import org.junit.Assume;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestHBOSSContract extends FileSystemContractBaseTest {
+
+  protected static final Logger LOG =
+      LoggerFactory.getLogger(TestHBOSSContract.class);
+
+  private Path basePath;
+  private Configuration conf;
+
+  @Rule
+  public TestName methodName = new TestName();
+
+  private void nameThread() {
+    Thread.currentThread().setName("JUnit-" + methodName.getMethodName());
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    nameThread();
+    conf = new Configuration();
+    fs = TestUtils.getFileSystem(conf);
+    Assume.assumeNotNull(fs);
+    HBaseObjectStoreSemantics hboss = (HBaseObjectStoreSemantics)fs;
+    basePath = fs.makeQualified(TestUtils.testPath(hboss, "ITestHBOSSContract"));
+  }
+
+  @Override
+  public Path getTestBaseDir() {
+    return basePath;
+  }
+
+  @Test
+  public void testMkdirsWithUmask() throws Exception {
+    // Skipped in the hadoop-aws tests
+    Assume.assumeFalse(TestUtils.fsIs(TestUtils.S3A, conf));
+    super.testMkdirsWithUmask();
+  }
+
+  @Test
+  public void testRenameDirectoryAsExistingDirectory() throws Exception {
+    if (!TestUtils.fsIs(TestUtils.S3A, conf)) {
+      super.testRenameDirectoryAsExistingDirectory();
+      return;
+    }
+
+    // Overridden implementation in the hadoop-aws tests
+    Assume.assumeTrue(renameSupported());
+
+    Path src = path("testRenameDirectoryAsExisting/dir");
+    fs.mkdirs(src);
+    createFile(path(src + "/file1"));
+    createFile(path(src + "/subdir/file2"));
+
+    Path dst = path("testRenameDirectoryAsExistingNew/newdir");
+    fs.mkdirs(dst);
+    rename(src, dst, true, false, true);
+    Assert.assertFalse("Nested file1 exists",
+        fs.exists(path(src + "/file1")));
+    Assert.assertFalse("Nested file2 exists",
+        fs.exists(path(src + "/subdir/file2")));
+    Assert.assertTrue("Renamed nested file1 exists",
+        fs.exists(path(dst + "/file1")));
+    Assert.assertTrue("Renamed nested exists",
+        fs.exists(path(dst + "/subdir/file2")));
+  }
+
+  @Test
+  public void testMoveDirUnderParent() throws Throwable {
+    // Skipped in the hadoop-aws tests
+    Assume.assumeFalse(TestUtils.fsIs(TestUtils.S3A, conf));
+    super.testMoveDirUnderParent();
+  }
+}
diff --git a/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/contract/TestHBOSSContractAppend.java b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/contract/TestHBOSSContractAppend.java
new file mode 100644
index 0000000..568ffbb
--- /dev/null
+++ b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/contract/TestHBOSSContractAppend.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.oss.contract;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.contract.AbstractContractAppendTest;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+
+/**
+ * Note that this has not been tested because S3A does not support it.
+ */
+public class TestHBOSSContractAppend extends AbstractContractAppendTest {
+
+  @Override
+  protected Configuration createConfiguration() {
+    return super.createConfiguration();
+  }
+
+  @Override
+  protected AbstractFSContract createContract(Configuration conf) {
+    return new HBOSSContract(conf);
+  }
+}
diff --git a/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/contract/TestHBOSSContractConcat.java b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/contract/TestHBOSSContractConcat.java
new file mode 100644
index 0000000..9639057
--- /dev/null
+++ b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/contract/TestHBOSSContractConcat.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.oss.contract;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.contract.AbstractContractConcatTest;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+
+/**
+ * Note that this has not been tested because S3A does not support it.
+ */
+public class TestHBOSSContractConcat extends AbstractContractConcatTest {
+
+  @Override
+  protected Configuration createConfiguration() {
+    return super.createConfiguration();
+  }
+
+  @Override
+  protected AbstractFSContract createContract(Configuration conf) {
+    return new HBOSSContract(conf);
+  }
+}
diff --git a/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/contract/TestHBOSSContractCreate.java b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/contract/TestHBOSSContractCreate.java
new file mode 100644
index 0000000..ec088fa
--- /dev/null
+++ b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/contract/TestHBOSSContractCreate.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.oss.contract;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.AbstractContractCreateTest;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.hbase.oss.EmbeddedS3;
+import org.apache.hadoop.hbase.oss.TestUtils;
+import org.apache.hadoop.hbase.oss.sync.TreeLockManager;
+import org.junit.Assume;
+import org.junit.Test;
+
+public class TestHBOSSContractCreate extends AbstractContractCreateTest {
+
+  @Override
+  protected Configuration createConfiguration() {
+    return super.createConfiguration();
+  }
+
+  @Override
+  protected AbstractFSContract createContract(Configuration conf) {
+    return new HBOSSContract(conf);
+  }
+
+  @Test
+  @Override
+  public void testCreatedFileIsVisibleOnFlush() throws Throwable {
+    Configuration conf = createConfiguration();
+    try {
+      TestUtils.getFileSystem(conf);
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail("Exception configuring FS: " + e);
+    }
+    // HBOSS satisfies the contract that this test checks for, but it also
+    // relies on flush, which s3a still does not support.
+    Assume.assumeFalse(TestUtils.fsIs(TestUtils.S3A, conf));
+    super.testCreatedFileIsVisibleOnFlush();
+  }
+
+  @Test
+  @Override
+  public void testCreatedFileIsImmediatelyVisible() throws Throwable {
+    describe("verify that a newly created file exists as soon as open returns");
+    Path path = path("testCreatedFileIsImmediatelyVisible");
+    try(FSDataOutputStream out = getFileSystem().create(path,
+                                   false,
+                                   4096,
+                                   (short) 1,
+                                   1024)) {
+
+      // The original contract test delays close() until after the files appear.
+      // HBOSS only guarantees that create() + close() is atomic, so the
+      // original test still wouldn't work.
+      out.close();
+
+      if (!getFileSystem().exists(path)) {
+
+        if (isSupported(CREATE_VISIBILITY_DELAYED)) {
+          // For some file systems, downgrade to a skip so that the failure is
+          // visible in test results.
+          ContractTestUtils.skip(
+                "This Filesystem delays visibility of newly created files");
+        }
+        assertPathExists("expected path to be visible before anything written",
+                         path);
+      }
+    }
+  }
+}
diff --git a/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/contract/TestHBOSSContractDelete.java b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/contract/TestHBOSSContractDelete.java
new file mode 100644
index 0000000..3a1a503
--- /dev/null
+++ b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/contract/TestHBOSSContractDelete.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.oss.contract;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.contract.AbstractContractDeleteTest;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+
+public class TestHBOSSContractDelete extends AbstractContractDeleteTest {
+
+  @Override
+  protected Configuration createConfiguration() {
+    return super.createConfiguration();
+  }
+
+  @Override
+  protected AbstractFSContract createContract(Configuration conf) {
+    return new HBOSSContract(conf);
+  }
+}
diff --git a/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/contract/TestHBOSSContractDistCp.java b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/contract/TestHBOSSContractDistCp.java
new file mode 100644
index 0000000..73afb07
--- /dev/null
+++ b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/contract/TestHBOSSContractDistCp.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.oss.contract;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+import org.apache.hadoop.hbase.oss.TestUtils;
+import org.apache.hadoop.tools.contract.AbstractContractDistCpTest;
+
+public class TestHBOSSContractDistCp extends AbstractContractDistCpTest {
+
+  @Override
+  protected Configuration createConfiguration() {
+    return super.createConfiguration();
+  }
+
+  @Override
+  protected AbstractFSContract createContract(Configuration conf) {
+    return new HBOSSContract(conf);
+  }
+}
diff --git a/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/contract/TestHBOSSContractGetFileStatus.java b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/contract/TestHBOSSContractGetFileStatus.java
new file mode 100644
index 0000000..3425afc
--- /dev/null
+++ b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/contract/TestHBOSSContractGetFileStatus.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.oss.contract;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.contract.AbstractContractGetFileStatusTest;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+import org.apache.hadoop.hbase.oss.TestUtils;
+
+public class TestHBOSSContractGetFileStatus extends AbstractContractGetFileStatusTest {
+
+  @Override
+  protected Configuration createConfiguration() {
+    Configuration conf = super.createConfiguration();
+    try {
+      TestUtils.getFileSystem(conf);
+      TestUtils.disableFilesystemCaching(conf);
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail("Exception configuring FS: " + e);
+    }
+    return conf;
+  }
+
+  @Override
+  protected AbstractFSContract createContract(Configuration conf) {
+    return new HBOSSContract(conf);
+  }
+}
diff --git a/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/contract/TestHBOSSContractMkdir.java b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/contract/TestHBOSSContractMkdir.java
new file mode 100644
index 0000000..268d8d1
--- /dev/null
+++ b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/contract/TestHBOSSContractMkdir.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.oss.contract;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.contract.AbstractContractMkdirTest;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+
+public class TestHBOSSContractMkdir extends AbstractContractMkdirTest {
+
+  @Override
+  protected Configuration createConfiguration() {
+    return super.createConfiguration();
+  }
+
+  @Override
+  protected AbstractFSContract createContract(Configuration conf) {
+    return new HBOSSContract(conf);
+  }
+}
diff --git a/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/contract/TestHBOSSContractOpen.java b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/contract/TestHBOSSContractOpen.java
new file mode 100644
index 0000000..36adfcf
--- /dev/null
+++ b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/contract/TestHBOSSContractOpen.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.oss.contract;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.contract.AbstractContractOpenTest;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+
+public class TestHBOSSContractOpen extends AbstractContractOpenTest {
+
+  @Override
+  protected Configuration createConfiguration() {
+    return super.createConfiguration();
+  }
+
+  @Override
+  protected AbstractFSContract createContract(Configuration conf) {
+    return new HBOSSContract(conf);
+  }
+}
diff --git a/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/contract/TestHBOSSContractRename.java b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/contract/TestHBOSSContractRename.java
new file mode 100644
index 0000000..f1047c4
--- /dev/null
+++ b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/contract/TestHBOSSContractRename.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.oss.contract;
+
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+import org.apache.hadoop.fs.contract.AbstractContractRenameTest;
+import org.apache.hadoop.hbase.oss.EmbeddedS3;
+import org.apache.hadoop.hbase.oss.TestUtils;
+import org.junit.Assume;
+import org.junit.Test;
+
+/**
+ * There is an S3A-specific extension of AbstractContractRenameTest
+ * that is similarly extended for HBOSS-on-S3A. This class remains to be run in
+ * the general case.
+ */
+public class TestHBOSSContractRename extends AbstractContractRenameTest {
+
+  @Override
+  protected Configuration createConfiguration() {
+    return super.createConfiguration();
+  }
+
+  @Override
+  protected AbstractFSContract createContract(Configuration conf) {
+    HBOSSContract contract = new HBOSSContract(conf);
+    try {
+      TestUtils.getFileSystem(conf);
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail("Exception configuring FS: " + e);
+    }
+    Assume.assumeFalse(TestUtils.fsIs(TestUtils.S3A, conf));
+    return contract;
+  }
+}
diff --git a/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/contract/TestHBOSSContractRenameS3A.java b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/contract/TestHBOSSContractRenameS3A.java
new file mode 100644
index 0000000..a441dbf
--- /dev/null
+++ b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/contract/TestHBOSSContractRenameS3A.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.oss.contract;
+
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+import org.apache.hadoop.fs.contract.AbstractContractRenameTest;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.hbase.oss.EmbeddedS3;
+import org.apache.hadoop.hbase.oss.TestUtils;
+import org.junit.Assume;
+import org.junit.Test;
+
+/**
+ * There is an S3A-specific extension of AbstractContractRenameTest, and this
+ * class implements the same modifications for HBOSS-on-S3A.
+ * TestHBOSSContractRename remains to be run in the general case.
+ */
+public class TestHBOSSContractRenameS3A extends AbstractContractRenameTest {
+
+  @Override
+  protected Configuration createConfiguration() {
+    return super.createConfiguration();
+  }
+
+  @Override
+  protected AbstractFSContract createContract(Configuration conf) {
+    HBOSSContract contract = new HBOSSContract(conf);
+    try {
+      TestUtils.getFileSystem(conf);
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail("Exception configuring FS: " + e);
+    }
+    Assume.assumeTrue(TestUtils.fsIs(TestUtils.S3A, conf));
+    return contract;
+  }
+
+  // This is copied from Hadoop's ITestS3AContractRename. Ideally we could
+  // extend that instead of AbstractContractRenameTest, but it is not published.
+  @Override
+  public void testRenameDirIntoExistingDir() throws Throwable {
+    describe("Verify renaming a dir into an existing dir puts the files"
+             +" from the source dir into the existing dir"
+             +" and leaves existing files alone");
+    FileSystem fs = getFileSystem();
+    String sourceSubdir = "source";
+    Path srcDir = path(sourceSubdir);
+    Path srcFilePath = new Path(srcDir, "source-256.txt");
+    byte[] srcDataset = ContractTestUtils.dataset(256, 'a', 'z');
+    ContractTestUtils.writeDataset(fs, srcFilePath, srcDataset, srcDataset.length, 1024, false);
+    Path destDir = path("dest");
+
+    Path destFilePath = new Path(destDir, "dest-512.txt");
+    byte[] destDateset = ContractTestUtils.dataset(512, 'A', 'Z');
+    ContractTestUtils.writeDataset(fs, destFilePath, destDateset, destDateset.length, 1024,
+        false);
+    assertIsFile(destFilePath);
+
+    boolean rename = fs.rename(srcDir, destDir);
+    assertFalse("s3a doesn't support rename to non-empty directory", rename);
+  }
+}
diff --git a/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/contract/TestHBOSSContractRootDirectory.java b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/contract/TestHBOSSContractRootDirectory.java
new file mode 100644
index 0000000..189693a
--- /dev/null
+++ b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/contract/TestHBOSSContractRootDirectory.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.oss.contract;
+
+import java.net.URI;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.contract.AbstractContractRootDirectoryTest;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+public class TestHBOSSContractRootDirectory extends AbstractContractRootDirectoryTest {
+
+  @Override
+  protected Configuration createConfiguration() {
+    return super.createConfiguration();
+  }
+
+  @Override
+  protected AbstractFSContract createContract(Configuration conf) {
+    return new HBOSSContract(conf);
+  }
+}
diff --git a/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/contract/TestHBOSSContractSeek.java b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/contract/TestHBOSSContractSeek.java
new file mode 100644
index 0000000..136b056
--- /dev/null
+++ b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/contract/TestHBOSSContractSeek.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.oss.contract;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.contract.AbstractContractSeekTest;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+
+public class TestHBOSSContractSeek extends AbstractContractSeekTest {
+
+  @Override
+  protected Configuration createConfiguration() {
+    return super.createConfiguration();
+  }
+
+  @Override
+  protected AbstractFSContract createContract(Configuration conf) {
+    return new HBOSSContract(conf);
+  }
+}
diff --git a/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/contract/TestHBOSSContractSetTimes.java b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/contract/TestHBOSSContractSetTimes.java
new file mode 100644
index 0000000..b5223cb
--- /dev/null
+++ b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/contract/TestHBOSSContractSetTimes.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.oss.contract;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.contract.AbstractContractSetTimesTest;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+
+/**
+ * Note that this has not been tested because S3A does not support it.
+ */
+public class TestHBOSSContractSetTimes extends AbstractContractSetTimesTest {
+
+  @Override
+  protected Configuration createConfiguration() {
+    return super.createConfiguration();
+  }
+
+  @Override
+  protected AbstractFSContract createContract(Configuration conf) {
+    return new HBOSSContract(conf);
+  }
+}
diff --git a/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/sync/LocalTreeLockManager.java b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/sync/LocalTreeLockManager.java
new file mode 100644
index 0000000..60b213c
--- /dev/null
+++ b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/sync/LocalTreeLockManager.java
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.oss.sync;
+
+import java.io.IOException;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation based on standard Java concurrency primitives. This makes it
+ * useless for anything but testing, development, and maybe a single-node HBase
+ * instance. It is however much faster and easier to debug, despite including
+ * logic very similar to what other implementations would need.
+ *
+ * Can be enabled in JUnit tests with -Plocal (although it's the default).
+ */
+public class LocalTreeLockManager extends TreeLockManager {
+  private static final Logger LOG =
+        LoggerFactory.getLogger(LocalTreeLockManager.class);
+
+  public void initialize(FileSystem fs) {
+    this.fs = fs;
+  }
+
+  @Override
+  protected void writeLock(Path p) throws IOException {
+    createLocksIfNeeded(p);
+    index.get(p).lock.writeLock().lock();
+  }
+
+  @Override
+  protected void writeUnlock(Path p) throws IOException {
+    try {
+      LockNode node = index.get(p);
+      // Node to unlock may already be gone after deletes
+      if (node != null) {
+        node.lock.writeLock().unlock();
+      }
+    } catch(IllegalMonitorStateException e) {
+      // Reentrant locks might be acquired multiple times
+      LOG.error("Tried to release unacquired write lock: {}", p);
+      throw e;
+    }
+  }
+
+  @Override
+  protected void readLock(Path p) throws IOException {
+    createLocksIfNeeded(p);
+    index.get(p).lock.readLock().lock();
+  }
+
+  @Override
+  protected void readUnlock(Path p) throws IOException {
+    try {
+      index.get(p).lock.readLock().unlock();
+    } catch(IllegalMonitorStateException e) {
+      // Reentrant locks might be acquired multiple times
+      LOG.error("Tried to release unacquired read lock: {}", p);
+      throw e;
+    }
+  }
+
+  @Override
+  protected boolean writeLockAbove(Path p) throws IOException {
+    createLocksIfNeeded(p);
+    while (!p.isRoot()) {
+      p = p.getParent();
+      LockNode currentNode = index.get(p);
+      if (currentNode.lock.isWriteLocked() &&
+            !currentNode.lock.isWriteLockedByCurrentThread()) {
+        LOG.warn("Parent write lock currently held: {}", p);
+        return true;
+      }
+    }
+    return false;
+  }
+
+  @Override
+  protected boolean writeLockBelow(Path p) throws IOException {
+    createLocksIfNeeded(p);
+    return writeLockBelow(p, true);
+  }
+
+  @Override
+  protected boolean readLockBelow(Path p) throws IOException {
+    createLocksIfNeeded(p);
+    return readLockBelow(p, true);
+  }
+
+  @Override
+  protected void recursiveDelete(Path p) {
+    if (!p.isRoot()) {
+      index.get(p.getParent()).children.remove(p);
+    }
+    LockNode currentNode = index.get(p);
+    innerRecursiveDelete(p);
+    currentNode.lock.writeLock().unlock();
+  }
+
+  private void innerRecursiveDelete(Path p) {
+    LockNode currentNode = index.remove(p);
+    Set<Path> childPaths = currentNode.children.keySet();
+    for (Path child : childPaths) {
+      innerRecursiveDelete(child);
+    }
+  }
+
+  private class LockNode {
+    public Path path;
+
+    public LockNode(Path path) {
+      this.path = path;
+    }
+
+    public Map<Path, LockNode> children = new HashMap<>();
+    public ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+
+  }
+
+  private Map<Path, LockNode> index = new HashMap<>();
+
+  private synchronized void createLocksIfNeeded(Path p) {
+    if (index.containsKey(p)) {
+      return;
+    }
+    Path lastPath = null;
+    while (p != null) {
+      LockNode currentNode = index.get(p);
+      if (currentNode != null) {
+        if (lastPath != null) {
+          currentNode.children.put(lastPath, index.get(lastPath));
+        }
+        return;
+      }
+      currentNode = new LockNode(p);
+      if (lastPath != null) {
+        currentNode.children.put(lastPath, index.get(lastPath));
+      }
+      index.put(p, currentNode);
+      lastPath = p;
+      p = p.getParent();
+    }
+  }
+
+  private synchronized boolean writeLockBelow(Path p, boolean firstLevel) {
+    LockNode currentNode = index.get(p);
+    if (!firstLevel && currentNode.lock.isWriteLocked() &&
+          !currentNode.lock.isWriteLockedByCurrentThread()) {
+      LOG.warn("Child write lock current held: {}", p);
+      return true;
+    }
+    Set<Path> childPaths = currentNode.children.keySet();
+    for (Path child : childPaths) {
+      if (writeLockBelow(child, false)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  // TODO will return true even if current thread has a read lock below...
+  private synchronized boolean readLockBelow(Path p, boolean firstLevel) {
+    LockNode currentNode = index.get(p);
+    if (!firstLevel && currentNode.lock.getReadLockCount() > 0) {
+      LOG.warn("Child read lock currently held: {}", p);
+      return true;
+    }
+    Set<Path> childPaths = index.get(p).children.keySet();
+    for (Path child : childPaths) {
+      if (readLockBelow(child, false)) {
+        return true;
+      }
+    }
+    return false;
+  }
+}
+
diff --git a/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/sync/NullTreeLockManager.java b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/sync/NullTreeLockManager.java
new file mode 100644
index 0000000..9d94077
--- /dev/null
+++ b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/sync/NullTreeLockManager.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.oss.sync;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Bypasses all synchronization to effectively make HBOSS operations no-ops.
+ *
+ * Can be enabled in JUnit tests with -Pnull to reproduce the problems HBOSS is
+ * intended to fix.
+ */
+public class NullTreeLockManager extends TreeLockManager {
+
+  public void initialize(FileSystem fs) {
+    this.fs = fs;
+  }
+
+  @Override
+  protected void writeLock(Path p) {
+  }
+
+  @Override
+  protected void writeUnlock(Path p) {
+  }
+
+  @Override
+  protected void readLock(Path p) {
+  }
+
+  @Override
+  protected void readUnlock(Path p) {
+  }
+
+  @Override
+  protected boolean writeLockAbove(Path p) {
+    return false;
+  }
+
+  @Override
+  protected boolean writeLockBelow(Path p) {
+    return false;
+  }
+
+  @Override
+  protected boolean readLockBelow(Path p) {
+    return false;
+  }
+
+  @Override
+  protected void recursiveDelete(Path p) {
+  }
+}
+
diff --git a/hbase-oss/src/test/resources/contract/s3a.xml b/hbase-oss/src/test/resources/contract/s3a.xml
new file mode 100644
index 0000000..72aac03
--- /dev/null
+++ b/hbase-oss/src/test/resources/contract/s3a.xml
@@ -0,0 +1,128 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~  or more contributor license agreements.  See the NOTICE file
+  ~  distributed with this work for additional information
+  ~  regarding copyright ownership.  The ASF licenses this file
+  ~  to you under the Apache License, Version 2.0 (the
+  ~  "License"); you may not use this file except in compliance
+  ~  with the License.  You may obtain a copy of the License at
+  ~
+  ~       http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~  Unless required by applicable law or agreed to in writing, software
+  ~  distributed under the License is distributed on an "AS IS" BASIS,
+  ~  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~  See the License for the specific language governing permissions and
+  ~  limitations under the License.
+  -->
+
+<configuration>
+
+  <!--
+    This is S3A's contract with HBOSS guarantees added. Specifically:
+
+      fs.contract.is-blobstore = false
+      fs.contract.create-visibility-delayed = false
+      fs.contract.supports-atomic-directory-delete = true
+      fs.contract.supports-atomic-rename = true
+
+    Note that fs.contract.is-blobstore appears to be identical in meaning to
+    fs.contract.create-visibility-delayed.
+  -->
+
+  <property>
+    <name>fs.contract.test.root-tests-enabled</name>
+    <value>true</value>
+  </property>
+
+  <property>
+    <name>fs.contract.test.random-seek-count</name>
+    <value>10</value>
+  </property>
+
+  <property>
+    <name>fs.contract.is-blobstore</name>
+    <value>false</value>
+  </property>
+
+  <property>
+    <name>fs.contract.create-visibility-delayed</name>
+    <value>false</value>
+  </property>
+
+  <property>
+    <name>fs.contract.is-case-sensitive</name>
+    <value>true</value>
+  </property>
+
+  <property>
+    <name>fs.contract.rename-returns-false-if-source-missing</name>
+    <value>true</value>
+  </property>
+
+  <property>
+    <name>fs.contract.rename-remove-dest-if-empty-dir</name>
+    <value>true</value>
+  </property>
+
+  <property>
+    <name>fs.contract.supports-append</name>
+    <value>false</value>
+  </property>
+
+  <property>
+    <name>fs.contract.supports-atomic-directory-delete</name>
+    <value>true</value>
+  </property>
+
+  <property>
+    <name>fs.contract.supports-atomic-rename</name>
+    <value>true</value>
+  </property>
+
+  <property>
+    <name>fs.contract.supports-block-locality</name>
+    <value>false</value>
+  </property>
+
+  <property>
+    <name>fs.contract.supports-concat</name>
+    <value>false</value>
+  </property>
+
+  <property>
+    <name>fs.contract.supports-getfilestatus</name>
+    <value>true</value>
+  </property>
+
+  <property>
+    <name>fs.contract.supports-seek</name>
+    <value>true</value>
+  </property>
+
+  <property>
+    <name>fs.contract.supports-seek-on-closed-file</name>
+    <value>true</value>
+  </property>
+
+  <property>
+    <name>fs.contract.rejects-seek-past-eof</name>
+    <value>true</value>
+  </property>
+
+  <property>
+    <name>fs.contract.supports-strict-exceptions</name>
+    <value>true</value>
+  </property>
+
+  <property>
+    <name>fs.contract.supports-unix-permissions</name>
+    <value>false</value>
+  </property>
+
+  <property>
+    <name>fs.contract.rename-overwrites-dest</name>
+    <value>false</value>
+  </property>
+
+</configuration>
diff --git a/hbase-oss/src/test/resources/core-site.xml b/hbase-oss/src/test/resources/core-site.xml
new file mode 100644
index 0000000..a46594f
--- /dev/null
+++ b/hbase-oss/src/test/resources/core-site.xml
@@ -0,0 +1,63 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~  or more contributor license agreements.  See the NOTICE file
+  ~  distributed with this work for additional information
+  ~  regarding copyright ownership.  The ASF licenses this file
+  ~  to you under the Apache License, Version 2.0 (the
+  ~  "License"); you may not use this file except in compliance
+  ~  with the License.  You may obtain a copy of the License at
+  ~
+  ~       http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~  Unless required by applicable law or agreed to in writing, software
+  ~  distributed under the License is distributed on an "AS IS" BASIS,
+  ~  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~  See the License for the specific language governing permissions and
+  ~  limitations under the License.
+  -->
+
+<configuration>
+
+  <!--
+    By default the tests run against a mocked S3 client. To run these against
+    the real Amazon S3 service or another object store, set fs.hboss.data.uri
+    and any required credentials in auth-keys.xml. e.g.:
+
+      <configuration>
+        <property>
+          <name>fs.hboss.data.uri</name>
+          <value>s3a://.../</value>
+        </property>
+
+        <property>
+          <name>fs.s3a.secret.key</name>
+          <value>...</value>
+        </property>
+
+        <property>
+          <name>fs.s3a.access.key</name>
+          <value>...</value>
+        </property>
+      </configuration>
+
+    S3Guard will also be configured to the LocalMetadataStore implementation
+    by default. You may wish to configure DynamoDBMetadataStore for integration
+    or if S3Guard is not needed (S3-compatible appliance, for instance), you
+    can disable it by configuring NullMetadataStore explicitly (see below).
+  -->
+
+  <!--
+  <property>
+    <name>fs.s3a.metadatastore.impl</name>
+    <value>org.apache.hadoop.fs.s3a.s3guard.NullMetadataStore</value>
+    <description>Uncomment to disable S3Guard entirely</description>
+  </property>
+  -->
+
+  <include xmlns="http://www.w3.org/2001/XInclude" href="auth-keys.xml">
+    <fallback/>
+  </include>
+
+</configuration>
diff --git a/hbase-oss/src/test/resources/log4j.properties b/hbase-oss/src/test/resources/log4j.properties
new file mode 100644
index 0000000..a59c82e
--- /dev/null
+++ b/hbase-oss/src/test/resources/log4j.properties
@@ -0,0 +1,19 @@
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+# log4j configuration used during build and unit tests
+
+log4j.rootLogger=info,stdout
+log4j.threshold=ALL
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} (%F:%M(%L)) - %m%n
+log4j.logger.org.apache.hadoop=DEBUG
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..ec62ba5
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,71 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <groupId>org.apache.hbase.filesystem</groupId>
+  <artifactId>hbase-filesystem</artifactId>
+  <version>1.0.0-alpha1-SNAPSHOT</version>
+
+  <name>Apache HBase FileSystem-related Modules</name>
+  <description>
+    This project houses projects that work at the Apache Hadoop FileSystem layer
+    to meet Apache HBase's needs. This work is experimental, and may eventually
+    be replaced by other initiatives in HBase. It is therefore released
+    separately.
+  </description>
+  <packaging>pom</packaging>
+
+  <properties>
+    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    <maven.compiler.source>1.8</maven.compiler.source>
+    <maven.compiler.target>1.8</maven.compiler.target>
+
+    <audience-annotations.version>0.5.0</audience-annotations.version>
+    <aws-java-sdk.version>1.11.525</aws-java-sdk.version>
+    <commons-io.version>2.5</commons-io.version>
+    <commons-lang3.version>3.6</commons-lang3.version>
+    <curator.version>4.0.0</curator.version>
+    <hadoop.version>3.2.0</hadoop.version>
+    <hbase.version>2.1.4</hbase.version>
+    <hbase-thirdparty.version>2.2.0</hbase-thirdparty.version>
+    <junit.version>4.12</junit.version>
+    <slf4j.version>1.7.25</slf4j.version>
+    <zookeeper.version>3.4.10</zookeeper.version>
+  </properties>
+
+  <modules>
+    <module>hbase-oss</module>
+  </modules>
+
+  <build>
+    <pluginManagement>
+      <plugins>
+        <plugin>
+          <artifactId>maven-surefire-plugin</artifactId>
+          <version>2.22.1</version>
+          <configuration>
+            <redirectTestOutputToFile>true</redirectTestOutputToFile>
+            <reuseForks>false</reuseForks>
+            <systemProperties>
+              <test.unique.fork.id>fork-000${surefire.forkNumber}</test.unique.fork.id>
+            </systemProperties>
+          </configuration>
+        </plugin>
+      </plugins>
+    </pluginManagement>
+  </build>
+</project>