HBASE-26786 [hboss] Limit synchronization from hot path of HBoss APIs (#32)
diff --git a/hbase-oss/src/main/java/org/apache/hadoop/hbase/oss/sync/ZKTreeLockManager.java b/hbase-oss/src/main/java/org/apache/hadoop/hbase/oss/sync/ZKTreeLockManager.java
index 16bd57e..66dcde4 100644
--- a/hbase-oss/src/main/java/org/apache/hadoop/hbase/oss/sync/ZKTreeLockManager.java
+++ b/hbase-oss/src/main/java/org/apache/hadoop/hbase/oss/sync/ZKTreeLockManager.java
@@ -20,12 +20,13 @@
import java.io.IOException;
import java.util.Collections;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
+
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
@@ -70,7 +71,7 @@
public static final String LOCK_SUB_ZNODE = ".hboss-lock-znode";
- private Map<Path,InterProcessReadWriteLock> lockCache = new HashMap<>();
+ private ConcurrentHashMap<Path,InterProcessReadWriteLock> lockCache = new ConcurrentHashMap<>();
public void initialize(FileSystem fs) throws IOException {
this.fs = fs;
@@ -196,7 +197,7 @@
We need to protect this block against potential concurrent calls to close()
*/
@Override
- protected synchronized boolean writeLockAbove(Path p) throws IOException {
+ protected boolean writeLockAbove(Path p) throws IOException {
LOG.debug("Checking for write lock above {}", p);
while (!p.isRoot()) {
p = p.getParent();
@@ -242,8 +243,9 @@
}
}
- synchronized void removeInMemoryLocks(Path p) {
- Iterator<Entry<Path,InterProcessReadWriteLock>> iter = lockCache.entrySet().iterator();
+ void removeInMemoryLocks(Path p) {
+ Iterator<Entry<Path, InterProcessReadWriteLock>> iter =
+ lockCache.entrySet().iterator();
while (iter.hasNext()) {
Entry<Path,InterProcessReadWriteLock> entry = iter.next();
if (isBeneath(p, entry.getKey())) {
@@ -380,24 +382,23 @@
return sb.toString();
}
- public synchronized Map<Path,InterProcessReadWriteLock> getUnmodifiableCache() {
+ public Map<Path,InterProcessReadWriteLock> getUnmodifiableCache() {
return Collections.unmodifiableMap(lockCache);
}
- private synchronized InterProcessReadWriteLock get(Path path) throws IOException {
- if (!lockCache.containsKey(path)) {
+ private InterProcessReadWriteLock get(Path path) {
+ return lockCache.computeIfAbsent(path, s -> {
String zkPath = new Path(path, LOCK_SUB_ZNODE).toString();
try {
- ZKPaths.mkdirs(curator.getZookeeperClient().getZooKeeper(), zkPath, true);
+ ZKPaths.mkdirs(curator.getZookeeperClient().getZooKeeper(), zkPath,
+ true);
} catch (KeeperException.NodeExistsException e) {
// Ignore
} catch (Exception e) {
- throw new IOException("Exception while ensuring lock parents exist: " +
- path, e);
+ throw new RuntimeException(e);
}
- lockCache.put(path, new InterProcessReadWriteLock(curator, zkPath));
- }
- return lockCache.get(path);
+ return new InterProcessReadWriteLock(curator, zkPath);
+ });
}
}