ISSUE #226: ByteBuf.release() was not called before it's garbage-collected

Descriptions of the changes in this PR:

the problem is a new entry buffer was allocated when closing log segment writer. the entry buffer is never used and also never recycled. It causes an annoying logging.

the fix is to assign a dummy entry writer which basically rejects writes, when closing a log segment writer. it would prevent leaking bytebuf.

Author: Sijie Guo <sijie@apache.org>

Reviewers: Jia Zhai <None>

This closes #230 from sijie/fix_bytebuf_release_pr, closes #226
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogSegmentWriter.java b/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogSegmentWriter.java
index 1d65d07..8584780 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogSegmentWriter.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogSegmentWriter.java
@@ -18,6 +18,7 @@
 package org.apache.distributedlog;
 
 import static com.google.common.base.Charsets.UTF_8;
+import static org.apache.distributedlog.DistributedLogConstants.INVALID_TXID;
 import static org.apache.distributedlog.LogRecord.MAX_LOGRECORDSET_SIZE;
 import static org.apache.distributedlog.LogRecord.MAX_LOGRECORD_SIZE;
 
@@ -47,6 +48,7 @@
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.util.MathUtils;
+import org.apache.distributedlog.Entry.Writer;
 import org.apache.distributedlog.common.concurrent.FutureEventListener;
 import org.apache.distributedlog.common.concurrent.FutureUtils;
 import org.apache.distributedlog.common.stats.OpStatsListener;
@@ -71,11 +73,8 @@
 import org.apache.distributedlog.logsegment.LogSegmentEntryWriter;
 import org.apache.distributedlog.logsegment.LogSegmentWriter;
 import org.apache.distributedlog.util.FailpointUtils;
-
-
 import org.apache.distributedlog.util.OrderedScheduler;
 import org.apache.distributedlog.util.SimplePermitLimiter;
-
 import org.apache.distributedlog.util.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -104,6 +103,54 @@
 class BKLogSegmentWriter implements LogSegmentWriter, AddCallback, Runnable, Sizable {
     static final Logger LOG = LoggerFactory.getLogger(BKLogSegmentWriter.class);
 
+    final Writer REJECT_WRITES_WRITER = new Writer() {
+        @Override
+        public void writeRecord(LogRecord record, CompletableFuture<DLSN> transmitPromise)
+            throws LogRecordTooLongException, WriteException {
+            throw new WriteException(getFullyQualifiedLogSegment(), "Write record is cancelled.");
+        }
+
+        @Override
+        public boolean hasUserRecords() {
+            return false;
+        }
+
+        @Override
+        public int getNumRecords() {
+            return 0;
+        }
+
+        @Override
+        public int getNumBytes() {
+            return 0;
+        }
+
+        @Override
+        public long getMaxTxId() {
+            return INVALID_TXID;
+        }
+
+        @Override
+        public ByteBuf getBuffer() throws InvalidEnvelopedEntryException, IOException {
+            throw new IOException("GetBuffer is not supported.");
+        }
+
+        @Override
+        public DLSN finalizeTransmit(long lssn, long entryId) {
+            return new DLSN(lssn, entryId, -1L);
+        }
+
+        @Override
+        public void completeTransmit(long lssn, long entryId) {
+            // no-op
+        }
+
+        @Override
+        public void abortTransmit(Throwable reason) {
+            // no-op
+        }
+    };
+
     private final String fullyQualifiedLogSegment;
     private final String streamName;
     private final int logSegmentMetadataVersion;
@@ -120,8 +167,8 @@
     private final boolean isDurableWriteEnabled;
     private DLSN lastDLSN = DLSN.InvalidDLSN;
     private final long startTxId;
-    private long lastTxId = DistributedLogConstants.INVALID_TXID;
-    private long lastTxIdAcknowledged = DistributedLogConstants.INVALID_TXID;
+    private long lastTxId = INVALID_TXID;
+    private long lastTxIdAcknowledged = INVALID_TXID;
     private long outstandingBytes = 0;
     private long numFlushesSinceRestart = 0;
     private long numBytes = 0;
@@ -555,7 +602,7 @@
         synchronized (this) {
             packetPreviousSaved = packetPrevious;
             packetCurrentSaved = new BKTransmitPacket(recordSetWriter);
-            recordSetWriter = newRecordSetWriter();
+            recordSetWriter = REJECT_WRITES_WRITER;
         }
 
         // Once the last packet been transmitted, apply any remaining promises asynchronously