Issue 205: Avoid copying bytebuf for constructing log record to write

Descriptions of the changes in this PR:

- avoid copying memory for writer facing constructor for log records
- construct a ByteBuf by copying ByteBuffer for reader facing constructors

This closes #205, #206 
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/BKDistributedLogNamespace.java b/distributedlog-core/src/main/java/org/apache/distributedlog/BKDistributedLogNamespace.java
index bc6cc8b..b756feb 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/BKDistributedLogNamespace.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/BKDistributedLogNamespace.java
@@ -298,16 +298,17 @@
 
     /**
      * Close the distributed log manager factory, freeing any resources it may hold.
+     * close the resource in reverse order v.s. in which they are started
      */
     @Override
     public void close() {
         if (!closed.compareAndSet(false, true)) {
             return;
         }
-        // shutdown the driver
-        Utils.close(driver);
         // close the write limiter
         this.writeLimiter.close();
+        // shutdown the driver
+        Utils.close(driver);
         // Shutdown the schedulers
         SchedulerUtils.shutdownScheduler(scheduler, conf.getSchedulerShutdownTimeoutMs(),
                 TimeUnit.MILLISECONDS);
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogWriteHandler.java b/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogWriteHandler.java
index 18ec549..955fafe 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogWriteHandler.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogWriteHandler.java
@@ -83,7 +83,7 @@
  * </ul>
  */
 class BKLogWriteHandler extends BKLogHandler {
-    static final Logger LOG = LoggerFactory.getLogger(BKLogReadHandler.class);
+    static final Logger LOG = LoggerFactory.getLogger(BKLogWriteHandler.class);
 
     private static Transaction.OpListener<LogSegmentEntryWriter> NULL_OP_LISTENER =
             new Transaction.OpListener<LogSegmentEntryWriter>() {
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/ReadAheadEntryReader.java b/distributedlog-core/src/main/java/org/apache/distributedlog/ReadAheadEntryReader.java
index 4a49fdf..1d3fbce 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/ReadAheadEntryReader.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/ReadAheadEntryReader.java
@@ -707,6 +707,9 @@
     private boolean updateLogSegmentMetadata(SegmentReader reader,
                                              LogSegmentMetadata newMetadata) {
         if (reader.getSegment().getLogSegmentSequenceNumber() != newMetadata.getLogSegmentSequenceNumber()) {
+            logger.error("Inconsistent state found in entry reader for {} : "
+                + "current segment = {}, new segment = {}",
+                new Object[] { streamName, reader.getSegment(), newMetadata });
             setLastException(new DLIllegalStateException("Inconsistent state found in entry reader for "
                     + streamName + " : current segment = " + reader.getSegment() + ", new segment = " + newMetadata));
             return false;
@@ -746,6 +749,9 @@
             }
         } else {
             if (currentSegmentSequenceNumber != segment.getLogSegmentSequenceNumber()) {
+                logger.error("Inconsistent state found in entry reader for {} : "
+                    + "current segment sn = {}, new segment sn = {}",
+                    new Object[] { streamName, currentSegmentSequenceNumber, segment.getLogSegmentSequenceNumber() });
                 setLastException(new DLIllegalStateException("Inconsistent state found in entry reader for "
                         + streamName + " : current segment sn = " + currentSegmentSequenceNumber
                         + ", new segment sn = " + segment.getLogSegmentSequenceNumber()));
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/api/namespace/Namespace.java b/distributedlog-core/src/main/java/org/apache/distributedlog/api/namespace/Namespace.java
index cad0f97..8766f17 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/api/namespace/Namespace.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/api/namespace/Namespace.java
@@ -70,7 +70,7 @@
  */
 @Public
 @Evolving
-public interface Namespace {
+public interface Namespace extends AutoCloseable{
 
     /**
      * Get the namespace driver used by this namespace.
@@ -187,6 +187,7 @@
     /**
      * Close the namespace.
      */
+    @Override
     void close();
 
 }
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKDistributedLogManager.java b/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKDistributedLogManager.java
index c133bb8..c44df2f 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKDistributedLogManager.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKDistributedLogManager.java
@@ -17,10 +17,13 @@
  */
 package org.apache.distributedlog;
 
-
 import static com.google.common.base.Charsets.UTF_8;
-import static org.junit.Assert.*;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 import java.io.IOException;
 import java.net.URI;
 import java.util.Collection;
@@ -47,6 +50,7 @@
 import org.apache.distributedlog.callback.LogSegmentListener;
 import org.apache.distributedlog.exceptions.AlreadyTruncatedTransactionException;
 import org.apache.distributedlog.exceptions.BKTransmitException;
+import org.apache.distributedlog.exceptions.DLIllegalStateException;
 import org.apache.distributedlog.exceptions.EndOfStreamException;
 import org.apache.distributedlog.exceptions.InvalidStreamNameException;
 import org.apache.distributedlog.exceptions.LogEmptyException;
@@ -63,15 +67,12 @@
 import org.apache.distributedlog.metadata.MetadataUpdater;
 import org.apache.distributedlog.util.OrderedScheduler;
 import org.apache.distributedlog.util.Utils;
-import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestName;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
-
 /**
  * Test Cases for {@link DistributedLogManager}.
  */
@@ -765,7 +766,6 @@
         }
 
         LogReader reader = dlm.getInputStream(1);
-        long numTrans = 1;
         LogRecord record = reader.readNext(false);
         assert (null != record);
         DLMTestUtil.verifyLogRecord(record);
@@ -773,23 +773,20 @@
 
         dlm.delete();
 
-        boolean exceptionEncountered = false;
+        boolean exceptionEncountered;
         try {
             record = reader.readNext(false);
             while (null != record) {
                 DLMTestUtil.verifyLogRecord(record);
                 assert (lastTxId < record.getTransactionId());
                 lastTxId = record.getTransactionId();
-                numTrans++;
                 record = reader.readNext(false);
             }
             // make sure the exception is thrown from readahead
             while (true) {
                 reader.readNext(false);
             }
-        } catch (LogReadException readexc) {
-            exceptionEncountered = true;
-        } catch (LogNotFoundException exc) {
+        } catch (LogReadException | LogNotFoundException | DLIllegalStateException e) {
             exceptionEncountered = true;
         }
         assertTrue(exceptionEncountered);
@@ -1136,26 +1133,26 @@
         Utils.ioResult(updater.setLogSegmentActive(segmentList.get(2L)));
 
         BKAsyncLogWriter writer = dlm.startAsyncLogSegmentNonPartitioned();
-        Assert.assertTrue(Utils.ioResult(writer.truncate(truncDLSN)));
+        assertTrue(Utils.ioResult(writer.truncate(truncDLSN)));
         BKLogWriteHandler handler = writer.getCachedWriteHandler();
         List<LogSegmentMetadata> cachedSegments = handler.getCachedLogSegments(LogSegmentMetadata.COMPARATOR);
         for (LogSegmentMetadata segment: cachedSegments) {
             if (segment.getLastDLSN().compareTo(truncDLSN) < 0) {
-                Assert.assertTrue(segment.isTruncated());
-                Assert.assertTrue(!segment.isPartiallyTruncated());
+                assertTrue(segment.isTruncated());
+                assertTrue(!segment.isPartiallyTruncated());
             } else if (segment.getFirstDLSN().compareTo(truncDLSN) < 0) {
-                Assert.assertTrue(!segment.isTruncated());
-                Assert.assertTrue(segment.isPartiallyTruncated());
+                assertTrue(!segment.isTruncated());
+                assertTrue(segment.isPartiallyTruncated());
             } else {
-                Assert.assertTrue(!segment.isTruncated());
-                Assert.assertTrue(!segment.isPartiallyTruncated());
+                assertTrue(!segment.isTruncated());
+                assertTrue(!segment.isPartiallyTruncated());
             }
         }
 
         segmentList = DLMTestUtil.readLogSegments(zookeeperClient,
                 LogMetadata.getLogSegmentsPath(uri, name, conf.getUnpartitionedStreamName()));
 
-        Assert.assertTrue(segmentList.get(truncDLSN.getLogSegmentSequenceNo())
+        assertTrue(segmentList.get(truncDLSN.getLogSegmentSequenceNo())
                 .getMinActiveDLSN().compareTo(truncDLSN) == 0);
 
         {