DL-2: DistributedLog should work with the official apache bookkeeper

This change is to upgrade bookkeeper version to BK 4.5.0.

- upgrade bookkeeper version to 4.5.0-SNAPSHOT (still waiting a few pull requests to merge apache/bookkeeper#297 apache/bookkeeper#287
  - change registerSuccessEvent for StatsLogger to add TimeUnit
  - use netty4 eventloop
  - move twitter repository dependencies to proxy related module only. core library will not depend on scala dependency anymore.

This change is a collaboration change with sijie

We will provide a performance comparison between 0.4.0 (using Twitter BK) and 0.5.0 (using BK 4.5.) in a separate pull request or email.

Author: Jia Zhai <zhaijia03@gmail.com>
Author: Jia Zhai <zhaijia@apache.org>
Author: Sijie Guo <sijie@apache.org>

Reviewers: Leigh Stewart <lstewart@apache.org>

This closes #135 from zhaijack/bump_dl_version
diff --git a/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/DLWriterWorker.java b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/DLWriterWorker.java
index 5c9b2a9..015cf46 100644
--- a/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/DLWriterWorker.java
+++ b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/DLWriterWorker.java
@@ -228,12 +228,14 @@
                 writer.write(new LogRecord(requestMillis, data)).whenComplete(new FutureEventListener<DLSN>() {
                     @Override
                     public void onSuccess(DLSN value) {
-                        requestStat.registerSuccessfulEvent(System.currentTimeMillis() - requestMillis);
+                        requestStat.registerSuccessfulEvent(
+                          System.currentTimeMillis() - requestMillis, TimeUnit.MILLISECONDS);
                     }
 
                     @Override
                     public void onFailure(Throwable cause) {
-                        requestStat.registerFailedEvent(System.currentTimeMillis() - requestMillis);
+                        requestStat.registerFailedEvent(
+                          System.currentTimeMillis() - requestMillis, TimeUnit.MILLISECONDS);
                         LOG.error("Failed to publish, rescue it : ", cause);
                         scheduleRescue(streamIdx, writer, 0);
                     }
diff --git a/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/ReaderWorker.java b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/ReaderWorker.java
index ad95a59..b81dad4 100644
--- a/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/ReaderWorker.java
+++ b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/ReaderWorker.java
@@ -156,14 +156,14 @@
             long e2eLatency = curTimeMillis - msg.getPublishTime();
             long deliveryLatency = curTimeMillis - record.getTransactionId();
             if (e2eLatency >= 0) {
-                e2eStat.registerSuccessfulEvent(e2eLatency);
+                e2eStat.registerSuccessfulEvent(e2eLatency, TimeUnit.MILLISECONDS);
             } else {
-                negativeE2EStat.registerSuccessfulEvent(-e2eLatency);
+                negativeE2EStat.registerSuccessfulEvent(-e2eLatency, TimeUnit.MILLISECONDS);
             }
             if (deliveryLatency >= 0) {
-                deliveryStat.registerSuccessfulEvent(deliveryLatency);
+                deliveryStat.registerSuccessfulEvent(deliveryLatency, TimeUnit.MILLISECONDS);
             } else {
-                negativeDeliveryStat.registerSuccessfulEvent(-deliveryLatency);
+                negativeDeliveryStat.registerSuccessfulEvent(-deliveryLatency, TimeUnit.MILLISECONDS);
             }
 
             prevDLSN = record.getDlsn();
@@ -200,12 +200,14 @@
                     new FutureEventListener<Boolean>() {
                         @Override
                         public void onSuccess(Boolean value) {
-                            truncationStat.registerSuccessfulEvent(stopwatch.stop().elapsed(TimeUnit.MILLISECONDS));
+                            truncationStat.registerSuccessfulEvent(
+                              stopwatch.stop().elapsed(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
                         }
 
                         @Override
                         public void onFailure(Throwable cause) {
-                            truncationStat.registerFailedEvent(stopwatch.stop().elapsed(TimeUnit.MILLISECONDS));
+                            truncationStat.registerFailedEvent(
+                              stopwatch.stop().elapsed(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
                             LOG.error("Failed to truncate stream {} to {} : ",
                                     new Object[]{streamName, dlsnToTruncate, cause});
                         }
diff --git a/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/WriterWorker.java b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/WriterWorker.java
index 46f9dfc..9879e71 100644
--- a/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/WriterWorker.java
+++ b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/WriterWorker.java
@@ -272,10 +272,12 @@
         @Override
         public void run() {
             if (null != dlsn) {
-                requestStat.registerSuccessfulEvent(System.currentTimeMillis() - requestMillis);
+                requestStat.registerSuccessfulEvent(
+                  System.currentTimeMillis() - requestMillis, TimeUnit.MILLISECONDS);
             } else {
                 LOG.error("Failed to publish to {} : ", streamName, cause);
-                requestStat.registerFailedEvent(System.currentTimeMillis() - requestMillis);
+                requestStat.registerFailedEvent(
+                  System.currentTimeMillis() - requestMillis, TimeUnit.MILLISECONDS);
                 exceptionsLogger.getCounter(cause.getClass().getName()).inc();
                 if (cause instanceof DLException) {
                     DLException dle = (DLException) cause;
diff --git a/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/AsyncReaderBenchmark.java b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/AsyncReaderBenchmark.java
index 4c8e372..a2dbdd7 100644
--- a/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/AsyncReaderBenchmark.java
+++ b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/AsyncReaderBenchmark.java
@@ -109,11 +109,12 @@
                     reader = FutureUtils.result(dlm.openAsyncLogReader(lastDLSN));
                 }
                 long elapsedMs = stopwatch.elapsed(TimeUnit.MICROSECONDS);
-                openReaderStats.registerSuccessfulEvent(elapsedMs);
+                openReaderStats.registerSuccessfulEvent(elapsedMs, TimeUnit.MICROSECONDS);
                 logger.info("It took {} ms to position the reader to transaction id = {}, dlsn = {}",
                         lastTxId, lastDLSN);
             } catch (Exception ioe) {
-                openReaderStats.registerFailedEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS));
+                openReaderStats.registerFailedEvent(
+                    stopwatch.elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
                 logger.warn("Failed to create reader for stream {} reading from tx id = {}, dlsn = {}.",
                         new Object[] { streamName, lastTxId, lastDLSN });
             }
@@ -133,7 +134,7 @@
                     stopwatch.start();
                     records = FutureUtils.result(reader.readBulk(batchSize));
                     long elapsedMicros = stopwatch.stop().elapsed(TimeUnit.MICROSECONDS);
-                    blockingReadStats.registerSuccessfulEvent(elapsedMicros);
+                    blockingReadStats.registerSuccessfulEvent(elapsedMicros, TimeUnit.MICROSECONDS);
                     if (!records.isEmpty()) {
                         readCounter.add(records.size());
                         LogRecordWithDLSN lastRecord = records.get(records.size() - 1);
diff --git a/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/SyncReaderBenchmark.java b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/SyncReaderBenchmark.java
index cbd7f67..3e8c8bb 100644
--- a/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/SyncReaderBenchmark.java
+++ b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/SyncReaderBenchmark.java
@@ -98,10 +98,11 @@
             try {
                 reader = dlm.getInputStream(lastTxId);
                 long elapsedMs = stopwatch.elapsed(TimeUnit.MICROSECONDS);
-                openReaderStats.registerSuccessfulEvent(elapsedMs);
+                openReaderStats.registerSuccessfulEvent(elapsedMs, TimeUnit.MICROSECONDS);
                 logger.info("It took {} ms to position the reader to transaction id {}", lastTxId);
             } catch (IOException ioe) {
-                openReaderStats.registerFailedEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS));
+                openReaderStats.registerFailedEvent(
+                    stopwatch.elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
                 logger.warn("Failed to create reader for stream {} reading from {}.", streamName, lastTxId);
             }
             if (null == reader) {
@@ -129,11 +130,11 @@
                     if (null != record) {
                         long elapsedMicros = stopwatch.stop().elapsed(TimeUnit.MICROSECONDS);
                         if (nonBlocking) {
-                            nonBlockingReadStats.registerSuccessfulEvent(elapsedMicros);
+                            nonBlockingReadStats.registerSuccessfulEvent(elapsedMicros, TimeUnit.MICROSECONDS);
                         } else {
                             numCatchupBytes += record.getPayload().length;
                             ++numCatchupReads;
-                            blockingReadStats.registerSuccessfulEvent(elapsedMicros);
+                            blockingReadStats.registerSuccessfulEvent(elapsedMicros, TimeUnit.MICROSECONDS);
                         }
                         lastTxId = record.getTransactionId();
                     } else {
diff --git a/distributedlog-common/src/main/java/org/apache/distributedlog/common/stats/BroadCastStatsLogger.java b/distributedlog-common/src/main/java/org/apache/distributedlog/common/stats/BroadCastStatsLogger.java
index 61a20f1..6ae269b 100644
--- a/distributedlog-common/src/main/java/org/apache/distributedlog/common/stats/BroadCastStatsLogger.java
+++ b/distributedlog-common/src/main/java/org/apache/distributedlog/common/stats/BroadCastStatsLogger.java
@@ -19,6 +19,7 @@
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
+import java.util.concurrent.TimeUnit;
 import org.apache.bookkeeper.stats.CachingStatsLogger;
 import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.stats.Gauge;
@@ -63,16 +64,29 @@
             final OpStatsLogger firstLogger = first.getOpStatsLogger(statName);
             final OpStatsLogger secondLogger = second.getOpStatsLogger(statName);
             return new OpStatsLogger() {
+
                 @Override
-                public void registerFailedEvent(long l) {
-                    firstLogger.registerFailedEvent(l);
-                    secondLogger.registerFailedEvent(l);
+                public void registerFailedEvent(long l, TimeUnit timeUnit) {
+                    firstLogger.registerFailedEvent(l, timeUnit);
+                    secondLogger.registerFailedEvent(l, timeUnit);
                 }
 
                 @Override
-                public void registerSuccessfulEvent(long l) {
-                    firstLogger.registerSuccessfulEvent(l);
-                    secondLogger.registerSuccessfulEvent(l);
+                public void registerSuccessfulEvent(long l, TimeUnit timeUnit) {
+                    firstLogger.registerSuccessfulEvent(l, timeUnit);
+                    secondLogger.registerSuccessfulEvent(l, timeUnit);
+                }
+
+                @Override
+                public void registerSuccessfulValue(long l) {
+                    firstLogger.registerSuccessfulValue(l);
+                    secondLogger.registerSuccessfulValue(l);
+                }
+
+                @Override
+                public void registerFailedValue(long l) {
+                    firstLogger.registerFailedValue(l);
+                    secondLogger.registerFailedValue(l);
                 }
 
                 @Override
diff --git a/distributedlog-common/src/main/java/org/apache/distributedlog/common/stats/OpStatsListener.java b/distributedlog-common/src/main/java/org/apache/distributedlog/common/stats/OpStatsListener.java
index e71a799..4145b39 100644
--- a/distributedlog-common/src/main/java/org/apache/distributedlog/common/stats/OpStatsListener.java
+++ b/distributedlog-common/src/main/java/org/apache/distributedlog/common/stats/OpStatsListener.java
@@ -44,11 +44,11 @@
 
     @Override
     public void onSuccess(T value) {
-        opStatsLogger.registerSuccessfulEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS));
+        opStatsLogger.registerSuccessfulEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
     }
 
     @Override
     public void onFailure(Throwable cause) {
-        opStatsLogger.registerFailedEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS));
+        opStatsLogger.registerFailedEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
     }
 }
diff --git a/distributedlog-common/src/test/java/org/apache/distributedlog/common/concurrent/TestFutureUtils.java b/distributedlog-common/src/test/java/org/apache/distributedlog/common/concurrent/TestFutureUtils.java
index ddfb7ae..a887c59 100644
--- a/distributedlog-common/src/test/java/org/apache/distributedlog/common/concurrent/TestFutureUtils.java
+++ b/distributedlog-common/src/test/java/org/apache/distributedlog/common/concurrent/TestFutureUtils.java
@@ -312,7 +312,8 @@
             Stopwatch.createStarted());
         underlyFuture.complete(1234L);
         FutureUtils.result(statsFuture);
-        verify(statsLogger, times(1)).registerSuccessfulEvent(anyLong());
+        verify(statsLogger, times(1))
+            .registerSuccessfulEvent(anyLong(), eq(TimeUnit.MICROSECONDS));
     }
 
     @Test
@@ -325,7 +326,8 @@
             Stopwatch.createStarted());
         underlyFuture.completeExceptionally(new TestException());
         FutureUtils.result(FutureUtils.ignore(statsFuture));
-        verify(statsLogger, times(1)).registerFailedEvent(anyLong());
+        verify(statsLogger, times(1))
+            .registerFailedEvent(anyLong(), eq(TimeUnit.MICROSECONDS));
     }
 
     @Test
diff --git a/distributedlog-core-twitter/pom.xml b/distributedlog-core-twitter/pom.xml
index bec9b03..9cf11be 100644
--- a/distributedlog-core-twitter/pom.xml
+++ b/distributedlog-core-twitter/pom.xml
@@ -138,4 +138,11 @@
       </plugin>
     </plugins>
   </build>
+  <repositories>
+    <repository>
+      <id>twitter-repo</id>
+      <name>Twitter Maven Repo</name>
+      <url>http://maven.twttr.com</url>
+    </repository>
+  </repositories>
 </project>
diff --git a/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/stats/OpStatsListener.java b/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/stats/OpStatsListener.java
index a9c7b21..67911f7 100644
--- a/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/stats/OpStatsListener.java
+++ b/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/stats/OpStatsListener.java
@@ -45,11 +45,13 @@
 
     @Override
     public void onSuccess(T value) {
-        opStatsLogger.registerSuccessfulEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS));
+        opStatsLogger.registerSuccessfulEvent(
+            stopwatch.elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
     }
 
     @Override
     public void onFailure(Throwable cause) {
-        opStatsLogger.registerFailedEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS));
+        opStatsLogger.registerFailedEvent(
+            stopwatch.elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
     }
 }
diff --git a/distributedlog-core/src/main/java/org/apache/bookkeeper/client/LedgerReader.java b/distributedlog-core/src/main/java/org/apache/bookkeeper/client/LedgerReader.java
index 39a0f77..dc9d09d 100644
--- a/distributedlog-core/src/main/java/org/apache/bookkeeper/client/LedgerReader.java
+++ b/distributedlog-core/src/main/java/org/apache/bookkeeper/client/LedgerReader.java
@@ -17,16 +17,9 @@
  */
 package org.apache.bookkeeper.client;
 
-import org.apache.bookkeeper.net.BookieSocketAddress;
-import org.apache.bookkeeper.proto.BookieClient;
-import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
-import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
-import org.apache.bookkeeper.proto.BookkeeperProtocol;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBufferInputStream;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufInputStream;
+import io.netty.buffer.Unpooled;
 import java.io.InputStream;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
@@ -36,6 +29,12 @@
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.BookieClient;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Reader used for DL tools to read entries
@@ -93,17 +92,21 @@
         final Set<ReadResult<InputStream>> readResults = new HashSet<ReadResult<InputStream>>();
         ReadEntryCallback readEntryCallback = new ReadEntryCallback() {
             @Override
-            public void readEntryComplete(int rc, long lid, long eid, ChannelBuffer buffer, Object ctx) {
+            public void readEntryComplete(int rc, long lid, long eid, ByteBuf buffer, Object ctx) {
                 BookieSocketAddress bookieAddress = (BookieSocketAddress) ctx;
                 ReadResult<InputStream> rr;
                 if (BKException.Code.OK != rc) {
                     rr = new ReadResult<InputStream>(eid, rc, null, bookieAddress.getSocketAddress());
                 } else {
+                    ByteBuf content;
                     try {
-                        ChannelBufferInputStream is = lh.macManager.verifyDigestAndReturnData(eid, buffer);
-                        rr = new ReadResult<InputStream>(eid, BKException.Code.OK, is, bookieAddress.getSocketAddress());
+                        content = lh.macManager.verifyDigestAndReturnData(eid, buffer);
+                        ByteBuf toRet = Unpooled.copiedBuffer(content);
+                        rr = new ReadResult<InputStream>(eid, BKException.Code.OK, new ByteBufInputStream(toRet), bookieAddress.getSocketAddress());
                     } catch (BKException.BKDigestMatchException e) {
                         rr = new ReadResult<InputStream>(eid, BKException.Code.DigestMatchException, null, bookieAddress.getSocketAddress());
+                    } finally {
+                        buffer.release();
                     }
                 }
                 readResults.add(rr);
@@ -184,7 +187,7 @@
         final Set<ReadResult<Long>> readResults = new HashSet<ReadResult<Long>>();
         ReadEntryCallback readEntryCallback = new ReadEntryCallback() {
             @Override
-            public void readEntryComplete(int rc, long lid, long eid, ChannelBuffer buffer, Object ctx) {
+            public void readEntryComplete(int rc, long lid, long eid, ByteBuf buffer, Object ctx) {
                 InetSocketAddress bookieAddress = (InetSocketAddress) ctx;
                 ReadResult<Long> rr;
                 if (BKException.Code.OK != rc) {
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/BKAsyncLogReader.java b/distributedlog-core/src/main/java/org/apache/distributedlog/BKAsyncLogReader.java
index 26a4a76..5be9e19 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/BKAsyncLogReader.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/BKAsyncLogReader.java
@@ -163,8 +163,10 @@
         void completeExceptionally(Throwable throwable) {
             Stopwatch stopwatch = Stopwatch.createStarted();
             if (promise.completeExceptionally(throwable)) {
-                futureSetLatency.registerFailedEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
-                delayUntilPromiseSatisfied.registerFailedEvent(enqueueTime.elapsed(TimeUnit.MICROSECONDS));
+                futureSetLatency.registerFailedEvent(
+                    stopwatch.stop().elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
+                delayUntilPromiseSatisfied.registerFailedEvent(
+                    enqueueTime.elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
             }
         }
 
@@ -191,10 +193,12 @@
             if (LOG.isTraceEnabled()) {
                 LOG.trace("{} : Satisfied promise with {} records", readHandler.getFullyQualifiedName(), records.size());
             }
-            delayUntilPromiseSatisfied.registerSuccessfulEvent(enqueueTime.stop().elapsed(TimeUnit.MICROSECONDS));
+            delayUntilPromiseSatisfied.registerSuccessfulEvent(
+                enqueueTime.stop().elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
             Stopwatch stopwatch = Stopwatch.createStarted();
             promise.complete(records);
-            futureSetLatency.registerSuccessfulEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
+            futureSetLatency.registerSuccessfulEvent(
+                stopwatch.stop().elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
         }
     }
 
@@ -396,7 +400,8 @@
     private synchronized CompletableFuture<List<LogRecordWithDLSN>> readInternal(int numEntries,
                                                                       long deadlineTime,
                                                                       TimeUnit deadlineTimeUnit) {
-        timeBetweenReadNexts.registerSuccessfulEvent(readNextDelayStopwatch.elapsed(TimeUnit.MICROSECONDS));
+        timeBetweenReadNexts.registerSuccessfulEvent(
+            readNextDelayStopwatch.elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
         readNextDelayStopwatch.reset().start();
         final PendingReadRequest readRequest = new PendingReadRequest(numEntries, deadlineTime, deadlineTimeUnit);
 
@@ -443,7 +448,8 @@
             }
         }
 
-        readNextExecTime.registerSuccessfulEvent(readNextDelayStopwatch.elapsed(TimeUnit.MICROSECONDS));
+        readNextExecTime.registerSuccessfulEvent(
+            readNextDelayStopwatch.elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
         readNextDelayStopwatch.reset().start();
 
         return readRequest.getPromise();
@@ -553,7 +559,8 @@
     public void run() {
         synchronized(scheduleLock) {
             if (scheduleDelayStopwatch.isRunning()) {
-                scheduleLatency.registerSuccessfulEvent(scheduleDelayStopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
+                scheduleLatency.registerSuccessfulEvent(
+                    scheduleDelayStopwatch.stop().elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
             }
 
             Stopwatch runTime = Stopwatch.createStarted();
@@ -573,7 +580,8 @@
                     if (null == nextRequest) {
                         LOG.trace("{}: Queue Empty waiting for Input", readHandler.getFullyQualifiedName());
                         scheduleCount.set(0);
-                        backgroundReaderRunTime.registerSuccessfulEvent(runTime.stop().elapsed(TimeUnit.MICROSECONDS));
+                        backgroundReaderRunTime.registerSuccessfulEvent(
+                            runTime.stop().elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
                         return;
                     }
 
@@ -599,7 +607,8 @@
                     if (!(lastException.get().getCause() instanceof LogNotFoundException)) {
                         LOG.warn("{}: Exception", readHandler.getFullyQualifiedName(), lastException.get());
                     }
-                    backgroundReaderRunTime.registerFailedEvent(runTime.stop().elapsed(TimeUnit.MICROSECONDS));
+                    backgroundReaderRunTime.registerFailedEvent(
+                        runTime.stop().elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
                     return;
                 }
 
@@ -646,7 +655,8 @@
                 if (nextRequest.hasReadRecords()) {
                     long remainingWaitTime = nextRequest.getRemainingWaitTime();
                     if (remainingWaitTime > 0 && !nextRequest.hasReadEnoughRecords()) {
-                        backgroundReaderRunTime.registerSuccessfulEvent(runTime.stop().elapsed(TimeUnit.MICROSECONDS));
+                        backgroundReaderRunTime.registerSuccessfulEvent(
+                            runTime.stop().elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
                         scheduleDelayStopwatch.reset().start();
                         scheduleCount.set(0);
                         // the request could still wait for more records
@@ -681,7 +691,8 @@
                 } else {
                     if (0 == scheduleCountLocal) {
                         LOG.trace("Schedule count dropping to zero", lastException.get());
-                        backgroundReaderRunTime.registerSuccessfulEvent(runTime.stop().elapsed(TimeUnit.MICROSECONDS));
+                        backgroundReaderRunTime.registerSuccessfulEvent(
+                            runTime.stop().elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
                         return;
                     }
                     scheduleCountLocal = scheduleCount.decrementAndGet();
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogHandler.java b/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogHandler.java
index a7d0d25..9f2e750 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogHandler.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogHandler.java
@@ -426,13 +426,16 @@
         ).whenComplete(new FutureEventListener<LogRecordWithDLSN>() {
             @Override
             public void onSuccess(LogRecordWithDLSN value) {
-                recoverLastEntryStats.registerSuccessfulEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
-                recoverScannedEntriesStats.registerSuccessfulEvent(numRecordsScanned.get());
+                recoverLastEntryStats.registerSuccessfulEvent(
+                    stopwatch.stop().elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
+                recoverScannedEntriesStats.registerSuccessfulValue(numRecordsScanned.get());
             }
 
             @Override
             public void onFailure(Throwable cause) {
-                recoverLastEntryStats.registerFailedEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
+                recoverLastEntryStats.registerFailedEvent(
+                    stopwatch.stop().elapsed(TimeUnit.MICROSECONDS),
+                    TimeUnit.MICROSECONDS);
             }
         });
     }
@@ -508,9 +511,9 @@
                         LOG.warn("{} received inprogress log segment in {} millis: {}",
                                  new Object[] { getFullyQualifiedName(), elapsedMillis, metadata });
                     }
-                    getInprogressSegmentStat.registerSuccessfulEvent(elapsedMicroSec);
+                    getInprogressSegmentStat.registerSuccessfulEvent(elapsedMicroSec, TimeUnit.MICROSECONDS);
                 } else {
-                    negativeGetInprogressSegmentStat.registerSuccessfulEvent(-elapsedMicroSec);
+                    negativeGetInprogressSegmentStat.registerSuccessfulEvent(-elapsedMicroSec, TimeUnit.MICROSECONDS);
                 }
             } else {
                 long elapsedMillis = ts - metadata.getCompletionTime();
@@ -520,9 +523,9 @@
                         LOG.warn("{} received completed log segment in {} millis : {}",
                                  new Object[] { getFullyQualifiedName(), elapsedMillis, metadata });
                     }
-                    getCompletedSegmentStat.registerSuccessfulEvent(elapsedMicroSec);
+                    getCompletedSegmentStat.registerSuccessfulEvent(elapsedMicroSec, TimeUnit.MICROSECONDS);
                 } else {
-                    negativeGetCompletedSegmentStat.registerSuccessfulEvent(-elapsedMicroSec);
+                    negativeGetCompletedSegmentStat.registerSuccessfulEvent(-elapsedMicroSec, TimeUnit.MICROSECONDS);
                 }
             }
         }
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogSegmentWriter.java b/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogSegmentWriter.java
index a4016c8..6f201a6 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogSegmentWriter.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogSegmentWriter.java
@@ -1131,8 +1131,10 @@
         final BKTransmitPacket transmitPacket = (BKTransmitPacket) ctx;
 
         // Time from transmit until receipt of addComplete callback
-        addCompleteTime.registerSuccessfulEvent(TimeUnit.MICROSECONDS.convert(
-            System.nanoTime() - transmitPacket.getTransmitTime(), TimeUnit.NANOSECONDS));
+        addCompleteTime.registerSuccessfulEvent(
+            TimeUnit.MICROSECONDS.convert(
+            System.nanoTime() - transmitPacket.getTransmitTime(), TimeUnit.NANOSECONDS),
+            TimeUnit.MICROSECONDS);
 
         if (BKException.Code.OK == rc) {
             EntryBuffer recordSet = transmitPacket.getRecordSet();
@@ -1149,9 +1151,13 @@
                 @Override
                 public Void call() {
                     final Stopwatch deferredTime = Stopwatch.createStarted();
-                    addCompleteQueuedTime.registerSuccessfulEvent(queuedTime.elapsed(TimeUnit.MICROSECONDS));
+                    addCompleteQueuedTime.registerSuccessfulEvent(
+                        queuedTime.elapsed(TimeUnit.MICROSECONDS),
+                        TimeUnit.MICROSECONDS);
                     addCompleteDeferredProcessing(transmitPacket, entryId, effectiveRC.get());
-                    addCompleteDeferredTime.registerSuccessfulEvent(deferredTime.elapsed(TimeUnit.MICROSECONDS));
+                    addCompleteDeferredTime.registerSuccessfulEvent(
+                        deferredTime.elapsed(TimeUnit.MICROSECONDS),
+                        TimeUnit.MILLISECONDS);
                     return null;
                 }
                 @Override
@@ -1199,14 +1205,16 @@
 
             if (transmitResult.get() != BKException.Code.OK) {
                 if (recordSet.hasUserRecords()) {
-                    transmitDataPacketSize.registerFailedEvent(recordSet.getNumBytes());
+                    transmitDataPacketSize.registerFailedEvent(
+                        recordSet.getNumBytes(), TimeUnit.MICROSECONDS);
                 }
             } else {
                 // If we had data that we flushed then we need it to make sure that
                 // background flush in the next pass will make the previous writes
                 // visible by advancing the lastAck
                 if (recordSet.hasUserRecords()) {
-                    transmitDataPacketSize.registerSuccessfulEvent(recordSet.getNumBytes());
+                    transmitDataPacketSize.registerSuccessfulEvent(
+                        recordSet.getNumBytes(), TimeUnit.MICROSECONDS);
                     controlFlushNeeded = true;
                     if (immediateFlushEnabled) {
                         if (0 == minDelayBetweenImmediateFlushMs) {
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogWriteHandler.java b/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogWriteHandler.java
index 1293d00..a310ea5 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogWriteHandler.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogWriteHandler.java
@@ -430,9 +430,11 @@
             return writer;
         } finally {
             if (success) {
-                openOpStats.registerSuccessfulEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
+                openOpStats.registerSuccessfulEvent(
+                    stopwatch.stop().elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
             } else {
-                openOpStats.registerFailedEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
+                openOpStats.registerFailedEvent(
+                    stopwatch.stop().elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
             }
         }
     }
@@ -743,9 +745,13 @@
             return completedLogSegment;
         } finally {
             if (success) {
-                closeOpStats.registerSuccessfulEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
+                closeOpStats.registerSuccessfulEvent(
+                    stopwatch.stop().elapsed(TimeUnit.MICROSECONDS),
+                    TimeUnit.MICROSECONDS);
             } else {
-                closeOpStats.registerFailedEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
+                closeOpStats.registerFailedEvent(
+                    stopwatch.stop().elapsed(TimeUnit.MICROSECONDS),
+                    TimeUnit.MICROSECONDS);
             }
         }
     }
@@ -1196,12 +1202,16 @@
         promise.whenComplete(new FutureEventListener<LogSegmentMetadata>() {
             @Override
             public void onSuccess(LogSegmentMetadata segment) {
-                deleteOpStats.registerSuccessfulEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
+                deleteOpStats.registerSuccessfulEvent(
+                    stopwatch.stop().elapsed(TimeUnit.MICROSECONDS),
+                    TimeUnit.MICROSECONDS);
             }
 
             @Override
             public void onFailure(Throwable cause) {
-                deleteOpStats.registerFailedEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
+                deleteOpStats.registerFailedEvent(
+                    stopwatch.stop().elapsed(TimeUnit.MICROSECONDS),
+                    TimeUnit.MICROSECONDS);
             }
         });
         entryStore.deleteLogSegment(ledgerMetadata)
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/BookKeeperClient.java b/distributedlog-core/src/main/java/org/apache/distributedlog/BookKeeperClient.java
index 2ea3b5d..a50c391 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/BookKeeperClient.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/BookKeeperClient.java
@@ -18,6 +18,9 @@
 package org.apache.distributedlog;
 
 import com.google.common.base.Optional;
+import io.netty.channel.EventLoopGroup;
+import io.netty.util.HashedWheelTimer;
+import java.util.Collections;
 import java.util.concurrent.CompletableFuture;
 import org.apache.bookkeeper.client.AsyncCallback;
 import org.apache.bookkeeper.client.BKException;
@@ -40,8 +43,6 @@
 import org.apache.distributedlog.util.ConfUtils;
 import org.apache.distributedlog.common.concurrent.FutureUtils;
 import org.apache.zookeeper.KeeperException;
-import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
-import org.jboss.netty.util.HashedWheelTimer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -66,7 +67,7 @@
     private final String zkServers;
     private final String ledgersPath;
     private final byte[] passwd;
-    private final ClientSocketChannelFactory channelFactory;
+    private final EventLoopGroup eventLoopGroup;
     private final HashedWheelTimer requestTimer;
     private final StatsLogger statsLogger;
 
@@ -80,8 +81,10 @@
 
     @SuppressWarnings("deprecation")
     private synchronized void commonInitialization(
-            DistributedLogConfiguration conf, String ledgersPath,
-            ClientSocketChannelFactory channelFactory, StatsLogger statsLogger, HashedWheelTimer requestTimer)
+            DistributedLogConfiguration conf,
+            String ledgersPath,
+            EventLoopGroup eventLoopGroup,
+            StatsLogger statsLogger, HashedWheelTimer requestTimer)
         throws IOException, InterruptedException, KeeperException {
         ClientConfiguration bkConfig = new ClientConfiguration();
         bkConfig.setAddEntryTimeout(conf.getBKClientWriteTimeout());
@@ -106,11 +109,10 @@
         final DNSToSwitchMapping dnsResolver =
                 NetUtils.getDNSResolver(dnsResolverCls, conf.getBkDNSResolverOverrides());
 
-        this.bkc = BookKeeper.newBuilder()
-            .config(bkConfig)
-            .zk(zkc.get())
-            .channelFactory(channelFactory)
-            .statsLogger(statsLogger)
+        this.bkc = BookKeeper.forConfig(bkConfig)
+            .setZookeeper(zkc.get())
+            .setEventLoopGroup(eventLoopGroup)
+            .setStatsLogger(statsLogger)
             .dnsResolver(dnsResolver)
             .requestTimer(requestTimer)
             .featureProvider(featureProvider.orNull())
@@ -122,7 +124,7 @@
                      String zkServers,
                      ZooKeeperClient zkc,
                      String ledgersPath,
-                     ClientSocketChannelFactory channelFactory,
+                     EventLoopGroup eventLoopGroup,
                      HashedWheelTimer requestTimer,
                      StatsLogger statsLogger,
                      Optional<FeatureProvider> featureProvider) {
@@ -131,7 +133,7 @@
         this.zkServers = zkServers;
         this.ledgersPath = ledgersPath;
         this.passwd = conf.getBKDigestPW().getBytes(UTF_8);
-        this.channelFactory = channelFactory;
+        this.eventLoopGroup = eventLoopGroup;
         this.requestTimer = requestTimer;
         this.statsLogger = statsLogger;
         this.featureProvider = featureProvider;
@@ -162,7 +164,7 @@
         }
 
         try {
-            commonInitialization(conf, ledgersPath, channelFactory, statsLogger, requestTimer);
+            commonInitialization(conf, ledgersPath, eventLoopGroup, statsLogger, requestTimer);
         } catch (InterruptedException e) {
             throw new DLInterruptedException("Interrupted on creating bookkeeper client " + name + " : ", e);
         } catch (KeeperException e) {
@@ -216,7 +218,7 @@
                             promise.completeExceptionally(BKException.create(rc));
                         }
                     }
-                }, null);
+                }, null, Collections.emptyMap());
         return promise;
     }
 
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/BookKeeperClientBuilder.java b/distributedlog-core/src/main/java/org/apache/distributedlog/BookKeeperClientBuilder.java
index a356f9f..1149ad5 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/BookKeeperClientBuilder.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/BookKeeperClientBuilder.java
@@ -19,14 +19,12 @@
 
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import io.netty.channel.EventLoopGroup;
+import io.netty.util.HashedWheelTimer;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
-import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
-import org.jboss.netty.util.HashedWheelTimer;
 import org.apache.bookkeeper.feature.FeatureProvider;
 
-import org.apache.bookkeeper.feature.Feature;
-
 /**
  * Builder to build bookkeeper client.
  */
@@ -55,7 +53,7 @@
     // statsLogger
     private StatsLogger statsLogger = NullStatsLogger.INSTANCE;
     // client channel factory
-    private ClientSocketChannelFactory channelFactory = null;
+    private EventLoopGroup eventLoopGroup = null;
     // request timer
     private HashedWheelTimer requestTimer = null;
     // feature provider
@@ -150,12 +148,12 @@
     /**
      * Build BookKeeper client using existing <i>channelFactory</i>.
      *
-     * @param channelFactory
-     *          Channel Factory used to build bookkeeper client.
+     * @param eventLoopGroup
+     *          event loop group used to build bookkeeper client.
      * @return bookkeeper client builder.
      */
-    public synchronized BookKeeperClientBuilder channelFactory(ClientSocketChannelFactory channelFactory) {
-        this.channelFactory = channelFactory;
+    public synchronized BookKeeperClientBuilder eventLoopGroup(EventLoopGroup eventLoopGroup) {
+        this.eventLoopGroup = eventLoopGroup;
         return this;
     }
 
@@ -204,6 +202,15 @@
 
     private BookKeeperClient buildClient() {
         validateParameters();
-        return new BookKeeperClient(dlConfig, name, zkServers, zkc, ledgersPath, channelFactory, requestTimer, statsLogger, featureProvider);
+        return new BookKeeperClient(
+            dlConfig,
+            name,
+            zkServers,
+            zkc,
+            ledgersPath,
+            eventLoopGroup,
+            requestTimer,
+            statsLogger,
+            featureProvider);
     }
 }
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/LocalDLMEmulator.java b/distributedlog-core/src/main/java/org/apache/distributedlog/LocalDLMEmulator.java
index f2c510d..bf3a491 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/LocalDLMEmulator.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/LocalDLMEmulator.java
@@ -21,7 +21,6 @@
 import org.apache.distributedlog.impl.metadata.BKDLConfig;
 import org.apache.distributedlog.metadata.DLMetadata;
 import org.apache.bookkeeper.conf.ServerConfiguration;
-import org.apache.bookkeeper.proto.BookieServer;
 import org.apache.bookkeeper.shims.zk.ZooKeeperServerShim;
 import org.apache.bookkeeper.util.IOUtils;
 import org.apache.bookkeeper.util.LocalBookKeeper;
@@ -33,14 +32,12 @@
 import org.apache.zookeeper.ZooKeeper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import java.io.File;
 import java.io.IOException;
 import java.net.BindException;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
@@ -180,36 +177,6 @@
         return uri;
     }
 
-    public BookieServer newBookie() throws Exception {
-        ServerConfiguration bookieConf = new ServerConfiguration();
-        bookieConf.setZkTimeout(zkTimeoutSec * 1000);
-        bookieConf.setBookiePort(0);
-        bookieConf.setAllowLoopback(true);
-        File tmpdir = File.createTempFile("bookie" + UUID.randomUUID() + "_",
-            "test");
-        if (!tmpdir.delete()) {
-            LOG.debug("Fail to delete tmpdir " + tmpdir);
-        }
-        if (!tmpdir.mkdir()) {
-            throw new IOException("Fail to create tmpdir " + tmpdir);
-        }
-        tmpDirs.add(tmpdir);
-
-        bookieConf.setZkServers(zkEnsemble);
-        bookieConf.setJournalDirName(tmpdir.getPath());
-        bookieConf.setLedgerDirNames(new String[]{tmpdir.getPath()});
-
-        BookieServer b = new BookieServer(bookieConf);
-        b.start();
-        for (int i = 0; i < 10 && !b.isRunning(); i++) {
-            Thread.sleep(10000);
-        }
-        if (!b.isRunning()) {
-            throw new IOException("Bookie would not start");
-        }
-        return b;
-    }
-
     /**
      * Check that a number of bookies are available
      *
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/BKNamespaceDriver.java b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/BKNamespaceDriver.java
index 21fe227..36ff437 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/BKNamespaceDriver.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/BKNamespaceDriver.java
@@ -20,6 +20,12 @@
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.util.HashedWheelTimer;
+import java.util.concurrent.ThreadFactory;
+import org.apache.commons.lang.SystemUtils;
 import org.apache.distributedlog.BookKeeperClient;
 import org.apache.distributedlog.BookKeeperClientBuilder;
 import org.apache.distributedlog.DistributedLogConfiguration;
@@ -58,9 +64,6 @@
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.common.PathUtils;
 import org.apache.zookeeper.data.Stat;
-import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
-import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
-import org.jboss.netty.util.HashedWheelTimer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -69,7 +72,6 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -130,7 +132,7 @@
     //       {@link org.apache.distributedlog.BookKeeperClient#get()}. So it is safe to
     //       keep builders and their client wrappers here, as they will be used when
     //       instantiating readers or writers.
-    private ClientSocketChannelFactory channelFactory;
+    private EventLoopGroup eventLoopGroup;
     private HashedWheelTimer requestTimer;
     private BookKeeperClientBuilder sharedWriterBKCBuilder;
     private BookKeeperClient writerBKC;
@@ -250,11 +252,22 @@
         return bkdlConfig;
     }
 
+    static EventLoopGroup getDefaultEventLoopGroup(int numThreads) {
+        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("DL-io-%s").build();
+        if (SystemUtils.IS_OS_LINUX) {
+            try {
+                return new EpollEventLoopGroup(numThreads, threadFactory);
+            } catch (Throwable t) {
+                LOG.warn("Could not use Netty Epoll event loop for bookie server:", t);
+                return new NioEventLoopGroup(numThreads, threadFactory);
+            }
+        } else {
+            return new NioEventLoopGroup(numThreads, threadFactory);
+        }
+    }
+
     private void initializeBookKeeperClients() throws IOException {
-        this.channelFactory = new NioClientSocketChannelFactory(
-                Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("DL-netty-boss-%d").build()),
-                Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("DL-netty-worker-%d").build()),
-                conf.getBKClientNumberIOThreads());
+        this.eventLoopGroup = getDefaultEventLoopGroup(conf.getBKClientNumberIOThreads());
         this.requestTimer = new HashedWheelTimer(
                 new ThreadFactoryBuilder().setNameFormat("DLFactoryTimer-%d").build(),
                 conf.getTimeoutTimerTickDurationMs(), TimeUnit.MILLISECONDS,
@@ -265,7 +278,7 @@
                 conf,
                 bkdlConfig.getBkZkServersForWriter(),
                 bkdlConfig.getBkLedgersPath(),
-                channelFactory,
+                eventLoopGroup,
                 requestTimer,
                 Optional.of(featureProvider.scope("bkc")),
                 statsLogger);
@@ -280,7 +293,7 @@
                     conf,
                     bkdlConfig.getBkZkServersForReader(),
                     bkdlConfig.getBkLedgersPath(),
-                    channelFactory,
+                    eventLoopGroup,
                     requestTimer,
                     Optional.<FeatureProvider>absent(),
                     statsLogger);
@@ -393,7 +406,7 @@
         writerZKC.close();
         readerZKC.close();
         // release bookkeeper resources
-        channelFactory.releaseExternalResources();
+        eventLoopGroup.shutdownGracefully();
         LOG.info("Release external resources used by channel factory.");
         requestTimer.stop();
         LOG.info("Stopped request timer");
@@ -582,7 +595,7 @@
                                                      DistributedLogConfiguration conf,
                                                      String zkServers,
                                                      String ledgersPath,
-                                                     ClientSocketChannelFactory channelFactory,
+                                                     EventLoopGroup eventLoopGroup,
                                                      HashedWheelTimer requestTimer,
                                                      Optional<FeatureProvider> featureProviderOptional,
                                                      StatsLogger statsLogger) {
@@ -591,7 +604,7 @@
                 .dlConfig(conf)
                 .zkServers(zkServers)
                 .ledgersPath(ledgersPath)
-                .channelFactory(channelFactory)
+                .eventLoopGroup(eventLoopGroup)
                 .requestTimer(requestTimer)
                 .featureProvider(featureProviderOptional)
                 .statsLogger(statsLogger);
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/limiter/ChainedRequestLimiter.java b/distributedlog-core/src/main/java/org/apache/distributedlog/limiter/ChainedRequestLimiter.java
index fdad69b..bbb4f7e 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/limiter/ChainedRequestLimiter.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/limiter/ChainedRequestLimiter.java
@@ -70,7 +70,7 @@
                 limiter.apply(request);
             }
         } finally {
-            applyTime.registerSuccessfulEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS));
+            applyTime.registerSuccessfulEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
         }
     }
 }
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/lock/ZKDistributedLock.java b/distributedlog-core/src/main/java/org/apache/distributedlog/lock/ZKDistributedLock.java
index 77151df..664d0bc 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/lock/ZKDistributedLock.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/lock/ZKDistributedLock.java
@@ -147,11 +147,15 @@
         promise.whenComplete(new FutureEventListener<ZKDistributedLock>() {
             @Override
             public void onSuccess(ZKDistributedLock lock) {
-                acquireStats.registerSuccessfulEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
+                acquireStats.registerSuccessfulEvent(
+                    stopwatch.stop().elapsed(TimeUnit.MICROSECONDS),
+                    TimeUnit.MICROSECONDS);
             }
             @Override
             public void onFailure(Throwable cause) {
-                acquireStats.registerFailedEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
+                acquireStats.registerFailedEvent(
+                    stopwatch.stop().elapsed(TimeUnit.MICROSECONDS),
+                    TimeUnit.MICROSECONDS);
                 // release the lock if fail to acquire
                 asyncClose();
             }
@@ -496,7 +500,9 @@
                     synchronized (ZKDistributedLock.this) {
                         lockReacquireFuture = null;
                     }
-                    reacquireStats.registerSuccessfulEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS));
+                    reacquireStats.registerSuccessfulEvent(
+                        stopwatch.elapsed(TimeUnit.MICROSECONDS),
+                        TimeUnit.MICROSECONDS);
                 }
 
                 @Override
@@ -509,7 +515,9 @@
                                     "Exception on re-acquiring lock", cause);
                         }
                     }
-                    reacquireStats.registerFailedEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS));
+                    reacquireStats.registerFailedEvent(
+                        stopwatch.elapsed(TimeUnit.MICROSECONDS),
+                        TimeUnit.MICROSECONDS);
                 }
             });
         }
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/lock/ZKSessionLock.java b/distributedlog-core/src/main/java/org/apache/distributedlog/lock/ZKSessionLock.java
index 9fdcbf1..1325751 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/lock/ZKSessionLock.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/lock/ZKSessionLock.java
@@ -838,9 +838,13 @@
             throw new LockingException(lockPath, message, ex);
         } finally {
             if (success) {
-                tryStats.registerSuccessfulEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS));
+                tryStats.registerSuccessfulEvent(
+                    stopwatch.elapsed(TimeUnit.MICROSECONDS),
+                    TimeUnit.MICROSECONDS);
             } else {
-                tryStats.registerFailedEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS));
+                tryStats.registerFailedEvent(
+                    stopwatch.elapsed(TimeUnit.MICROSECONDS),
+                    TimeUnit.MICROSECONDS);
             }
             // This can only happen for a Throwable thats not an
             // Exception, i.e. an Error
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/tools/DistributedLogTool.java b/distributedlog-core/src/main/java/org/apache/distributedlog/tools/DistributedLogTool.java
index 64229d1..88c5d0f 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/tools/DistributedLogTool.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/tools/DistributedLogTool.java
@@ -1665,7 +1665,7 @@
                 System.out.println("Skip inprogress log segment " + segment);
                 return;
             }
-            LedgerHandle lh = bkAdmin.openLedger(segment.getLogSegmentId(), true);
+            LedgerHandle lh = bkAdmin.openLedger(segment.getLogSegmentId());
             long lac = lh.getLastAddConfirmed();
             Enumeration<LedgerEntry> entries = lh.readEntries(lac, lac);
             if (!entries.hasMoreElements()) {
@@ -1727,6 +1727,7 @@
         abstract protected int runBKCmd(ZooKeeperClient zkc, BookKeeperClient bkc) throws Exception;
     }
 
+    /**
     static class RecoverCommand extends PerBKCommand {
 
         final List<Long> ledgers = new ArrayList<Long>();
@@ -2035,6 +2036,7 @@
             return "recover [options] <bookiesSrc>";
         }
     }
+    **/
 
     /**
      * Per Ledger Command, which parse common options for per ledger. e.g. ledger id.
@@ -2854,7 +2856,8 @@
         addCommand(new ListCommand());
         addCommand(new ReadLastConfirmedCommand());
         addCommand(new ReadEntriesCommand());
-        addCommand(new RecoverCommand());
+        // TODO: Fix it later, tracking by https://github.com/apache/distributedlog/issues/150
+        // addCommand(new RecoverCommand());
         addCommand(new RecoverLedgerCommand());
         addCommand(new ShowCommand());
         addCommand(new TruncateCommand());
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/util/SimplePermitLimiter.java b/distributedlog-core/src/main/java/org/apache/distributedlog/util/SimplePermitLimiter.java
index 3697b3f..35cdfa0 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/util/SimplePermitLimiter.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/util/SimplePermitLimiter.java
@@ -83,7 +83,7 @@
 
     @Override
     public boolean acquire() {
-        permitsMetric.registerSuccessfulEvent(permits.get());
+        permitsMetric.registerSuccessfulValue(permits.get());
         if (permits.incrementAndGet() <= permitsMax || isDarkmode()) {
             return true;
         } else {
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/TestDLMTestUtil.java b/distributedlog-core/src/test/java/org/apache/distributedlog/TestDLMTestUtil.java
index a4db3ad..e47d020 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/TestDLMTestUtil.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/TestDLMTestUtil.java
@@ -32,6 +32,12 @@
 public class TestDLMTestUtil {
     static final Logger LOG = LoggerFactory.getLogger(TestDLMTestUtil.class);
 
+    static {
+        // org.apache.zookeeper.test.ClientBase uses FourLetterWordMain, from 3.5.3 four letter words
+        // are disabled by default due to security reasons
+        System.setProperty("zookeeper.4lw.commands.whitelist", "*");
+    }
+
     @Rule
     public TestName testNames = new TestName();
 
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/TestDistributedLogBase.java b/distributedlog-core/src/test/java/org/apache/distributedlog/TestDistributedLogBase.java
index 5e4ba07..5b55cd0 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/TestDistributedLogBase.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/TestDistributedLogBase.java
@@ -67,6 +67,12 @@
 public class TestDistributedLogBase {
     static final Logger LOG = LoggerFactory.getLogger(TestDistributedLogBase.class);
 
+    static {
+        // org.apache.zookeeper.test.ClientBase uses FourLetterWordMain, from 3.5.3 four letter words
+        // are disabled by default due to security reasons
+        System.setProperty("zookeeper.4lw.commands.whitelist", "*");
+    }
+
     // Num worker threads should be one, since the exec service is used for the ordered
     // future pool in test cases, and setting to > 1 will therefore result in unordered
     // write ops.
diff --git a/distributedlog-core/src/test/resources/bk_server.conf b/distributedlog-core/src/test/resources/bk_server.conf
index bd5ae93..0a746d6 100644
--- a/distributedlog-core/src/test/resources/bk_server.conf
+++ b/distributedlog-core/src/test/resources/bk_server.conf
@@ -122,7 +122,7 @@
 journalFlushWhenQueueEmpty=false
 journalRemoveFromPageCache=true
 journalAdaptiveGroupWrites=true
-journalMaxGroupWaitMSec=15
+journalMaxGroupWaitMSec=2
 journalBufferedEntriesThreshold=180
 journalBufferedWritesThreshold=262144
 journalMaxGroupedEntriesToCommit=200
@@ -138,8 +138,8 @@
 fileInfoMaxIdleTime=3600
 
 # Bookie Threads Settings
-numAddWorkerThreads=24
-numJournalCallbackThreads=48
-numReadWorkerThreads=72
-numLongPollWorkerThreads=72
+numAddWorkerThreads=1
+numJournalCallbackThreads=1
+numReadWorkerThreads=4
+numLongPollWorkerThreads=4
 
diff --git a/distributedlog-protocol/src/main/java/org/apache/distributedlog/io/LZ4CompressionCodec.java b/distributedlog-protocol/src/main/java/org/apache/distributedlog/io/LZ4CompressionCodec.java
index 38d3ba3..b164fef 100644
--- a/distributedlog-protocol/src/main/java/org/apache/distributedlog/io/LZ4CompressionCodec.java
+++ b/distributedlog-protocol/src/main/java/org/apache/distributedlog/io/LZ4CompressionCodec.java
@@ -58,7 +58,7 @@
 
         Stopwatch watch = Stopwatch.createStarted();
         byte[] compressed = compressor.compress(data, offset, length);
-        compressionStat.registerSuccessfulEvent(watch.elapsed(TimeUnit.MICROSECONDS));
+        compressionStat.registerSuccessfulEvent(watch.elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
         return compressed;
     }
 
@@ -75,7 +75,7 @@
         while (true) {
             try {
                 byte[] decompressed = safeDecompressor.decompress(data, offset, length, outLength);
-                decompressionStat.registerSuccessfulEvent(watch.elapsed(TimeUnit.MICROSECONDS));
+                decompressionStat.registerSuccessfulEvent(watch.elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
                 return decompressed;
             } catch (LZ4Exception e) {
                 outLength *= 2;
@@ -95,7 +95,7 @@
 
         Stopwatch watch = Stopwatch.createStarted();
         byte[] decompressed = fastDecompressor.decompress(data, offset, decompressedSize);
-        decompressionStat.registerSuccessfulEvent(watch.elapsed(TimeUnit.MICROSECONDS));
+        decompressionStat.registerSuccessfulEvent(watch.elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
         return decompressed;
     }
 }
diff --git a/distributedlog-proxy-client/pom.xml b/distributedlog-proxy-client/pom.xml
index 3ae7357..718be4d 100644
--- a/distributedlog-proxy-client/pom.xml
+++ b/distributedlog-proxy-client/pom.xml
@@ -169,4 +169,11 @@
       </plugin>
     </plugins>
   </build>
+  <repositories>
+    <repository>
+      <id>twitter-repo</id>
+      <name>Twitter Maven Repo</name>
+      <url>http://maven.twttr.com</url>
+    </repository>
+  </repositories>
 </project>
diff --git a/distributedlog-proxy-protocol/pom.xml b/distributedlog-proxy-protocol/pom.xml
index 468ee09..609be7e 100644
--- a/distributedlog-proxy-protocol/pom.xml
+++ b/distributedlog-proxy-protocol/pom.xml
@@ -127,4 +127,11 @@
       </plugin>
     </plugins>
   </build>
+  <repositories>
+    <repository>
+      <id>twitter-repo</id>
+      <name>Twitter Maven Repo</name>
+      <url>http://maven.twttr.com</url>
+    </repository>
+  </repositories>
 </project>
diff --git a/distributedlog-proxy-server/pom.xml b/distributedlog-proxy-server/pom.xml
index 0e294f3..a80a4d2 100644
--- a/distributedlog-proxy-server/pom.xml
+++ b/distributedlog-proxy-server/pom.xml
@@ -272,4 +272,11 @@
       </dependencies>
     </profile>
   </profiles>
+  <repositories>
+    <repository>
+      <id>twitter-repo</id>
+      <name>Twitter Maven Repo</name>
+      <url>http://maven.twttr.com</url>
+    </repository>
+  </repositories>
 </project>
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/StatsFilter.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/StatsFilter.java
index ee64580..872fecb 100644
--- a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/StatsFilter.java
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/StatsFilter.java
@@ -42,11 +42,13 @@
         final Stopwatch stopwatch = Stopwatch.createStarted();
         try {
             result = service.apply(req);
-            serviceExec.registerSuccessfulEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
+            serviceExec.registerSuccessfulEvent(
+                stopwatch.stop().elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
         } finally {
             outstandingAsync.dec();
             if (null == result) {
-                serviceExec.registerFailedEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
+                serviceExec.registerFailedEvent(
+                    stopwatch.stop().elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
             }
         }
         return result;
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/LeastLoadPlacementPolicy.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/LeastLoadPlacementPolicy.java
index 1336ddd..2ecaf36 100644
--- a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/LeastLoadPlacementPolicy.java
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/LeastLoadPlacementPolicy.java
@@ -17,6 +17,7 @@
  */
 package org.apache.distributedlog.service.placement;
 
+import java.util.concurrent.TimeUnit;
 import org.apache.distributedlog.client.routing.RoutingService;
 import org.apache.distributedlog.api.namespace.Namespace;
 import com.twitter.util.Duration;
@@ -175,14 +176,16 @@
         }).onSuccess(new Function<TreeSet<ServerLoad>, BoxedUnit>() {
             @Override
             public BoxedUnit apply(TreeSet<ServerLoad> serverLoads) {
-                placementCalcStats.registerSuccessfulEvent(System.currentTimeMillis() - startTime);
+                placementCalcStats.registerSuccessfulEvent(
+                    System.currentTimeMillis() - startTime, TimeUnit.MICROSECONDS);
                 return BoxedUnit.UNIT;
             }
         }).onFailure(new Function<Throwable, BoxedUnit>() {
             @Override
             public BoxedUnit apply(Throwable t) {
                 logger.error("Failure calculating loads", t);
-                placementCalcStats.registerFailedEvent(System.currentTimeMillis() - startTime);
+                placementCalcStats.registerFailedEvent(
+                    System.currentTimeMillis() - startTime, TimeUnit.MICROSECONDS);
                 return BoxedUnit.UNIT;
             }
         });
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/AbstractStreamOp.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/AbstractStreamOp.java
index 7700184..12c4783 100644
--- a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/AbstractStreamOp.java
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/AbstractStreamOp.java
@@ -96,7 +96,8 @@
                 .addEventListener(new FutureEventListener<Response>() {
             @Override
             public void onSuccess(Response response) {
-                opStatsLogger.registerSuccessfulEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS));
+                opStatsLogger.registerSuccessfulEvent(
+                  stopwatch.elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
                 setResponse(response);
             }
             @Override
@@ -119,7 +120,8 @@
             OwnershipAcquireFailedException oafe = (OwnershipAcquireFailedException) cause;
             fail(ResponseUtils.ownerToHeader(oafe.getCurrentOwner()));
         } else {
-            opStatsLogger.registerFailedEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS));
+            opStatsLogger.registerFailedEvent(
+              stopwatch.elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
             fail(ResponseUtils.exceptionToHeader(cause));
         }
     }
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/BulkWriteOp.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/BulkWriteOp.java
index 372703a..93dedb6 100644
--- a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/BulkWriteOp.java
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/BulkWriteOp.java
@@ -122,16 +122,19 @@
             @Override
             public void onSuccess(BulkWriteResponse response) {
                 if (response.getHeader().getCode() == StatusCode.SUCCESS) {
-                    latencyStat.registerSuccessfulEvent(stopwatch().elapsed(TimeUnit.MICROSECONDS));
+                    latencyStat.registerSuccessfulEvent(
+                      stopwatch().elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
                     bytes.add(size);
                     bulkWriteBytes.add(size);
                 } else {
-                    latencyStat.registerFailedEvent(stopwatch().elapsed(TimeUnit.MICROSECONDS));
+                    latencyStat.registerFailedEvent(
+                      stopwatch().elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
                 }
             }
             @Override
             public void onFailure(Throwable cause) {
-                latencyStat.registerFailedEvent(stopwatch().elapsed(TimeUnit.MICROSECONDS));
+                latencyStat.registerFailedEvent(
+                  stopwatch().elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
             }
         });
     }
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamImpl.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamImpl.java
index df3d64f..71a9531 100644
--- a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamImpl.java
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamImpl.java
@@ -654,9 +654,11 @@
                                             Stopwatch stopwatch,
                                             Promise<Boolean> acquirePromise) {
         if (success) {
-            streamAcquireStat.registerSuccessfulEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS));
+            streamAcquireStat.registerSuccessfulEvent(
+              stopwatch.elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
         } else {
-            streamAcquireStat.registerFailedEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS));
+            streamAcquireStat.registerFailedEvent(
+              stopwatch.elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
         }
         for (StreamOp op : oldPendingOps) {
             executeOp(op, success);
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/WriteOp.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/WriteOp.java
index 0a8a2da..e3aa703 100644
--- a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/WriteOp.java
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/WriteOp.java
@@ -102,16 +102,19 @@
             @Override
             public void onSuccess(WriteResponse response) {
                 if (response.getHeader().getCode() == StatusCode.SUCCESS) {
-                    latencyStat.registerSuccessfulEvent(stopwatch().elapsed(TimeUnit.MICROSECONDS));
+                    latencyStat.registerSuccessfulEvent(
+                      stopwatch().elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
                     bytes.add(size);
                     writeBytes.add(size);
                 } else {
-                    latencyStat.registerFailedEvent(stopwatch().elapsed(TimeUnit.MICROSECONDS));
+                    latencyStat.registerFailedEvent(
+                      stopwatch().elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
                 }
             }
             @Override
             public void onFailure(Throwable cause) {
-                latencyStat.registerFailedEvent(stopwatch().elapsed(TimeUnit.MICROSECONDS));
+                latencyStat.registerFailedEvent(
+                  stopwatch().elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
             }
         });
     }
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/admin/StreamAdminOp.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/admin/StreamAdminOp.java
index c3c5d81..d790016 100644
--- a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/admin/StreamAdminOp.java
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/admin/StreamAdminOp.java
@@ -84,14 +84,14 @@
             @Override
             public WriteResponse map(WriteResponse response) {
                 opStatsLogger.registerSuccessfulEvent(
-                        stopwatch.elapsed(TimeUnit.MICROSECONDS));
+                        stopwatch.elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
                 return response;
             }
 
             @Override
             public WriteResponse handle(Throwable cause) {
                 opStatsLogger.registerFailedEvent(
-                        stopwatch.elapsed(TimeUnit.MICROSECONDS));
+                        stopwatch.elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
                 return ResponseUtils.write(ResponseUtils.exceptionToHeader(cause));
             }
 
diff --git a/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/DistributedLogServerTestCase.java b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/DistributedLogServerTestCase.java
index 58b5b2a..a8ed1b2 100644
--- a/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/DistributedLogServerTestCase.java
+++ b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/DistributedLogServerTestCase.java
@@ -49,6 +49,12 @@
  */
 public abstract class DistributedLogServerTestCase {
 
+    static {
+        // org.apache.zookeeper.test.ClientBase uses FourLetterWordMain, from 3.5.3 four letter words
+        // are disabled by default due to security reasons
+        System.setProperty("zookeeper.4lw.commands.whitelist", "*");
+    }
+
     protected static DistributedLogConfiguration conf =
             new DistributedLogConfiguration().setLockTimeout(10)
                     .setOutputBufferSize(0).setPeriodicFlushFrequencyMilliSeconds(10);
diff --git a/distributedlog-tutorials/distributedlog-mapreduce/README.md b/distributedlog-tutorials/distributedlog-mapreduce/README.md
deleted file mode 100644
index ab50d28..0000000
--- a/distributedlog-tutorials/distributedlog-mapreduce/README.md
+++ /dev/null
Binary files differ
diff --git a/distributedlog-tutorials/distributedlog-mapreduce/pom.xml b/distributedlog-tutorials/distributedlog-mapreduce/pom.xml
deleted file mode 100644
index e7693df..0000000
--- a/distributedlog-tutorials/distributedlog-mapreduce/pom.xml
+++ /dev/null
@@ -1,56 +0,0 @@
-<?xml version="1.0"?>
-<!--
-   Licensed to the Apache Software Foundation (ASF) under one or more
-   contributor license agreements.  See the NOTICE file distributed with
-   this work for additional information regarding copyright ownership.
-   The ASF licenses this file to You under the Apache License, Version 2.0
-   (the "License"); you may not use this file except in compliance with
-   the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
-   Unless required by applicable law or agreed to in writing, software
-   distributed under the License is distributed on an "AS IS" BASIS,
-   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-   See the License for the specific language governing permissions and
-   limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-  <modelVersion>4.0.0</modelVersion>
-  <parent>
-    <artifactId>distributedlog</artifactId>
-    <groupId>org.apache.distributedlog</groupId>
-    <version>0.5.0-SNAPSHOT</version>
-    <relativePath>../..</relativePath>
-  </parent>
-  <groupId>org.apache.distributedlog</groupId>
-  <artifactId>distributedlog-mapreduce</artifactId>
-  <name>Apache DistributedLog :: Tutorials :: MapReduce Tutorial</name>
-  <url>http://maven.apache.org</url>
-  <properties>
-    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-    <project.libdir>${basedir}/lib</project.libdir>
-  </properties>
-  <dependencies>
-    <dependency>
-      <groupId>org.apache.distributedlog</groupId>
-      <artifactId>distributedlog-core</artifactId>
-      <version>${project.parent.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.distributedlog</groupId>
-      <artifactId>distributedlog-proxy-client</artifactId>
-      <version>${project.parent.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-common</artifactId>
-      <version>2.7.2</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
-      <version>2.7.2</version>
-    </dependency>
-  </dependencies>
-</project>
diff --git a/distributedlog-tutorials/distributedlog-mapreduce/src/main/java/org/apache/distributedlog/mapreduce/DistributedLogInputFormat.java b/distributedlog-tutorials/distributedlog-mapreduce/src/main/java/org/apache/distributedlog/mapreduce/DistributedLogInputFormat.java
deleted file mode 100644
index 6fd017c..0000000
--- a/distributedlog-tutorials/distributedlog-mapreduce/src/main/java/org/apache/distributedlog/mapreduce/DistributedLogInputFormat.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.mapreduce;
-
-import com.google.common.collect.Lists;
-import org.apache.distributedlog.DistributedLogConfiguration;
-import org.apache.distributedlog.api.DistributedLogManager;
-import org.apache.distributedlog.DLSN;
-import org.apache.distributedlog.LogRecordWithDLSN;
-import org.apache.distributedlog.LogSegmentMetadata;
-import org.apache.distributedlog.api.namespace.Namespace;
-import org.apache.distributedlog.impl.BKNamespaceDriver;
-import org.apache.distributedlog.api.namespace.NamespaceBuilder;
-import org.apache.bookkeeper.client.BKException;
-import org.apache.bookkeeper.client.BookKeeper;
-import org.apache.bookkeeper.client.BookKeeperAccessor;
-import org.apache.bookkeeper.client.LedgerMetadata;
-import org.apache.bookkeeper.meta.LedgerManager;
-import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-import java.io.IOException;
-import java.net.URI;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-
-/**
- * InputFormat to read data from a distributedlog stream.
- */
-public class DistributedLogInputFormat
-        extends InputFormat<DLSN, LogRecordWithDLSN> implements Configurable {
-
-    private static final String DL_URI = "distributedlog.uri";
-    private static final String DL_STREAM = "distributedlog.stream";
-
-    protected Configuration conf;
-    protected DistributedLogConfiguration dlConf;
-    protected URI dlUri;
-    protected Namespace namespace;
-    protected String streamName;
-    protected DistributedLogManager dlm;
-
-    /** {@inheritDoc} */
-    @Override
-    public void setConf(Configuration configuration) {
-        this.conf = configuration;
-        dlConf = new DistributedLogConfiguration();
-        dlUri = URI.create(configuration.get(DL_URI, ""));
-        streamName = configuration.get(DL_STREAM, "");
-        try {
-            namespace = NamespaceBuilder.newBuilder()
-                    .conf(dlConf)
-                    .uri(dlUri)
-                    .build();
-            dlm = namespace.openLog(streamName);
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    @Override
-    public Configuration getConf() {
-        return conf;
-    }
-
-    @Override
-    public List<InputSplit> getSplits(JobContext jobContext)
-            throws IOException, InterruptedException {
-        List<LogSegmentMetadata> segments = dlm.getLogSegments();
-        List<InputSplit> inputSplits = Lists.newArrayListWithCapacity(segments.size());
-        BookKeeper bk = ((BKNamespaceDriver) namespace.getNamespaceDriver()).getReaderBKC().get();
-        LedgerManager lm = BookKeeperAccessor.getLedgerManager(bk);
-        final AtomicInteger rcHolder = new AtomicInteger(0);
-        final AtomicReference<LedgerMetadata> metadataHolder = new AtomicReference<LedgerMetadata>(null);
-        for (LogSegmentMetadata segment : segments) {
-            final CountDownLatch latch = new CountDownLatch(1);
-            lm.readLedgerMetadata(segment.getLogSegmentId(),
-                    new BookkeeperInternalCallbacks.GenericCallback<LedgerMetadata>() {
-                @Override
-                public void operationComplete(int rc, LedgerMetadata ledgerMetadata) {
-                    metadataHolder.set(ledgerMetadata);
-                    rcHolder.set(rc);
-                    latch.countDown();
-                }
-            });
-            latch.await();
-            if (BKException.Code.OK != rcHolder.get()) {
-                throw new IOException("Faild to get log segment metadata for " + segment + " : "
-                        + BKException.getMessage(rcHolder.get()));
-            }
-            inputSplits.add(new LogSegmentSplit(segment, metadataHolder.get()));
-        }
-        return inputSplits;
-    }
-
-    @Override
-    public RecordReader<DLSN, LogRecordWithDLSN> createRecordReader(
-            InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
-            throws IOException, InterruptedException {
-        return new LogSegmentReader(
-                streamName,
-                dlConf,
-                ((BKNamespaceDriver) namespace.getNamespaceDriver()).getReaderBKC().get(),
-                (LogSegmentSplit) inputSplit);
-    }
-}
diff --git a/distributedlog-tutorials/distributedlog-mapreduce/src/main/java/org/apache/distributedlog/mapreduce/LogSegmentReader.java b/distributedlog-tutorials/distributedlog-mapreduce/src/main/java/org/apache/distributedlog/mapreduce/LogSegmentReader.java
deleted file mode 100644
index 541db3b..0000000
--- a/distributedlog-tutorials/distributedlog-mapreduce/src/main/java/org/apache/distributedlog/mapreduce/LogSegmentReader.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.mapreduce;
-
-import org.apache.distributedlog.*;
-import org.apache.bookkeeper.client.BKException;
-import org.apache.bookkeeper.client.BookKeeper;
-import org.apache.bookkeeper.client.LedgerEntry;
-import org.apache.bookkeeper.client.LedgerHandle;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-import java.io.IOException;
-import java.util.Enumeration;
-
-import static com.google.common.base.Charsets.UTF_8;
-
-/**
- * Record Reader to read from a log segment split
- */
-class LogSegmentReader extends RecordReader<DLSN, LogRecordWithDLSN> {
-
-    final String streamName;
-    final BookKeeper bk;
-    final LedgerHandle lh;
-    final LogSegmentMetadata metadata;
-
-    long entryId = -1L;
-    Entry.Reader reader = null;
-    LogRecordWithDLSN currentRecord = null;
-    int readPos = 0;
-
-    LogSegmentReader(String streamName,
-                     DistributedLogConfiguration conf,
-                     BookKeeper bk,
-                     LogSegmentSplit split)
-            throws IOException {
-        this.streamName = streamName;
-        this.bk = bk;
-        this.metadata = split.getMetadata();
-        try {
-            this.lh = bk.openLedgerNoRecovery(
-                    split.getLogSegmentId(),
-                    BookKeeper.DigestType.CRC32,
-                    conf.getBKDigestPW().getBytes(UTF_8));
-        } catch (BKException e) {
-            throw new IOException(e);
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new IOException(e);
-        }
-    }
-
-    @Override
-    public void initialize(InputSplit split, TaskAttemptContext context)
-            throws IOException, InterruptedException {
-        // do nothing
-    }
-
-    @Override
-    public boolean nextKeyValue()
-            throws IOException, InterruptedException {
-        LogRecordWithDLSN record;
-        currentRecord = null;
-        if (null != reader) {
-            record = reader.nextRecord();
-            if (null != record) {
-                currentRecord = record;
-                readPos = record.getPositionWithinLogSegment();
-                return true;
-            } else {
-                return false;
-            }
-        }
-        ++entryId;
-        if (entryId > lh.getLastAddConfirmed()) {
-            return false;
-        }
-        try {
-            Enumeration<LedgerEntry> entries =
-                    lh.readEntries(entryId, entryId);
-            if (entries.hasMoreElements()) {
-                LedgerEntry entry = entries.nextElement();
-                reader = Entry.newBuilder()
-                        .setLogSegmentInfo(metadata.getLogSegmentSequenceNumber(),
-                                metadata.getStartSequenceId())
-                        .setEntryId(entry.getEntryId())
-                        .setEnvelopeEntry(
-                                LogSegmentMetadata.supportsEnvelopedEntries(metadata.getVersion()))
-                        .deserializeRecordSet(true)
-                        .setInputStream(entry.getEntryInputStream())
-                        .buildReader();
-            }
-            return nextKeyValue();
-        } catch (BKException e) {
-            throw new IOException(e);
-        }
-    }
-
-    @Override
-    public DLSN getCurrentKey()
-            throws IOException, InterruptedException {
-        return currentRecord.getDlsn();
-    }
-
-    @Override
-    public LogRecordWithDLSN getCurrentValue()
-            throws IOException, InterruptedException {
-        return currentRecord;
-    }
-
-    @Override
-    public float getProgress()
-            throws IOException, InterruptedException {
-        if (metadata.getRecordCount() > 0) {
-            return ((float) (readPos + 1)) / metadata.getRecordCount();
-        }
-        return 1;
-    }
-
-    @Override
-    public void close() throws IOException {
-        try {
-            lh.close();
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new IOException(e);
-        } catch (BKException e) {
-            throw new IOException(e);
-        }
-    }
-}
diff --git a/distributedlog-tutorials/distributedlog-mapreduce/src/main/java/org/apache/distributedlog/mapreduce/LogSegmentSplit.java b/distributedlog-tutorials/distributedlog-mapreduce/src/main/java/org/apache/distributedlog/mapreduce/LogSegmentSplit.java
deleted file mode 100644
index 132e24d..0000000
--- a/distributedlog-tutorials/distributedlog-mapreduce/src/main/java/org/apache/distributedlog/mapreduce/LogSegmentSplit.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.mapreduce;
-
-import com.google.common.collect.Sets;
-import org.apache.distributedlog.LogSegmentMetadata;
-import org.apache.bookkeeper.client.LedgerMetadata;
-import org.apache.bookkeeper.net.BookieSocketAddress;
-import org.apache.bookkeeper.versioning.Version;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.InputSplit;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Set;
-
-import static com.google.common.base.Charsets.UTF_8;
-
-/**
- * A input split that reads from a log segment.
- */
-public class LogSegmentSplit extends InputSplit implements Writable {
-
-    private LogSegmentMetadata logSegmentMetadata;
-    private LedgerMetadata ledgerMetadata;
-
-    public LogSegmentSplit() {}
-
-    public LogSegmentSplit(LogSegmentMetadata logSegmentMetadata,
-                           LedgerMetadata ledgerMetadata) {
-        this.logSegmentMetadata = logSegmentMetadata;
-        this.ledgerMetadata = ledgerMetadata;
-    }
-
-    public LogSegmentMetadata getMetadata() {
-        return logSegmentMetadata;
-    }
-
-    public long getLogSegmentId() {
-        return logSegmentMetadata.getLogSegmentId();
-    }
-
-    @Override
-    public long getLength()
-            throws IOException, InterruptedException {
-        return logSegmentMetadata.getRecordCount();
-    }
-
-    @Override
-    public String[] getLocations()
-            throws IOException, InterruptedException {
-        Set<String> locations = Sets.newHashSet();
-        for (ArrayList<BookieSocketAddress> ensemble : ledgerMetadata.getEnsembles().values()) {
-            for (BookieSocketAddress host : ensemble) {
-                locations.add(host.getHostName());
-            }
-        }
-        return locations.toArray(new String[locations.size()]);
-    }
-
-    @Override
-    public void write(DataOutput dataOutput) throws IOException {
-        String lsMetadataStr = logSegmentMetadata.getFinalisedData();
-        dataOutput.writeUTF(lsMetadataStr);
-        String lhMetadataStr = new String(ledgerMetadata.serialize(), UTF_8);
-        dataOutput.writeUTF(lhMetadataStr);
-    }
-
-    @Override
-    public void readFields(DataInput dataInput) throws IOException {
-        String lsMetadataStr = dataInput.readUTF();
-        logSegmentMetadata = LogSegmentMetadata.parseData("",
-                lsMetadataStr.getBytes(UTF_8));
-        String lhMetadataStr = dataInput.readUTF();
-        ledgerMetadata = LedgerMetadata.parseConfig(lhMetadataStr.getBytes(UTF_8),
-                Version.ANY);
-    }
-}
diff --git a/distributedlog-tutorials/distributedlog-messaging/pom.xml b/distributedlog-tutorials/distributedlog-messaging/pom.xml
index d10e1b5..f346752 100644
--- a/distributedlog-tutorials/distributedlog-messaging/pom.xml
+++ b/distributedlog-tutorials/distributedlog-messaging/pom.xml
@@ -61,7 +61,7 @@
     <dependency>
       <groupId>org.apache.thrift</groupId>
       <artifactId>libthrift</artifactId>
-      <version>0.5.0-1</version>
+      <version>${libthrift.version}</version>
     </dependency>
     <dependency>
       <groupId>com.twitter</groupId>
@@ -114,4 +114,11 @@
       </plugin>
     </plugins>
   </build>
+  <repositories>
+    <repository>
+      <id>twitter-repo</id>
+      <name>Twitter Maven Repo</name>
+      <url>http://maven.twttr.com</url>
+    </repository>
+  </repositories>
 </project>
diff --git a/distributedlog-tutorials/pom.xml b/distributedlog-tutorials/pom.xml
index 1346656..dc718d6 100644
--- a/distributedlog-tutorials/pom.xml
+++ b/distributedlog-tutorials/pom.xml
@@ -29,7 +29,6 @@
     <module>distributedlog-basic</module>
     <module>distributedlog-messaging</module>
     <module>distributedlog-kafka</module>
-    <module>distributedlog-mapreduce</module>
   </modules>
   <properties>
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
diff --git a/pom.xml b/pom.xml
index 7b44cf7..2712b92 100644
--- a/pom.xml
+++ b/pom.xml
@@ -97,13 +97,13 @@
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
     <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
     <!-- dependencies -->
-    <bookkeeper.version>4.3.7-TWTTR-OSS</bookkeeper.version>
+    <bookkeeper.version>4.5.0</bookkeeper.version>
     <codahale.metrics.version>3.0.1</codahale.metrics.version>
     <commons-cli.version>1.1</commons-cli.version>
     <commons-codec.version>1.6</commons-codec.version>
     <commons-lang.version>2.6</commons-lang.version>
     <commons-lang3.version>3.3.2</commons-lang3.version>
-    <curator.version>3.2.1</curator.version>
+    <curator.version>4.0.0</curator.version>
     <finagle.version>6.34.0</finagle.version>
     <freebuilder.version>1.12.3</freebuilder.version>
     <guava.version>20.0</guava.version>
@@ -117,7 +117,7 @@
     <scrooge.version>4.6.0</scrooge.version>
     <slf4j.version>1.6.4</slf4j.version>
     <stats-util.version>0.0.58</stats-util.version>
-    <zookeeper.version>3.5.1-alpha</zookeeper.version>
+    <zookeeper.version>3.5.3-beta</zookeeper.version>
     <!-- plugin dependencies -->
     <apache-rat-plugin.version>0.7</apache-rat-plugin.version>
     <cobertura-maven-plugin.version>2.7</cobertura-maven-plugin.version>
@@ -171,7 +171,7 @@
         <artifactId>maven-assembly-plugin</artifactId>
         <version>${maven-assembly-plugin.version}</version>
         <configuration>
-	  <tarLongFileMode>gnu</tarLongFileMode>
+          <tarLongFileMode>gnu</tarLongFileMode>
           <descriptors>
             <descriptor>src/assemble/src.xml</descriptor>
           </descriptors>
@@ -231,13 +231,14 @@
             <exclude>ChangeLog</exclude>
             <exclude>**/README.md</exclude>
             <exclude>**/apidocs/*</exclude>
-	    <exclude>GROUPS</exclude>
-	    <exclude>OWNERS</exclude>
-	    <exclude>CONFIG.ini</exclude>
+            <exclude>GROUPS</exclude>
+            <exclude>OWNERS</exclude>
+            <exclude>CONFIG.ini</exclude>
             <exclude>**/**.md</exclude>
             <exclude>scripts/dev/reviewers</exclude>
             <exclude>**/dependency-reduced-pom.xml</exclude>
             <exclude>**/org/apache/distributedlog/thrift/*</exclude>
+            <exclude>.repository/**</exclude>
           </excludes>
         </configuration>
       </plugin>
@@ -260,19 +261,4 @@
       </plugin>
     </plugins>
   </build>
-  <repositories>
-    <repository>
-      <id>bookkeeper-twitter-mvn-repo</id>
-      <url>https://raw.github.com/twitter/bookkeeper/mvn-repo/${bookkeeper.version}</url>
-      <snapshots>
-        <enabled>true</enabled>
-        <updatePolicy>always</updatePolicy>
-      </snapshots>
-    </repository> 
-    <repository>
-      <id>twitter-repo</id>
-      <name>Twitter Maven Repo</name>
-      <url>http://maven.twttr.com</url>
-    </repository>
-  </repositories>
 </project>