HBASE-26786 [hboss] Limit synchronization from hot path of HBoss APIs (#32)

diff --git a/hbase-oss/src/main/java/org/apache/hadoop/hbase/oss/sync/ZKTreeLockManager.java b/hbase-oss/src/main/java/org/apache/hadoop/hbase/oss/sync/ZKTreeLockManager.java
index 16bd57e..66dcde4 100644
--- a/hbase-oss/src/main/java/org/apache/hadoop/hbase/oss/sync/ZKTreeLockManager.java
+++ b/hbase-oss/src/main/java/org/apache/hadoop/hbase/oss/sync/ZKTreeLockManager.java
@@ -20,12 +20,13 @@
 
 import java.io.IOException;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
+
 import org.apache.curator.RetryPolicy;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
@@ -70,7 +71,7 @@
 
   public static final String LOCK_SUB_ZNODE = ".hboss-lock-znode";
 
-  private Map<Path,InterProcessReadWriteLock> lockCache = new HashMap<>();
+  private ConcurrentHashMap<Path,InterProcessReadWriteLock> lockCache = new ConcurrentHashMap<>();
 
   public void initialize(FileSystem fs) throws IOException {
     this.fs = fs;
@@ -196,7 +197,7 @@
   We need to protect this block against potential concurrent calls to close()
    */
   @Override
-  protected synchronized boolean writeLockAbove(Path p) throws IOException {
+  protected boolean writeLockAbove(Path p) throws IOException {
     LOG.debug("Checking for write lock above {}", p);
     while (!p.isRoot()) {
       p = p.getParent();
@@ -242,8 +243,9 @@
     }
   }
 
-  synchronized void removeInMemoryLocks(Path p) {
-    Iterator<Entry<Path,InterProcessReadWriteLock>> iter = lockCache.entrySet().iterator();
+  void removeInMemoryLocks(Path p) {
+    Iterator<Entry<Path, InterProcessReadWriteLock>> iter =
+        lockCache.entrySet().iterator();
     while (iter.hasNext()) {
       Entry<Path,InterProcessReadWriteLock> entry = iter.next();
       if (isBeneath(p, entry.getKey())) {
@@ -380,24 +382,23 @@
     return sb.toString();
   }
 
-  public synchronized Map<Path,InterProcessReadWriteLock> getUnmodifiableCache() {
+  public Map<Path,InterProcessReadWriteLock> getUnmodifiableCache() {
     return Collections.unmodifiableMap(lockCache);
   }
 
-  private synchronized InterProcessReadWriteLock get(Path path) throws IOException {
-    if (!lockCache.containsKey(path)) {
+  private InterProcessReadWriteLock get(Path path) {
+    return lockCache.computeIfAbsent(path, s -> {
       String zkPath = new Path(path, LOCK_SUB_ZNODE).toString();
       try {
-        ZKPaths.mkdirs(curator.getZookeeperClient().getZooKeeper(), zkPath, true);
+        ZKPaths.mkdirs(curator.getZookeeperClient().getZooKeeper(), zkPath,
+            true);
       } catch (KeeperException.NodeExistsException e) {
         // Ignore
       } catch (Exception e) {
-        throw new IOException("Exception while ensuring lock parents exist: " +
-              path, e);
+        throw new RuntimeException(e);
       }
-      lockCache.put(path, new InterProcessReadWriteLock(curator, zkPath));
-    }
-    return lockCache.get(path);
+      return new InterProcessReadWriteLock(curator, zkPath);
+    });
   }
 }