BOOKKEEPER-883: Test timeout in bookkeeper-benchmark
Problem:
The BenchReadThroughputLatency is tight with FlatLedgerManager. so lots of assumptions are made based on how the znodes are changed when ledgers are created. There was a change introduced LedgerIdGenerator, which broke the assumptions that made by BenchReadThroughputLatency.
Fix:
- Use a hashset to cache processed ledgers on reacting on children changes
- Remove unpredictable test on next ledger
- Fix an error logging on FlatLedgerManager processing ledgers
Author: Sijie Guo <sijie@apache.org>
Reviewers: Matteo Merli <mmerli@apache.org>
Closes #10 from sijie/BOOKKEEPER-883
(cherry picked from commit 8729d12be50295086e7440dfa5d0256abb7688d5)
Signed-off-by: Sijie Guo <sijie@apache.org>
diff --git a/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchReadThroughputLatency.java b/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchReadThroughputLatency.java
index d5baaa4..1cdd564 100644
--- a/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchReadThroughputLatency.java
+++ b/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchReadThroughputLatency.java
@@ -19,42 +19,34 @@
*/
package org.apache.bookkeeper.benchmark;
-import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.client.LedgerEntry;
-import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
-
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher.Event;
-
-import java.util.Enumeration;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.ArrayList;
-import java.util.regex.Pattern;
-import java.util.regex.Matcher;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicInteger;
-
import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.PosixParser;
-import org.apache.commons.cli.ParseException;
-
-import static com.google.common.base.Charsets.UTF_8;
-
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.WatchedEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Enumeration;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Set;
+import java.util.regex.Pattern;
+import java.util.regex.Matcher;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static com.google.common.base.Charsets.UTF_8;
+
public class BenchReadThroughputLatency {
static final Logger LOG = LoggerFactory.getLogger(BenchReadThroughputLatency.class);
@@ -90,7 +82,7 @@
try {
bk = new BookKeeper(conf);
while (true) {
- lh = bk.openLedgerNoRecovery(ledgerId, BookKeeper.DigestType.CRC32,
+ lh = bk.openLedgerNoRecovery(ledgerId, BookKeeper.DigestType.CRC32,
passwd);
long lastConfirmed = Math.min(lh.getLastAddConfirmed(), absoluteLimit);
if (lastConfirmed == lastRead) {
@@ -154,7 +146,7 @@
@SuppressWarnings("deprecation")
public static void main(String[] args) throws Exception {
Options options = new Options();
- options.addOption("ledger", true, "Ledger to read. If empty, read all ledgers which come available. "
+ options.addOption("ledger", true, "Ledger to read. If empty, read all ledgers which come available. "
+ " Cannot be used with -listen");
options.addOption("listen", true, "Listen for creation of <arg> ledgers, and read each one fully");
options.addOption("password", true, "Password used to access ledgers (default 'benchPasswd')");
@@ -207,11 +199,12 @@
}
}
});
+ final Set<String> processedLedgers = new HashSet<String>();
try {
zk.register(new Watcher() {
public void process(WatchedEvent event) {
try {
- if (event.getState() == Event.KeeperState.SyncConnected
+ if (event.getState() == Event.KeeperState.SyncConnected
&& event.getType() == Event.EventType.None) {
connectedLatch.countDown();
} else if (event.getType() == Event.EventType.NodeCreated
@@ -229,22 +222,29 @@
ledgers.add(child);
}
}
- Collections.sort(ledgers, ZK_LEDGER_COMPARE);
- String last = ledgers.get(ledgers.size() - 1);
- final Matcher m = LEDGER_PATTERN.matcher(last);
- if (m.find()) {
- int ledgersLeft = numLedgers.decrementAndGet();
- Thread t = new Thread() {
- public void run() {
- readLedger(conf, Long.valueOf(m.group(1)), passwd);
+ for (String ledger : ledgers) {
+ synchronized (processedLedgers) {
+ if (processedLedgers.contains(ledger)) {
+ continue;
+ }
+ final Matcher m = LEDGER_PATTERN.matcher(ledger);
+ if (m.find()) {
+ int ledgersLeft = numLedgers.decrementAndGet();
+ final Long ledgerId = Long.valueOf(m.group(1));
+ processedLedgers.add(ledger);
+ Thread t = new Thread() {
+ public void run() {
+ readLedger(conf, ledgerId, passwd);
+ }
+ };
+ t.start();
+ if (ledgersLeft <= 0) {
+ shutdownLatch.countDown();
}
- };
- t.start();
- if (ledgersLeft <= 0) {
- shutdownLatch.countDown();
+ } else {
+ LOG.error("Cant file ledger id in {}", ledger);
+ }
}
- } else {
- LOG.error("Cant file ledger id in {}", last);
}
} else {
LOG.warn("Unknown event {}", event);
diff --git a/bookkeeper-benchmark/src/test/java/org/apache/bookkeeper/benchmark/TestBenchmark.java b/bookkeeper-benchmark/src/test/java/org/apache/bookkeeper/benchmark/TestBenchmark.java
index ec3cd61..f5108ec 100644
--- a/bookkeeper-benchmark/src/test/java/org/apache/bookkeeper/benchmark/TestBenchmark.java
+++ b/bookkeeper-benchmark/src/test/java/org/apache/bookkeeper/benchmark/TestBenchmark.java
@@ -19,29 +19,16 @@
*/
package org.apache.bookkeeper.benchmark;
-import org.junit.BeforeClass;
-import org.junit.AfterClass;
-import org.junit.Test;
-import org.junit.Assert;
-
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
-import org.apache.bookkeeper.util.LocalBookKeeper;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.Watcher.Event.EventType;
-import org.apache.zookeeper.Watcher.Event.KeeperState;
-import org.apache.zookeeper.ZooKeeper;
+import org.junit.Test;
+import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
public class TestBenchmark extends BookKeeperClusterTestCase {
@@ -114,7 +101,7 @@
if (!t.isAlive()) {
break;
}
- Thread.sleep(1000); // wait for 10 seconds for reading to finish
+ Thread.sleep(100);
}
Assert.assertFalse("Thread should be finished", t.isAlive());
@@ -122,40 +109,5 @@
BenchReadThroughputLatency.main(new String[] {
"--zookeeper", zkUtil.getZooKeeperConnectString(),
"--ledger", String.valueOf(lastLedgerId)});
-
- final long nextLedgerId = lastLedgerId+1;
- t = new Thread() {
- public void run() {
- try {
- BenchReadThroughputLatency.main(new String[] {
- "--zookeeper", zkUtil.getZooKeeperConnectString(),
- "--ledger", String.valueOf(nextLedgerId)});
- } catch (Throwable t) {
- LOG.error("Error reading", t);
- threwException.set(true);
- }
- }
- };
- t.start();
-
- Assert.assertTrue("Thread should be running", t.isAlive());
- BookKeeper bk = new BookKeeper(zkUtil.getZooKeeperConnectString());
- LedgerHandle lh = bk.createLedger(BookKeeper.DigestType.CRC32, "benchPasswd".getBytes());
- try {
- for (int j = 0; j < 100; j++) {
- lh.addEntry(data);
- }
- } finally {
- lh.close();
- bk.close();
- }
- for (int i = 0; i < 60; i++) {
- if (!t.isAlive()) {
- break;
- }
- Thread.sleep(1000); // wait for 10 seconds for reading to finish
- }
- Assert.assertFalse("Thread should be finished", t.isAlive());
- Assert.assertFalse("A thread has thrown an exception, check logs", threwException.get());
}
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java
index 6bd3216..3172247 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java
@@ -88,6 +88,11 @@
}
@Override
+ protected boolean isSpecialZnode(String znode) {
+ return znode.startsWith(ZkLedgerIdGenerator.LEDGER_ID_GEN_PREFIX) || super.isSpecialZnode(znode);
+ }
+
+ @Override
public LedgerRangeIterator getLedgerRanges() {
return new LedgerRangeIterator() {
// single iterator, can visit only one time
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerIdGenerator.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerIdGenerator.java
index a6c5b7b..b54c891 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerIdGenerator.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerIdGenerator.java
@@ -42,6 +42,8 @@
public class ZkLedgerIdGenerator implements LedgerIdGenerator {
static final Logger LOG = LoggerFactory.getLogger(ZkLedgerIdGenerator.class);
+ static final String LEDGER_ID_GEN_PREFIX = "ID-";
+
final ZooKeeper zk;
final String ledgerIdGenPath;
final String ledgerPrefix;
@@ -55,7 +57,7 @@
} else {
this.ledgerIdGenPath = ledgersPath + "/" + idGenZnodeName;
}
- this.ledgerPrefix = this.ledgerIdGenPath + "/ID-";
+ this.ledgerPrefix = this.ledgerIdGenPath + "/" + LEDGER_ID_GEN_PREFIX;
}
@Override