ZOOKEEPER-3531: Synchronization on ACLCache cause cluster to hang whe…

…n network/disk issues happen during datatree serialization

Author: Mcfatealan <louchang_new@163.com>

Reviewers: fangmin@apache.org, andor@apache.org

Closes #1077 from mcfatealan/ZOOKEEPER-3531 and squashes the following commits:

60109185e [Mcfatealan] minor
951dd5cac [Mcfatealan] fix checkstyle issue
96ea4a066 [Mcfatealan] finer-synchronization for deserialize
b79446a6f [Mcfatealan] minor fix for checkstyle
d5f7d04ab [Mcfatealan] ZOOKEEPER-3531: Synchronization on ACLCache cause cluster to hang when network/disk issues happen during datatree serialization
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ReferenceCountedACLCache.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ReferenceCountedACLCache.java
index a278abf..b92ee18 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ReferenceCountedACLCache.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ReferenceCountedACLCache.java
@@ -22,9 +22,9 @@
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.jute.Index;
 import org.apache.jute.InputArchive;
@@ -99,14 +99,14 @@
         return ++aclIndex;
     }
 
-    public synchronized void deserialize(InputArchive ia) throws IOException {
+    public void deserialize(InputArchive ia) throws IOException {
         clear();
         int i = ia.readInt("map");
+
+        LinkedHashMap<Long, List<ACL>> deserializedMap = new LinkedHashMap<>();
+        // keep read operations out of synchronization block
         while (i > 0) {
             Long val = ia.readLong("long");
-            if (aclIndex < val) {
-                aclIndex = val;
-            }
             List<ACL> aclList = new ArrayList<ACL>();
             Index j = ia.startVector("acls");
             if (j == null) {
@@ -118,17 +118,33 @@
                 aclList.add(acl);
                 j.incr();
             }
-            longKeyMap.put(val, aclList);
-            aclKeyMap.put(aclList, val);
-            referenceCounter.put(val, new AtomicLongWithEquals(0));
+
+            deserializedMap.put(val, aclList);
             i--;
         }
+
+        synchronized (this) {
+            for (Map.Entry<Long, List<ACL>> entry : deserializedMap.entrySet()) {
+                Long val = entry.getKey();
+                List<ACL> aclList = entry.getValue();
+                if (aclIndex < val) {
+                    aclIndex = val;
+                }
+
+                longKeyMap.put(val, aclList);
+                aclKeyMap.put(aclList, val);
+                referenceCounter.put(val, new AtomicLongWithEquals(0));
+            }
+        }
     }
 
-    public synchronized void serialize(OutputArchive oa) throws IOException {
-        oa.writeInt(longKeyMap.size(), "map");
-        Set<Map.Entry<Long, List<ACL>>> set = longKeyMap.entrySet();
-        for (Map.Entry<Long, List<ACL>> val : set) {
+    public void serialize(OutputArchive oa) throws IOException {
+        Map<Long, List<ACL>> clonedLongKeyMap;
+        synchronized (this) {
+            clonedLongKeyMap = new HashMap<>(longKeyMap);
+        }
+        oa.writeInt(clonedLongKeyMap.size(), "map");
+        for (Map.Entry<Long, List<ACL>> val : clonedLongKeyMap.entrySet()) {
             oa.writeLong(val.getKey(), "long");
             List<ACL> aclList = val.getValue();
             oa.startVector(aclList, "acls");
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/DataTreeTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/DataTreeTest.java
index b73e0b7..11eb836 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/DataTreeTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/DataTreeTest.java
@@ -24,6 +24,7 @@
 import static org.junit.Assert.assertTrue;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.PrintWriter;
@@ -253,6 +254,106 @@
         assertEquals("/bug is still in pTrie", "/", pTrie.findMaxPrefix("/bug"));
     }
 
+
+    /* ZOOKEEPER-3531 - org.apache.zookeeper.server.DataTree#serialize calls the aclCache.serialize when doing
+     * dataree serialization, however, org.apache.zookeeper.server.ReferenceCountedACLCache#serialize
+     * could get stuck at OutputArchieve.writeInt due to potential network/disk issues.
+     * This can cause the system experiences hanging issues similar to ZooKeeper-2201.
+     * This test verifies the fix that we should not hold ACL cache during dumping aclcache to snapshots
+    */
+    @Test(timeout = 60000)
+    public void testSerializeDoesntLockACLCacheWhileWriting() throws Exception {
+        DataTree tree = new DataTree();
+        tree.createNode("/marker", new byte[]{42}, null, -1, 1, 1, 1);
+        final AtomicBoolean ranTestCase = new AtomicBoolean();
+        DataOutputStream out = new DataOutputStream(new ByteArrayOutputStream());
+        BinaryOutputArchive oa = new BinaryOutputArchive(out) {
+            @Override
+            public void writeInt(int size, String tag) throws IOException {
+                final Semaphore semaphore = new Semaphore(0);
+
+                new Thread(new Runnable() {
+                    @Override
+                    public void run() {
+
+                        synchronized (tree.getReferenceCountedAclCache()) {
+                            //When we lock ACLCache, allow writeRecord to continue
+                            semaphore.release();
+                        }
+                    }
+                }).start();
+
+                try {
+                    boolean acquired = semaphore.tryAcquire(30, TimeUnit.SECONDS);
+                    //This is the real assertion - could another thread lock
+                    //the ACLCache
+                    assertTrue("Couldn't acquire a lock on the ACLCache while we were calling tree.serialize", acquired);
+                } catch (InterruptedException e1) {
+                    throw new RuntimeException(e1);
+                }
+                ranTestCase.set(true);
+
+                super.writeInt(size, tag);
+            }
+        };
+
+        tree.serialize(oa, "test");
+
+        //Let's make sure that we hit the code that ran the real assertion above
+        assertTrue("Didn't find the expected node", ranTestCase.get());
+    }
+
+    /* ZOOKEEPER-3531 - similarly for aclCache.deserialize, we should not hold lock either
+    */
+    @Test(timeout = 60000)
+    public void testDeserializeDoesntLockACLCacheWhileReading() throws Exception {
+        DataTree tree = new DataTree();
+        tree.createNode("/marker", new byte[]{42}, null, -1, 1, 1, 1);
+        final AtomicBoolean ranTestCase = new AtomicBoolean();
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        DataOutputStream out = new DataOutputStream(baos);
+        BinaryOutputArchive oa = new BinaryOutputArchive(out);
+
+        tree.serialize(oa, "test");
+
+        DataTree tree2 = new DataTree();
+        DataInputStream in = new DataInputStream(new ByteArrayInputStream(baos.toByteArray()));
+        BinaryInputArchive ia = new BinaryInputArchive(in) {
+            @Override
+            public long readLong(String tag) throws IOException {
+                final Semaphore semaphore = new Semaphore(0);
+
+                new Thread(new Runnable() {
+                    @Override
+                    public void run() {
+
+                        synchronized (tree2.getReferenceCountedAclCache()) {
+                            //When we lock ACLCache, allow readLong to continue
+                            semaphore.release();
+                        }
+                    }
+                }).start();
+
+                try {
+                    boolean acquired = semaphore.tryAcquire(30, TimeUnit.SECONDS);
+                    //This is the real assertion - could another thread lock
+                    //the ACLCache
+                    assertTrue("Couldn't acquire a lock on the ACLCache while we were calling tree.deserialize", acquired);
+                } catch (InterruptedException e1) {
+                    throw new RuntimeException(e1);
+                }
+                ranTestCase.set(true);
+
+                return super.readLong(tag);
+            }
+        };
+
+        tree2.deserialize(ia, "test");
+
+        //Let's make sure that we hit the code that ran the real assertion above
+        assertTrue("Didn't find the expected node", ranTestCase.get());
+    }
+
     /*
      * ZOOKEEPER-2201 - OutputArchive.writeRecord can block for long periods of
      * time, we must call it outside of the node lock.