DL-45: DL should allow ByteBuf based API to avoid copying bytes array

Descriptions of the changes in this PR:

This change leverages the `ByteBuf` api introduced in bookkeeper 4.5.0. It will avoid copying bytes array between LogRecord and LogRecordSet/Entry, and avoid copying bytes from DL to BK client.

This change also bump the lz4 library to `1.3.0` to leverage the `ByteBuffer` binding.

Author: Sijie Guo <sijie@apache.org>
Author: Jia Zhai <zhaijia03@gmail.com>
Author: Jia Zhai <zhaijia@apache.org>

Reviewers: Jia Zhai <None>, Enrico Olivelli <eolivelli@gmail.com>, Leigh Stewart <None>

This closes #151 from sijie/zero_copy
diff --git a/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/ReaderWorker.java b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/ReaderWorker.java
index b81dad4..4ac948a 100644
--- a/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/ReaderWorker.java
+++ b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/ReaderWorker.java
@@ -140,6 +140,7 @@
                 processRecord(nextRecord);
                 nextRecord = reader.nextRecord();
             }
+            reader.release();
         }
 
         public void processRecord(final LogRecordWithDLSN record) {
diff --git a/distributedlog-common/pom.xml b/distributedlog-common/pom.xml
index e768574..2129755 100644
--- a/distributedlog-common/pom.xml
+++ b/distributedlog-common/pom.xml
@@ -69,6 +69,11 @@
       <version>${commons-codec.version}</version>
     </dependency>
     <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty-buffer</artifactId>
+      <version>${netty.version}</version>
+    </dependency>
+    <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
       <version>${junit.version}</version>
diff --git a/distributedlog-common/src/main/java/org/apache/distributedlog/common/util/ByteBufUtils.java b/distributedlog-common/src/main/java/org/apache/distributedlog/common/util/ByteBufUtils.java
new file mode 100644
index 0000000..934d8a7
--- /dev/null
+++ b/distributedlog-common/src/main/java/org/apache/distributedlog/common/util/ByteBufUtils.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.distributedlog.common.util;
+
+import io.netty.buffer.ByteBuf;
+
+/**
+ * Utils to handle with {@link io.netty.buffer.ByteBuf}.
+ */
+public class ByteBufUtils {
+
+    public static byte[] getArray(ByteBuf buffer) {
+        if (buffer.hasArray() && buffer.arrayOffset() == 0 && buffer.writableBytes() == 0) {
+            return buffer.array();
+        }
+        byte[] data = new byte[buffer.readableBytes()];
+        buffer.getBytes(buffer.readerIndex(), data);
+        return data;
+    }
+
+}
diff --git a/distributedlog-common/src/main/java/org/apache/distributedlog/common/util/ReferenceCounted.java b/distributedlog-common/src/main/java/org/apache/distributedlog/common/util/ReferenceCounted.java
new file mode 100644
index 0000000..2df0226
--- /dev/null
+++ b/distributedlog-common/src/main/java/org/apache/distributedlog/common/util/ReferenceCounted.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.common.util;
+
+/**
+ * An interface for indicating an object is reference counted.
+ */
+public interface ReferenceCounted {
+
+    /**
+     * Retain the reference.
+     */
+    void retain();
+
+    /**
+     * Release the reference.
+     */
+    void release();
+
+}
diff --git a/distributedlog-core/src/main/java/org/apache/bookkeeper/client/LedgerReader.java b/distributedlog-core/src/main/java/org/apache/bookkeeper/client/LedgerReader.java
index dc9d09d..2ab2bca 100644
--- a/distributedlog-core/src/main/java/org/apache/bookkeeper/client/LedgerReader.java
+++ b/distributedlog-core/src/main/java/org/apache/bookkeeper/client/LedgerReader.java
@@ -18,9 +18,7 @@
 package org.apache.bookkeeper.client;
 
 import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufInputStream;
 import io.netty.buffer.Unpooled;
-import java.io.InputStream;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Enumeration;
@@ -86,25 +84,25 @@
     }
 
     public void readEntriesFromAllBookies(final LedgerHandle lh, long eid,
-                                          final GenericCallback<Set<ReadResult<InputStream>>> callback) {
+                                          final GenericCallback<Set<ReadResult<ByteBuf>>> callback) {
         List<Integer> writeSet = lh.distributionSchedule.getWriteSet(eid);
         final AtomicInteger numBookies = new AtomicInteger(writeSet.size());
-        final Set<ReadResult<InputStream>> readResults = new HashSet<ReadResult<InputStream>>();
+        final Set<ReadResult<ByteBuf>> readResults = new HashSet<>();
         ReadEntryCallback readEntryCallback = new ReadEntryCallback() {
             @Override
             public void readEntryComplete(int rc, long lid, long eid, ByteBuf buffer, Object ctx) {
                 BookieSocketAddress bookieAddress = (BookieSocketAddress) ctx;
-                ReadResult<InputStream> rr;
+                ReadResult<ByteBuf> rr;
                 if (BKException.Code.OK != rc) {
-                    rr = new ReadResult<InputStream>(eid, rc, null, bookieAddress.getSocketAddress());
+                    rr = new ReadResult<>(eid, rc, null, bookieAddress.getSocketAddress());
                 } else {
                     ByteBuf content;
                     try {
                         content = lh.macManager.verifyDigestAndReturnData(eid, buffer);
                         ByteBuf toRet = Unpooled.copiedBuffer(content);
-                        rr = new ReadResult<InputStream>(eid, BKException.Code.OK, new ByteBufInputStream(toRet), bookieAddress.getSocketAddress());
+                        rr = new ReadResult<>(eid, BKException.Code.OK, toRet, bookieAddress.getSocketAddress());
                     } catch (BKException.BKDigestMatchException e) {
-                        rr = new ReadResult<InputStream>(eid, BKException.Code.DigestMatchException, null, bookieAddress.getSocketAddress());
+                        rr = new ReadResult<>(eid, BKException.Code.DigestMatchException, null, bookieAddress.getSocketAddress());
                     } finally {
                         buffer.release();
                     }
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/BKAsyncLogReader.java b/distributedlog-core/src/main/java/org/apache/distributedlog/BKAsyncLogReader.java
index 5be9e19..d80555a 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/BKAsyncLogReader.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/BKAsyncLogReader.java
@@ -240,6 +240,13 @@
         this.lastProcessTime = Stopwatch.createStarted();
     }
 
+    synchronized void releaseCurrentEntry() {
+        if (null != currentEntry) {
+            currentEntry.release();
+            currentEntry = null;
+        }
+    }
+
     private ScheduledFuture<?> scheduleIdleReaderTaskIfNecessary() {
         if (idleErrorThresholdMillis < Integer.MAX_VALUE) {
             // Dont run the task more than once every seconds (for sanity)
@@ -480,6 +487,7 @@
             closePromise = closeFuture = new CompletableFuture<Void>();
             exception = new ReadCancelledException(readHandler.getFullyQualifiedName(), "Reader was closed");
             setLastException(exception);
+            releaseCurrentEntry();
         }
 
         // Do this after we have checked that the reader was not previously closed
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogSegmentWriter.java b/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogSegmentWriter.java
index 6f201a6..48a0daf 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogSegmentWriter.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogSegmentWriter.java
@@ -17,6 +17,13 @@
  */
 package org.apache.distributedlog;
 
+import static com.google.common.base.Charsets.UTF_8;
+import static org.apache.distributedlog.LogRecord.MAX_LOGRECORD_SIZE;
+import static org.apache.distributedlog.LogRecord.MAX_LOGRECORDSET_SIZE;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Stopwatch;
+import io.netty.buffer.ByteBuf;
 import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.Callable;
@@ -27,9 +34,6 @@
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReentrantLock;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Stopwatch;
 import java.util.function.Function;
 import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
 import org.apache.distributedlog.exceptions.BKTransmitException;
@@ -44,13 +48,11 @@
 import org.apache.distributedlog.feature.CoreFeatureKeys;
 import org.apache.distributedlog.injector.FailureInjector;
 import org.apache.distributedlog.injector.RandomDelayFailureInjector;
-import org.apache.distributedlog.io.Buffer;
 import org.apache.distributedlog.io.CompressionCodec;
 import org.apache.distributedlog.io.CompressionUtils;
 import org.apache.distributedlog.lock.DistributedLock;
 import org.apache.distributedlog.logsegment.LogSegmentEntryWriter;
 import org.apache.distributedlog.logsegment.LogSegmentWriter;
-import org.apache.distributedlog.common.stats.BroadCastStatsLogger;
 import org.apache.distributedlog.common.stats.OpStatsListener;
 import org.apache.distributedlog.util.FailpointUtils;
 import org.apache.distributedlog.common.concurrent.FutureEventListener;
@@ -75,10 +77,6 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static com.google.common.base.Charsets.UTF_8;
-import static org.apache.distributedlog.LogRecord.MAX_LOGRECORD_SIZE;
-import static org.apache.distributedlog.LogRecord.MAX_LOGRECORDSET_SIZE;
-
 /**
  * BookKeeper Based Log Segment Writer.
  *
@@ -152,7 +150,6 @@
     private final OrderedScheduler scheduler;
 
     // stats
-    private final StatsLogger envelopeStatsLogger;
     private final StatsLogger transmitOutstandingLogger;
     private final Counter transmitDataSuccesses;
     private final Counter transmitDataMisses;
@@ -220,7 +217,6 @@
         }
         this.writeLimiter = new WriteLimiter(streamName, streamWriteLimiter, globalWriteLimiter);
         this.alertStatsLogger = alertStatsLogger;
-        this.envelopeStatsLogger = BroadCastStatsLogger.masterslave(statsLogger, perLogStatsLogger);
 
         StatsLogger flushStatsLogger = statsLogger.scope("flush");
         StatsLogger pFlushStatsLogger = flushStatsLogger.scope("periodic");
@@ -279,8 +275,7 @@
                 streamName,
                 Math.max(transmissionThreshold, 1024),
                 envelopeBeforeTransmit(),
-                compressionType,
-                envelopeStatsLogger);
+                compressionType);
         this.packetPrevious = null;
         this.startTxId = startTxId;
         this.lastTxId = startTxId;
@@ -440,8 +435,7 @@
                 streamName,
                 Math.max(transmissionThreshold, getAverageTransmitSize()),
                 envelopeBeforeTransmit(),
-                compressionType,
-                envelopeStatsLogger);
+                compressionType);
     }
 
     private boolean envelopeBeforeTransmit() {
@@ -1049,7 +1043,7 @@
                 }
             }
 
-            Buffer toSend;
+            ByteBuf toSend;
             try {
                 toSend = recordSetToTransmit.getBuffer();
                 FailpointUtils.checkFailPoint(FailpointUtils.FailPointName.FP_TransmitFailGetBuffer);
@@ -1076,8 +1070,7 @@
 
                 BKTransmitPacket packet = new BKTransmitPacket(recordSetToTransmit);
                 packetPrevious = packet;
-                entryWriter.asyncAddEntry(toSend.getData(), 0, toSend.size(),
-                                          this, packet);
+                entryWriter.asyncAddEntry(toSend, this, packet);
 
                 if (recordSetToTransmit.hasUserRecords()) {
                     transmitDataSuccesses.inc();
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/BKSyncLogReader.java b/distributedlog-core/src/main/java/org/apache/distributedlog/BKSyncLogReader.java
index 47301b5..fa56cfb 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/BKSyncLogReader.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/BKSyncLogReader.java
@@ -106,6 +106,22 @@
                 });
     }
 
+    synchronized void releaseCurrentEntry() {
+        if (null != currentEntry) {
+            currentEntry.release();
+            currentEntry = null;
+        }
+    }
+
+    synchronized void checkClosedOrException() throws IOException {
+        if (null != closeFuture) {
+            throw new IOException("Reader is closed");
+        }
+        if (null != readerException.get()) {
+            throw readerException.get();
+        }
+    }
+
     @VisibleForTesting
     ReadAheadEntryReader getReadAheadReader() {
         return readAheadReader;
@@ -150,9 +166,8 @@
     @Override
     public synchronized LogRecordWithDLSN readNext(boolean nonBlocking)
             throws IOException {
-        if (null != readerException.get()) {
-            throw readerException.get();
-        }
+        checkClosedOrException();
+
         LogRecordWithDLSN record = doReadNext(nonBlocking);
         // no record is returned, check if the reader becomes idle
         if (null == record && shouldCheckIdleReader) {
@@ -236,6 +251,7 @@
                 return closeFuture;
             }
             closeFuture = closePromise = new CompletableFuture<Void>();
+            releaseCurrentEntry();
         }
         readHandler.unregisterListener(readAheadReader);
         readAheadReader.removeStateChangeNotification(this);
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/Entry.java b/distributedlog-core/src/main/java/org/apache/distributedlog/Entry.java
index 30cd499..1a4583d 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/Entry.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/Entry.java
@@ -17,19 +17,14 @@
  */
 package org.apache.distributedlog;
 
-import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import io.netty.buffer.ByteBuf;
+import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
 import org.apache.distributedlog.exceptions.LogRecordTooLongException;
 import org.apache.distributedlog.exceptions.WriteException;
 import org.apache.distributedlog.io.CompressionCodec;
 import org.apache.bookkeeper.stats.NullStatsLogger;
-import org.apache.bookkeeper.stats.StatsLogger;
-
-import javax.annotation.Nullable;
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
 
 /**
  * A set of {@link LogRecord}s.
@@ -47,22 +42,18 @@
      *          if envelope the buffer before transmit
      * @param codec
      *          compression codec
-     * @param statsLogger
-     *          stats logger to receive stats
      * @return writer to build a log record set.
      */
     public static Writer newEntry(
             String logName,
             int initialBufferSize,
             boolean envelopeBeforeTransmit,
-            CompressionCodec.Type codec,
-            StatsLogger statsLogger) {
+            CompressionCodec.Type codec) {
         return new EnvelopedEntryWriter(
                 logName,
                 initialBufferSize,
                 envelopeBeforeTransmit,
-                codec,
-                statsLogger);
+                codec);
     }
 
     public static Builder newBuilder() {
@@ -78,40 +69,12 @@
         private long entryId = -1;
         private long startSequenceId = Long.MIN_VALUE;
         private boolean envelopeEntry = true;
-        // input stream
-        private InputStream in = null;
-        // or bytes array
-        private byte[] data = null;
-        private int offset = -1;
-        private int length = -1;
-        private Optional<Long> txidToSkipTo = Optional.absent();
-        private Optional<DLSN> dlsnToSkipTo = Optional.absent();
+        private ByteBuf buffer;
         private boolean deserializeRecordSet = true;
 
         private Builder() {}
 
         /**
-         * Reset the builder.
-         *
-         * @return builder
-         */
-        public Builder reset() {
-            logSegmentSequenceNumber = -1;
-            entryId = -1;
-            startSequenceId = Long.MIN_VALUE;
-            envelopeEntry = true;
-            // input stream
-            in = null;
-            // or bytes array
-            data = null;
-            offset = -1;
-            length = -1;
-            txidToSkipTo = Optional.absent();
-            dlsnToSkipTo = Optional.absent();
-            return this;
-        }
-
-        /**
          * Set the segment info of the log segment that this record
          * set belongs to.
          *
@@ -152,56 +115,14 @@
         }
 
         /**
-         * Set the serialized bytes data of this record set.
+         * Set the entry buffer of the serialized bytes data of this record set.
          *
-         * @param data
-         *          serialized bytes data of this record set.
-         * @param offset
-         *          offset of the bytes data
-         * @param length
-         *          length of the bytes data
-         * @return builder
-         */
-        public Builder setData(byte[] data, int offset, int length) {
-            this.data = data;
-            this.offset = offset;
-            this.length = length;
-            return this;
-        }
-
-        /**
-         * Set the input stream of the serialized bytes data of this record set.
-         *
-         * @param in
+         * @param buffer
          *          input stream
          * @return builder
          */
-        public Builder setInputStream(InputStream in) {
-            this.in = in;
-            return this;
-        }
-
-        /**
-         * Set the record set starts from <code>dlsn</code>.
-         *
-         * @param dlsn
-         *          dlsn to skip to
-         * @return builder
-         */
-        public Builder skipTo(@Nullable DLSN dlsn) {
-            this.dlsnToSkipTo = Optional.fromNullable(dlsn);
-            return this;
-        }
-
-        /**
-         * Set the record set starts from <code>txid</code>.
-         *
-         * @param txid
-         *          txid to skip to
-         * @return builder
-         */
-        public Builder skipTo(long txid) {
-            this.txidToSkipTo = Optional.of(txid);
+        public Builder setEntry(ByteBuf buffer) {
+            this.buffer = buffer.retainedSlice();
             return this;
         }
 
@@ -217,41 +138,14 @@
             return this;
         }
 
-        public Entry build() {
-            Preconditions.checkNotNull(data, "Serialized data isn't provided");
-            Preconditions.checkArgument(offset >= 0 && length >= 0
-                    && (offset + length) <= data.length,
-                    "Invalid offset or length of serialized data");
-            return new Entry(
-                    logSegmentSequenceNumber,
-                    entryId,
-                    startSequenceId,
-                    envelopeEntry,
-                    deserializeRecordSet,
-                    data,
-                    offset,
-                    length,
-                    txidToSkipTo,
-                    dlsnToSkipTo);
-        }
-
         public Entry.Reader buildReader() throws IOException {
-            Preconditions.checkArgument(data != null || in != null,
+            Preconditions.checkNotNull(buffer,
                     "Serialized data or input stream isn't provided");
-            InputStream in;
-            if (null != this.in) {
-                in = this.in;
-            } else {
-                Preconditions.checkArgument(offset >= 0 && length >= 0
-                                && (offset + length) <= data.length,
-                        "Invalid offset or length of serialized data");
-                in = new ByteArrayInputStream(data, offset, length);
-            }
             return new EnvelopedEntryReader(
                     logSegmentSequenceNumber,
                     entryId,
                     startSequenceId,
-                    in,
+                    buffer,
                     envelopeEntry,
                     deserializeRecordSet,
                     NullStatsLogger.INSTANCE);
@@ -259,73 +153,6 @@
 
     }
 
-    private final long logSegmentSequenceNumber;
-    private final long entryId;
-    private final long startSequenceId;
-    private final boolean envelopedEntry;
-    private final boolean deserializeRecordSet;
-    private final byte[] data;
-    private final int offset;
-    private final int length;
-    private final Optional<Long> txidToSkipTo;
-    private final Optional<DLSN> dlsnToSkipTo;
-
-    private Entry(long logSegmentSequenceNumber,
-                  long entryId,
-                  long startSequenceId,
-                  boolean envelopedEntry,
-                  boolean deserializeRecordSet,
-                  byte[] data,
-                  int offset,
-                  int length,
-                  Optional<Long> txidToSkipTo,
-                  Optional<DLSN> dlsnToSkipTo) {
-        this.logSegmentSequenceNumber = logSegmentSequenceNumber;
-        this.entryId = entryId;
-        this.startSequenceId = startSequenceId;
-        this.envelopedEntry = envelopedEntry;
-        this.deserializeRecordSet = deserializeRecordSet;
-        this.data = data;
-        this.offset = offset;
-        this.length = length;
-        this.txidToSkipTo = txidToSkipTo;
-        this.dlsnToSkipTo = dlsnToSkipTo;
-    }
-
-    /**
-     * Get raw data of this record set.
-     *
-     * @return raw data representation of this record set.
-     */
-    public byte[] getRawData() {
-        return data;
-    }
-
-    /**
-     * Create reader to iterate over this record set.
-     *
-     * @return reader to iterate over this record set.
-     * @throws IOException if the record set is invalid record set.
-     */
-    public Reader reader() throws IOException {
-        InputStream in = new ByteArrayInputStream(data, offset, length);
-        Reader reader = new EnvelopedEntryReader(
-                logSegmentSequenceNumber,
-                entryId,
-                startSequenceId,
-                in,
-                envelopedEntry,
-                deserializeRecordSet,
-                NullStatsLogger.INSTANCE);
-        if (txidToSkipTo.isPresent()) {
-            reader.skipTo(txidToSkipTo.get());
-        }
-        if (dlsnToSkipTo.isPresent()) {
-            reader.skipTo(dlsnToSkipTo.get());
-        }
-        return reader;
-    }
-
     /**
      * Writer to append {@link LogRecord}s to {@link Entry}.
      */
@@ -345,11 +172,6 @@
         void writeRecord(LogRecord record, CompletableFuture<DLSN> transmitPromise)
                 throws LogRecordTooLongException, WriteException;
 
-        /**
-         * Reset the writer to write records.
-         */
-        void reset();
-
     }
 
     /**
@@ -398,6 +220,11 @@
          */
         boolean skipTo(DLSN dlsn) throws IOException;
 
+        /**
+         * Release the resources held by the entry reader.
+         */
+        void release();
+
     }
 
 }
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/EntryBuffer.java b/distributedlog-core/src/main/java/org/apache/distributedlog/EntryBuffer.java
index c08fbeb..a881df8 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/EntryBuffer.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/EntryBuffer.java
@@ -17,11 +17,10 @@
  */
 package org.apache.distributedlog;
 
-import org.apache.distributedlog.exceptions.InvalidEnvelopedEntryException;
-import org.apache.distributedlog.io.Buffer;
-import org.apache.distributedlog.io.TransmitListener;
-
+import io.netty.buffer.ByteBuf;
 import java.io.IOException;
+import org.apache.distributedlog.exceptions.InvalidEnvelopedEntryException;
+import org.apache.distributedlog.io.TransmitListener;
 
 /**
  * Write representation of a {@link Entry}.
@@ -61,10 +60,12 @@
     /**
      * Get the buffer to transmit.
      *
+     * <p>The caller is responsible for releasing the reference of the returned bytebuf object.
+     *
      * @return the buffer to transmit.
      * @throws InvalidEnvelopedEntryException if the record set buffer is invalid
      * @throws IOException when encountered IOException during serialization
      */
-    Buffer getBuffer() throws InvalidEnvelopedEntryException, IOException;
+    ByteBuf getBuffer() throws InvalidEnvelopedEntryException, IOException;
 
 }
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/EnvelopedEntry.java b/distributedlog-core/src/main/java/org/apache/distributedlog/EnvelopedEntry.java
index 09301aa..89eb6e9 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/EnvelopedEntry.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/EnvelopedEntry.java
@@ -17,23 +17,12 @@
  */
 package org.apache.distributedlog;
 
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
+import io.netty.buffer.ByteBuf;
 import java.io.IOException;
-import java.io.InputStream;
-
-import com.google.common.base.Preconditions;
-
-import org.apache.distributedlog.exceptions.InvalidEnvelopedEntryException;
-import org.apache.bookkeeper.stats.Counter;
-import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
-
-import org.apache.distributedlog.common.annotations.DistributedLogAnnotations.Compression;
 import org.apache.distributedlog.io.CompressionCodec;
+import org.apache.distributedlog.io.CompressionCodec.Type;
 import org.apache.distributedlog.io.CompressionUtils;
-import org.apache.distributedlog.common.util.BitMaskUtils;
 
 /**
  * An enveloped entry written to BookKeeper.
@@ -69,212 +58,23 @@
  */
 class EnvelopedEntry {
 
-    public static final int VERSION_LENGTH = 1; // One byte long
     public static final byte VERSION_ONE = 1;
+    public static final int HEADER_LENGTH =
+              Byte.BYTES // version
+            + Integer.BYTES // flags
+            + Integer.BYTES // decompressed size
+            + Integer.BYTES; // compressed size
 
-    public static final byte LOWEST_SUPPORTED_VERSION = VERSION_ONE;
-    public static final byte HIGHEST_SUPPORTED_VERSION = VERSION_ONE;
+    static final int VERSION_OFFSET = 0;
+    static final int FLAGS_OFFSET = VERSION_OFFSET + Byte.BYTES;
+    static final int DECOMPRESSED_SIZE_OFFSET = FLAGS_OFFSET + Integer.BYTES;
+    static final int COMPRESSED_SIZE_OFFSET = DECOMPRESSED_SIZE_OFFSET + Integer.BYTES;
+
     public static final byte CURRENT_VERSION = VERSION_ONE;
-
-    private final OpStatsLogger compressionStat;
-    private final OpStatsLogger decompressionStat;
-    private final Counter compressedEntryBytes;
-    private final Counter decompressedEntryBytes;
-    private final byte version;
-
-    private Header header = new Header();
-    private Payload payloadCompressed = new Payload();
-    private Payload payloadDecompressed = new Payload();
-
-    public EnvelopedEntry(byte version,
-                          StatsLogger statsLogger) throws InvalidEnvelopedEntryException {
-        Preconditions.checkNotNull(statsLogger);
-        if (version < LOWEST_SUPPORTED_VERSION || version > HIGHEST_SUPPORTED_VERSION) {
-            throw new InvalidEnvelopedEntryException("Invalid enveloped entry version " + version + ", expected to be in [ "
-                    + LOWEST_SUPPORTED_VERSION + " ~ " + HIGHEST_SUPPORTED_VERSION + " ]");
-        }
-        this.version = version;
-        this.compressionStat = statsLogger.getOpStatsLogger("compression_time");
-        this.decompressionStat = statsLogger.getOpStatsLogger("decompression_time");
-        this.compressedEntryBytes = statsLogger.getCounter("compressed_bytes");
-        this.decompressedEntryBytes = statsLogger.getCounter("decompressed_bytes");
-    }
+    public static final int COMPRESSION_CODEC_MASK = 0x3;
 
     /**
-     * @param statsLogger
-     *          Used for getting stats for (de)compression time
-     * @param compressionType
-     *          The compression type to use
-     * @param decompressed
-     *          The decompressed payload
-     *          NOTE: The size of the byte array passed as the decompressed payload can be larger
-     *                than the actual contents to be compressed.
-     */
-    public EnvelopedEntry(byte version,
-                          CompressionCodec.Type compressionType,
-                          byte[] decompressed,
-                          int length,
-                          StatsLogger statsLogger)
-            throws InvalidEnvelopedEntryException {
-        this(version, statsLogger);
-        Preconditions.checkNotNull(compressionType);
-        Preconditions.checkNotNull(decompressed);
-        Preconditions.checkArgument(length >= 0, "Invalid bytes length " + length);
-
-        this.header = new Header(compressionType, length);
-        this.payloadDecompressed = new Payload(length, decompressed);
-    }
-
-    private boolean isReady() {
-        return (header.ready && payloadDecompressed.ready);
-    }
-
-    @Compression
-    public void writeFully(DataOutputStream out) throws IOException {
-        Preconditions.checkNotNull(out);
-        if (!isReady()) {
-            throw new IOException("Entry not writable");
-        }
-        // Version
-        out.writeByte(version);
-        // Header
-        header.write(out);
-        // Compress
-        CompressionCodec codec = CompressionUtils.getCompressionCodec(header.compressionType);
-        byte[] compressed = codec.compress(
-                payloadDecompressed.payload,
-                0,
-                payloadDecompressed.length,
-                compressionStat);
-        this.payloadCompressed = new Payload(compressed.length, compressed);
-        this.compressedEntryBytes.add(payloadCompressed.length);
-        this.decompressedEntryBytes.add(payloadDecompressed.length);
-        payloadCompressed.write(out);
-    }
-
-    @Compression
-    public void readFully(DataInputStream in) throws IOException {
-        Preconditions.checkNotNull(in);
-        // Make sure we're reading the right versioned entry.
-        byte version = in.readByte();
-        if (version != this.version) {
-            throw new IOException(String.format("Version mismatch while reading. Received: %d," +
-                    " Required: %d", version, this.version));
-        }
-        header.read(in);
-        payloadCompressed.read(in);
-        // Decompress
-        CompressionCodec codec = CompressionUtils.getCompressionCodec(header.compressionType);
-        byte[] decompressed = codec.decompress(
-                payloadCompressed.payload,
-                0,
-                payloadCompressed.length,
-                header.decompressedSize,
-                decompressionStat);
-        this.payloadDecompressed = new Payload(decompressed.length, decompressed);
-        this.compressedEntryBytes.add(payloadCompressed.length);
-        this.decompressedEntryBytes.add(payloadDecompressed.length);
-    }
-
-    public byte[] getDecompressedPayload() throws IOException {
-        if (!isReady()) {
-            throw new IOException("Decompressed payload is not initialized");
-        }
-        return payloadDecompressed.payload;
-    }
-
-    public static class Header {
-        public static final int COMPRESSION_CODEC_MASK = 0x3;
-        public static final int COMPRESSION_CODEC_NONE = 0x0;
-        public static final int COMPRESSION_CODEC_LZ4 = 0x1;
-
-        private int flags = 0;
-        private int decompressedSize = 0;
-        private CompressionCodec.Type compressionType = CompressionCodec.Type.UNKNOWN;
-
-        // Whether this struct is ready for reading/writing.
-        private boolean ready = false;
-
-        // Used while reading.
-        public Header() {
-        }
-
-        public Header(CompressionCodec.Type compressionType,
-                      int decompressedSize) {
-            this.compressionType = compressionType;
-            this.decompressedSize = decompressedSize;
-            this.flags = 0;
-            switch (compressionType) {
-                case NONE:
-                    this.flags = (int) BitMaskUtils.set(flags, COMPRESSION_CODEC_MASK,
-                                                        COMPRESSION_CODEC_NONE);
-                    break;
-                case LZ4:
-                    this.flags = (int) BitMaskUtils.set(flags, COMPRESSION_CODEC_MASK,
-                                                        COMPRESSION_CODEC_LZ4);
-                    break;
-                default:
-                    throw new RuntimeException(String.format("Unknown Compression Type: %s",
-                                                             compressionType));
-            }
-            // This can now be written.
-            this.ready = true;
-        }
-
-        private void write(DataOutputStream out) throws IOException {
-            out.writeInt(flags);
-            out.writeInt(decompressedSize);
-        }
-
-        private void read(DataInputStream in) throws IOException {
-            this.flags = in.readInt();
-            int compressionType = (int) BitMaskUtils.get(flags, COMPRESSION_CODEC_MASK);
-            if (compressionType == COMPRESSION_CODEC_NONE) {
-                this.compressionType = CompressionCodec.Type.NONE;
-            } else if (compressionType == COMPRESSION_CODEC_LZ4) {
-                this.compressionType = CompressionCodec.Type.LZ4;
-            } else {
-                throw new IOException(String.format("Unsupported Compression Type: %s",
-                                                    compressionType));
-            }
-            this.decompressedSize = in.readInt();
-            // Values can now be read.
-            this.ready = true;
-        }
-    }
-
-    public static class Payload {
-        private int length = 0;
-        private byte[] payload = null;
-
-        // Whether this struct is ready for reading/writing.
-        private boolean ready = false;
-
-        // Used for reading
-        Payload() {
-        }
-
-        Payload(int length, byte[] payload) {
-            this.length = length;
-            this.payload = payload;
-            this.ready = true;
-        }
-
-        private void write(DataOutputStream out) throws IOException {
-            out.writeInt(length);
-            out.write(payload, 0, length);
-        }
-
-        private void read(DataInputStream in) throws IOException {
-            this.length = in.readInt();
-            this.payload = new byte[length];
-            in.readFully(payload);
-            this.ready = true;
-        }
-    }
-
-    /**
-     * Return an InputStream that reads from the provided InputStream, decompresses the data
+     * Return an {@link ByteBuf} that reads from the provided {@link ByteBuf}, decompresses the data
      * and returns a new InputStream wrapping the underlying payload.
      *
      * Note that src is modified by this call.
@@ -283,14 +83,30 @@
      *      New Input stream with the underlying payload.
      * @throws Exception
      */
-    public static InputStream fromInputStream(InputStream src,
-                                              StatsLogger statsLogger) throws IOException {
-        src.mark(VERSION_LENGTH);
-        byte version = new DataInputStream(src).readByte();
-        src.reset();
-        EnvelopedEntry entry = new EnvelopedEntry(version, statsLogger);
-        entry.readFully(new DataInputStream(src));
-        return new ByteArrayInputStream(entry.getDecompressedPayload());
+    public static ByteBuf fromEnvelopedBuf(ByteBuf src, StatsLogger statsLogger)
+            throws IOException {
+        byte version = src.readByte();
+        if (version != CURRENT_VERSION) {
+            throw new IOException(String.format("Version mismatch while reading. Received: %d,"
+                + " Required: %d", version, CURRENT_VERSION));
+        }
+        int flags = src.readInt();
+        int codecCode = flags & COMPRESSION_CODEC_MASK;
+        int originDataLen = src.readInt();
+        int actualDataLen = src.readInt();
+        ByteBuf compressedBuf = src.slice(src.readerIndex(), actualDataLen);
+        ByteBuf decompressedBuf;
+        try {
+            if (Type.NONE.code() == codecCode && originDataLen != actualDataLen) {
+                throw new IOException("Inconsistent data length found for a non-compressed entry : compressed = "
+                        + originDataLen + ", actual = " + actualDataLen);
+            }
+            CompressionCodec codec = CompressionUtils.getCompressionCodec(Type.of(codecCode));
+            decompressedBuf = codec.decompress(compressedBuf, originDataLen);
+        } finally {
+            compressedBuf.release();
+        }
+        return decompressedBuf;
     }
 
 }
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/EnvelopedEntryReader.java b/distributedlog-core/src/main/java/org/apache/distributedlog/EnvelopedEntryReader.java
index 1761de5..3924f4b 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/EnvelopedEntryReader.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/EnvelopedEntryReader.java
@@ -17,45 +17,72 @@
  */
 package org.apache.distributedlog;
 
-import org.apache.bookkeeper.stats.StatsLogger;
-
-import java.io.DataInputStream;
+import com.google.common.annotations.VisibleForTesting;
+import io.netty.buffer.ByteBuf;
 import java.io.IOException;
-import java.io.InputStream;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.apache.bookkeeper.stats.StatsLogger;
 
 /**
  * Record reader to read records from an enveloped entry buffer.
  */
+@NotThreadSafe
 class EnvelopedEntryReader implements Entry.Reader, RecordStream {
 
     private final long logSegmentSeqNo;
     private final long entryId;
+    private final ByteBuf src;
     private final LogRecord.Reader reader;
 
     // slot id
     private long slotId = 0;
+    // error lag
+    private IOException lastException = null;
+    private boolean isExhausted = false;
 
     EnvelopedEntryReader(long logSegmentSeqNo,
                          long entryId,
                          long startSequenceId,
-                         InputStream in,
+                         ByteBuf in,
                          boolean envelopedEntry,
                          boolean deserializeRecordSet,
                          StatsLogger statsLogger)
             throws IOException {
         this.logSegmentSeqNo = logSegmentSeqNo;
         this.entryId = entryId;
-        InputStream src = in;
         if (envelopedEntry) {
-            src = EnvelopedEntry.fromInputStream(in, statsLogger);
+            this.src = EnvelopedEntry.fromEnvelopedBuf(in, statsLogger);
+        } else {
+            this.src = in;
         }
         this.reader = new LogRecord.Reader(
                 this,
-                new DataInputStream(src),
+                src,
                 startSequenceId,
                 deserializeRecordSet);
     }
 
+    @VisibleForTesting
+    boolean isExhausted() {
+        return isExhausted;
+    }
+
+    @VisibleForTesting
+    ByteBuf getSrcBuf() {
+        return src;
+    }
+
+    private void checkLastException() throws IOException {
+        if (null != lastException) {
+            throw lastException;
+        }
+    }
+
+    private void releaseBuffer() {
+        isExhausted = true;
+        this.src.release();
+    }
+
     @Override
     public long getLSSN() {
         return logSegmentSeqNo;
@@ -68,16 +95,44 @@
 
     @Override
     public LogRecordWithDLSN nextRecord() throws IOException {
-        return reader.readOp();
+        checkLastException();
+
+        if (isExhausted) {
+            return null;
+        }
+
+        LogRecordWithDLSN record;
+        try {
+            record = reader.readOp();
+        } catch (IOException ioe) {
+            lastException = ioe;
+            releaseBuffer();
+            throw ioe;
+        }
+        if (null == record) {
+            releaseBuffer();
+        }
+        return record;
+    }
+
+    public void release() {
+        if (isExhausted) {
+            return;
+        }
+        releaseBuffer();;
     }
 
     @Override
     public boolean skipTo(long txId) throws IOException {
+        checkLastException();
+
         return reader.skipTo(txId, true);
     }
 
     @Override
     public boolean skipTo(DLSN dlsn) throws IOException {
+        checkLastException();
+
         return reader.skipTo(dlsn);
     }
 
@@ -99,4 +154,5 @@
     public String getName() {
         return "EnvelopedReader";
     }
+
 }
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/EnvelopedEntryWriter.java b/distributedlog-core/src/main/java/org/apache/distributedlog/EnvelopedEntryWriter.java
index 18645d4..cc6e941 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/EnvelopedEntryWriter.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/EnvelopedEntryWriter.java
@@ -17,25 +17,33 @@
  */
 package org.apache.distributedlog;
 
+import static org.apache.distributedlog.EnvelopedEntry.COMPRESSED_SIZE_OFFSET;
+import static org.apache.distributedlog.EnvelopedEntry.COMPRESSION_CODEC_MASK;
+import static org.apache.distributedlog.EnvelopedEntry.CURRENT_VERSION;
+import static org.apache.distributedlog.EnvelopedEntry.DECOMPRESSED_SIZE_OFFSET;
+import static org.apache.distributedlog.EnvelopedEntry.FLAGS_OFFSET;
+import static org.apache.distributedlog.EnvelopedEntry.HEADER_LENGTH;
+import static org.apache.distributedlog.EnvelopedEntry.VERSION_OFFSET;
+import static org.apache.distributedlog.LogRecord.MAX_LOGRECORDSET_SIZE;
+import static org.apache.distributedlog.LogRecord.MAX_LOGRECORD_SIZE;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.util.ReferenceCountUtil;
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import org.apache.distributedlog.Entry.Writer;
 import org.apache.distributedlog.exceptions.InvalidEnvelopedEntryException;
 import org.apache.distributedlog.exceptions.LogRecordTooLongException;
-import org.apache.distributedlog.exceptions.WriteCancelledException;
 import org.apache.distributedlog.exceptions.WriteException;
-import org.apache.distributedlog.io.Buffer;
 import org.apache.distributedlog.io.CompressionCodec;
-import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.distributedlog.io.CompressionCodec.Type;
+import org.apache.distributedlog.io.CompressionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.LinkedList;
-import java.util.List;
-
-import static org.apache.distributedlog.LogRecord.MAX_LOGRECORD_SIZE;
-
 /**
  * {@link org.apache.distributedlog.io.Buffer} based log record set writer.
  */
@@ -56,12 +64,13 @@
     }
 
     private final String logName;
-    private final Buffer buffer;
+    private final ByteBuf buffer;
+    private ByteBuf finalizedBuffer = null;
     private final LogRecord.Writer writer;
     private final List<WriteRequest> writeRequests;
     private final boolean envelopeBeforeTransmit;
     private final CompressionCodec.Type codec;
-    private final StatsLogger statsLogger;
+    private final int flags;
     private int count = 0;
     private boolean hasUserData = false;
     private long maxTxId = Long.MIN_VALUE;
@@ -69,22 +78,19 @@
     EnvelopedEntryWriter(String logName,
                          int initialBufferSize,
                          boolean envelopeBeforeTransmit,
-                         CompressionCodec.Type codec,
-                         StatsLogger statsLogger) {
+                         CompressionCodec.Type codec) {
         this.logName = logName;
-        this.buffer = new Buffer(initialBufferSize * 6 / 5);
-        this.writer = new LogRecord.Writer(new DataOutputStream(buffer));
+        this.buffer = PooledByteBufAllocator.DEFAULT.buffer(
+                Math.min(Math.max(initialBufferSize * 6 / 5, HEADER_LENGTH), MAX_LOGRECORDSET_SIZE),
+                MAX_LOGRECORDSET_SIZE);
+        this.writer = new LogRecord.Writer(buffer);
         this.writeRequests = new LinkedList<WriteRequest>();
         this.envelopeBeforeTransmit = envelopeBeforeTransmit;
         this.codec = codec;
-        this.statsLogger = statsLogger;
-    }
-
-    @Override
-    public synchronized void reset() {
-        cancelPromises(new WriteCancelledException(logName, "Record Set is reset"));
-        count = 0;
-        this.buffer.reset();
+        this.flags = codec.code() & COMPRESSION_CODEC_MASK;
+        if (envelopeBeforeTransmit) {
+            this.buffer.writerIndex(HEADER_LENGTH);
+        }
     }
 
     @Override
@@ -146,7 +152,7 @@
 
     @Override
     public int getNumBytes() {
-        return buffer.size();
+        return buffer.readableBytes();
     }
 
     @Override
@@ -155,24 +161,45 @@
     }
 
     @Override
-    public synchronized Buffer getBuffer() throws InvalidEnvelopedEntryException, IOException {
-        if (!envelopeBeforeTransmit) {
-            return buffer;
+    public synchronized ByteBuf getBuffer() throws InvalidEnvelopedEntryException, IOException {
+        if (null == finalizedBuffer) {
+            finalizedBuffer = finalizeBuffer();
         }
-        // We can't escape this allocation because things need to be read from one byte array
-        // and then written to another. This is the destination.
-        Buffer toSend = new Buffer(buffer.size());
-        byte[] decompressed = buffer.getData();
-        int length = buffer.size();
-        EnvelopedEntry entry = new EnvelopedEntry(EnvelopedEntry.CURRENT_VERSION,
-                                                  codec,
-                                                  decompressed,
-                                                  length,
-                                                  statsLogger);
-        // This will cause an allocation of a byte[] for compression. This can be avoided
-        // but we can do that later only if needed.
-        entry.writeFully(new DataOutputStream(toSend));
-        return toSend;
+        return finalizedBuffer.slice();
+    }
+
+    private ByteBuf finalizeBuffer() {
+       if (!envelopeBeforeTransmit) {
+            return buffer.retain();
+        }
+
+        int dataOffset = HEADER_LENGTH;
+        int dataLen = buffer.readableBytes() - HEADER_LENGTH;
+
+        if (Type.NONE == codec) {
+            // update version
+            buffer.setByte(VERSION_OFFSET, CURRENT_VERSION);
+            // update the flags
+            buffer.setInt(FLAGS_OFFSET, flags);
+            // update data len
+            buffer.setInt(DECOMPRESSED_SIZE_OFFSET, dataLen);
+            buffer.setInt(COMPRESSED_SIZE_OFFSET, dataLen);
+            return buffer.retain();
+        }
+
+        // compression
+        CompressionCodec compressor =
+                CompressionUtils.getCompressionCodec(codec);
+        ByteBuf uncompressedBuf = buffer.slice(dataOffset, dataLen);
+        ByteBuf compressedBuf = compressor.compress(uncompressedBuf, HEADER_LENGTH);
+        // update version
+        compressedBuf.setByte(VERSION_OFFSET, CURRENT_VERSION);
+        // update the flags
+        compressedBuf.setInt(FLAGS_OFFSET, flags);
+        // update data len
+        compressedBuf.setInt(DECOMPRESSED_SIZE_OFFSET, dataLen);
+        compressedBuf.setInt(COMPRESSED_SIZE_OFFSET, compressedBuf.readableBytes() - HEADER_LENGTH);
+        return compressedBuf;
     }
 
     @Override
@@ -183,10 +210,18 @@
     @Override
     public void completeTransmit(long lssn, long entryId) {
         satisfyPromises(lssn, entryId);
+        buffer.release();
+        synchronized (this) {
+            ReferenceCountUtil.release(finalizedBuffer);
+        }
     }
 
     @Override
     public void abortTransmit(Throwable reason) {
         cancelPromises(reason);
+        buffer.release();
+        synchronized (this) {
+            ReferenceCountUtil.release(finalizedBuffer);
+        }
     }
 }
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/LocalDLMEmulator.java b/distributedlog-core/src/main/java/org/apache/distributedlog/LocalDLMEmulator.java
index bf3a491..d897274 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/LocalDLMEmulator.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/LocalDLMEmulator.java
@@ -154,6 +154,10 @@
             throw new Exception("Error starting zookeeper/bookkeeper");
         }
         int bookiesUp = checkBookiesUp(numBookies, zkTimeoutSec);
+        if (numBookies != bookiesUp) {
+            LOG.info("Only {} bookies are up, expected {} bookies to be there.",
+                bookiesUp, numBookies);
+        }
         assert (numBookies == bookiesUp);
         // Provision "/messaging/distributedlog" namespace
         DLMetadata.create(new BKDLConfig(zkEnsemble, "/ledgers")).create(uri);
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/ReadAheadEntryReader.java b/distributedlog-core/src/main/java/org/apache/distributedlog/ReadAheadEntryReader.java
index 386a9a1..8183703 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/ReadAheadEntryReader.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/ReadAheadEntryReader.java
@@ -431,6 +431,10 @@
         }
     }
 
+    synchronized boolean isClosed() {
+        return null != closePromise;
+    }
+
     @Override
     public CompletableFuture<Void> asyncClose() {
         final CompletableFuture<Void> closeFuture;
@@ -556,6 +560,14 @@
 
     @Override
     public void onSuccess(List<Entry.Reader> entries) {
+        if (isClosed()) {
+            // if readahead is closing, don't put the entries to the queue
+            for (Entry.Reader entry : entries) {
+                entry.release();
+            }
+            return;
+        }
+
         lastEntryAddedTime.reset().start();
         for (Entry.Reader entry : entries) {
             entryQueue.add(entry);
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentEntryReader.java b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentEntryReader.java
index 0bb91ae..b1dbf40 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentEntryReader.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentEntryReader.java
@@ -17,9 +17,22 @@
  */
 package org.apache.distributedlog.impl.logsegment;
 
+import static com.google.common.base.Charsets.UTF_8;
+
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 import org.apache.distributedlog.DistributedLogConfiguration;
 import org.apache.distributedlog.Entry;
 import org.apache.distributedlog.LogSegmentMetadata;
@@ -42,20 +55,6 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Enumeration;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static com.google.common.base.Charsets.UTF_8;
-
 /**
  * BookKeeper ledger based log segment entry reader.
  */
@@ -86,7 +85,24 @@
             return done;
         }
 
+        synchronized void release() {
+            if (null != this.entry) {
+                this.entry.getEntryBuffer().release();
+                this.entry = null;
+            }
+        }
+
+        void release(LedgerEntry entry) {
+            if (null != entry) {
+                entry.getEntryBuffer().release();
+            }
+        }
+
         void complete(LedgerEntry entry) {
+            // the reader is already closed
+            if (isClosed()) {
+                release(entry);
+            }
             synchronized (this) {
                 if (done) {
                     return;
@@ -119,6 +135,8 @@
         }
 
         synchronized LedgerEntry getEntry() {
+            // retain reference for the caller
+            this.entry.getEntryBuffer().retain();
             return this.entry;
         }
 
@@ -513,6 +531,16 @@
         }
     }
 
+    private void releaseAllCachedEntries() {
+        synchronized (this) {
+            CacheEntry entry = readAheadEntries.poll();
+            while (null != entry) {
+                entry.release();
+                entry = readAheadEntries.poll();
+            }
+        }
+    }
+
     //
     // Background Read Operations
     //
@@ -624,7 +652,7 @@
                 .setEntryId(entry.getEntryId())
                 .setEnvelopeEntry(envelopeEntries)
                 .deserializeRecordSet(deserializeRecordSet)
-                .setInputStream(entry.getEntryInputStream())
+                .setEntry(entry.getEntryBuffer())
                 .buildReader();
     }
 
@@ -747,22 +775,28 @@
             }
             if (entry.isSuccess()) {
                 CacheEntry removedEntry = readAheadEntries.poll();
-                if (entry != removedEntry) {
-                    DLIllegalStateException ise = new DLIllegalStateException("Unexpected condition at reading from "
-                            + getSegment());
-                    completeExceptionally(ise, false);
-                    return;
-                }
                 try {
-                    nextRequest.addEntry(processReadEntry(entry.getEntry()));
-                } catch (IOException e) {
-                    completeExceptionally(e, false);
-                    return;
+                    if (entry != removedEntry) {
+                        DLIllegalStateException ise = new DLIllegalStateException("Unexpected condition at reading from "
+                            + getSegment());
+                        completeExceptionally(ise, false);
+                        return;
+                    }
+                    try {
+                        // the reference is retained on `entry.getEntry()`. Entry.Reader is responsible for releasing it.
+                        nextRequest.addEntry(processReadEntry(entry.getEntry()));
+                    } catch (IOException e) {
+                        completeExceptionally(e, false);
+                        return;
+                    }
+                } finally {
+                    removedEntry.release();
                 }
             } else if (skipBrokenEntries && BKException.Code.DigestMatchException == entry.getRc()) {
                 // skip this entry and move forward
                 skippedBrokenEntriesCounter.inc();
-                readAheadEntries.poll();
+                CacheEntry removedEntry = readAheadEntries.poll();
+                removedEntry.release();
                 continue;
             } else {
                 completeExceptionally(new BKTransmitException("Encountered issue on reading entry " + entry.getEntryId()
@@ -826,6 +860,9 @@
             completeExceptionally(exception, false);
         }
 
+        // release the cached entries
+        releaseAllCachedEntries();
+
         // cancel all pending reads
         cancelAllPendingReads(exception);
 
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentEntryWriter.java b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentEntryWriter.java
index b333e96..1f05359 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentEntryWriter.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentEntryWriter.java
@@ -18,6 +18,7 @@
 package org.apache.distributedlog.impl.logsegment;
 
 import com.google.common.annotations.VisibleForTesting;
+import io.netty.buffer.ByteBuf;
 import org.apache.distributedlog.logsegment.LogSegmentEntryWriter;
 import org.apache.bookkeeper.client.AsyncCallback;
 import org.apache.bookkeeper.client.LedgerHandle;
@@ -49,9 +50,9 @@
     }
 
     @Override
-    public void asyncAddEntry(byte[] data, int offset, int length,
+    public void asyncAddEntry(ByteBuf entry,
                               AsyncCallback.AddCallback callback, Object ctx) {
-        lh.asyncAddEntry(data, offset, length, callback, ctx);
+        lh.asyncAddEntry(entry, callback, ctx);
     }
 
     @Override
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentRandomAccessEntryReader.java b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentRandomAccessEntryReader.java
index 254345e..99ae6ad 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentRandomAccessEntryReader.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentRandomAccessEntryReader.java
@@ -79,7 +79,7 @@
                 .setEntryId(entry.getEntryId())
                 .setEnvelopeEntry(envelopeEntries)
                 .deserializeRecordSet(deserializeRecordSet)
-                .setInputStream(entry.getEntryInputStream())
+                .setEntry(entry.getEntryBuffer())
                 .buildReader();
     }
 
@@ -89,11 +89,19 @@
         if (BKException.Code.OK == rc) {
             List<Entry.Reader> entryList = Lists.newArrayList();
             while (entries.hasMoreElements()) {
+                LedgerEntry entry = entries.nextElement();
                 try {
-                    entryList.add(processReadEntry(entries.nextElement()));
+                    entryList.add(processReadEntry(entry));
                 } catch (IOException ioe) {
+                    // release the buffers
+                    while (entries.hasMoreElements()) {
+                        LedgerEntry le = entries.nextElement();
+                        le.getEntryBuffer().release();
+                    }
                     FutureUtils.completeExceptionally(promise, ioe);
                     return;
+                } finally {
+                    entry.getEntryBuffer().release();
                 }
             }
             FutureUtils.complete(promise, entryList);
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/logsegment/LogSegmentEntryWriter.java b/distributedlog-core/src/main/java/org/apache/distributedlog/logsegment/LogSegmentEntryWriter.java
index 70f0da0..5ded597 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/logsegment/LogSegmentEntryWriter.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/logsegment/LogSegmentEntryWriter.java
@@ -18,6 +18,7 @@
 package org.apache.distributedlog.logsegment;
 
 import com.google.common.annotations.Beta;
+import io.netty.buffer.ByteBuf;
 import org.apache.distributedlog.Entry;
 import org.apache.distributedlog.common.util.Sizable;
 import org.apache.bookkeeper.client.AsyncCallback;
@@ -54,12 +55,8 @@
      * {@link org.apache.bookkeeper.client.LedgerHandle#asyncAddEntry(
      * byte[], int, int, AsyncCallback.AddCallback, Object)}
      *
-     * @param data
+     * @param entry
      *          data to add
-     * @param offset
-     *          offset in the data
-     * @param length
-     *          length of the data
      * @param callback
      *          callback
      * @param ctx
@@ -67,6 +64,6 @@
      * @see org.apache.bookkeeper.client.LedgerHandle#asyncAddEntry(
      * byte[], int, int, AsyncCallback.AddCallback, Object)
      */
-    void asyncAddEntry(byte[] data, int offset, int length,
+    void asyncAddEntry(ByteBuf entry,
                        AsyncCallback.AddCallback callback, Object ctx);
 }
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/tools/DistributedLogTool.java b/distributedlog-core/src/main/java/org/apache/distributedlog/tools/DistributedLogTool.java
index 88c5d0f..e50a172 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/tools/DistributedLogTool.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/tools/DistributedLogTool.java
@@ -17,13 +17,13 @@
  */
 package org.apache.distributedlog.tools;
 
+import io.netty.buffer.ByteBuf;
 import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.IOException;
-import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.OutputStreamWriter;
 import java.io.PrintWriter;
@@ -46,7 +46,6 @@
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.regex.Matcher;
@@ -1676,8 +1675,9 @@
                     .setLogSegmentInfo(segment.getLogSegmentSequenceNumber(), segment.getStartSequenceId())
                     .setEntryId(lastEntry.getEntryId())
                     .setEnvelopeEntry(LogSegmentMetadata.supportsEnvelopedEntries(segment.getVersion()))
-                    .setInputStream(lastEntry.getEntryInputStream())
+                    .setEntry(lastEntry.getEntryBuffer())
                     .buildReader();
+            lastEntry.getEntryBuffer().release();
             LogRecordWithDLSN record = reader.nextRecord();
             LogRecordWithDLSN lastRecord = null;
             while (null != record) {
@@ -2245,26 +2245,22 @@
                 throws Exception {
             for (long eid = fromEntryId; eid <= untilEntryId; ++eid) {
                 final CountDownLatch doneLatch = new CountDownLatch(1);
-                final AtomicReference<Set<LedgerReader.ReadResult<InputStream>>> resultHolder =
-                        new AtomicReference<Set<LedgerReader.ReadResult<InputStream>>>();
-                ledgerReader.readEntriesFromAllBookies(lh, eid, new BookkeeperInternalCallbacks.GenericCallback<Set<LedgerReader.ReadResult<InputStream>>>() {
-                    @Override
-                    public void operationComplete(int rc, Set<LedgerReader.ReadResult<InputStream>> readResults) {
-                        if (BKException.Code.OK == rc) {
-                            resultHolder.set(readResults);
-                        } else {
-                            resultHolder.set(null);
-                        }
-                        doneLatch.countDown();
+                final AtomicReference<Set<LedgerReader.ReadResult<ByteBuf>>> resultHolder = new AtomicReference<>();
+                ledgerReader.readEntriesFromAllBookies(lh, eid, (rc, readResults) -> {
+                    if (BKException.Code.OK == rc) {
+                        resultHolder.set(readResults);
+                    } else {
+                        resultHolder.set(null);
                     }
+                    doneLatch.countDown();
                 });
                 doneLatch.await();
-                Set<LedgerReader.ReadResult<InputStream>> readResults = resultHolder.get();
+                Set<LedgerReader.ReadResult<ByteBuf>> readResults = resultHolder.get();
                 if (null == readResults) {
                     throw new IOException("Failed to read entry " + eid);
                 }
                 boolean printHeader = true;
-                for (LedgerReader.ReadResult<InputStream> rr : readResults) {
+                for (LedgerReader.ReadResult<ByteBuf> rr : readResults) {
                     if (corruptOnly) {
                         if (BKException.Code.DigestMatchException == rr.getResultCode()) {
                             if (printHeader) {
@@ -2287,9 +2283,10 @@
                             Entry.Reader reader = Entry.newBuilder()
                                     .setLogSegmentInfo(lh.getId(), 0L)
                                     .setEntryId(eid)
-                                    .setInputStream(rr.getValue())
+                                    .setEntry(rr.getValue())
                                     .setEnvelopeEntry(LogSegmentMetadata.supportsEnvelopedEntries(metadataVersion))
                                     .buildReader();
+                            rr.getValue().release();
                             printEntry(reader);
                         } else {
                             System.out.println("status = " + BKException.getMessage(rr.getResultCode()));
@@ -2346,9 +2343,10 @@
                 Entry.Reader reader = Entry.newBuilder()
                         .setLogSegmentInfo(0L, 0L)
                         .setEntryId(entry.getEntryId())
-                        .setInputStream(entry.getEntryInputStream())
+                        .setEntry(entry.getEntryBuffer())
                         .setEnvelopeEntry(LogSegmentMetadata.supportsEnvelopedEntries(metadataVersion))
                         .buildReader();
+                entry.getEntryBuffer().release();
                 printEntry(reader);
                 ++i;
             }
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/TestAsyncReaderWriter.java b/distributedlog-core/src/test/java/org/apache/distributedlog/TestAsyncReaderWriter.java
index 5123178..781e8d0 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/TestAsyncReaderWriter.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/TestAsyncReaderWriter.java
@@ -17,6 +17,7 @@
  */
 package org.apache.distributedlog;
 
+import io.netty.buffer.ByteBuf;
 import java.io.IOException;
 import java.net.URI;
 import java.nio.ByteBuffer;
@@ -2039,10 +2040,8 @@
             recordSetWriter.writeRecord(ByteBuffer.wrap(record.getPayload()), writePromise);
             recordSetFutures.add(writePromise);
         }
-        final ByteBuffer recordSetBuffer = recordSetWriter.getBuffer();
-        byte[] data = new byte[recordSetBuffer.remaining()];
-        recordSetBuffer.get(data);
-        LogRecord setRecord = new LogRecord(6L, data);
+        final ByteBuf recordSetBuffer = recordSetWriter.getBuffer();
+        LogRecord setRecord = new LogRecord(6L, recordSetBuffer);
         setRecord.setRecordSet();
         CompletableFuture<DLSN> writeRecordSetFuture = writer.write(setRecord);
         writeRecordSetFuture.whenComplete(new FutureEventListener<DLSN>() {
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/TestEntry.java b/distributedlog-core/src/test/java/org/apache/distributedlog/TestEntry.java
index 30ef481..5ea971e 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/TestEntry.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/TestEntry.java
@@ -17,27 +17,29 @@
  */
 package org.apache.distributedlog;
 
+import static com.google.common.base.Charsets.UTF_8;
+import static org.apache.distributedlog.EnvelopedEntry.HEADER_LENGTH;
+import static org.apache.distributedlog.LogRecord.MAX_LOGRECORD_SIZE;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
 import com.google.common.collect.Lists;
+import io.netty.buffer.ByteBuf;
+import java.nio.ByteBuffer;
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import org.apache.distributedlog.Entry.Reader;
 import org.apache.distributedlog.Entry.Writer;
 import org.apache.distributedlog.exceptions.LogRecordTooLongException;
-import org.apache.distributedlog.io.Buffer;
 import org.apache.distributedlog.io.CompressionCodec;
-import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.distributedlog.common.concurrent.FutureEventListener;
 import org.apache.distributedlog.common.concurrent.FutureUtils;
 import org.apache.distributedlog.util.Utils;
 import org.junit.Assert;
 import org.junit.Test;
 
-import java.nio.ByteBuffer;
-import java.util.List;
-
-import static com.google.common.base.Charsets.UTF_8;
-import static org.apache.distributedlog.LogRecord.MAX_LOGRECORD_SIZE;
-import static org.junit.Assert.*;
-
 /**
  * Test Case of {@link Entry}
  */
@@ -49,20 +51,28 @@
                 "test-empty-record-set",
                 1024,
                 true,
-                CompressionCodec.Type.NONE,
-                NullStatsLogger.INSTANCE);
-        assertEquals("zero bytes", 0, writer.getNumBytes());
+                CompressionCodec.Type.NONE);
+        assertEquals("zero bytes", HEADER_LENGTH, writer.getNumBytes());
         assertEquals("zero records", 0, writer.getNumRecords());
 
-        Buffer buffer = writer.getBuffer();
-        Entry recordSet = Entry.newBuilder()
-                .setData(buffer.getData(), 0, buffer.size())
+        ByteBuf buffer = writer.getBuffer();
+        EnvelopedEntryReader reader = (EnvelopedEntryReader) Entry.newBuilder()
+                .setEntry(buffer)
                 .setLogSegmentInfo(1L, 0L)
                 .setEntryId(0L)
-                .build();
-        Reader reader = recordSet.reader();
+                .buildReader();
+        int refCnt = reader.getSrcBuf().refCnt();
+        assertFalse(reader.isExhausted());
         Assert.assertNull("Empty record set should return null",
                 reader.nextRecord());
+        assertTrue(reader.isExhausted());
+        assertEquals(refCnt - 1, reader.getSrcBuf().refCnt());
+
+        // read next record again (to test release buffer)
+        Assert.assertNull("Empty record set should return null",
+            reader.nextRecord());
+        assertEquals(refCnt - 1, reader.getSrcBuf().refCnt());
+        buffer.release();
     }
 
     @Test(timeout = 20000)
@@ -70,10 +80,9 @@
         Writer writer = Entry.newEntry(
                 "test-write-too-long-record",
                 1024,
-                false,
-                CompressionCodec.Type.NONE,
-                NullStatsLogger.INSTANCE);
-        assertEquals("zero bytes", 0, writer.getNumBytes());
+                true,
+                CompressionCodec.Type.NONE);
+        assertEquals("zero bytes", HEADER_LENGTH, writer.getNumBytes());
         assertEquals("zero records", 0, writer.getNumRecords());
 
         LogRecord largeRecord = new LogRecord(1L, new byte[MAX_LOGRECORD_SIZE + 1]);
@@ -83,11 +92,12 @@
         } catch (LogRecordTooLongException lrtle) {
             // expected
         }
-        assertEquals("zero bytes", 0, writer.getNumBytes());
+        assertEquals("zero bytes", HEADER_LENGTH, writer.getNumBytes());
         assertEquals("zero records", 0, writer.getNumRecords());
 
-        Buffer buffer = writer.getBuffer();
-        Assert.assertEquals("zero bytes", 0, buffer.size());
+        ByteBuf buffer = writer.getBuffer();
+        assertEquals("zero bytes", HEADER_LENGTH, buffer.readableBytes());
+        buffer.release();
     }
 
     @Test(timeout = 20000)
@@ -96,9 +106,8 @@
                 "test-write-records",
                 1024,
                 true,
-                CompressionCodec.Type.NONE,
-                NullStatsLogger.INSTANCE);
-        assertEquals("zero bytes", 0, writer.getNumBytes());
+                CompressionCodec.Type.NONE);
+        assertEquals("zero bytes", HEADER_LENGTH, writer.getNumBytes());
         assertEquals("zero records", 0, writer.getNumRecords());
 
         List<CompletableFuture<DLSN>> writePromiseList = Lists.newArrayList();
@@ -132,34 +141,38 @@
             assertEquals((i + 6) + " records", (i + 6), writer.getNumRecords());
         }
 
-        Buffer buffer = writer.getBuffer();
+        ByteBuf buffer = writer.getBuffer();
+        buffer.retain();
 
         // Test transmit complete
         writer.completeTransmit(1L, 1L);
         List<DLSN> writeResults = Utils.ioResult(FutureUtils.collect(writePromiseList));
         for (int i = 0; i < 10; i++) {
-            Assert.assertEquals(new DLSN(1L, 1L, i), writeResults.get(i));
+            assertEquals(new DLSN(1L, 1L, i), writeResults.get(i));
         }
 
         // Test reading from buffer
-        Entry recordSet = Entry.newBuilder()
-                .setData(buffer.getData(), 0, buffer.size())
+        Reader reader = Entry.newBuilder()
+                .setEntry(buffer)
                 .setLogSegmentInfo(1L, 1L)
                 .setEntryId(0L)
-                .build();
-        Reader reader = recordSet.reader();
+                .setEnvelopeEntry(true)
+                .buildReader();
+        buffer.release();
         LogRecordWithDLSN record = reader.nextRecord();
         int numReads = 0;
         long expectedTxid = 0L;
         while (null != record) {
-            Assert.assertEquals(expectedTxid, record.getTransactionId());
-            Assert.assertEquals(expectedTxid, record.getSequenceId());
-            Assert.assertEquals(new DLSN(1L, 0L, expectedTxid), record.getDlsn());
+            assertEquals(expectedTxid, record.getTransactionId());
+            assertEquals(expectedTxid, record.getSequenceId());
+            assertEquals(new DLSN(1L, 0L, expectedTxid), record.getDlsn());
             ++numReads;
             ++expectedTxid;
             record = reader.nextRecord();
         }
-        Assert.assertEquals(10, numReads);
+        assertEquals(10, numReads);
+
+        reader.release();
     }
 
     @Test(timeout = 20000)
@@ -168,9 +181,8 @@
                 "test-write-recordset",
                 1024,
                 true,
-                CompressionCodec.Type.NONE,
-                NullStatsLogger.INSTANCE);
-        assertEquals("zero bytes", 0, writer.getNumBytes());
+                CompressionCodec.Type.NONE);
+        assertEquals("zero bytes", HEADER_LENGTH, writer.getNumBytes());
         assertEquals("zero records", 0, writer.getNumRecords());
 
         List<CompletableFuture<DLSN>> writePromiseList = Lists.newArrayList();
@@ -194,10 +206,8 @@
             recordSetPromiseList.add(writePromise);
             assertEquals((i + 1) + " records", (i + 1), recordSetWriter.getNumRecords());
         }
-        final ByteBuffer recordSetBuffer = recordSetWriter.getBuffer();
-        byte[] data = new byte[recordSetBuffer.remaining()];
-        recordSetBuffer.get(data);
-        LogRecord setRecord = new LogRecord(5L, data);
+        final ByteBuf recordSetBuffer = recordSetWriter.getBuffer();
+        LogRecord setRecord = new LogRecord(5L, recordSetBuffer);
         setRecord.setPositionWithinLogSegment(5);
         setRecord.setRecordSet();
         CompletableFuture<DLSN> writePromise = new CompletableFuture<DLSN>();
@@ -228,21 +238,22 @@
             assertEquals((i + 11) + " records", (i + 11), writer.getNumRecords());
         }
 
-        Buffer buffer = writer.getBuffer();
+        ByteBuf buffer = writer.getBuffer();
+        buffer.retain();
 
         // Test transmit complete
         writer.completeTransmit(1L, 1L);
         List<DLSN> writeResults = Utils.ioResult(FutureUtils.collect(writePromiseList));
         for (int i = 0; i < 5; i++) {
-            Assert.assertEquals(new DLSN(1L, 1L, i), writeResults.get(i));
+            assertEquals(new DLSN(1L, 1L, i), writeResults.get(i));
         }
-        Assert.assertEquals(new DLSN(1L, 1L, 5), writeResults.get(5));
+        assertEquals(new DLSN(1L, 1L, 5), writeResults.get(5));
         for (int i = 0; i < 5; i++) {
-            Assert.assertEquals(new DLSN(1L, 1L, (10 + i)), writeResults.get(6 + i));
+            assertEquals(new DLSN(1L, 1L, (10 + i)), writeResults.get(6 + i));
         }
         List<DLSN> recordSetWriteResults = Utils.ioResult(FutureUtils.collect(recordSetPromiseList));
         for (int i = 0; i < 5; i++) {
-            Assert.assertEquals(new DLSN(1L, 1L, (5 + i)), recordSetWriteResults.get(i));
+            assertEquals(new DLSN(1L, 1L, (5 + i)), recordSetWriteResults.get(i));
         }
 
         // Test reading from buffer
@@ -264,9 +275,11 @@
         verifyReadResult(buffer, 1L, 1L, 1L, false,
                 new DLSN(1L, 1L, 12L), 0, 0, 3,
                 new DLSN(1L, 1L, 12L), 12L);
+
+        buffer.release();
     }
 
-    void verifyReadResult(Buffer data,
+    void verifyReadResult(ByteBuf data,
                           long lssn, long entryId, long startSequenceId,
                           boolean deserializeRecordSet,
                           DLSN skipTo,
@@ -275,14 +288,13 @@
                           int lastNumRecords,
                           DLSN expectedDLSN,
                           long expectedTxId) throws Exception {
-        Entry recordSet = Entry.newBuilder()
-                .setData(data.getData(), 0, data.size())
+        Reader reader = Entry.newBuilder()
+                .setEntry(data)
                 .setLogSegmentInfo(lssn, startSequenceId)
                 .setEntryId(entryId)
                 .deserializeRecordSet(deserializeRecordSet)
-                .skipTo(skipTo)
-                .build();
-        Reader reader = recordSet.reader();
+                .buildReader();
+        reader.skipTo(skipTo);
 
         LogRecordWithDLSN record;
         for (int i = 0; i < firstNumRecords; i++) { // first
@@ -291,7 +303,7 @@
             assertEquals(expectedDLSN, record.getDlsn());
             assertEquals(expectedTxId, record.getTransactionId());
             assertNotNull("record " + record + " payload is null",
-                    record.getPayload());
+                    record.getPayloadBuf());
             assertEquals("record-" + expectedTxId, new String(record.getPayload(), UTF_8));
             expectedDLSN = expectedDLSN.getNextDLSN();
             ++expectedTxId;
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/TestEnvelopedEntry.java b/distributedlog-core/src/test/java/org/apache/distributedlog/TestEnvelopedEntry.java
deleted file mode 100644
index 6d78f03..0000000
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/TestEnvelopedEntry.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog;
-
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-
-import org.apache.distributedlog.io.Buffer;
-import org.apache.distributedlog.io.CompressionCodec;
-import org.apache.bookkeeper.stats.NullStatsLogger;
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class TestEnvelopedEntry {
-
-    static final Logger LOG = LoggerFactory.getLogger(TestEnvelopedEntry.class);
-
-    private String getString(boolean compressible) {
-        if (compressible) {
-            StringBuilder builder = new StringBuilder();
-            for(int i = 0; i < 1000; i++) {
-                builder.append('A');
-            }
-            return builder.toString();
-        }
-        return "DistributedLogEnvelopedEntry";
-    }
-
-    @Test(timeout = 20000)
-    public void testEnvelope() throws Exception {
-        byte[] data = getString(false).getBytes();
-        EnvelopedEntry writeEntry = new EnvelopedEntry(EnvelopedEntry.CURRENT_VERSION,
-                                                  CompressionCodec.Type.NONE,
-                                                  data,
-                                                  data.length,
-                                                  new NullStatsLogger());
-        Buffer outBuf = new Buffer(2 * data.length);
-        writeEntry.writeFully(new DataOutputStream(outBuf));
-        EnvelopedEntry readEntry = new EnvelopedEntry(EnvelopedEntry.CURRENT_VERSION,
-                                                      new NullStatsLogger());
-        readEntry.readFully(new DataInputStream(new ByteArrayInputStream(outBuf.getData())));
-        byte[] newData = readEntry.getDecompressedPayload();
-        Assert.assertEquals("Written data should equal read data", new String(data), new String(newData));
-    }
-
-    @Test(timeout = 20000)
-    public void testLZ4Compression() throws Exception {
-        byte[] data = getString(true).getBytes();
-        EnvelopedEntry writeEntry = new EnvelopedEntry(EnvelopedEntry.CURRENT_VERSION,
-                                                       CompressionCodec.Type.LZ4,
-                                                       data,
-                                                       data.length,
-                                                       new NullStatsLogger());
-        Buffer outBuf = new Buffer(data.length);
-        writeEntry.writeFully(new DataOutputStream(outBuf));
-        Assert.assertTrue(data.length > outBuf.size());
-        EnvelopedEntry readEntry = new EnvelopedEntry(EnvelopedEntry.CURRENT_VERSION,
-                                                      new NullStatsLogger());
-        readEntry.readFully(new DataInputStream(new ByteArrayInputStream(outBuf.getData())));
-        byte[] newData = readEntry.getDecompressedPayload();
-        Assert.assertEquals("Written data should equal read data", new String(data), new String(newData));
-    }
-}
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/TestReadAheadEntryReader.java b/distributedlog-core/src/test/java/org/apache/distributedlog/TestReadAheadEntryReader.java
index eda8eb2..1e8ed84 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/TestReadAheadEntryReader.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/TestReadAheadEntryReader.java
@@ -343,6 +343,7 @@
                 readAheadEntryReader.getNextReadAheadEntry(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
         assertEquals(2L, entryReader.getLSSN());
         assertEquals(1L, entryReader.getEntryId());
+        entryReader.release();
         Utils.close(readAheadEntryReader);
 
         // positioning on a partially truncated log segment (segment 2) before min active dlsn
@@ -355,6 +356,7 @@
                 readAheadEntryReader.getNextReadAheadEntry(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
         assertEquals(2L, entryReader.getLSSN());
         assertEquals(1L, entryReader.getEntryId());
+        entryReader.release();
         Utils.close(readAheadEntryReader);
 
         // positioning on a partially truncated log segment (segment 2) after min active dlsn
@@ -367,6 +369,7 @@
                 readAheadEntryReader.getNextReadAheadEntry(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
         assertEquals(2L, entryReader.getLSSN());
         assertEquals(2L, entryReader.getEntryId());
+        entryReader.release();
         Utils.close(readAheadEntryReader);
 
         Utils.close(writer);
diff --git a/distributedlog-protocol/pom.xml b/distributedlog-protocol/pom.xml
index e81450d..010f5b2 100644
--- a/distributedlog-protocol/pom.xml
+++ b/distributedlog-protocol/pom.xml
@@ -37,6 +37,11 @@
       <scope>provided</scope>
     </dependency>
     <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty-buffer</artifactId>
+      <version>${netty.version}</version>
+    </dependency>
+    <dependency>
       <groupId>net.jpountz.lz4</groupId>
       <artifactId>lz4</artifactId>
       <version>${lz4.version}</version>
diff --git a/distributedlog-protocol/src/main/java/org/apache/distributedlog/EnvelopedRecordSetReader.java b/distributedlog-protocol/src/main/java/org/apache/distributedlog/EnvelopedRecordSetReader.java
index 1648c6d..83a950e 100644
--- a/distributedlog-protocol/src/main/java/org/apache/distributedlog/EnvelopedRecordSetReader.java
+++ b/distributedlog-protocol/src/main/java/org/apache/distributedlog/EnvelopedRecordSetReader.java
@@ -17,23 +17,21 @@
  */
 package org.apache.distributedlog;
 
-import static org.apache.distributedlog.LogRecordSet.COMPRESSION_CODEC_LZ4;
 import static org.apache.distributedlog.LogRecordSet.METADATA_COMPRESSION_MASK;
 import static org.apache.distributedlog.LogRecordSet.METADATA_VERSION_MASK;
-import static org.apache.distributedlog.LogRecordSet.NULL_OP_STATS_LOGGER;
 import static org.apache.distributedlog.LogRecordSet.VERSION;
 
-import org.apache.distributedlog.io.CompressionCodec;
-import org.apache.distributedlog.io.CompressionUtils;
-import java.io.DataInputStream;
+import io.netty.buffer.ByteBuf;
 import java.io.IOException;
-import java.io.InputStream;
-import java.nio.ByteBuffer;
-
+import lombok.extern.slf4j.Slf4j;
+import org.apache.distributedlog.io.CompressionCodec;
+import org.apache.distributedlog.io.CompressionCodec.Type;
+import org.apache.distributedlog.io.CompressionUtils;
 
 /**
  * Record reader to read records from an enveloped entry buffer.
  */
+@Slf4j
 class EnvelopedRecordSetReader implements LogRecordSet.Reader {
 
     private final long logSegmentSeqNo;
@@ -41,7 +39,7 @@
     private final long transactionId;
     private final long startSequenceId;
     private int numRecords;
-    private final ByteBuffer reader;
+    private final ByteBuf reader;
 
     // slot id
     private long slotId;
@@ -53,7 +51,7 @@
                              long startSlotId,
                              int startPositionWithinLogSegment,
                              long startSequenceId,
-                             InputStream in)
+                             ByteBuf src)
             throws IOException {
         this.logSegmentSeqNo = logSegmentSeqNo;
         this.entryId = entryId;
@@ -63,7 +61,6 @@
         this.startSequenceId = startSequenceId;
 
         // read data
-        DataInputStream src = new DataInputStream(in);
         int metadata = src.readInt();
         int version = metadata & METADATA_VERSION_MASK;
         if (version != VERSION) {
@@ -72,22 +69,21 @@
         }
         int codecCode = metadata & METADATA_COMPRESSION_MASK;
         this.numRecords = src.readInt();
-        int originDataLen = src.readInt();
-        int actualDataLen = src.readInt();
-        byte[] compressedData = new byte[actualDataLen];
-        src.readFully(compressedData);
-
-        if (COMPRESSION_CODEC_LZ4 == codecCode) {
-            CompressionCodec codec = CompressionUtils.getCompressionCodec(CompressionCodec.Type.LZ4);
-            byte[] decompressedData = codec.decompress(compressedData, 0, actualDataLen,
-                    originDataLen, NULL_OP_STATS_LOGGER);
-            this.reader = ByteBuffer.wrap(decompressedData);
-        } else {
-            if (originDataLen != actualDataLen) {
-                throw new IOException("Inconsistent data length found for a non-compressed record set : original = "
-                        + originDataLen + ", actual = " + actualDataLen);
+        int decompressedDataLen = src.readInt();
+        int compressedDataLen = src.readInt();
+        ByteBuf compressedBuf = src.slice(src.readerIndex(), compressedDataLen);
+        try {
+            if (Type.NONE.code() == codecCode && decompressedDataLen != compressedDataLen) {
+                throw new IOException("Inconsistent data length found for a non-compressed record set : decompressed = "
+                        + decompressedDataLen + ", actual = " + compressedDataLen);
             }
-            this.reader = ByteBuffer.wrap(compressedData);
+            CompressionCodec codec = CompressionUtils.getCompressionCodec(Type.of(codecCode));
+            this.reader = codec.decompress(compressedBuf, decompressedDataLen);
+        } finally {
+            compressedBuf.release();
+        }
+        if (numRecords == 0) {
+            this.reader.release();
         }
     }
 
@@ -97,22 +93,34 @@
             return null;
         }
 
-        int recordLen = reader.getInt();
-        byte[] recordData = new byte[recordLen];
-        reader.get(recordData);
-        DLSN dlsn = new DLSN(logSegmentSeqNo, entryId, slotId);
+        int recordLen = reader.readInt();
+        ByteBuf recordBuf = reader.slice(reader.readerIndex(), recordLen);
+        reader.readerIndex(reader.readerIndex() + recordLen);
 
+        DLSN dlsn = new DLSN(logSegmentSeqNo, entryId, slotId);
         LogRecordWithDLSN record =
                 new LogRecordWithDLSN(dlsn, startSequenceId);
         record.setPositionWithinLogSegment(position);
         record.setTransactionId(transactionId);
-        record.setPayload(recordData);
+        record.setPayloadBuf(recordBuf, true);
 
         ++slotId;
         ++position;
         --numRecords;
 
+        // release the record set buffer when exhausting the reader
+        if (0 == numRecords) {
+            this.reader.release();
+        }
+
         return record;
     }
 
+    @Override
+    public void release() {
+        if (0 != numRecords) {
+            numRecords = 0;
+            reader.release();
+        }
+    }
 }
diff --git a/distributedlog-protocol/src/main/java/org/apache/distributedlog/EnvelopedRecordSetWriter.java b/distributedlog-protocol/src/main/java/org/apache/distributedlog/EnvelopedRecordSetWriter.java
index 2a60ff3..0bfd06f 100644
--- a/distributedlog-protocol/src/main/java/org/apache/distributedlog/EnvelopedRecordSetWriter.java
+++ b/distributedlog-protocol/src/main/java/org/apache/distributedlog/EnvelopedRecordSetWriter.java
@@ -17,71 +17,59 @@
  */
 package org.apache.distributedlog;
 
+import static org.apache.distributedlog.LogRecord.MAX_LOGRECORDSET_SIZE;
 import static org.apache.distributedlog.LogRecord.MAX_LOGRECORD_SIZE;
-import static org.apache.distributedlog.LogRecordSet.COMPRESSION_CODEC_LZ4;
-import static org.apache.distributedlog.LogRecordSet.COMPRESSION_CODEC_NONE;
+import static org.apache.distributedlog.LogRecordSet.COMPRESSED_SIZE_OFFSET;
+import static org.apache.distributedlog.LogRecordSet.COUNT_OFFSET;
+import static org.apache.distributedlog.LogRecordSet.DECOMPRESSED_SIZE_OFFSET;
 import static org.apache.distributedlog.LogRecordSet.HEADER_LEN;
 import static org.apache.distributedlog.LogRecordSet.METADATA_COMPRESSION_MASK;
+import static org.apache.distributedlog.LogRecordSet.METADATA_OFFSET;
 import static org.apache.distributedlog.LogRecordSet.METADATA_VERSION_MASK;
-import static org.apache.distributedlog.LogRecordSet.NULL_OP_STATS_LOGGER;
 import static org.apache.distributedlog.LogRecordSet.VERSION;
 
-import java.io.DataOutputStream;
-import java.io.IOException;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.util.ReferenceCountUtil;
 import java.nio.ByteBuffer;
-import java.nio.channels.Channels;
-import java.nio.channels.WritableByteChannel;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.distributedlog.exceptions.LogRecordTooLongException;
 import org.apache.distributedlog.exceptions.WriteException;
 import org.apache.distributedlog.io.Buffer;
 import org.apache.distributedlog.io.CompressionCodec;
+import org.apache.distributedlog.io.CompressionCodec.Type;
 import org.apache.distributedlog.io.CompressionUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * {@link Buffer} based log record set writer.
  */
+@Slf4j
 class EnvelopedRecordSetWriter implements LogRecordSet.Writer {
 
-    private static final Logger logger = LoggerFactory.getLogger(EnvelopedRecordSetWriter.class);
-
-    private final Buffer buffer;
-    private final DataOutputStream writer;
-    private final WritableByteChannel writeChannel;
+    private final ByteBuf buffer;
     private final List<CompletableFuture<DLSN>> promiseList;
     private final CompressionCodec.Type codec;
+    private final int metadata;
     private final int codecCode;
     private int count = 0;
-    private ByteBuffer recordSetBuffer = null;
+    private ByteBuf recordSetBuffer = null;
 
     EnvelopedRecordSetWriter(int initialBufferSize,
                              CompressionCodec.Type codec) {
-        this.buffer = new Buffer(Math.max(initialBufferSize, HEADER_LEN));
+        this.buffer = PooledByteBufAllocator.DEFAULT.buffer(
+                Math.max(initialBufferSize, HEADER_LEN),
+                MAX_LOGRECORDSET_SIZE);
         this.promiseList = new LinkedList<CompletableFuture<DLSN>>();
         this.codec = codec;
-        switch (codec) {
-            case LZ4:
-                this.codecCode = COMPRESSION_CODEC_LZ4;
-                break;
-            default:
-                this.codecCode = COMPRESSION_CODEC_NONE;
-                break;
-        }
-        this.writer = new DataOutputStream(buffer);
-        try {
-            this.writer.writeInt((VERSION & METADATA_VERSION_MASK)
-                    | (codecCode & METADATA_COMPRESSION_MASK));
-            this.writer.writeInt(0); // count
-            this.writer.writeInt(0); // original len
-            this.writer.writeInt(0); // actual len
-        } catch (IOException e) {
-            logger.warn("Failed to serialize the header to an enveloped record set", e);
-        }
-        this.writeChannel = Channels.newChannel(writer);
+        this.codecCode = codec.code();
+        this.metadata = (VERSION & METADATA_VERSION_MASK) | (codecCode & METADATA_COMPRESSION_MASK);
+        this.buffer.writeInt(metadata);
+        this.buffer.writeInt(0); // count
+        this.buffer.writeInt(0); // original len
+        this.buffer.writeInt(0); // actual len
     }
 
     synchronized List<CompletableFuture<DLSN>> getPromiseList() {
@@ -98,15 +86,10 @@
                     "Log Record of size " + logRecordSize + " written when only "
                             + MAX_LOGRECORD_SIZE + " is allowed");
         }
-        try {
-            writer.writeInt(record.remaining());
-            writeChannel.write(record);
-            ++count;
-            promiseList.add(transmitPromise);
-        } catch (IOException e) {
-            logger.error("Failed to append record to record set", e);
-            throw new WriteException("", "Failed to append record to record set");
-        }
+        buffer.writeInt(logRecordSize);
+        buffer.writeBytes(record);
+        ++count;
+        promiseList.add(transmitPromise);
     }
 
     private synchronized void satisfyPromises(long lssn, long entryId, long startSlotId) {
@@ -127,7 +110,7 @@
 
     @Override
     public int getNumBytes() {
-        return buffer.size();
+        return buffer.readableBytes();
     }
 
     @Override
@@ -136,62 +119,53 @@
     }
 
     @Override
-    public synchronized ByteBuffer getBuffer() {
+    public synchronized ByteBuf getBuffer() {
         if (null == recordSetBuffer) {
             recordSetBuffer = createBuffer();
         }
-        return recordSetBuffer.duplicate();
+        return recordSetBuffer.retainedSlice();
     }
 
-    ByteBuffer createBuffer() {
-        byte[] data = buffer.getData();
+    ByteBuf createBuffer() {
         int dataOffset = HEADER_LEN;
-        int dataLen = buffer.size() - HEADER_LEN;
+        int dataLen = buffer.readableBytes() - HEADER_LEN;
 
-        if (COMPRESSION_CODEC_LZ4 != codecCode) {
-            ByteBuffer recordSetBuffer = ByteBuffer.wrap(data, 0, buffer.size());
+        if (Type.NONE.code() == codecCode) {
             // update count
-            recordSetBuffer.putInt(4, count);
+            buffer.setInt(COUNT_OFFSET, count);
             // update data len
-            recordSetBuffer.putInt(8, dataLen);
-            recordSetBuffer.putInt(12, dataLen);
-            return recordSetBuffer;
+            buffer.setInt(DECOMPRESSED_SIZE_OFFSET, dataLen);
+            buffer.setInt(COMPRESSED_SIZE_OFFSET, dataLen);
+            return buffer.retain();
         }
 
         // compression
 
         CompressionCodec compressor =
                     CompressionUtils.getCompressionCodec(codec);
-        byte[] compressed =
-                compressor.compress(data, dataOffset, dataLen, NULL_OP_STATS_LOGGER);
-
-        ByteBuffer recordSetBuffer;
-        if (compressed.length > dataLen) {
-            byte[] newData = new byte[HEADER_LEN + compressed.length];
-            System.arraycopy(data, 0, newData, 0, HEADER_LEN + dataLen);
-            recordSetBuffer = ByteBuffer.wrap(newData);
-        } else {
-            recordSetBuffer = ByteBuffer.wrap(data);
-        }
-        // version
-        recordSetBuffer.position(4);
+        ByteBuf uncompressedBuf = buffer.slice(dataOffset, dataLen);
+        ByteBuf compressedBuf = compressor.compress(uncompressedBuf, HEADER_LEN);
+        compressedBuf.setInt(METADATA_OFFSET, metadata);
         // update count
-        recordSetBuffer.putInt(count);
+        compressedBuf.setInt(COUNT_OFFSET, count);
         // update data len
-        recordSetBuffer.putInt(dataLen);
-        recordSetBuffer.putInt(compressed.length);
-        recordSetBuffer.put(compressed);
-        recordSetBuffer.flip();
-        return recordSetBuffer;
+        compressedBuf.setInt(DECOMPRESSED_SIZE_OFFSET, dataLen);
+        compressedBuf.setInt(COMPRESSED_SIZE_OFFSET, compressedBuf.readableBytes() - HEADER_LEN);
+
+        return compressedBuf;
     }
 
     @Override
-    public void completeTransmit(long lssn, long entryId, long startSlotId) {
+    public synchronized void completeTransmit(long lssn, long entryId, long startSlotId) {
         satisfyPromises(lssn, entryId, startSlotId);
+        buffer.release();
+        ReferenceCountUtil.release(recordSetBuffer);
     }
 
     @Override
-    public void abortTransmit(Throwable reason) {
+    public synchronized void abortTransmit(Throwable reason) {
         cancelPromises(reason);
+        buffer.release();
+        ReferenceCountUtil.release(recordSetBuffer);
     }
 }
diff --git a/distributedlog-protocol/src/main/java/org/apache/distributedlog/LogRecord.java b/distributedlog-protocol/src/main/java/org/apache/distributedlog/LogRecord.java
index f2a19fc..01d9e9b 100644
--- a/distributedlog-protocol/src/main/java/org/apache/distributedlog/LogRecord.java
+++ b/distributedlog-protocol/src/main/java/org/apache/distributedlog/LogRecord.java
@@ -18,12 +18,15 @@
 package org.apache.distributedlog;
 
 import com.google.common.annotations.VisibleForTesting;
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufInputStream;
+import io.netty.buffer.Unpooled;
 import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
+import java.nio.ByteBuffer;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.apache.distributedlog.common.util.ByteBufUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -103,6 +106,7 @@
  *
  * @see LogRecordWithDLSN
  */
+@NotThreadSafe
 public class LogRecord {
 
     private static final Logger LOG = LoggerFactory.getLogger(LogRecord.class);
@@ -128,7 +132,7 @@
 
     private long metadata;
     private long txid;
-    private byte[] payload;
+    private ByteBuf payload;
 
     /**
      * Construct an uninitialized log record.
@@ -152,7 +156,32 @@
      */
     public LogRecord(long txid, byte[] payload) {
         this.txid = txid;
-        this.payload = payload;
+        this.payload = Unpooled.wrappedBuffer(payload);
+    }
+
+    /**
+     * Construct a log record with <i>txid</i> and payload <i>buffer</i>.
+     *
+     * @param txid application defined transaction id.
+     * @param buffer payload buffer.
+     */
+    public LogRecord(long txid, ByteBuffer buffer) {
+        this.txid = txid;
+        this.payload = Unpooled.wrappedBuffer(buffer);
+    }
+
+    /**
+     * Used by {@link LogRecordWithDLSN} to construct a log record read by readers.
+     *
+     * @param txid transaction id
+     * @param payload playload
+     */
+    protected LogRecord(long txid, ByteBuf payload) {
+        this.txid = txid;
+        // Need to make a copy since the passed payload is using a ref-count buffer that we don't know when could
+        // release, since the record is passed to the user. Also, the passed ByteBuf is coming from network and is
+        // backed by a direct buffer which we could not expose as a byte[]
+        this.payload = Unpooled.copiedBuffer(payload);
         this.metadata = 0;
     }
 
@@ -184,16 +213,22 @@
      * @return payload of this log record.
      */
     public byte[] getPayload() {
-        return payload;
+        return ByteBufUtils.getArray(payload);
     }
 
-    /**
-     * Set payload for this log record.
-     *
-     * @param payload payload of this log record
-     */
-    void setPayload(byte[] payload) {
-        this.payload = payload;
+    public ByteBuf getPayloadBuf() {
+        return payload.slice();
+    }
+
+    void setPayloadBuf(ByteBuf payload, boolean copyData) {
+        if (null != this.payload) {
+            this.payload.release();
+        }
+        if (copyData) {
+            this.payload = Unpooled.copiedBuffer(payload);
+        } else {
+            this.payload = payload;
+        }
     }
 
     /**
@@ -202,7 +237,7 @@
      * @return payload as input stream
      */
     public InputStream getPayLoadInputStream() {
-        return new ByteArrayInputStream(payload);
+        return new ByteBufInputStream(payload.retainedSlice(), true);
     }
 
     //
@@ -340,21 +375,25 @@
     // Serialization & Deserialization
     //
 
-    protected void readPayload(DataInputStream in) throws IOException {
+    protected void readPayload(ByteBuf in, boolean copyData) throws IOException {
         int length = in.readInt();
         if (length < 0) {
             throw new EOFException("Log Record is corrupt: Negative length " + length);
         }
-        payload = new byte[length];
-        in.readFully(payload);
+        if (copyData) {
+            setPayloadBuf(in.slice(in.readerIndex(), length), true);
+        } else {
+            setPayloadBuf(in.retainedSlice(in.readerIndex(), length), false);
+        }
+        in.skipBytes(length);
     }
 
-    private void writePayload(DataOutputStream out) throws IOException {
-        out.writeInt(payload.length);
-        out.write(payload);
+    private void writePayload(ByteBuf out) {
+        out.writeInt(payload.readableBytes());
+        out.writeBytes(payload, payload.readerIndex(), payload.readableBytes());
     }
 
-    private void writeToStream(DataOutputStream out) throws IOException {
+    private void writeToStream(ByteBuf out) {
         out.writeLong(metadata);
         out.writeLong(txid);
         writePayload(out);
@@ -369,16 +408,16 @@
      */
     int getPersistentSize() {
         // Flags + TxId + Payload-length + payload
-        return 2 * (Long.SIZE / 8) + Integer.SIZE / 8 + payload.length;
+        return 2 * (Long.SIZE / 8) + Integer.SIZE / 8 + payload.readableBytes();
     }
 
     /**
      * Writer class to write log records into an output {@code stream}.
      */
     public static class Writer {
-        private final DataOutputStream buf;
+        private final ByteBuf buf;
 
-        public Writer(DataOutputStream out) {
+        public Writer(ByteBuf out) {
             this.buf = out;
         }
 
@@ -386,14 +425,13 @@
          * Write an operation to the output stream.
          *
          * @param record The operation to write
-         * @throws IOException if an error occurs during writing.
          */
-        public void writeOp(LogRecord record) throws IOException {
+        public void writeOp(LogRecord record) {
             record.writeToStream(buf);
         }
 
         public int getPendingBytes() {
-            return buf.size();
+            return buf.readableBytes();
         }
     }
 
@@ -402,7 +440,7 @@
       */
     public static class Reader {
         private final RecordStream recordStream;
-        private final DataInputStream in;
+        private final ByteBuf in;
         private final long startSequenceId;
         private final boolean deserializeRecordSet;
         private static final int SKIP_BUFFER_SIZE = 512;
@@ -417,13 +455,13 @@
          * @param startSequenceId the start sequence id.
          */
         public Reader(RecordStream recordStream,
-                      DataInputStream in,
+                      ByteBuf in,
                       long startSequenceId) {
             this(recordStream, in, startSequenceId, true);
         }
 
         public Reader(RecordStream recordStream,
-                      DataInputStream in,
+                      ByteBuf in,
                       long startSequenceId,
                       boolean deserializeRecordSet) {
             this.recordStream = recordStream;
@@ -460,6 +498,10 @@
                     }
                 }
 
+                if (in.readableBytes() <= 0) {
+                    return null;
+                }
+
                 try {
                     long metadata = in.readLong();
                     // Reading the first 8 bytes positions the record stream on the correct log record
@@ -470,7 +512,14 @@
                     nextRecordInStream = new LogRecordWithDLSN(recordStream.getCurrentPosition(), startSequenceId);
                     nextRecordInStream.setMetadata(metadata);
                     nextRecordInStream.setTransactionId(in.readLong());
-                    nextRecordInStream.readPayload(in);
+
+                    // 1) if it is simple record, copy the data
+                    // 2) if it is record set and deserializeRecordSet is true, we don't need to copy data,
+                    //    defer data copying to deserializing record from record set.
+                    // 3) if it is record set and deserializeRecordSet is false, we copy the data, so applications
+                    //    don't have to deal with reference count.
+                    boolean copyData = !isRecordSet(metadata) || !deserializeRecordSet;
+                    nextRecordInStream.readPayload(in, copyData);
                     if (LOG.isTraceEnabled()) {
                         if (nextRecordInStream.isControl()) {
                             LOG.trace("Reading {} Control DLSN {}",
@@ -509,8 +558,6 @@
         }
 
         private boolean skipTo(Long txId, DLSN dlsn, boolean skipControl) throws IOException {
-            LOG.debug("SkipTo");
-            byte[] skipBuffer = null;
             boolean found = false;
             while (true) {
                 try {
@@ -519,7 +566,7 @@
 
                     // if there is not record set, read next record
                     if (null == recordSetReader) {
-                        in.mark(INPUTSTREAM_MARK_LIMIT);
+                        in.markReaderIndex();
                         flags = in.readLong();
                         currTxId = in.readLong();
                     } else {
@@ -539,7 +586,7 @@
                             LOG.trace("Found position {} beyond {}", recordStream.getCurrentPosition(), dlsn);
                         }
                         if (null == lastRecordSkipTo) {
-                            in.reset();
+                            in.resetReaderIndex();
                         }
                         found = true;
                         break;
@@ -550,7 +597,7 @@
                                 LOG.trace("Found position {} beyond {}", currTxId, txId);
                             }
                             if (null == lastRecordSkipTo) {
-                                in.reset();
+                                in.resetReaderIndex();
                             }
                             found = true;
                             break;
@@ -569,7 +616,7 @@
                             new LogRecordWithDLSN(recordStream.getCurrentPosition(), startSequenceId);
                         record.setMetadata(flags);
                         record.setTransactionId(currTxId);
-                        record.readPayload(in);
+                        record.readPayload(in, false);
                         recordSetReader = LogRecordSet.of(record);
                     } else {
                         int length = in.readInt();
@@ -580,15 +627,7 @@
                             break;
                         }
                         // skip single record
-                        if (null == skipBuffer) {
-                            skipBuffer = new byte[SKIP_BUFFER_SIZE];
-                        }
-                        int read = 0;
-                        while (read < length) {
-                            int bytesToRead = Math.min(length - read, SKIP_BUFFER_SIZE);
-                            in.readFully(skipBuffer, 0, bytesToRead);
-                            read += bytesToRead;
-                        }
+                        in.skipBytes(length);
                         if (LOG.isTraceEnabled()) {
                             LOG.trace("Skipped Record with TxId {} DLSN {}",
                                 currTxId, recordStream.getCurrentPosition());
diff --git a/distributedlog-protocol/src/main/java/org/apache/distributedlog/LogRecordSet.java b/distributedlog-protocol/src/main/java/org/apache/distributedlog/LogRecordSet.java
index 55b20ff..8db6019 100644
--- a/distributedlog-protocol/src/main/java/org/apache/distributedlog/LogRecordSet.java
+++ b/distributedlog-protocol/src/main/java/org/apache/distributedlog/LogRecordSet.java
@@ -19,12 +19,10 @@
 
 import static com.google.common.base.Preconditions.checkArgument;
 
-import java.io.ByteArrayInputStream;
+import io.netty.buffer.ByteBuf;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.concurrent.CompletableFuture;
-import org.apache.bookkeeper.stats.NullStatsLogger;
-import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.distributedlog.exceptions.LogRecordTooLongException;
 import org.apache.distributedlog.exceptions.WriteException;
 import org.apache.distributedlog.io.CompressionCodec;
@@ -57,42 +55,35 @@
  */
 public class LogRecordSet {
 
-    static final OpStatsLogger NULL_OP_STATS_LOGGER =
-            NullStatsLogger.INSTANCE.getOpStatsLogger("");
-
     public static final int HEADER_LEN =
-            4 /* Metadata */
-          + 4 /* Count */
-          + 8 /* Lengths */
+            Integer.BYTES /* Metadata */
+          + Integer.BYTES /* Count */
+          + Integer.BYTES + Integer.BYTES /* Lengths: (decompressed + compressed) */
             ;
 
     // Version
     static final int VERSION = 0x1000;
 
+    static final int METADATA_OFFSET = 0;
+    static final int COUNT_OFFSET = METADATA_OFFSET + Integer.BYTES;
+    static final int DECOMPRESSED_SIZE_OFFSET = COUNT_OFFSET + Integer.BYTES;
+    static final int COMPRESSED_SIZE_OFFSET = DECOMPRESSED_SIZE_OFFSET + Integer.BYTES;
+
     // Metadata
     static final int METADATA_VERSION_MASK = 0xf000;
     static final int METADATA_COMPRESSION_MASK = 0x3;
 
-    // Compression Codec
-    static final int COMPRESSION_CODEC_NONE = 0x0;
-    static final int COMPRESSION_CODEC_LZ4 = 0X1;
-
     public static int numRecords(LogRecord record) throws IOException {
         checkArgument(record.isRecordSet(),
                 "record is not a recordset");
-        byte[] data = record.getPayload();
-        return numRecords(data);
-    }
-
-    public static int numRecords(byte[] data) throws IOException {
-        ByteBuffer buffer = ByteBuffer.wrap(data);
-        int metadata = buffer.getInt();
+        ByteBuf buffer = record.getPayloadBuf();
+        int metadata = buffer.getInt(METADATA_OFFSET);
         int version = (metadata & METADATA_VERSION_MASK);
         if (version != VERSION) {
             throw new IOException(String.format("Version mismatch while reading. Received: %d,"
                 + " Required: %d", version, VERSION));
         }
-        return buffer.getInt();
+        return buffer.getInt(COUNT_OFFSET);
     }
 
     public static Writer newWriter(int initialBufferSize,
@@ -103,7 +94,6 @@
     public static Reader of(LogRecordWithDLSN record) throws IOException {
         checkArgument(record.isRecordSet(),
                 "record is not a recordset");
-        byte[] data = record.getPayload();
         DLSN dlsn = record.getDlsn();
         int startPosition = record.getPositionWithinLogSegment();
         long startSequenceId = record.getStartSequenceIdOfCurrentSegment();
@@ -115,7 +105,7 @@
                 dlsn.getSlotId(),
                 startPosition,
                 startSequenceId,
-                new ByteArrayInputStream(data));
+                record.getPayloadBuf());
     }
 
     /**
@@ -150,6 +140,11 @@
          */
         LogRecordWithDLSN nextRecord() throws IOException;
 
+        /**
+         * Release the resources hold by this record set reader.
+         */
+        void release();
+
     }
 
 }
diff --git a/distributedlog-protocol/src/main/java/org/apache/distributedlog/LogRecordSetBuffer.java b/distributedlog-protocol/src/main/java/org/apache/distributedlog/LogRecordSetBuffer.java
index 1fac817..27d54f8 100644
--- a/distributedlog-protocol/src/main/java/org/apache/distributedlog/LogRecordSetBuffer.java
+++ b/distributedlog-protocol/src/main/java/org/apache/distributedlog/LogRecordSetBuffer.java
@@ -17,7 +17,7 @@
  */
 package org.apache.distributedlog;
 
-import java.nio.ByteBuffer;
+import io.netty.buffer.ByteBuf;
 
 /**
  * Write representation of a {@link LogRecordSet}.
@@ -44,7 +44,7 @@
      *
      * @return the buffer to transmit.
      */
-    ByteBuffer getBuffer();
+    ByteBuf getBuffer();
 
     /**
      * Complete transmit.
diff --git a/distributedlog-protocol/src/main/java/org/apache/distributedlog/LogRecordWithDLSN.java b/distributedlog-protocol/src/main/java/org/apache/distributedlog/LogRecordWithDLSN.java
index 3279a5a..096fc81 100644
--- a/distributedlog-protocol/src/main/java/org/apache/distributedlog/LogRecordWithDLSN.java
+++ b/distributedlog-protocol/src/main/java/org/apache/distributedlog/LogRecordWithDLSN.java
@@ -18,6 +18,7 @@
 package org.apache.distributedlog;
 
 import com.google.common.annotations.VisibleForTesting;
+import io.netty.buffer.ByteBuf;
 
 /**
  * Log record with {@link DLSN} and <code>SequenceId</code>.
@@ -72,6 +73,13 @@
         this.startSequenceIdOfCurrentSegment = startSequenceIdOfCurrentSegment;
     }
 
+    @VisibleForTesting
+    public LogRecordWithDLSN(DLSN dlsn, long txid, ByteBuf buffer, long startSequenceIdOfCurrentSegment) {
+        super(txid, buffer);
+        this.dlsn = dlsn;
+        this.startSequenceIdOfCurrentSegment = startSequenceIdOfCurrentSegment;
+    }
+
     long getStartSequenceIdOfCurrentSegment() {
         return startSequenceIdOfCurrentSegment;
     }
diff --git a/distributedlog-protocol/src/main/java/org/apache/distributedlog/io/CompressionCodec.java b/distributedlog-protocol/src/main/java/org/apache/distributedlog/io/CompressionCodec.java
index 9a0e3a3..f07647e 100644
--- a/distributedlog-protocol/src/main/java/org/apache/distributedlog/io/CompressionCodec.java
+++ b/distributedlog-protocol/src/main/java/org/apache/distributedlog/io/CompressionCodec.java
@@ -17,7 +17,7 @@
  */
 package org.apache.distributedlog.io;
 
-import org.apache.bookkeeper.stats.OpStatsLogger;
+import io.netty.buffer.ByteBuf;
 
 /**
  * Common interface for compression/decompression operations using different compression codecs.
@@ -27,54 +27,54 @@
      * Enum specifying the currently supported compression types.
      */
     enum Type {
-        NONE, LZ4, UNKNOWN
+
+        UNKNOWN(-0x1),
+        NONE(0x0),
+        LZ4(0x1);
+
+        private int code;
+
+        Type(int code) {
+            this.code = code;
+        }
+
+        public int code() {
+            return this.code;
+        }
+
+        public static Type of(int code) {
+            switch (code) {
+                case 0x0:
+                    return NONE;
+                case 0x1:
+                    return LZ4;
+                default:
+                    return UNKNOWN;
+            }
+        }
+
     }
 
     /**
-     * Return the compressed data as a byte array.
-     * @param data
+     * Return the compressed data as a byte buffer.
+     *
+     * @param uncompressed
      *          The data to be compressed
-     * @param offset
-     *          The offset in the bytes of data to compress
-     * @param length
-     *          The number of bytes of data to compress
-     * @param compressionStat
-     *          The stat to use for timing the compression operation
+     * @param headerLen
+     *          Account the header len for compressed buffer.
      * @return
      *          The compressed data
      *          The returned byte array is sized to the length of the compressed data
      */
-    byte[] compress(byte[] data, int offset, int length, OpStatsLogger compressionStat);
+    ByteBuf compress(ByteBuf uncompressed, int headerLen);
 
     /**
      * Return the decompressed data as a byte array.
-     * @param data
-     *          The data to be decompressed
-     * @param offset
-     *          The offset in the bytes of data to decompress
-     * @param length
-     *          The number of bytes of data to decompress
-     * @param decompressionStat
-     *          The stat to use for timing the decompression operation
-     * @return
-     *          The decompressed data
-     */
-    byte[] decompress(byte[] data, int offset, int length, OpStatsLogger decompressionStat);
-
-    /**
-     * Return the decompressed data as a byte array.
-     * @param data
+     *
+     * @param compressed
      *          The data to the decompressed
-     * @param offset
-     *          The offset in the bytes of data to decompress
-     * @param length
-     *          The number of bytes of data to decompress
-     * @param decompressedSize
-     *          The exact size of the decompressed data
-     * @param decompressionStat
-     *          The stat to use for timing the decompression operation
      * @return
      *          The decompressed data
      */
-    byte[] decompress(byte[] data, int offset, int length, int decompressedSize, OpStatsLogger decompressionStat);
+    ByteBuf decompress(ByteBuf compressed, int decompressedSize);
 }
diff --git a/distributedlog-protocol/src/main/java/org/apache/distributedlog/io/CompressionUtils.java b/distributedlog-protocol/src/main/java/org/apache/distributedlog/io/CompressionUtils.java
index 7bac92a..ecbc09d 100644
--- a/distributedlog-protocol/src/main/java/org/apache/distributedlog/io/CompressionUtils.java
+++ b/distributedlog-protocol/src/main/java/org/apache/distributedlog/io/CompressionUtils.java
@@ -25,9 +25,6 @@
     public static final String LZ4 = "lz4";
     public static final String NONE = "none";
 
-    private static final CompressionCodec IDENTITY_CODEC = new IdentityCompressionCodec();
-    private static final CompressionCodec LZ4_CODEC = new LZ4CompressionCodec();
-
     /**
      * Get a cached compression codec instance for the specified type.
      * @param type compression codec type
@@ -35,10 +32,10 @@
      */
     public static CompressionCodec getCompressionCodec(CompressionCodec.Type type) {
         if (type == CompressionCodec.Type.LZ4) {
-            return LZ4_CODEC;
+            return LZ4CompressionCodec.of();
         }
         // No Compression
-        return IDENTITY_CODEC;
+        return IdentityCompressionCodec.of();
     }
 
     /**
diff --git a/distributedlog-protocol/src/main/java/org/apache/distributedlog/io/IdentityCompressionCodec.java b/distributedlog-protocol/src/main/java/org/apache/distributedlog/io/IdentityCompressionCodec.java
index 5034033..d082ff8 100644
--- a/distributedlog-protocol/src/main/java/org/apache/distributedlog/io/IdentityCompressionCodec.java
+++ b/distributedlog-protocol/src/main/java/org/apache/distributedlog/io/IdentityCompressionCodec.java
@@ -20,31 +20,40 @@
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 
-import java.util.Arrays;
-import org.apache.bookkeeper.stats.OpStatsLogger;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.CompositeByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
 
 /**
  * An identity compression codec implementation for {@link CompressionCodec}.
  */
 public class IdentityCompressionCodec implements CompressionCodec {
+
+    public static IdentityCompressionCodec of() {
+        return INSTANCE;
+    }
+
+    private static final IdentityCompressionCodec INSTANCE = new IdentityCompressionCodec();
+
     @Override
-    public byte[] compress(byte[] data, int offset, int length, OpStatsLogger compressionStat) {
-        checkNotNull(data);
-        checkArgument(length >= 0);
-        return Arrays.copyOfRange(data, offset, offset + length);
+    public ByteBuf compress(ByteBuf uncompressed, int headerLen) {
+        checkNotNull(uncompressed);
+        checkArgument(uncompressed.readableBytes() >= 0);
+        if (headerLen == 0) {
+            return uncompressed.retain();
+        } else {
+            CompositeByteBuf composited = PooledByteBufAllocator.DEFAULT.compositeBuffer(2);
+            composited.addComponent(PooledByteBufAllocator.DEFAULT.buffer(headerLen, headerLen));
+            composited.addComponent(uncompressed.retain());
+            return composited;
+        }
     }
 
     @Override
-    public byte[] decompress(byte[] data, int offset, int length, OpStatsLogger decompressionStat) {
-        checkNotNull(data);
-        return Arrays.copyOfRange(data, offset, offset + length);
-    }
-
-    @Override
-    // Decompressed size is the same as the length of the data because this is an
-    // Identity compressor
-    public byte[] decompress(byte[] data, int offset, int length,
-                             int decompressedSize, OpStatsLogger decompressionStat) {
-        return decompress(data, offset, length, decompressionStat);
+    public ByteBuf decompress(ByteBuf compressed, int decompressedSize) {
+        checkNotNull(compressed);
+        checkArgument(compressed.readableBytes() >= 0);
+        checkArgument(decompressedSize >= 0);
+        return compressed.retain();
     }
 }
diff --git a/distributedlog-protocol/src/main/java/org/apache/distributedlog/io/LZ4CompressionCodec.java b/distributedlog-protocol/src/main/java/org/apache/distributedlog/io/LZ4CompressionCodec.java
index b164fef..0f6b30a 100644
--- a/distributedlog-protocol/src/main/java/org/apache/distributedlog/io/LZ4CompressionCodec.java
+++ b/distributedlog-protocol/src/main/java/org/apache/distributedlog/io/LZ4CompressionCodec.java
@@ -20,14 +20,12 @@
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 
-import com.google.common.base.Stopwatch;
-import java.util.concurrent.TimeUnit;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import java.nio.ByteBuffer;
 import net.jpountz.lz4.LZ4Compressor;
-import net.jpountz.lz4.LZ4Exception;
 import net.jpountz.lz4.LZ4Factory;
 import net.jpountz.lz4.LZ4FastDecompressor;
-import net.jpountz.lz4.LZ4SafeDecompressor;
-import org.apache.bookkeeper.stats.OpStatsLogger;
 
 /**
  * An {@code lz4} based {@link CompressionCodec} implementation.
@@ -36,66 +34,55 @@
  */
 public class LZ4CompressionCodec implements CompressionCodec {
 
-    // Used for compression
-    private final LZ4Compressor compressor;
-    // Used to decompress when the size of the output is known
-    private final LZ4FastDecompressor fastDecompressor;
-    // Used to decompress when the size of the output is not known
-    private final LZ4SafeDecompressor safeDecompressor;
-
-    public LZ4CompressionCodec() {
-        this.compressor = LZ4Factory.fastestInstance().fastCompressor();
-        this.fastDecompressor = LZ4Factory.fastestInstance().fastDecompressor();
-        this.safeDecompressor = LZ4Factory.fastestInstance().safeDecompressor();
+    public static LZ4CompressionCodec of() {
+        return INSTANCE;
     }
 
-    @Override
-    public byte[] compress(byte[] data, int offset, int length, OpStatsLogger compressionStat) {
-        checkNotNull(data);
-        checkArgument(offset >= 0 && offset < data.length);
-        checkArgument(length >= 0);
-        checkNotNull(compressionStat);
+    private static final LZ4CompressionCodec INSTANCE = new LZ4CompressionCodec();
 
-        Stopwatch watch = Stopwatch.createStarted();
-        byte[] compressed = compressor.compress(data, offset, length);
-        compressionStat.registerSuccessfulEvent(watch.elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
+    private static final LZ4Factory factory = LZ4Factory.fastestJavaInstance();
+    // Used for compression
+    private static final LZ4Compressor compressor = factory.fastCompressor();
+    // Used to decompress when the size of the output is known
+    private static final LZ4FastDecompressor decompressor = factory.fastDecompressor();
+
+    @Override
+    public ByteBuf compress(ByteBuf uncompressed, int headerLen) {
+        checkNotNull(uncompressed);
+        checkArgument(uncompressed.readableBytes() > 0);
+
+        int uncompressedLen = uncompressed.readableBytes();
+        int maxLen = compressor.maxCompressedLength(uncompressedLen);
+
+        // get the source bytebuffer
+        ByteBuffer uncompressedNio = uncompressed.nioBuffer(uncompressed.readerIndex(), uncompressedLen);
+        ByteBuf compressed = PooledByteBufAllocator.DEFAULT.buffer(
+                maxLen + headerLen, maxLen + headerLen);
+        ByteBuffer compressedNio = compressed.nioBuffer(headerLen, maxLen);
+
+        int compressedLen = compressor.compress(
+                uncompressedNio, uncompressedNio.position(), uncompressedLen,
+                compressedNio, compressedNio.position(), maxLen);
+        compressed.writerIndex(compressedLen + headerLen);
+
         return compressed;
     }
 
     @Override
-    public byte[] decompress(byte[] data, int offset, int length, OpStatsLogger decompressionStat) {
-        checkNotNull(data);
-        checkArgument(offset >= 0 && offset < data.length);
-        checkArgument(length >= 0);
-        checkNotNull(decompressionStat);
-
-        Stopwatch watch = Stopwatch.createStarted();
-        // Assume that we have a compression ratio of 1/3.
-        int outLength = length * 3;
-        while (true) {
-            try {
-                byte[] decompressed = safeDecompressor.decompress(data, offset, length, outLength);
-                decompressionStat.registerSuccessfulEvent(watch.elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
-                return decompressed;
-            } catch (LZ4Exception e) {
-                outLength *= 2;
-            }
-        }
-    }
-
-    @Override
     // length parameter is ignored here because of the way the fastDecompressor works.
-    public byte[] decompress(byte[] data, int offset, int length, int decompressedSize,
-                             OpStatsLogger decompressionStat) {
-        checkNotNull(data);
-        checkArgument(offset >= 0 && offset < data.length);
-        checkArgument(length >= 0);
+    public ByteBuf decompress(ByteBuf compressed, int decompressedSize) {
+        checkNotNull(compressed);
+        checkArgument(compressed.readableBytes() >= 0);
         checkArgument(decompressedSize >= 0);
-        checkNotNull(decompressionStat);
 
-        Stopwatch watch = Stopwatch.createStarted();
-        byte[] decompressed = fastDecompressor.decompress(data, offset, decompressedSize);
-        decompressionStat.registerSuccessfulEvent(watch.elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
-        return decompressed;
+        ByteBuf uncompressed = PooledByteBufAllocator.DEFAULT.buffer(decompressedSize, decompressedSize);
+        ByteBuffer uncompressedNio = uncompressed.nioBuffer(0, decompressedSize);
+        ByteBuffer compressedNio = compressed.nioBuffer(compressed.readerIndex(), compressed.readableBytes());
+
+        decompressor.decompress(
+                compressedNio, compressedNio.position(),
+                uncompressedNio, uncompressedNio.position(), uncompressedNio.remaining());
+        uncompressed.writerIndex(decompressedSize);
+        return uncompressed;
     }
 }
diff --git a/distributedlog-protocol/src/test/java/org/apache/distributedlog/TestLogRecordSet.java b/distributedlog-protocol/src/test/java/org/apache/distributedlog/TestLogRecordSet.java
index 1c5db24..a75f071 100644
--- a/distributedlog-protocol/src/test/java/org/apache/distributedlog/TestLogRecordSet.java
+++ b/distributedlog-protocol/src/test/java/org/apache/distributedlog/TestLogRecordSet.java
@@ -25,9 +25,11 @@
 import static org.junit.Assert.fail;
 
 import com.google.common.collect.Lists;
+import io.netty.buffer.ByteBuf;
 import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.distributedlog.LogRecordSet.Reader;
 import org.apache.distributedlog.LogRecordSet.Writer;
 import org.apache.distributedlog.exceptions.LogRecordTooLongException;
@@ -38,6 +40,7 @@
 /**
  * Test Case for {@link LogRecordSet}.
  */
+@Slf4j
 public class TestLogRecordSet {
 
     @Test(timeout = 60000)
@@ -46,21 +49,19 @@
         assertEquals("zero user bytes", HEADER_LEN, writer.getNumBytes());
         assertEquals("zero records", 0, writer.getNumRecords());
 
-        ByteBuffer buffer = writer.getBuffer();
-        assertEquals("zero user bytes", HEADER_LEN, buffer.remaining());
-
-        byte[] data = new byte[buffer.remaining()];
-        buffer.get(data);
+        ByteBuf buffer = writer.getBuffer();
+        assertEquals("zero user bytes", HEADER_LEN, buffer.readableBytes());
 
         LogRecordWithDLSN record = new LogRecordWithDLSN(
                 new DLSN(1L, 0L, 0L),
                 1L,
-                data,
+                buffer,
                 1L);
         record.setRecordSet();
         Reader reader = LogRecordSet.of(record);
         assertNull("Empty record set should return null",
                 reader.nextRecord());
+        reader.release();
     }
 
     @Test(timeout = 60000)
@@ -78,21 +79,19 @@
         }
         assertEquals("zero user bytes", HEADER_LEN, writer.getNumBytes());
         assertEquals("zero records", 0, writer.getNumRecords());
-        ByteBuffer buffer = writer.getBuffer();
-        assertEquals("zero user bytes", HEADER_LEN, buffer.remaining());
-
-        byte[] data = new byte[buffer.remaining()];
-        buffer.get(data);
+        ByteBuf buffer = writer.getBuffer();
+        assertEquals("zero user bytes", HEADER_LEN, buffer.readableBytes());
 
         LogRecordWithDLSN record = new LogRecordWithDLSN(
                 new DLSN(1L, 0L, 0L),
                 1L,
-                data,
+                buffer,
                 1L);
         record.setRecordSet();
         Reader reader = LogRecordSet.of(record);
         assertNull("Empty record set should return null",
                 reader.nextRecord());
+        reader.release();
     }
 
     @Test(timeout = 20000)
@@ -137,7 +136,7 @@
             assertEquals((i + 6) + " records", (i + 6), writer.getNumRecords());
         }
 
-        ByteBuffer buffer = writer.getBuffer();
+        ByteBuf buffer = writer.getBuffer();
         assertEquals("10 records", 10, writer.getNumRecords());
 
         // Test transmit complete
@@ -147,14 +146,10 @@
             assertEquals(new DLSN(1L, 1L, 10L + i), writeResults.get(i));
         }
 
-        // Test reading from buffer
-        byte[] data = new byte[buffer.remaining()];
-        buffer.get(data);
-
         LogRecordWithDLSN record = new LogRecordWithDLSN(
                 new DLSN(1L, 1L, 10L),
                 99L,
-                data,
+                buffer,
                 999L);
         record.setPositionWithinLogSegment(888);
         record.setRecordSet();
@@ -172,6 +167,7 @@
             readRecord = reader.nextRecord();
         }
         assertEquals(10, numReads);
+        reader.release();
     }
 
 }
diff --git a/distributedlog-protocol/src/test/java/org/apache/distributedlog/io/TestCompressionCodec.java b/distributedlog-protocol/src/test/java/org/apache/distributedlog/io/TestCompressionCodec.java
new file mode 100644
index 0000000..23bb8c6
--- /dev/null
+++ b/distributedlog-protocol/src/test/java/org/apache/distributedlog/io/TestCompressionCodec.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.io;
+
+import static com.google.common.base.Charsets.UTF_8;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.buffer.Unpooled;
+import org.apache.distributedlog.LogRecord;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Test Case for {@link CompressionCodec}.
+ */
+public class TestCompressionCodec {
+
+    @Test(timeout = 10000)
+    public void testUnknownCompressionCodec() throws Exception {
+        assertEquals(
+                CompressionCodec.Type.UNKNOWN,
+                CompressionUtils.stringToType("unknown"));
+    }
+
+    @Test(timeout = 10000)
+    public void testIdentityCompressionCodec() throws Exception {
+        testCompressionCodec(CompressionUtils.getCompressionCodec(CompressionCodec.Type.NONE));
+    }
+
+    @Test(timeout = 10000)
+    public void testLZ4CompressionCodec() throws Exception {
+        testCompressionCodec(CompressionUtils.getCompressionCodec(CompressionCodec.Type.LZ4));
+    }
+
+    @Test(timeout = 10000)
+    public void testIdentityCompressionCodec2() throws Exception {
+        testCompressionCodec2(CompressionUtils.getCompressionCodec(CompressionCodec.Type.NONE));
+    }
+
+    @Test(timeout = 10000)
+    public void testLZ4CompressionCodec2() throws Exception {
+        testCompressionCodec2(CompressionUtils.getCompressionCodec(CompressionCodec.Type.LZ4));
+    }
+
+    private void testCompressionCodec(CompressionCodec codec) throws Exception {
+        byte[] data = "identity-compression-codec".getBytes(UTF_8);
+        ByteBuf buf = Unpooled.wrappedBuffer(data);
+        ByteBuf compressedBuf = codec.compress(buf, 0);
+        ByteBuf decompressedBuf = codec.decompress(compressedBuf, data.length);
+        assertEquals("The length of decompressed buf should be same as the original buffer",
+                data.length, decompressedBuf.readableBytes());
+        byte[] decompressedData = new byte[data.length];
+        decompressedBuf.readBytes(decompressedData);
+        assertArrayEquals("The decompressed bytes should be same as the original bytes",
+                data, decompressedData);
+        buf.release();
+        compressedBuf.release();
+        decompressedBuf.release();
+    }
+
+    private void testCompressionCodec2(CompressionCodec codec) throws Exception {
+        ByteBuf buffer = PooledByteBufAllocator.DEFAULT.buffer(32, LogRecord.MAX_LOGRECORDSET_SIZE);
+        for (int i = 0; i < 100; i++) {
+            ByteBuffer record = ByteBuffer.wrap(("record-" + i).getBytes(UTF_8));
+            buffer.writeInt(record.remaining());
+            buffer.writeBytes(record);
+        }
+        byte[] uncompressedData = new byte[buffer.readableBytes()];
+        buffer.slice().readBytes(uncompressedData);
+
+        ByteBuf compressedBuf = codec.compress(buffer, 0);
+        byte[] compressedData = new byte[compressedBuf.readableBytes()];
+        compressedBuf.slice().readBytes(compressedData);
+
+        ByteBuf decompressedBuf = codec.decompress(compressedBuf, uncompressedData.length);
+        byte[] decompressedData = new byte[decompressedBuf.readableBytes()];
+        decompressedBuf.slice().readBytes(decompressedData);
+
+        buffer.release();
+        compressedBuf.release();
+        decompressedBuf.release();
+    }
+
+}
diff --git a/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/DistributedLogClientImpl.java b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/DistributedLogClientImpl.java
index 0ed93d0..ce851b5 100644
--- a/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/DistributedLogClientImpl.java
+++ b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/DistributedLogClientImpl.java
@@ -24,6 +24,8 @@
 import com.google.common.base.Stopwatch;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
 import org.apache.distributedlog.DLSN;
 import org.apache.distributedlog.LogRecordSetBuffer;
 import org.apache.distributedlog.client.monitor.MonitorServiceClient;
@@ -367,10 +369,12 @@
                         beforeComplete(sc, response.getHeader());
                         AbstractWriteOp.this.complete(sc.getAddress(), response);
                     }
+                    onSendWriteRequestCompleted();
                 }
                 @Override
                 public void onFailure(Throwable cause) {
                     // handled by the ResponseHeader listener
+                    onSendWriteRequestCompleted();
                 }
             }).map(new AbstractFunction1<WriteResponse, ResponseHeader>() {
                 @Override
@@ -381,27 +385,58 @@
         }
 
         abstract Future<WriteResponse> sendWriteRequest(ProxyClient sc);
+
+        // action triggered on {@link #sendWriteRequest} completed (either succeed or failed)
+        void onSendWriteRequestCompleted() {
+        }
     }
 
     class WriteOp extends AbstractWriteOp {
+        final ByteBuf dataBuf;
         final ByteBuffer data;
 
+        WriteOp(final String name, final ByteBuf dataBuf) {
+            super(name, clientStats.getOpStats("write"));
+            this.dataBuf = dataBuf;
+            this.data = dataBuf.nioBuffer();
+        }
+
         WriteOp(final String name, final ByteBuffer data) {
             super(name, clientStats.getOpStats("write"));
             this.data = data;
+            this.dataBuf = Unpooled.wrappedBuffer(data);
+        }
+
+        @Override
+        void complete(SocketAddress address, WriteResponse response) {
+            super.complete(address, response);
+            release();
+        }
+
+        @Override
+        void fail(SocketAddress address, Throwable t) {
+            super.fail(address, t);
+            release();
         }
 
         @Override
         Future<WriteResponse> sendWriteRequest(ProxyClient sc) {
-            return sc.getService().writeWithContext(stream, data, ctx);
+            // retain the databuf when sending a write request
+            // release the databuf {@link #onSendWriteRequestCompleted()}
+            dataBuf.retain();
+            return sc.getService()
+                .writeWithContext(stream, data.duplicate(), ctx);
+        }
+
+        @Override
+        void onSendWriteRequestCompleted() {
+            dataBuf.release();
         }
 
         @Override
         Long computeChecksum() {
             if (null == crc32) {
-                byte[] dataBytes = new byte[data.remaining()];
-                data.duplicate().get(dataBytes);
-                crc32 = ProtocolUtils.writeOpCRC32(stream, dataBytes);
+                crc32 = ProtocolUtils.writeOpCRC32(stream, data.duplicate());
             }
             return crc32;
         }
@@ -414,6 +449,12 @@
                 }
             });
         }
+
+        void release() {
+            if (null != dataBuf) {
+                dataBuf.release();
+            }
+        }
     }
 
     class TruncateOp extends AbstractWriteOp {
@@ -456,7 +497,6 @@
 
     }
 
-
     class ReleaseOp extends AbstractWriteOp {
 
         ReleaseOp(String name) {
diff --git a/distributedlog-proxy-protocol/src/main/java/org/apache/distributedlog/protocol/util/ProtocolUtils.java b/distributedlog-proxy-protocol/src/main/java/org/apache/distributedlog/protocol/util/ProtocolUtils.java
index 1f91968..1768f5c 100644
--- a/distributedlog-proxy-protocol/src/main/java/org/apache/distributedlog/protocol/util/ProtocolUtils.java
+++ b/distributedlog-proxy-protocol/src/main/java/org/apache/distributedlog/protocol/util/ProtocolUtils.java
@@ -19,6 +19,8 @@
 
 import static com.google.common.base.Charsets.UTF_8;
 
+import io.netty.buffer.ByteBuf;
+import java.nio.ByteBuffer;
 import org.apache.distributedlog.DLSN;
 import java.util.zip.CRC32;
 import org.apache.distributedlog.exceptions.DLException;
@@ -42,11 +44,25 @@
     /**
      * Generate crc32 for WriteOp.
      */
-    public static Long writeOpCRC32(String stream, byte[] payload) {
+    public static Long writeOpCRC32(String stream, ByteBuf data) {
         CRC32 crc = requestCRC.get();
         try {
             crc.update(stream.getBytes(UTF_8));
-            crc.update(payload);
+            crc.update(data.nioBuffer());
+            return crc.getValue();
+        } finally {
+            crc.reset();
+        }
+    }
+
+    /**
+     * Generate crc32 for WriteOp.
+     */
+    public static Long writeOpCRC32(String stream, ByteBuffer data) {
+        CRC32 crc = requestCRC.get();
+        try {
+            crc.update(stream.getBytes(UTF_8));
+            crc.update(data);
             return crc.getValue();
         } finally {
             crc.reset();
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/WriteOp.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/WriteOp.java
index e3aa703..b78ba00 100644
--- a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/WriteOp.java
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/WriteOp.java
@@ -53,7 +53,7 @@
 
     private static final Logger logger = LoggerFactory.getLogger(WriteOp.class);
 
-    private final byte[] payload;
+    private final ByteBuffer payload;
     private final boolean isRecordSet;
 
     // Stats
@@ -80,8 +80,7 @@
                    Feature checksumDisabledFeature,
                    AccessControlManager accessControlManager) {
         super(stream, requestStat(statsLogger, "write"), checksum, checksumDisabledFeature);
-        payload = new byte[data.remaining()];
-        data.get(payload);
+        this.payload = data;
         this.isRecordSet = isRecordSet;
 
         final Partition partition = streamPartitionConverter.convert(stream);
@@ -121,12 +120,12 @@
 
     @Override
     public long getPayloadSize() {
-      return payload.length;
+      return payload.remaining();
     }
 
     @Override
     public Long computeChecksum() {
-        return ProtocolUtils.writeOpCRC32(stream, payload);
+        return ProtocolUtils.writeOpCRC32(stream, payload.duplicate());
     }
 
     @Override
diff --git a/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/TestDistributedLogService.java b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/TestDistributedLogService.java
index 60f814e..ef1b5eb 100644
--- a/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/TestDistributedLogService.java
+++ b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/TestDistributedLogService.java
@@ -26,6 +26,7 @@
 import static org.junit.Assert.fail;
 
 import com.google.common.collect.Lists;
+import io.netty.buffer.Unpooled;
 import org.apache.distributedlog.DLSN;
 import org.apache.distributedlog.DistributedLogConfiguration;
 import org.apache.distributedlog.TestDistributedLogBase;
@@ -569,7 +570,7 @@
     public void testWriteOpChecksumBadStream() throws Exception {
         DistributedLogServiceImpl localService = createConfiguredLocalService();
         WriteContext ctx = new WriteContext().setCrc32(
-            ProtocolUtils.writeOpCRC32("test", getTestDataBuffer().array()));
+            ProtocolUtils.writeOpCRC32("test", getTestDataBuffer()));
         Future<WriteResponse> result = localService.writeWithContext("test1", getTestDataBuffer(), ctx);
         WriteResponse resp = Await.result(result);
         assertEquals(StatusCode.CHECKSUM_FAILED, resp.getHeader().getCode());
@@ -581,7 +582,7 @@
         DistributedLogServiceImpl localService = createConfiguredLocalService();
         ByteBuffer buffer = getTestDataBuffer();
         WriteContext ctx = new WriteContext().setCrc32(
-            ProtocolUtils.writeOpCRC32("test", buffer.array()));
+            ProtocolUtils.writeOpCRC32("test", buffer));
 
         // Overwrite 1 byte to corrupt data.
         buffer.put(1, (byte) 0xab);
@@ -658,11 +659,11 @@
         WriteOp writeOp0 = getWriteOp(
             streamName,
             disabledFeature,
-            ProtocolUtils.writeOpCRC32(streamName, "test".getBytes()));
+            ProtocolUtils.writeOpCRC32(streamName, Unpooled.wrappedBuffer("test".getBytes())));
         WriteOp writeOp1 = getWriteOp(
             streamName,
             disabledFeature,
-            ProtocolUtils.writeOpCRC32(streamName, "test".getBytes()));
+            ProtocolUtils.writeOpCRC32(streamName, Unpooled.wrappedBuffer("test".getBytes())));
 
         writeOp0.preExecute();
         disabledFeature.set(0);
diff --git a/pom.xml b/pom.xml
index bf2a57d..88bfa4a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -114,8 +114,9 @@
     <junit.version>4.8.1</junit.version>
     <libthrift.version>0.5.0-1</libthrift.version>
     <lombok.version>1.16.16</lombok.version>
-    <lz4.version>1.2.0</lz4.version>
+    <lz4.version>1.3.0</lz4.version>
     <mockito.version>1.9.5</mockito.version>
+    <netty.version>4.1.12.Final</netty.version>
     <scrooge.version>4.6.0</scrooge.version>
     <slf4j.version>1.6.4</slf4j.version>
     <stats-util.version>0.0.58</stats-util.version>
@@ -215,7 +216,7 @@
         <version>${maven-surefire-plugin.version}</version>
         <configuration>
           <redirectTestOutputToFile>true</redirectTestOutputToFile>
-          <argLine>-Xmx3G -Djava.net.preferIPv4Stack=true -XX:MaxDirectMemorySize=2G</argLine>
+          <argLine>-Xmx3G -Djava.net.preferIPv4Stack=true -XX:MaxDirectMemorySize=2G -Dio.netty.leakDetection.level=PARANOID</argLine>
           <forkMode>always</forkMode>
           <forkedProcessTimeoutInSeconds>1800</forkedProcessTimeoutInSeconds>
         </configuration>
@@ -226,25 +227,30 @@
         <version>${apache-rat-plugin.version}</version>
         <configuration>
           <excludes>
+            <exclude>ChangeLog</exclude>
+            <exclude>CONFIG.ini</exclude>
+            <exclude>GROUPS</exclude>
+            <exclude>OWNERS</exclude>
+            <exclude>dist/**/*</exclude>
             <exclude>docs/**/*</exclude>
+            <exclude>scripts/dev/reviewers</exclude>
             <exclude>website/**/*</exclude>
+            <exclude>**/*.md</exclude>
+            <exclude>**/apidocs/*</exclude>
+            <exclude>**/dependency-reduced-pom.xml</exclude>
+            <exclude>**/org/apache/distributedlog/thrift/*</exclude>
+            <!-- Git -->
             <exclude>.git/**/*</exclude>
             <exclude>.github/**/*</exclude>
             <exclude>.gitignore</exclude>
             <exclude>.idea/**/*</exclude>
-            <exclude>dist/**/*</exclude>
+            <!-- Intellij -->
+            <exclude>**/*.iml</exclude>
+            <exclude>**/*.iws</exclude>
+            <exclude>**/*.ipr</exclude>
+            <!-- SVN -->
             <exclude>**/.svn/**/*</exclude>
-            <exclude>ChangeLog</exclude>
-            <exclude>**/README.md</exclude>
-            <exclude>**/apidocs/*</exclude>
-            <exclude>GROUPS</exclude>
-            <exclude>OWNERS</exclude>
-            <exclude>CONFIG.ini</exclude>
-            <exclude>**/**.md</exclude>
-            <exclude>**/**.iml</exclude>
-            <exclude>scripts/dev/reviewers</exclude>
-            <exclude>**/dependency-reduced-pom.xml</exclude>
-            <exclude>**/org/apache/distributedlog/thrift/*</exclude>
+            <!-- Maven -->
             <exclude>.repository/**</exclude>
           </excludes>
         </configuration>
diff --git a/tests/jmh/conf/log4j.properties b/tests/jmh/conf/log4j.properties
new file mode 100644
index 0000000..af1cf5f
--- /dev/null
+++ b/tests/jmh/conf/log4j.properties
@@ -0,0 +1,56 @@
+#/**
+# * Licensed to the Apache Software Foundation (ASF) under one
+# * or more contributor license agreements.  See the NOTICE file
+# * distributed with this work for additional information
+# * regarding copyright ownership.  The ASF licenses this file
+# * to you under the Apache License, Version 2.0 (the
+# * "License"); you may not use this file except in compliance
+# * with the License.  You may obtain a copy of the License at
+# *
+# *     http://www.apache.org/licenses/LICENSE-2.0
+# *
+# * Unless required by applicable law or agreed to in writing, software
+# * distributed under the License is distributed on an "AS IS" BASIS,
+# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# * See the License for the specific language governing permissions and
+# * limitations under the License.
+# */
+
+#
+# DistributedLog Logging Configuration
+#
+
+# Default values
+dlog.root.logger=INFO, R
+dlog.log.dir=logs
+dlog.log.file=dlog.log
+
+log4j.rootLogger=${dlog.root.logger}
+log4j.logger.org.apache.zookeeper=INFO
+log4j.logger.org.apache.bookkeeper=INFO
+
+# redirect executor output to executors.log since slow op warnings can be quite verbose
+log4j.logger.org.apache.bookkeeper.util.SafeRunnable=INFO, Executors
+log4j.additivity.org.apache.bookkeeper.util.SafeRunnable=false
+
+log4j.appender.Executors=org.apache.log4j.RollingFileAppender
+log4j.appender.Executors.Threshold=INFO
+log4j.appender.Executors.File=${dlog.log.dir}/executors.log
+log4j.appender.Executors.MaxFileSize=20MB
+log4j.appender.Executors.MaxBackupIndex=5
+log4j.appender.Executors.layout=org.apache.log4j.PatternLayout
+log4j.appender.Executors.layout.ConversionPattern=%d{ISO8601} - %-5p - [%t:%C{1}@%L] - %m%n
+
+log4j.appender.R=org.apache.log4j.RollingFileAppender
+log4j.appender.R.Threshold=INFO
+log4j.appender.R.File=${dlog.log.dir}/${dlog.log.file}
+log4j.appender.R.MaxFileSize=20MB
+log4j.appender.R.MaxBackupIndex=50
+log4j.appender.R.layout=org.apache.log4j.PatternLayout
+log4j.appender.R.layout.ConversionPattern=%d{ISO8601} - %-5p - [%t:%C{1}@%L] - %m%n
+
+log4j.appender.stderr=org.apache.log4j.ConsoleAppender
+log4j.appender.stderr.Target=System.err
+log4j.appender.stderr.Threshold=INFO
+log4j.appender.stderr.layout=org.apache.log4j.PatternLayout
+log4j.appender.stderr.layout.ConversionPattern=%d{ISO8601} - %-5p - [%t:%C{1}@%L] - %m%n
diff --git a/tests/jmh/src/main/java/org/apache/distributedlog/tests/CompressionBenchmark.java b/tests/jmh/src/main/java/org/apache/distributedlog/tests/CompressionBenchmark.java
index af66212..1869e4f 100644
--- a/tests/jmh/src/main/java/org/apache/distributedlog/tests/CompressionBenchmark.java
+++ b/tests/jmh/src/main/java/org/apache/distributedlog/tests/CompressionBenchmark.java
@@ -20,6 +20,8 @@
  */
 package org.apache.distributedlog.tests;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
 import java.util.Random;
 import java.util.concurrent.TimeUnit;
 import org.apache.bookkeeper.stats.NullStatsLogger;
@@ -47,9 +49,9 @@
     @Param({ "10", "100", "1000", "10000" })
     int size;
 
-    byte[] entry;
-    byte[] compressedLz4;
-    byte[] compressedNone;
+    ByteBuf entry;
+    ByteBuf compressedLz4;
+    ByteBuf compressedNone;
     CompressionCodec codecLz4;
     CompressionCodec codecNone;
     OpStatsLogger opStatsLogger;
@@ -57,14 +59,16 @@
     @Setup
     public void prepare() {
         Random r = new Random(System.currentTimeMillis());
-        this.entry = new byte[this.size];
-        r.nextBytes(entry);
+        byte[] data = new byte[this.size];
+        r.nextBytes(data);
+        this.entry = Unpooled.buffer(size);
+        this.entry.writeBytes(data);
 
         this.codecLz4 = CompressionUtils.getCompressionCodec(Type.LZ4);
         this.codecNone = CompressionUtils.getCompressionCodec(Type.NONE);
         this.opStatsLogger = NullStatsLogger.INSTANCE.getOpStatsLogger("");
-        this.compressedLz4 = codecLz4.compress(entry, 0, entry.length, opStatsLogger);
-        this.compressedNone = codecNone.compress(entry, 0, entry.length, opStatsLogger);
+        this.compressedLz4 = codecLz4.compress(entry.slice(), 0);
+        this.compressedNone = codecNone.compress(entry.slice(), 0);
     }
 
 
@@ -79,7 +83,8 @@
     }
 
     private void testCompress(CompressionCodec codec) {
-        codec.compress(entry, 0, entry.length, opStatsLogger);
+        ByteBuf compressed = codec.compress(entry.slice(), 0);
+        compressed.release();
     }
 
     @Benchmark
@@ -92,8 +97,9 @@
         testDecompress(codecNone, compressedNone);
     }
 
-    private void testDecompress(CompressionCodec codec, byte[] compressed) {
-        codec.decompress(compressed, 0, compressed.length, opStatsLogger);
+    private void testDecompress(CompressionCodec codec, ByteBuf compressed) {
+        ByteBuf decompressed = codec.decompress(compressed.slice(), size);
+        decompressed.release();
     }
 
 }