ISSUE #226: ByteBuf.release() was not called before it's garbage-collected
Descriptions of the changes in this PR:
the problem is a new entry buffer was allocated when closing log segment writer. the entry buffer is never used and also never recycled. It causes an annoying logging.
the fix is to assign a dummy entry writer which basically rejects writes, when closing a log segment writer. it would prevent leaking bytebuf.
Author: Sijie Guo <sijie@apache.org>
Reviewers: Jia Zhai <None>
This closes #230 from sijie/fix_bytebuf_release_pr, closes #226
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogSegmentWriter.java b/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogSegmentWriter.java
index 1d65d07..8584780 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogSegmentWriter.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogSegmentWriter.java
@@ -18,6 +18,7 @@
package org.apache.distributedlog;
import static com.google.common.base.Charsets.UTF_8;
+import static org.apache.distributedlog.DistributedLogConstants.INVALID_TXID;
import static org.apache.distributedlog.LogRecord.MAX_LOGRECORDSET_SIZE;
import static org.apache.distributedlog.LogRecord.MAX_LOGRECORD_SIZE;
@@ -47,6 +48,7 @@
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.util.MathUtils;
+import org.apache.distributedlog.Entry.Writer;
import org.apache.distributedlog.common.concurrent.FutureEventListener;
import org.apache.distributedlog.common.concurrent.FutureUtils;
import org.apache.distributedlog.common.stats.OpStatsListener;
@@ -71,11 +73,8 @@
import org.apache.distributedlog.logsegment.LogSegmentEntryWriter;
import org.apache.distributedlog.logsegment.LogSegmentWriter;
import org.apache.distributedlog.util.FailpointUtils;
-
-
import org.apache.distributedlog.util.OrderedScheduler;
import org.apache.distributedlog.util.SimplePermitLimiter;
-
import org.apache.distributedlog.util.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -104,6 +103,54 @@
class BKLogSegmentWriter implements LogSegmentWriter, AddCallback, Runnable, Sizable {
static final Logger LOG = LoggerFactory.getLogger(BKLogSegmentWriter.class);
+ final Writer REJECT_WRITES_WRITER = new Writer() {
+ @Override
+ public void writeRecord(LogRecord record, CompletableFuture<DLSN> transmitPromise)
+ throws LogRecordTooLongException, WriteException {
+ throw new WriteException(getFullyQualifiedLogSegment(), "Write record is cancelled.");
+ }
+
+ @Override
+ public boolean hasUserRecords() {
+ return false;
+ }
+
+ @Override
+ public int getNumRecords() {
+ return 0;
+ }
+
+ @Override
+ public int getNumBytes() {
+ return 0;
+ }
+
+ @Override
+ public long getMaxTxId() {
+ return INVALID_TXID;
+ }
+
+ @Override
+ public ByteBuf getBuffer() throws InvalidEnvelopedEntryException, IOException {
+ throw new IOException("GetBuffer is not supported.");
+ }
+
+ @Override
+ public DLSN finalizeTransmit(long lssn, long entryId) {
+ return new DLSN(lssn, entryId, -1L);
+ }
+
+ @Override
+ public void completeTransmit(long lssn, long entryId) {
+ // no-op
+ }
+
+ @Override
+ public void abortTransmit(Throwable reason) {
+ // no-op
+ }
+ };
+
private final String fullyQualifiedLogSegment;
private final String streamName;
private final int logSegmentMetadataVersion;
@@ -120,8 +167,8 @@
private final boolean isDurableWriteEnabled;
private DLSN lastDLSN = DLSN.InvalidDLSN;
private final long startTxId;
- private long lastTxId = DistributedLogConstants.INVALID_TXID;
- private long lastTxIdAcknowledged = DistributedLogConstants.INVALID_TXID;
+ private long lastTxId = INVALID_TXID;
+ private long lastTxIdAcknowledged = INVALID_TXID;
private long outstandingBytes = 0;
private long numFlushesSinceRestart = 0;
private long numBytes = 0;
@@ -555,7 +602,7 @@
synchronized (this) {
packetPreviousSaved = packetPrevious;
packetCurrentSaved = new BKTransmitPacket(recordSetWriter);
- recordSetWriter = newRecordSetWriter();
+ recordSetWriter = REJECT_WRITES_WRITER;
}
// Once the last packet been transmitted, apply any remaining promises asynchronously