BOOKKEEPER-883: Test timeout in bookkeeper-benchmark

Problem:

The BenchReadThroughputLatency is tight with FlatLedgerManager. so lots of assumptions are made based on how the znodes are changed when ledgers are created. There was a change introduced LedgerIdGenerator, which broke the assumptions that made by BenchReadThroughputLatency.

Fix:

- Use a hashset to cache processed ledgers on reacting on children changes
- Remove unpredictable test on next ledger
- Fix an error logging on FlatLedgerManager processing ledgers

Author: Sijie Guo <sijie@apache.org>

Reviewers: Matteo Merli <mmerli@apache.org>

Closes #10 from sijie/BOOKKEEPER-883

(cherry picked from commit 8729d12be50295086e7440dfa5d0256abb7688d5)
Signed-off-by: Sijie Guo <sijie@apache.org>
diff --git a/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchReadThroughputLatency.java b/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchReadThroughputLatency.java
index d5baaa4..1cdd564 100644
--- a/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchReadThroughputLatency.java
+++ b/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchReadThroughputLatency.java
@@ -19,42 +19,34 @@
  */
 package org.apache.bookkeeper.benchmark;
 
-import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.client.LedgerEntry;
-import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
-
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher.Event;
-
-import java.util.Enumeration;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.ArrayList;
-import java.util.regex.Pattern;
-import java.util.regex.Matcher;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicInteger;
-
 import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Option;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
 import org.apache.commons.cli.PosixParser;
-import org.apache.commons.cli.ParseException;
-
-import static com.google.common.base.Charsets.UTF_8;
-
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.WatchedEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Enumeration;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Set;
+import java.util.regex.Pattern;
+import java.util.regex.Matcher;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static com.google.common.base.Charsets.UTF_8;
+
 public class BenchReadThroughputLatency {
     static final Logger LOG = LoggerFactory.getLogger(BenchReadThroughputLatency.class);
 
@@ -90,7 +82,7 @@
         try {
             bk = new BookKeeper(conf);
             while (true) {
-                lh = bk.openLedgerNoRecovery(ledgerId, BookKeeper.DigestType.CRC32, 
+                lh = bk.openLedgerNoRecovery(ledgerId, BookKeeper.DigestType.CRC32,
                                              passwd);
                 long lastConfirmed = Math.min(lh.getLastAddConfirmed(), absoluteLimit);
                 if (lastConfirmed == lastRead) {
@@ -154,7 +146,7 @@
     @SuppressWarnings("deprecation")
     public static void main(String[] args) throws Exception {
         Options options = new Options();
-        options.addOption("ledger", true, "Ledger to read. If empty, read all ledgers which come available. " 
+        options.addOption("ledger", true, "Ledger to read. If empty, read all ledgers which come available. "
                           + " Cannot be used with -listen");
         options.addOption("listen", true, "Listen for creation of <arg> ledgers, and read each one fully");
         options.addOption("password", true, "Password used to access ledgers (default 'benchPasswd')");
@@ -207,11 +199,12 @@
                     }
                 }
             });
+        final Set<String> processedLedgers = new HashSet<String>();
         try {
             zk.register(new Watcher() {
                     public void process(WatchedEvent event) {
                         try {
-                            if (event.getState() == Event.KeeperState.SyncConnected 
+                            if (event.getState() == Event.KeeperState.SyncConnected
                                 && event.getType() == Event.EventType.None) {
                                 connectedLatch.countDown();
                             } else if (event.getType() == Event.EventType.NodeCreated
@@ -229,22 +222,29 @@
                                         ledgers.add(child);
                                     }
                                 }
-                                Collections.sort(ledgers, ZK_LEDGER_COMPARE);
-                                String last = ledgers.get(ledgers.size() - 1);
-                                final Matcher m = LEDGER_PATTERN.matcher(last);
-                                if (m.find()) {
-                                    int ledgersLeft = numLedgers.decrementAndGet();
-                                    Thread t = new Thread() {
-                                            public void run() {
-                                                readLedger(conf, Long.valueOf(m.group(1)), passwd);
+                                for (String ledger : ledgers) {
+                                    synchronized (processedLedgers) {
+                                        if (processedLedgers.contains(ledger)) {
+                                            continue;
+                                        }
+                                        final Matcher m = LEDGER_PATTERN.matcher(ledger);
+                                        if (m.find()) {
+                                            int ledgersLeft = numLedgers.decrementAndGet();
+                                            final Long ledgerId = Long.valueOf(m.group(1));
+                                            processedLedgers.add(ledger);
+                                            Thread t = new Thread() {
+                                                public void run() {
+                                                    readLedger(conf, ledgerId, passwd);
+                                                }
+                                            };
+                                            t.start();
+                                            if (ledgersLeft <= 0) {
+                                                shutdownLatch.countDown();
                                             }
-                                        };
-                                    t.start();
-                                    if (ledgersLeft <= 0) {
-                                        shutdownLatch.countDown();
+                                        } else {
+                                            LOG.error("Cant file ledger id in {}", ledger);
+                                        }
                                     }
-                                } else {
-                                    LOG.error("Cant file ledger id in {}", last);
                                 }
                             } else {
                                 LOG.warn("Unknown event {}", event);
diff --git a/bookkeeper-benchmark/src/test/java/org/apache/bookkeeper/benchmark/TestBenchmark.java b/bookkeeper-benchmark/src/test/java/org/apache/bookkeeper/benchmark/TestBenchmark.java
index ec3cd61..f5108ec 100644
--- a/bookkeeper-benchmark/src/test/java/org/apache/bookkeeper/benchmark/TestBenchmark.java
+++ b/bookkeeper-benchmark/src/test/java/org/apache/bookkeeper/benchmark/TestBenchmark.java
@@ -19,29 +19,16 @@
  */
 package org.apache.bookkeeper.benchmark;
 
-import org.junit.BeforeClass;
-import org.junit.AfterClass;
-import org.junit.Test;
-import org.junit.Assert;
-
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
-import org.apache.bookkeeper.util.LocalBookKeeper;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.Watcher.Event.EventType;
-import org.apache.zookeeper.Watcher.Event.KeeperState;
-import org.apache.zookeeper.ZooKeeper;
+import org.junit.Test;
+import org.junit.Assert;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 public class TestBenchmark extends BookKeeperClusterTestCase {
@@ -114,7 +101,7 @@
             if (!t.isAlive()) {
                 break;
             }
-            Thread.sleep(1000); // wait for 10 seconds for reading to finish
+            Thread.sleep(100);
         }
 
         Assert.assertFalse("Thread should be finished", t.isAlive());
@@ -122,40 +109,5 @@
         BenchReadThroughputLatency.main(new String[] {
                 "--zookeeper", zkUtil.getZooKeeperConnectString(),
                 "--ledger", String.valueOf(lastLedgerId)});
-
-        final long nextLedgerId = lastLedgerId+1;
-        t = new Thread() {
-                public void run() {
-                    try {
-                        BenchReadThroughputLatency.main(new String[] {
-                                "--zookeeper", zkUtil.getZooKeeperConnectString(),
-                                "--ledger", String.valueOf(nextLedgerId)});
-                    } catch (Throwable t) {
-                        LOG.error("Error reading", t);
-                        threwException.set(true);
-                    }
-                }
-            };
-        t.start();
-
-        Assert.assertTrue("Thread should be running", t.isAlive());
-        BookKeeper bk = new BookKeeper(zkUtil.getZooKeeperConnectString());
-        LedgerHandle lh = bk.createLedger(BookKeeper.DigestType.CRC32, "benchPasswd".getBytes());
-        try {
-            for (int j = 0; j < 100; j++) {
-                lh.addEntry(data);
-            }
-        } finally {
-            lh.close();
-            bk.close();
-        }
-        for (int i = 0; i < 60; i++) {
-            if (!t.isAlive()) {
-                break;
-            }
-            Thread.sleep(1000); // wait for 10 seconds for reading to finish
-        }
-        Assert.assertFalse("Thread should be finished", t.isAlive());
-        Assert.assertFalse("A thread has thrown an exception, check logs", threwException.get());
     }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java
index 6bd3216..3172247 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java
@@ -88,6 +88,11 @@
     }
 
     @Override
+    protected boolean isSpecialZnode(String znode) {
+        return znode.startsWith(ZkLedgerIdGenerator.LEDGER_ID_GEN_PREFIX) || super.isSpecialZnode(znode);
+    }
+
+    @Override
     public LedgerRangeIterator getLedgerRanges() {
         return new LedgerRangeIterator() {
             // single iterator, can visit only one time
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerIdGenerator.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerIdGenerator.java
index a6c5b7b..b54c891 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerIdGenerator.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerIdGenerator.java
@@ -42,6 +42,8 @@
 public class ZkLedgerIdGenerator implements LedgerIdGenerator {
     static final Logger LOG = LoggerFactory.getLogger(ZkLedgerIdGenerator.class);
 
+    static final String LEDGER_ID_GEN_PREFIX = "ID-";
+
     final ZooKeeper zk;
     final String ledgerIdGenPath;
     final String ledgerPrefix;
@@ -55,7 +57,7 @@
         } else {
             this.ledgerIdGenPath = ledgersPath + "/" + idGenZnodeName;
         }
-        this.ledgerPrefix = this.ledgerIdGenPath + "/ID-";
+        this.ledgerPrefix = this.ledgerIdGenPath + "/" + LEDGER_ID_GEN_PREFIX;
     }
 
     @Override