DL-45: DL should allow ByteBuf based API to avoid copying bytes array
Descriptions of the changes in this PR:
This change leverages the `ByteBuf` api introduced in bookkeeper 4.5.0. It will avoid copying bytes array between LogRecord and LogRecordSet/Entry, and avoid copying bytes from DL to BK client.
This change also bump the lz4 library to `1.3.0` to leverage the `ByteBuffer` binding.
Author: Sijie Guo <sijie@apache.org>
Author: Jia Zhai <zhaijia03@gmail.com>
Author: Jia Zhai <zhaijia@apache.org>
Reviewers: Jia Zhai <None>, Enrico Olivelli <eolivelli@gmail.com>, Leigh Stewart <None>
This closes #151 from sijie/zero_copy
diff --git a/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/ReaderWorker.java b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/ReaderWorker.java
index b81dad4..4ac948a 100644
--- a/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/ReaderWorker.java
+++ b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/ReaderWorker.java
@@ -140,6 +140,7 @@
processRecord(nextRecord);
nextRecord = reader.nextRecord();
}
+ reader.release();
}
public void processRecord(final LogRecordWithDLSN record) {
diff --git a/distributedlog-common/pom.xml b/distributedlog-common/pom.xml
index e768574..2129755 100644
--- a/distributedlog-common/pom.xml
+++ b/distributedlog-common/pom.xml
@@ -69,6 +69,11 @@
<version>${commons-codec.version}</version>
</dependency>
<dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-buffer</artifactId>
+ <version>${netty.version}</version>
+ </dependency>
+ <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
diff --git a/distributedlog-common/src/main/java/org/apache/distributedlog/common/util/ByteBufUtils.java b/distributedlog-common/src/main/java/org/apache/distributedlog/common/util/ByteBufUtils.java
new file mode 100644
index 0000000..934d8a7
--- /dev/null
+++ b/distributedlog-common/src/main/java/org/apache/distributedlog/common/util/ByteBufUtils.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.distributedlog.common.util;
+
+import io.netty.buffer.ByteBuf;
+
+/**
+ * Utils to handle with {@link io.netty.buffer.ByteBuf}.
+ */
+public class ByteBufUtils {
+
+ public static byte[] getArray(ByteBuf buffer) {
+ if (buffer.hasArray() && buffer.arrayOffset() == 0 && buffer.writableBytes() == 0) {
+ return buffer.array();
+ }
+ byte[] data = new byte[buffer.readableBytes()];
+ buffer.getBytes(buffer.readerIndex(), data);
+ return data;
+ }
+
+}
diff --git a/distributedlog-common/src/main/java/org/apache/distributedlog/common/util/ReferenceCounted.java b/distributedlog-common/src/main/java/org/apache/distributedlog/common/util/ReferenceCounted.java
new file mode 100644
index 0000000..2df0226
--- /dev/null
+++ b/distributedlog-common/src/main/java/org/apache/distributedlog/common/util/ReferenceCounted.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.common.util;
+
+/**
+ * An interface for indicating an object is reference counted.
+ */
+public interface ReferenceCounted {
+
+ /**
+ * Retain the reference.
+ */
+ void retain();
+
+ /**
+ * Release the reference.
+ */
+ void release();
+
+}
diff --git a/distributedlog-core/src/main/java/org/apache/bookkeeper/client/LedgerReader.java b/distributedlog-core/src/main/java/org/apache/bookkeeper/client/LedgerReader.java
index dc9d09d..2ab2bca 100644
--- a/distributedlog-core/src/main/java/org/apache/bookkeeper/client/LedgerReader.java
+++ b/distributedlog-core/src/main/java/org/apache/bookkeeper/client/LedgerReader.java
@@ -18,9 +18,7 @@
package org.apache.bookkeeper.client;
import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.Unpooled;
-import java.io.InputStream;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Enumeration;
@@ -86,25 +84,25 @@
}
public void readEntriesFromAllBookies(final LedgerHandle lh, long eid,
- final GenericCallback<Set<ReadResult<InputStream>>> callback) {
+ final GenericCallback<Set<ReadResult<ByteBuf>>> callback) {
List<Integer> writeSet = lh.distributionSchedule.getWriteSet(eid);
final AtomicInteger numBookies = new AtomicInteger(writeSet.size());
- final Set<ReadResult<InputStream>> readResults = new HashSet<ReadResult<InputStream>>();
+ final Set<ReadResult<ByteBuf>> readResults = new HashSet<>();
ReadEntryCallback readEntryCallback = new ReadEntryCallback() {
@Override
public void readEntryComplete(int rc, long lid, long eid, ByteBuf buffer, Object ctx) {
BookieSocketAddress bookieAddress = (BookieSocketAddress) ctx;
- ReadResult<InputStream> rr;
+ ReadResult<ByteBuf> rr;
if (BKException.Code.OK != rc) {
- rr = new ReadResult<InputStream>(eid, rc, null, bookieAddress.getSocketAddress());
+ rr = new ReadResult<>(eid, rc, null, bookieAddress.getSocketAddress());
} else {
ByteBuf content;
try {
content = lh.macManager.verifyDigestAndReturnData(eid, buffer);
ByteBuf toRet = Unpooled.copiedBuffer(content);
- rr = new ReadResult<InputStream>(eid, BKException.Code.OK, new ByteBufInputStream(toRet), bookieAddress.getSocketAddress());
+ rr = new ReadResult<>(eid, BKException.Code.OK, toRet, bookieAddress.getSocketAddress());
} catch (BKException.BKDigestMatchException e) {
- rr = new ReadResult<InputStream>(eid, BKException.Code.DigestMatchException, null, bookieAddress.getSocketAddress());
+ rr = new ReadResult<>(eid, BKException.Code.DigestMatchException, null, bookieAddress.getSocketAddress());
} finally {
buffer.release();
}
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/BKAsyncLogReader.java b/distributedlog-core/src/main/java/org/apache/distributedlog/BKAsyncLogReader.java
index 5be9e19..d80555a 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/BKAsyncLogReader.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/BKAsyncLogReader.java
@@ -240,6 +240,13 @@
this.lastProcessTime = Stopwatch.createStarted();
}
+ synchronized void releaseCurrentEntry() {
+ if (null != currentEntry) {
+ currentEntry.release();
+ currentEntry = null;
+ }
+ }
+
private ScheduledFuture<?> scheduleIdleReaderTaskIfNecessary() {
if (idleErrorThresholdMillis < Integer.MAX_VALUE) {
// Dont run the task more than once every seconds (for sanity)
@@ -480,6 +487,7 @@
closePromise = closeFuture = new CompletableFuture<Void>();
exception = new ReadCancelledException(readHandler.getFullyQualifiedName(), "Reader was closed");
setLastException(exception);
+ releaseCurrentEntry();
}
// Do this after we have checked that the reader was not previously closed
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogSegmentWriter.java b/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogSegmentWriter.java
index 6f201a6..48a0daf 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogSegmentWriter.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogSegmentWriter.java
@@ -17,6 +17,13 @@
*/
package org.apache.distributedlog;
+import static com.google.common.base.Charsets.UTF_8;
+import static org.apache.distributedlog.LogRecord.MAX_LOGRECORD_SIZE;
+import static org.apache.distributedlog.LogRecord.MAX_LOGRECORDSET_SIZE;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Stopwatch;
+import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.Callable;
@@ -27,9 +34,6 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Stopwatch;
import java.util.function.Function;
import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
import org.apache.distributedlog.exceptions.BKTransmitException;
@@ -44,13 +48,11 @@
import org.apache.distributedlog.feature.CoreFeatureKeys;
import org.apache.distributedlog.injector.FailureInjector;
import org.apache.distributedlog.injector.RandomDelayFailureInjector;
-import org.apache.distributedlog.io.Buffer;
import org.apache.distributedlog.io.CompressionCodec;
import org.apache.distributedlog.io.CompressionUtils;
import org.apache.distributedlog.lock.DistributedLock;
import org.apache.distributedlog.logsegment.LogSegmentEntryWriter;
import org.apache.distributedlog.logsegment.LogSegmentWriter;
-import org.apache.distributedlog.common.stats.BroadCastStatsLogger;
import org.apache.distributedlog.common.stats.OpStatsListener;
import org.apache.distributedlog.util.FailpointUtils;
import org.apache.distributedlog.common.concurrent.FutureEventListener;
@@ -75,10 +77,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static com.google.common.base.Charsets.UTF_8;
-import static org.apache.distributedlog.LogRecord.MAX_LOGRECORD_SIZE;
-import static org.apache.distributedlog.LogRecord.MAX_LOGRECORDSET_SIZE;
-
/**
* BookKeeper Based Log Segment Writer.
*
@@ -152,7 +150,6 @@
private final OrderedScheduler scheduler;
// stats
- private final StatsLogger envelopeStatsLogger;
private final StatsLogger transmitOutstandingLogger;
private final Counter transmitDataSuccesses;
private final Counter transmitDataMisses;
@@ -220,7 +217,6 @@
}
this.writeLimiter = new WriteLimiter(streamName, streamWriteLimiter, globalWriteLimiter);
this.alertStatsLogger = alertStatsLogger;
- this.envelopeStatsLogger = BroadCastStatsLogger.masterslave(statsLogger, perLogStatsLogger);
StatsLogger flushStatsLogger = statsLogger.scope("flush");
StatsLogger pFlushStatsLogger = flushStatsLogger.scope("periodic");
@@ -279,8 +275,7 @@
streamName,
Math.max(transmissionThreshold, 1024),
envelopeBeforeTransmit(),
- compressionType,
- envelopeStatsLogger);
+ compressionType);
this.packetPrevious = null;
this.startTxId = startTxId;
this.lastTxId = startTxId;
@@ -440,8 +435,7 @@
streamName,
Math.max(transmissionThreshold, getAverageTransmitSize()),
envelopeBeforeTransmit(),
- compressionType,
- envelopeStatsLogger);
+ compressionType);
}
private boolean envelopeBeforeTransmit() {
@@ -1049,7 +1043,7 @@
}
}
- Buffer toSend;
+ ByteBuf toSend;
try {
toSend = recordSetToTransmit.getBuffer();
FailpointUtils.checkFailPoint(FailpointUtils.FailPointName.FP_TransmitFailGetBuffer);
@@ -1076,8 +1070,7 @@
BKTransmitPacket packet = new BKTransmitPacket(recordSetToTransmit);
packetPrevious = packet;
- entryWriter.asyncAddEntry(toSend.getData(), 0, toSend.size(),
- this, packet);
+ entryWriter.asyncAddEntry(toSend, this, packet);
if (recordSetToTransmit.hasUserRecords()) {
transmitDataSuccesses.inc();
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/BKSyncLogReader.java b/distributedlog-core/src/main/java/org/apache/distributedlog/BKSyncLogReader.java
index 47301b5..fa56cfb 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/BKSyncLogReader.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/BKSyncLogReader.java
@@ -106,6 +106,22 @@
});
}
+ synchronized void releaseCurrentEntry() {
+ if (null != currentEntry) {
+ currentEntry.release();
+ currentEntry = null;
+ }
+ }
+
+ synchronized void checkClosedOrException() throws IOException {
+ if (null != closeFuture) {
+ throw new IOException("Reader is closed");
+ }
+ if (null != readerException.get()) {
+ throw readerException.get();
+ }
+ }
+
@VisibleForTesting
ReadAheadEntryReader getReadAheadReader() {
return readAheadReader;
@@ -150,9 +166,8 @@
@Override
public synchronized LogRecordWithDLSN readNext(boolean nonBlocking)
throws IOException {
- if (null != readerException.get()) {
- throw readerException.get();
- }
+ checkClosedOrException();
+
LogRecordWithDLSN record = doReadNext(nonBlocking);
// no record is returned, check if the reader becomes idle
if (null == record && shouldCheckIdleReader) {
@@ -236,6 +251,7 @@
return closeFuture;
}
closeFuture = closePromise = new CompletableFuture<Void>();
+ releaseCurrentEntry();
}
readHandler.unregisterListener(readAheadReader);
readAheadReader.removeStateChangeNotification(this);
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/Entry.java b/distributedlog-core/src/main/java/org/apache/distributedlog/Entry.java
index 30cd499..1a4583d 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/Entry.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/Entry.java
@@ -17,19 +17,14 @@
*/
package org.apache.distributedlog;
-import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
+import io.netty.buffer.ByteBuf;
+import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import org.apache.distributedlog.exceptions.LogRecordTooLongException;
import org.apache.distributedlog.exceptions.WriteException;
import org.apache.distributedlog.io.CompressionCodec;
import org.apache.bookkeeper.stats.NullStatsLogger;
-import org.apache.bookkeeper.stats.StatsLogger;
-
-import javax.annotation.Nullable;
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
/**
* A set of {@link LogRecord}s.
@@ -47,22 +42,18 @@
* if envelope the buffer before transmit
* @param codec
* compression codec
- * @param statsLogger
- * stats logger to receive stats
* @return writer to build a log record set.
*/
public static Writer newEntry(
String logName,
int initialBufferSize,
boolean envelopeBeforeTransmit,
- CompressionCodec.Type codec,
- StatsLogger statsLogger) {
+ CompressionCodec.Type codec) {
return new EnvelopedEntryWriter(
logName,
initialBufferSize,
envelopeBeforeTransmit,
- codec,
- statsLogger);
+ codec);
}
public static Builder newBuilder() {
@@ -78,40 +69,12 @@
private long entryId = -1;
private long startSequenceId = Long.MIN_VALUE;
private boolean envelopeEntry = true;
- // input stream
- private InputStream in = null;
- // or bytes array
- private byte[] data = null;
- private int offset = -1;
- private int length = -1;
- private Optional<Long> txidToSkipTo = Optional.absent();
- private Optional<DLSN> dlsnToSkipTo = Optional.absent();
+ private ByteBuf buffer;
private boolean deserializeRecordSet = true;
private Builder() {}
/**
- * Reset the builder.
- *
- * @return builder
- */
- public Builder reset() {
- logSegmentSequenceNumber = -1;
- entryId = -1;
- startSequenceId = Long.MIN_VALUE;
- envelopeEntry = true;
- // input stream
- in = null;
- // or bytes array
- data = null;
- offset = -1;
- length = -1;
- txidToSkipTo = Optional.absent();
- dlsnToSkipTo = Optional.absent();
- return this;
- }
-
- /**
* Set the segment info of the log segment that this record
* set belongs to.
*
@@ -152,56 +115,14 @@
}
/**
- * Set the serialized bytes data of this record set.
+ * Set the entry buffer of the serialized bytes data of this record set.
*
- * @param data
- * serialized bytes data of this record set.
- * @param offset
- * offset of the bytes data
- * @param length
- * length of the bytes data
- * @return builder
- */
- public Builder setData(byte[] data, int offset, int length) {
- this.data = data;
- this.offset = offset;
- this.length = length;
- return this;
- }
-
- /**
- * Set the input stream of the serialized bytes data of this record set.
- *
- * @param in
+ * @param buffer
* input stream
* @return builder
*/
- public Builder setInputStream(InputStream in) {
- this.in = in;
- return this;
- }
-
- /**
- * Set the record set starts from <code>dlsn</code>.
- *
- * @param dlsn
- * dlsn to skip to
- * @return builder
- */
- public Builder skipTo(@Nullable DLSN dlsn) {
- this.dlsnToSkipTo = Optional.fromNullable(dlsn);
- return this;
- }
-
- /**
- * Set the record set starts from <code>txid</code>.
- *
- * @param txid
- * txid to skip to
- * @return builder
- */
- public Builder skipTo(long txid) {
- this.txidToSkipTo = Optional.of(txid);
+ public Builder setEntry(ByteBuf buffer) {
+ this.buffer = buffer.retainedSlice();
return this;
}
@@ -217,41 +138,14 @@
return this;
}
- public Entry build() {
- Preconditions.checkNotNull(data, "Serialized data isn't provided");
- Preconditions.checkArgument(offset >= 0 && length >= 0
- && (offset + length) <= data.length,
- "Invalid offset or length of serialized data");
- return new Entry(
- logSegmentSequenceNumber,
- entryId,
- startSequenceId,
- envelopeEntry,
- deserializeRecordSet,
- data,
- offset,
- length,
- txidToSkipTo,
- dlsnToSkipTo);
- }
-
public Entry.Reader buildReader() throws IOException {
- Preconditions.checkArgument(data != null || in != null,
+ Preconditions.checkNotNull(buffer,
"Serialized data or input stream isn't provided");
- InputStream in;
- if (null != this.in) {
- in = this.in;
- } else {
- Preconditions.checkArgument(offset >= 0 && length >= 0
- && (offset + length) <= data.length,
- "Invalid offset or length of serialized data");
- in = new ByteArrayInputStream(data, offset, length);
- }
return new EnvelopedEntryReader(
logSegmentSequenceNumber,
entryId,
startSequenceId,
- in,
+ buffer,
envelopeEntry,
deserializeRecordSet,
NullStatsLogger.INSTANCE);
@@ -259,73 +153,6 @@
}
- private final long logSegmentSequenceNumber;
- private final long entryId;
- private final long startSequenceId;
- private final boolean envelopedEntry;
- private final boolean deserializeRecordSet;
- private final byte[] data;
- private final int offset;
- private final int length;
- private final Optional<Long> txidToSkipTo;
- private final Optional<DLSN> dlsnToSkipTo;
-
- private Entry(long logSegmentSequenceNumber,
- long entryId,
- long startSequenceId,
- boolean envelopedEntry,
- boolean deserializeRecordSet,
- byte[] data,
- int offset,
- int length,
- Optional<Long> txidToSkipTo,
- Optional<DLSN> dlsnToSkipTo) {
- this.logSegmentSequenceNumber = logSegmentSequenceNumber;
- this.entryId = entryId;
- this.startSequenceId = startSequenceId;
- this.envelopedEntry = envelopedEntry;
- this.deserializeRecordSet = deserializeRecordSet;
- this.data = data;
- this.offset = offset;
- this.length = length;
- this.txidToSkipTo = txidToSkipTo;
- this.dlsnToSkipTo = dlsnToSkipTo;
- }
-
- /**
- * Get raw data of this record set.
- *
- * @return raw data representation of this record set.
- */
- public byte[] getRawData() {
- return data;
- }
-
- /**
- * Create reader to iterate over this record set.
- *
- * @return reader to iterate over this record set.
- * @throws IOException if the record set is invalid record set.
- */
- public Reader reader() throws IOException {
- InputStream in = new ByteArrayInputStream(data, offset, length);
- Reader reader = new EnvelopedEntryReader(
- logSegmentSequenceNumber,
- entryId,
- startSequenceId,
- in,
- envelopedEntry,
- deserializeRecordSet,
- NullStatsLogger.INSTANCE);
- if (txidToSkipTo.isPresent()) {
- reader.skipTo(txidToSkipTo.get());
- }
- if (dlsnToSkipTo.isPresent()) {
- reader.skipTo(dlsnToSkipTo.get());
- }
- return reader;
- }
-
/**
* Writer to append {@link LogRecord}s to {@link Entry}.
*/
@@ -345,11 +172,6 @@
void writeRecord(LogRecord record, CompletableFuture<DLSN> transmitPromise)
throws LogRecordTooLongException, WriteException;
- /**
- * Reset the writer to write records.
- */
- void reset();
-
}
/**
@@ -398,6 +220,11 @@
*/
boolean skipTo(DLSN dlsn) throws IOException;
+ /**
+ * Release the resources held by the entry reader.
+ */
+ void release();
+
}
}
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/EntryBuffer.java b/distributedlog-core/src/main/java/org/apache/distributedlog/EntryBuffer.java
index c08fbeb..a881df8 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/EntryBuffer.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/EntryBuffer.java
@@ -17,11 +17,10 @@
*/
package org.apache.distributedlog;
-import org.apache.distributedlog.exceptions.InvalidEnvelopedEntryException;
-import org.apache.distributedlog.io.Buffer;
-import org.apache.distributedlog.io.TransmitListener;
-
+import io.netty.buffer.ByteBuf;
import java.io.IOException;
+import org.apache.distributedlog.exceptions.InvalidEnvelopedEntryException;
+import org.apache.distributedlog.io.TransmitListener;
/**
* Write representation of a {@link Entry}.
@@ -61,10 +60,12 @@
/**
* Get the buffer to transmit.
*
+ * <p>The caller is responsible for releasing the reference of the returned bytebuf object.
+ *
* @return the buffer to transmit.
* @throws InvalidEnvelopedEntryException if the record set buffer is invalid
* @throws IOException when encountered IOException during serialization
*/
- Buffer getBuffer() throws InvalidEnvelopedEntryException, IOException;
+ ByteBuf getBuffer() throws InvalidEnvelopedEntryException, IOException;
}
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/EnvelopedEntry.java b/distributedlog-core/src/main/java/org/apache/distributedlog/EnvelopedEntry.java
index 09301aa..89eb6e9 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/EnvelopedEntry.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/EnvelopedEntry.java
@@ -17,23 +17,12 @@
*/
package org.apache.distributedlog;
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
+import io.netty.buffer.ByteBuf;
import java.io.IOException;
-import java.io.InputStream;
-
-import com.google.common.base.Preconditions;
-
-import org.apache.distributedlog.exceptions.InvalidEnvelopedEntryException;
-import org.apache.bookkeeper.stats.Counter;
-import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
-
-import org.apache.distributedlog.common.annotations.DistributedLogAnnotations.Compression;
import org.apache.distributedlog.io.CompressionCodec;
+import org.apache.distributedlog.io.CompressionCodec.Type;
import org.apache.distributedlog.io.CompressionUtils;
-import org.apache.distributedlog.common.util.BitMaskUtils;
/**
* An enveloped entry written to BookKeeper.
@@ -69,212 +58,23 @@
*/
class EnvelopedEntry {
- public static final int VERSION_LENGTH = 1; // One byte long
public static final byte VERSION_ONE = 1;
+ public static final int HEADER_LENGTH =
+ Byte.BYTES // version
+ + Integer.BYTES // flags
+ + Integer.BYTES // decompressed size
+ + Integer.BYTES; // compressed size
- public static final byte LOWEST_SUPPORTED_VERSION = VERSION_ONE;
- public static final byte HIGHEST_SUPPORTED_VERSION = VERSION_ONE;
+ static final int VERSION_OFFSET = 0;
+ static final int FLAGS_OFFSET = VERSION_OFFSET + Byte.BYTES;
+ static final int DECOMPRESSED_SIZE_OFFSET = FLAGS_OFFSET + Integer.BYTES;
+ static final int COMPRESSED_SIZE_OFFSET = DECOMPRESSED_SIZE_OFFSET + Integer.BYTES;
+
public static final byte CURRENT_VERSION = VERSION_ONE;
-
- private final OpStatsLogger compressionStat;
- private final OpStatsLogger decompressionStat;
- private final Counter compressedEntryBytes;
- private final Counter decompressedEntryBytes;
- private final byte version;
-
- private Header header = new Header();
- private Payload payloadCompressed = new Payload();
- private Payload payloadDecompressed = new Payload();
-
- public EnvelopedEntry(byte version,
- StatsLogger statsLogger) throws InvalidEnvelopedEntryException {
- Preconditions.checkNotNull(statsLogger);
- if (version < LOWEST_SUPPORTED_VERSION || version > HIGHEST_SUPPORTED_VERSION) {
- throw new InvalidEnvelopedEntryException("Invalid enveloped entry version " + version + ", expected to be in [ "
- + LOWEST_SUPPORTED_VERSION + " ~ " + HIGHEST_SUPPORTED_VERSION + " ]");
- }
- this.version = version;
- this.compressionStat = statsLogger.getOpStatsLogger("compression_time");
- this.decompressionStat = statsLogger.getOpStatsLogger("decompression_time");
- this.compressedEntryBytes = statsLogger.getCounter("compressed_bytes");
- this.decompressedEntryBytes = statsLogger.getCounter("decompressed_bytes");
- }
+ public static final int COMPRESSION_CODEC_MASK = 0x3;
/**
- * @param statsLogger
- * Used for getting stats for (de)compression time
- * @param compressionType
- * The compression type to use
- * @param decompressed
- * The decompressed payload
- * NOTE: The size of the byte array passed as the decompressed payload can be larger
- * than the actual contents to be compressed.
- */
- public EnvelopedEntry(byte version,
- CompressionCodec.Type compressionType,
- byte[] decompressed,
- int length,
- StatsLogger statsLogger)
- throws InvalidEnvelopedEntryException {
- this(version, statsLogger);
- Preconditions.checkNotNull(compressionType);
- Preconditions.checkNotNull(decompressed);
- Preconditions.checkArgument(length >= 0, "Invalid bytes length " + length);
-
- this.header = new Header(compressionType, length);
- this.payloadDecompressed = new Payload(length, decompressed);
- }
-
- private boolean isReady() {
- return (header.ready && payloadDecompressed.ready);
- }
-
- @Compression
- public void writeFully(DataOutputStream out) throws IOException {
- Preconditions.checkNotNull(out);
- if (!isReady()) {
- throw new IOException("Entry not writable");
- }
- // Version
- out.writeByte(version);
- // Header
- header.write(out);
- // Compress
- CompressionCodec codec = CompressionUtils.getCompressionCodec(header.compressionType);
- byte[] compressed = codec.compress(
- payloadDecompressed.payload,
- 0,
- payloadDecompressed.length,
- compressionStat);
- this.payloadCompressed = new Payload(compressed.length, compressed);
- this.compressedEntryBytes.add(payloadCompressed.length);
- this.decompressedEntryBytes.add(payloadDecompressed.length);
- payloadCompressed.write(out);
- }
-
- @Compression
- public void readFully(DataInputStream in) throws IOException {
- Preconditions.checkNotNull(in);
- // Make sure we're reading the right versioned entry.
- byte version = in.readByte();
- if (version != this.version) {
- throw new IOException(String.format("Version mismatch while reading. Received: %d," +
- " Required: %d", version, this.version));
- }
- header.read(in);
- payloadCompressed.read(in);
- // Decompress
- CompressionCodec codec = CompressionUtils.getCompressionCodec(header.compressionType);
- byte[] decompressed = codec.decompress(
- payloadCompressed.payload,
- 0,
- payloadCompressed.length,
- header.decompressedSize,
- decompressionStat);
- this.payloadDecompressed = new Payload(decompressed.length, decompressed);
- this.compressedEntryBytes.add(payloadCompressed.length);
- this.decompressedEntryBytes.add(payloadDecompressed.length);
- }
-
- public byte[] getDecompressedPayload() throws IOException {
- if (!isReady()) {
- throw new IOException("Decompressed payload is not initialized");
- }
- return payloadDecompressed.payload;
- }
-
- public static class Header {
- public static final int COMPRESSION_CODEC_MASK = 0x3;
- public static final int COMPRESSION_CODEC_NONE = 0x0;
- public static final int COMPRESSION_CODEC_LZ4 = 0x1;
-
- private int flags = 0;
- private int decompressedSize = 0;
- private CompressionCodec.Type compressionType = CompressionCodec.Type.UNKNOWN;
-
- // Whether this struct is ready for reading/writing.
- private boolean ready = false;
-
- // Used while reading.
- public Header() {
- }
-
- public Header(CompressionCodec.Type compressionType,
- int decompressedSize) {
- this.compressionType = compressionType;
- this.decompressedSize = decompressedSize;
- this.flags = 0;
- switch (compressionType) {
- case NONE:
- this.flags = (int) BitMaskUtils.set(flags, COMPRESSION_CODEC_MASK,
- COMPRESSION_CODEC_NONE);
- break;
- case LZ4:
- this.flags = (int) BitMaskUtils.set(flags, COMPRESSION_CODEC_MASK,
- COMPRESSION_CODEC_LZ4);
- break;
- default:
- throw new RuntimeException(String.format("Unknown Compression Type: %s",
- compressionType));
- }
- // This can now be written.
- this.ready = true;
- }
-
- private void write(DataOutputStream out) throws IOException {
- out.writeInt(flags);
- out.writeInt(decompressedSize);
- }
-
- private void read(DataInputStream in) throws IOException {
- this.flags = in.readInt();
- int compressionType = (int) BitMaskUtils.get(flags, COMPRESSION_CODEC_MASK);
- if (compressionType == COMPRESSION_CODEC_NONE) {
- this.compressionType = CompressionCodec.Type.NONE;
- } else if (compressionType == COMPRESSION_CODEC_LZ4) {
- this.compressionType = CompressionCodec.Type.LZ4;
- } else {
- throw new IOException(String.format("Unsupported Compression Type: %s",
- compressionType));
- }
- this.decompressedSize = in.readInt();
- // Values can now be read.
- this.ready = true;
- }
- }
-
- public static class Payload {
- private int length = 0;
- private byte[] payload = null;
-
- // Whether this struct is ready for reading/writing.
- private boolean ready = false;
-
- // Used for reading
- Payload() {
- }
-
- Payload(int length, byte[] payload) {
- this.length = length;
- this.payload = payload;
- this.ready = true;
- }
-
- private void write(DataOutputStream out) throws IOException {
- out.writeInt(length);
- out.write(payload, 0, length);
- }
-
- private void read(DataInputStream in) throws IOException {
- this.length = in.readInt();
- this.payload = new byte[length];
- in.readFully(payload);
- this.ready = true;
- }
- }
-
- /**
- * Return an InputStream that reads from the provided InputStream, decompresses the data
+ * Return an {@link ByteBuf} that reads from the provided {@link ByteBuf}, decompresses the data
* and returns a new InputStream wrapping the underlying payload.
*
* Note that src is modified by this call.
@@ -283,14 +83,30 @@
* New Input stream with the underlying payload.
* @throws Exception
*/
- public static InputStream fromInputStream(InputStream src,
- StatsLogger statsLogger) throws IOException {
- src.mark(VERSION_LENGTH);
- byte version = new DataInputStream(src).readByte();
- src.reset();
- EnvelopedEntry entry = new EnvelopedEntry(version, statsLogger);
- entry.readFully(new DataInputStream(src));
- return new ByteArrayInputStream(entry.getDecompressedPayload());
+ public static ByteBuf fromEnvelopedBuf(ByteBuf src, StatsLogger statsLogger)
+ throws IOException {
+ byte version = src.readByte();
+ if (version != CURRENT_VERSION) {
+ throw new IOException(String.format("Version mismatch while reading. Received: %d,"
+ + " Required: %d", version, CURRENT_VERSION));
+ }
+ int flags = src.readInt();
+ int codecCode = flags & COMPRESSION_CODEC_MASK;
+ int originDataLen = src.readInt();
+ int actualDataLen = src.readInt();
+ ByteBuf compressedBuf = src.slice(src.readerIndex(), actualDataLen);
+ ByteBuf decompressedBuf;
+ try {
+ if (Type.NONE.code() == codecCode && originDataLen != actualDataLen) {
+ throw new IOException("Inconsistent data length found for a non-compressed entry : compressed = "
+ + originDataLen + ", actual = " + actualDataLen);
+ }
+ CompressionCodec codec = CompressionUtils.getCompressionCodec(Type.of(codecCode));
+ decompressedBuf = codec.decompress(compressedBuf, originDataLen);
+ } finally {
+ compressedBuf.release();
+ }
+ return decompressedBuf;
}
}
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/EnvelopedEntryReader.java b/distributedlog-core/src/main/java/org/apache/distributedlog/EnvelopedEntryReader.java
index 1761de5..3924f4b 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/EnvelopedEntryReader.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/EnvelopedEntryReader.java
@@ -17,45 +17,72 @@
*/
package org.apache.distributedlog;
-import org.apache.bookkeeper.stats.StatsLogger;
-
-import java.io.DataInputStream;
+import com.google.common.annotations.VisibleForTesting;
+import io.netty.buffer.ByteBuf;
import java.io.IOException;
-import java.io.InputStream;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.apache.bookkeeper.stats.StatsLogger;
/**
* Record reader to read records from an enveloped entry buffer.
*/
+@NotThreadSafe
class EnvelopedEntryReader implements Entry.Reader, RecordStream {
private final long logSegmentSeqNo;
private final long entryId;
+ private final ByteBuf src;
private final LogRecord.Reader reader;
// slot id
private long slotId = 0;
+ // error lag
+ private IOException lastException = null;
+ private boolean isExhausted = false;
EnvelopedEntryReader(long logSegmentSeqNo,
long entryId,
long startSequenceId,
- InputStream in,
+ ByteBuf in,
boolean envelopedEntry,
boolean deserializeRecordSet,
StatsLogger statsLogger)
throws IOException {
this.logSegmentSeqNo = logSegmentSeqNo;
this.entryId = entryId;
- InputStream src = in;
if (envelopedEntry) {
- src = EnvelopedEntry.fromInputStream(in, statsLogger);
+ this.src = EnvelopedEntry.fromEnvelopedBuf(in, statsLogger);
+ } else {
+ this.src = in;
}
this.reader = new LogRecord.Reader(
this,
- new DataInputStream(src),
+ src,
startSequenceId,
deserializeRecordSet);
}
+ @VisibleForTesting
+ boolean isExhausted() {
+ return isExhausted;
+ }
+
+ @VisibleForTesting
+ ByteBuf getSrcBuf() {
+ return src;
+ }
+
+ private void checkLastException() throws IOException {
+ if (null != lastException) {
+ throw lastException;
+ }
+ }
+
+ private void releaseBuffer() {
+ isExhausted = true;
+ this.src.release();
+ }
+
@Override
public long getLSSN() {
return logSegmentSeqNo;
@@ -68,16 +95,44 @@
@Override
public LogRecordWithDLSN nextRecord() throws IOException {
- return reader.readOp();
+ checkLastException();
+
+ if (isExhausted) {
+ return null;
+ }
+
+ LogRecordWithDLSN record;
+ try {
+ record = reader.readOp();
+ } catch (IOException ioe) {
+ lastException = ioe;
+ releaseBuffer();
+ throw ioe;
+ }
+ if (null == record) {
+ releaseBuffer();
+ }
+ return record;
+ }
+
+ public void release() {
+ if (isExhausted) {
+ return;
+ }
+ releaseBuffer();;
}
@Override
public boolean skipTo(long txId) throws IOException {
+ checkLastException();
+
return reader.skipTo(txId, true);
}
@Override
public boolean skipTo(DLSN dlsn) throws IOException {
+ checkLastException();
+
return reader.skipTo(dlsn);
}
@@ -99,4 +154,5 @@
public String getName() {
return "EnvelopedReader";
}
+
}
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/EnvelopedEntryWriter.java b/distributedlog-core/src/main/java/org/apache/distributedlog/EnvelopedEntryWriter.java
index 18645d4..cc6e941 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/EnvelopedEntryWriter.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/EnvelopedEntryWriter.java
@@ -17,25 +17,33 @@
*/
package org.apache.distributedlog;
+import static org.apache.distributedlog.EnvelopedEntry.COMPRESSED_SIZE_OFFSET;
+import static org.apache.distributedlog.EnvelopedEntry.COMPRESSION_CODEC_MASK;
+import static org.apache.distributedlog.EnvelopedEntry.CURRENT_VERSION;
+import static org.apache.distributedlog.EnvelopedEntry.DECOMPRESSED_SIZE_OFFSET;
+import static org.apache.distributedlog.EnvelopedEntry.FLAGS_OFFSET;
+import static org.apache.distributedlog.EnvelopedEntry.HEADER_LENGTH;
+import static org.apache.distributedlog.EnvelopedEntry.VERSION_OFFSET;
+import static org.apache.distributedlog.LogRecord.MAX_LOGRECORDSET_SIZE;
+import static org.apache.distributedlog.LogRecord.MAX_LOGRECORD_SIZE;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.util.ReferenceCountUtil;
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.distributedlog.Entry.Writer;
import org.apache.distributedlog.exceptions.InvalidEnvelopedEntryException;
import org.apache.distributedlog.exceptions.LogRecordTooLongException;
-import org.apache.distributedlog.exceptions.WriteCancelledException;
import org.apache.distributedlog.exceptions.WriteException;
-import org.apache.distributedlog.io.Buffer;
import org.apache.distributedlog.io.CompressionCodec;
-import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.distributedlog.io.CompressionCodec.Type;
+import org.apache.distributedlog.io.CompressionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.LinkedList;
-import java.util.List;
-
-import static org.apache.distributedlog.LogRecord.MAX_LOGRECORD_SIZE;
-
/**
* {@link org.apache.distributedlog.io.Buffer} based log record set writer.
*/
@@ -56,12 +64,13 @@
}
private final String logName;
- private final Buffer buffer;
+ private final ByteBuf buffer;
+ private ByteBuf finalizedBuffer = null;
private final LogRecord.Writer writer;
private final List<WriteRequest> writeRequests;
private final boolean envelopeBeforeTransmit;
private final CompressionCodec.Type codec;
- private final StatsLogger statsLogger;
+ private final int flags;
private int count = 0;
private boolean hasUserData = false;
private long maxTxId = Long.MIN_VALUE;
@@ -69,22 +78,19 @@
EnvelopedEntryWriter(String logName,
int initialBufferSize,
boolean envelopeBeforeTransmit,
- CompressionCodec.Type codec,
- StatsLogger statsLogger) {
+ CompressionCodec.Type codec) {
this.logName = logName;
- this.buffer = new Buffer(initialBufferSize * 6 / 5);
- this.writer = new LogRecord.Writer(new DataOutputStream(buffer));
+ this.buffer = PooledByteBufAllocator.DEFAULT.buffer(
+ Math.min(Math.max(initialBufferSize * 6 / 5, HEADER_LENGTH), MAX_LOGRECORDSET_SIZE),
+ MAX_LOGRECORDSET_SIZE);
+ this.writer = new LogRecord.Writer(buffer);
this.writeRequests = new LinkedList<WriteRequest>();
this.envelopeBeforeTransmit = envelopeBeforeTransmit;
this.codec = codec;
- this.statsLogger = statsLogger;
- }
-
- @Override
- public synchronized void reset() {
- cancelPromises(new WriteCancelledException(logName, "Record Set is reset"));
- count = 0;
- this.buffer.reset();
+ this.flags = codec.code() & COMPRESSION_CODEC_MASK;
+ if (envelopeBeforeTransmit) {
+ this.buffer.writerIndex(HEADER_LENGTH);
+ }
}
@Override
@@ -146,7 +152,7 @@
@Override
public int getNumBytes() {
- return buffer.size();
+ return buffer.readableBytes();
}
@Override
@@ -155,24 +161,45 @@
}
@Override
- public synchronized Buffer getBuffer() throws InvalidEnvelopedEntryException, IOException {
- if (!envelopeBeforeTransmit) {
- return buffer;
+ public synchronized ByteBuf getBuffer() throws InvalidEnvelopedEntryException, IOException {
+ if (null == finalizedBuffer) {
+ finalizedBuffer = finalizeBuffer();
}
- // We can't escape this allocation because things need to be read from one byte array
- // and then written to another. This is the destination.
- Buffer toSend = new Buffer(buffer.size());
- byte[] decompressed = buffer.getData();
- int length = buffer.size();
- EnvelopedEntry entry = new EnvelopedEntry(EnvelopedEntry.CURRENT_VERSION,
- codec,
- decompressed,
- length,
- statsLogger);
- // This will cause an allocation of a byte[] for compression. This can be avoided
- // but we can do that later only if needed.
- entry.writeFully(new DataOutputStream(toSend));
- return toSend;
+ return finalizedBuffer.slice();
+ }
+
+ private ByteBuf finalizeBuffer() {
+ if (!envelopeBeforeTransmit) {
+ return buffer.retain();
+ }
+
+ int dataOffset = HEADER_LENGTH;
+ int dataLen = buffer.readableBytes() - HEADER_LENGTH;
+
+ if (Type.NONE == codec) {
+ // update version
+ buffer.setByte(VERSION_OFFSET, CURRENT_VERSION);
+ // update the flags
+ buffer.setInt(FLAGS_OFFSET, flags);
+ // update data len
+ buffer.setInt(DECOMPRESSED_SIZE_OFFSET, dataLen);
+ buffer.setInt(COMPRESSED_SIZE_OFFSET, dataLen);
+ return buffer.retain();
+ }
+
+ // compression
+ CompressionCodec compressor =
+ CompressionUtils.getCompressionCodec(codec);
+ ByteBuf uncompressedBuf = buffer.slice(dataOffset, dataLen);
+ ByteBuf compressedBuf = compressor.compress(uncompressedBuf, HEADER_LENGTH);
+ // update version
+ compressedBuf.setByte(VERSION_OFFSET, CURRENT_VERSION);
+ // update the flags
+ compressedBuf.setInt(FLAGS_OFFSET, flags);
+ // update data len
+ compressedBuf.setInt(DECOMPRESSED_SIZE_OFFSET, dataLen);
+ compressedBuf.setInt(COMPRESSED_SIZE_OFFSET, compressedBuf.readableBytes() - HEADER_LENGTH);
+ return compressedBuf;
}
@Override
@@ -183,10 +210,18 @@
@Override
public void completeTransmit(long lssn, long entryId) {
satisfyPromises(lssn, entryId);
+ buffer.release();
+ synchronized (this) {
+ ReferenceCountUtil.release(finalizedBuffer);
+ }
}
@Override
public void abortTransmit(Throwable reason) {
cancelPromises(reason);
+ buffer.release();
+ synchronized (this) {
+ ReferenceCountUtil.release(finalizedBuffer);
+ }
}
}
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/LocalDLMEmulator.java b/distributedlog-core/src/main/java/org/apache/distributedlog/LocalDLMEmulator.java
index bf3a491..d897274 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/LocalDLMEmulator.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/LocalDLMEmulator.java
@@ -154,6 +154,10 @@
throw new Exception("Error starting zookeeper/bookkeeper");
}
int bookiesUp = checkBookiesUp(numBookies, zkTimeoutSec);
+ if (numBookies != bookiesUp) {
+ LOG.info("Only {} bookies are up, expected {} bookies to be there.",
+ bookiesUp, numBookies);
+ }
assert (numBookies == bookiesUp);
// Provision "/messaging/distributedlog" namespace
DLMetadata.create(new BKDLConfig(zkEnsemble, "/ledgers")).create(uri);
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/ReadAheadEntryReader.java b/distributedlog-core/src/main/java/org/apache/distributedlog/ReadAheadEntryReader.java
index 386a9a1..8183703 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/ReadAheadEntryReader.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/ReadAheadEntryReader.java
@@ -431,6 +431,10 @@
}
}
+ synchronized boolean isClosed() {
+ return null != closePromise;
+ }
+
@Override
public CompletableFuture<Void> asyncClose() {
final CompletableFuture<Void> closeFuture;
@@ -556,6 +560,14 @@
@Override
public void onSuccess(List<Entry.Reader> entries) {
+ if (isClosed()) {
+ // if readahead is closing, don't put the entries to the queue
+ for (Entry.Reader entry : entries) {
+ entry.release();
+ }
+ return;
+ }
+
lastEntryAddedTime.reset().start();
for (Entry.Reader entry : entries) {
entryQueue.add(entry);
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentEntryReader.java b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentEntryReader.java
index 0bb91ae..b1dbf40 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentEntryReader.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentEntryReader.java
@@ -17,9 +17,22 @@
*/
package org.apache.distributedlog.impl.logsegment;
+import static com.google.common.base.Charsets.UTF_8;
+
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.LinkedList;
+import java.util.List;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.distributedlog.Entry;
import org.apache.distributedlog.LogSegmentMetadata;
@@ -42,20 +55,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Enumeration;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static com.google.common.base.Charsets.UTF_8;
-
/**
* BookKeeper ledger based log segment entry reader.
*/
@@ -86,7 +85,24 @@
return done;
}
+ synchronized void release() {
+ if (null != this.entry) {
+ this.entry.getEntryBuffer().release();
+ this.entry = null;
+ }
+ }
+
+ void release(LedgerEntry entry) {
+ if (null != entry) {
+ entry.getEntryBuffer().release();
+ }
+ }
+
void complete(LedgerEntry entry) {
+ // the reader is already closed
+ if (isClosed()) {
+ release(entry);
+ }
synchronized (this) {
if (done) {
return;
@@ -119,6 +135,8 @@
}
synchronized LedgerEntry getEntry() {
+ // retain reference for the caller
+ this.entry.getEntryBuffer().retain();
return this.entry;
}
@@ -513,6 +531,16 @@
}
}
+ private void releaseAllCachedEntries() {
+ synchronized (this) {
+ CacheEntry entry = readAheadEntries.poll();
+ while (null != entry) {
+ entry.release();
+ entry = readAheadEntries.poll();
+ }
+ }
+ }
+
//
// Background Read Operations
//
@@ -624,7 +652,7 @@
.setEntryId(entry.getEntryId())
.setEnvelopeEntry(envelopeEntries)
.deserializeRecordSet(deserializeRecordSet)
- .setInputStream(entry.getEntryInputStream())
+ .setEntry(entry.getEntryBuffer())
.buildReader();
}
@@ -747,22 +775,28 @@
}
if (entry.isSuccess()) {
CacheEntry removedEntry = readAheadEntries.poll();
- if (entry != removedEntry) {
- DLIllegalStateException ise = new DLIllegalStateException("Unexpected condition at reading from "
- + getSegment());
- completeExceptionally(ise, false);
- return;
- }
try {
- nextRequest.addEntry(processReadEntry(entry.getEntry()));
- } catch (IOException e) {
- completeExceptionally(e, false);
- return;
+ if (entry != removedEntry) {
+ DLIllegalStateException ise = new DLIllegalStateException("Unexpected condition at reading from "
+ + getSegment());
+ completeExceptionally(ise, false);
+ return;
+ }
+ try {
+ // the reference is retained on `entry.getEntry()`. Entry.Reader is responsible for releasing it.
+ nextRequest.addEntry(processReadEntry(entry.getEntry()));
+ } catch (IOException e) {
+ completeExceptionally(e, false);
+ return;
+ }
+ } finally {
+ removedEntry.release();
}
} else if (skipBrokenEntries && BKException.Code.DigestMatchException == entry.getRc()) {
// skip this entry and move forward
skippedBrokenEntriesCounter.inc();
- readAheadEntries.poll();
+ CacheEntry removedEntry = readAheadEntries.poll();
+ removedEntry.release();
continue;
} else {
completeExceptionally(new BKTransmitException("Encountered issue on reading entry " + entry.getEntryId()
@@ -826,6 +860,9 @@
completeExceptionally(exception, false);
}
+ // release the cached entries
+ releaseAllCachedEntries();
+
// cancel all pending reads
cancelAllPendingReads(exception);
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentEntryWriter.java b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentEntryWriter.java
index b333e96..1f05359 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentEntryWriter.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentEntryWriter.java
@@ -18,6 +18,7 @@
package org.apache.distributedlog.impl.logsegment;
import com.google.common.annotations.VisibleForTesting;
+import io.netty.buffer.ByteBuf;
import org.apache.distributedlog.logsegment.LogSegmentEntryWriter;
import org.apache.bookkeeper.client.AsyncCallback;
import org.apache.bookkeeper.client.LedgerHandle;
@@ -49,9 +50,9 @@
}
@Override
- public void asyncAddEntry(byte[] data, int offset, int length,
+ public void asyncAddEntry(ByteBuf entry,
AsyncCallback.AddCallback callback, Object ctx) {
- lh.asyncAddEntry(data, offset, length, callback, ctx);
+ lh.asyncAddEntry(entry, callback, ctx);
}
@Override
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentRandomAccessEntryReader.java b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentRandomAccessEntryReader.java
index 254345e..99ae6ad 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentRandomAccessEntryReader.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentRandomAccessEntryReader.java
@@ -79,7 +79,7 @@
.setEntryId(entry.getEntryId())
.setEnvelopeEntry(envelopeEntries)
.deserializeRecordSet(deserializeRecordSet)
- .setInputStream(entry.getEntryInputStream())
+ .setEntry(entry.getEntryBuffer())
.buildReader();
}
@@ -89,11 +89,19 @@
if (BKException.Code.OK == rc) {
List<Entry.Reader> entryList = Lists.newArrayList();
while (entries.hasMoreElements()) {
+ LedgerEntry entry = entries.nextElement();
try {
- entryList.add(processReadEntry(entries.nextElement()));
+ entryList.add(processReadEntry(entry));
} catch (IOException ioe) {
+ // release the buffers
+ while (entries.hasMoreElements()) {
+ LedgerEntry le = entries.nextElement();
+ le.getEntryBuffer().release();
+ }
FutureUtils.completeExceptionally(promise, ioe);
return;
+ } finally {
+ entry.getEntryBuffer().release();
}
}
FutureUtils.complete(promise, entryList);
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/logsegment/LogSegmentEntryWriter.java b/distributedlog-core/src/main/java/org/apache/distributedlog/logsegment/LogSegmentEntryWriter.java
index 70f0da0..5ded597 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/logsegment/LogSegmentEntryWriter.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/logsegment/LogSegmentEntryWriter.java
@@ -18,6 +18,7 @@
package org.apache.distributedlog.logsegment;
import com.google.common.annotations.Beta;
+import io.netty.buffer.ByteBuf;
import org.apache.distributedlog.Entry;
import org.apache.distributedlog.common.util.Sizable;
import org.apache.bookkeeper.client.AsyncCallback;
@@ -54,12 +55,8 @@
* {@link org.apache.bookkeeper.client.LedgerHandle#asyncAddEntry(
* byte[], int, int, AsyncCallback.AddCallback, Object)}
*
- * @param data
+ * @param entry
* data to add
- * @param offset
- * offset in the data
- * @param length
- * length of the data
* @param callback
* callback
* @param ctx
@@ -67,6 +64,6 @@
* @see org.apache.bookkeeper.client.LedgerHandle#asyncAddEntry(
* byte[], int, int, AsyncCallback.AddCallback, Object)
*/
- void asyncAddEntry(byte[] data, int offset, int length,
+ void asyncAddEntry(ByteBuf entry,
AsyncCallback.AddCallback callback, Object ctx);
}
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/tools/DistributedLogTool.java b/distributedlog-core/src/main/java/org/apache/distributedlog/tools/DistributedLogTool.java
index 88c5d0f..e50a172 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/tools/DistributedLogTool.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/tools/DistributedLogTool.java
@@ -17,13 +17,13 @@
*/
package org.apache.distributedlog.tools;
+import io.netty.buffer.ByteBuf;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
-import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
@@ -46,7 +46,6 @@
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
@@ -1676,8 +1675,9 @@
.setLogSegmentInfo(segment.getLogSegmentSequenceNumber(), segment.getStartSequenceId())
.setEntryId(lastEntry.getEntryId())
.setEnvelopeEntry(LogSegmentMetadata.supportsEnvelopedEntries(segment.getVersion()))
- .setInputStream(lastEntry.getEntryInputStream())
+ .setEntry(lastEntry.getEntryBuffer())
.buildReader();
+ lastEntry.getEntryBuffer().release();
LogRecordWithDLSN record = reader.nextRecord();
LogRecordWithDLSN lastRecord = null;
while (null != record) {
@@ -2245,26 +2245,22 @@
throws Exception {
for (long eid = fromEntryId; eid <= untilEntryId; ++eid) {
final CountDownLatch doneLatch = new CountDownLatch(1);
- final AtomicReference<Set<LedgerReader.ReadResult<InputStream>>> resultHolder =
- new AtomicReference<Set<LedgerReader.ReadResult<InputStream>>>();
- ledgerReader.readEntriesFromAllBookies(lh, eid, new BookkeeperInternalCallbacks.GenericCallback<Set<LedgerReader.ReadResult<InputStream>>>() {
- @Override
- public void operationComplete(int rc, Set<LedgerReader.ReadResult<InputStream>> readResults) {
- if (BKException.Code.OK == rc) {
- resultHolder.set(readResults);
- } else {
- resultHolder.set(null);
- }
- doneLatch.countDown();
+ final AtomicReference<Set<LedgerReader.ReadResult<ByteBuf>>> resultHolder = new AtomicReference<>();
+ ledgerReader.readEntriesFromAllBookies(lh, eid, (rc, readResults) -> {
+ if (BKException.Code.OK == rc) {
+ resultHolder.set(readResults);
+ } else {
+ resultHolder.set(null);
}
+ doneLatch.countDown();
});
doneLatch.await();
- Set<LedgerReader.ReadResult<InputStream>> readResults = resultHolder.get();
+ Set<LedgerReader.ReadResult<ByteBuf>> readResults = resultHolder.get();
if (null == readResults) {
throw new IOException("Failed to read entry " + eid);
}
boolean printHeader = true;
- for (LedgerReader.ReadResult<InputStream> rr : readResults) {
+ for (LedgerReader.ReadResult<ByteBuf> rr : readResults) {
if (corruptOnly) {
if (BKException.Code.DigestMatchException == rr.getResultCode()) {
if (printHeader) {
@@ -2287,9 +2283,10 @@
Entry.Reader reader = Entry.newBuilder()
.setLogSegmentInfo(lh.getId(), 0L)
.setEntryId(eid)
- .setInputStream(rr.getValue())
+ .setEntry(rr.getValue())
.setEnvelopeEntry(LogSegmentMetadata.supportsEnvelopedEntries(metadataVersion))
.buildReader();
+ rr.getValue().release();
printEntry(reader);
} else {
System.out.println("status = " + BKException.getMessage(rr.getResultCode()));
@@ -2346,9 +2343,10 @@
Entry.Reader reader = Entry.newBuilder()
.setLogSegmentInfo(0L, 0L)
.setEntryId(entry.getEntryId())
- .setInputStream(entry.getEntryInputStream())
+ .setEntry(entry.getEntryBuffer())
.setEnvelopeEntry(LogSegmentMetadata.supportsEnvelopedEntries(metadataVersion))
.buildReader();
+ entry.getEntryBuffer().release();
printEntry(reader);
++i;
}
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/TestAsyncReaderWriter.java b/distributedlog-core/src/test/java/org/apache/distributedlog/TestAsyncReaderWriter.java
index 5123178..781e8d0 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/TestAsyncReaderWriter.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/TestAsyncReaderWriter.java
@@ -17,6 +17,7 @@
*/
package org.apache.distributedlog;
+import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
@@ -2039,10 +2040,8 @@
recordSetWriter.writeRecord(ByteBuffer.wrap(record.getPayload()), writePromise);
recordSetFutures.add(writePromise);
}
- final ByteBuffer recordSetBuffer = recordSetWriter.getBuffer();
- byte[] data = new byte[recordSetBuffer.remaining()];
- recordSetBuffer.get(data);
- LogRecord setRecord = new LogRecord(6L, data);
+ final ByteBuf recordSetBuffer = recordSetWriter.getBuffer();
+ LogRecord setRecord = new LogRecord(6L, recordSetBuffer);
setRecord.setRecordSet();
CompletableFuture<DLSN> writeRecordSetFuture = writer.write(setRecord);
writeRecordSetFuture.whenComplete(new FutureEventListener<DLSN>() {
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/TestEntry.java b/distributedlog-core/src/test/java/org/apache/distributedlog/TestEntry.java
index 30ef481..5ea971e 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/TestEntry.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/TestEntry.java
@@ -17,27 +17,29 @@
*/
package org.apache.distributedlog;
+import static com.google.common.base.Charsets.UTF_8;
+import static org.apache.distributedlog.EnvelopedEntry.HEADER_LENGTH;
+import static org.apache.distributedlog.LogRecord.MAX_LOGRECORD_SIZE;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
import com.google.common.collect.Lists;
+import io.netty.buffer.ByteBuf;
+import java.nio.ByteBuffer;
+import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.distributedlog.Entry.Reader;
import org.apache.distributedlog.Entry.Writer;
import org.apache.distributedlog.exceptions.LogRecordTooLongException;
-import org.apache.distributedlog.io.Buffer;
import org.apache.distributedlog.io.CompressionCodec;
-import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.distributedlog.common.concurrent.FutureEventListener;
import org.apache.distributedlog.common.concurrent.FutureUtils;
import org.apache.distributedlog.util.Utils;
import org.junit.Assert;
import org.junit.Test;
-import java.nio.ByteBuffer;
-import java.util.List;
-
-import static com.google.common.base.Charsets.UTF_8;
-import static org.apache.distributedlog.LogRecord.MAX_LOGRECORD_SIZE;
-import static org.junit.Assert.*;
-
/**
* Test Case of {@link Entry}
*/
@@ -49,20 +51,28 @@
"test-empty-record-set",
1024,
true,
- CompressionCodec.Type.NONE,
- NullStatsLogger.INSTANCE);
- assertEquals("zero bytes", 0, writer.getNumBytes());
+ CompressionCodec.Type.NONE);
+ assertEquals("zero bytes", HEADER_LENGTH, writer.getNumBytes());
assertEquals("zero records", 0, writer.getNumRecords());
- Buffer buffer = writer.getBuffer();
- Entry recordSet = Entry.newBuilder()
- .setData(buffer.getData(), 0, buffer.size())
+ ByteBuf buffer = writer.getBuffer();
+ EnvelopedEntryReader reader = (EnvelopedEntryReader) Entry.newBuilder()
+ .setEntry(buffer)
.setLogSegmentInfo(1L, 0L)
.setEntryId(0L)
- .build();
- Reader reader = recordSet.reader();
+ .buildReader();
+ int refCnt = reader.getSrcBuf().refCnt();
+ assertFalse(reader.isExhausted());
Assert.assertNull("Empty record set should return null",
reader.nextRecord());
+ assertTrue(reader.isExhausted());
+ assertEquals(refCnt - 1, reader.getSrcBuf().refCnt());
+
+ // read next record again (to test release buffer)
+ Assert.assertNull("Empty record set should return null",
+ reader.nextRecord());
+ assertEquals(refCnt - 1, reader.getSrcBuf().refCnt());
+ buffer.release();
}
@Test(timeout = 20000)
@@ -70,10 +80,9 @@
Writer writer = Entry.newEntry(
"test-write-too-long-record",
1024,
- false,
- CompressionCodec.Type.NONE,
- NullStatsLogger.INSTANCE);
- assertEquals("zero bytes", 0, writer.getNumBytes());
+ true,
+ CompressionCodec.Type.NONE);
+ assertEquals("zero bytes", HEADER_LENGTH, writer.getNumBytes());
assertEquals("zero records", 0, writer.getNumRecords());
LogRecord largeRecord = new LogRecord(1L, new byte[MAX_LOGRECORD_SIZE + 1]);
@@ -83,11 +92,12 @@
} catch (LogRecordTooLongException lrtle) {
// expected
}
- assertEquals("zero bytes", 0, writer.getNumBytes());
+ assertEquals("zero bytes", HEADER_LENGTH, writer.getNumBytes());
assertEquals("zero records", 0, writer.getNumRecords());
- Buffer buffer = writer.getBuffer();
- Assert.assertEquals("zero bytes", 0, buffer.size());
+ ByteBuf buffer = writer.getBuffer();
+ assertEquals("zero bytes", HEADER_LENGTH, buffer.readableBytes());
+ buffer.release();
}
@Test(timeout = 20000)
@@ -96,9 +106,8 @@
"test-write-records",
1024,
true,
- CompressionCodec.Type.NONE,
- NullStatsLogger.INSTANCE);
- assertEquals("zero bytes", 0, writer.getNumBytes());
+ CompressionCodec.Type.NONE);
+ assertEquals("zero bytes", HEADER_LENGTH, writer.getNumBytes());
assertEquals("zero records", 0, writer.getNumRecords());
List<CompletableFuture<DLSN>> writePromiseList = Lists.newArrayList();
@@ -132,34 +141,38 @@
assertEquals((i + 6) + " records", (i + 6), writer.getNumRecords());
}
- Buffer buffer = writer.getBuffer();
+ ByteBuf buffer = writer.getBuffer();
+ buffer.retain();
// Test transmit complete
writer.completeTransmit(1L, 1L);
List<DLSN> writeResults = Utils.ioResult(FutureUtils.collect(writePromiseList));
for (int i = 0; i < 10; i++) {
- Assert.assertEquals(new DLSN(1L, 1L, i), writeResults.get(i));
+ assertEquals(new DLSN(1L, 1L, i), writeResults.get(i));
}
// Test reading from buffer
- Entry recordSet = Entry.newBuilder()
- .setData(buffer.getData(), 0, buffer.size())
+ Reader reader = Entry.newBuilder()
+ .setEntry(buffer)
.setLogSegmentInfo(1L, 1L)
.setEntryId(0L)
- .build();
- Reader reader = recordSet.reader();
+ .setEnvelopeEntry(true)
+ .buildReader();
+ buffer.release();
LogRecordWithDLSN record = reader.nextRecord();
int numReads = 0;
long expectedTxid = 0L;
while (null != record) {
- Assert.assertEquals(expectedTxid, record.getTransactionId());
- Assert.assertEquals(expectedTxid, record.getSequenceId());
- Assert.assertEquals(new DLSN(1L, 0L, expectedTxid), record.getDlsn());
+ assertEquals(expectedTxid, record.getTransactionId());
+ assertEquals(expectedTxid, record.getSequenceId());
+ assertEquals(new DLSN(1L, 0L, expectedTxid), record.getDlsn());
++numReads;
++expectedTxid;
record = reader.nextRecord();
}
- Assert.assertEquals(10, numReads);
+ assertEquals(10, numReads);
+
+ reader.release();
}
@Test(timeout = 20000)
@@ -168,9 +181,8 @@
"test-write-recordset",
1024,
true,
- CompressionCodec.Type.NONE,
- NullStatsLogger.INSTANCE);
- assertEquals("zero bytes", 0, writer.getNumBytes());
+ CompressionCodec.Type.NONE);
+ assertEquals("zero bytes", HEADER_LENGTH, writer.getNumBytes());
assertEquals("zero records", 0, writer.getNumRecords());
List<CompletableFuture<DLSN>> writePromiseList = Lists.newArrayList();
@@ -194,10 +206,8 @@
recordSetPromiseList.add(writePromise);
assertEquals((i + 1) + " records", (i + 1), recordSetWriter.getNumRecords());
}
- final ByteBuffer recordSetBuffer = recordSetWriter.getBuffer();
- byte[] data = new byte[recordSetBuffer.remaining()];
- recordSetBuffer.get(data);
- LogRecord setRecord = new LogRecord(5L, data);
+ final ByteBuf recordSetBuffer = recordSetWriter.getBuffer();
+ LogRecord setRecord = new LogRecord(5L, recordSetBuffer);
setRecord.setPositionWithinLogSegment(5);
setRecord.setRecordSet();
CompletableFuture<DLSN> writePromise = new CompletableFuture<DLSN>();
@@ -228,21 +238,22 @@
assertEquals((i + 11) + " records", (i + 11), writer.getNumRecords());
}
- Buffer buffer = writer.getBuffer();
+ ByteBuf buffer = writer.getBuffer();
+ buffer.retain();
// Test transmit complete
writer.completeTransmit(1L, 1L);
List<DLSN> writeResults = Utils.ioResult(FutureUtils.collect(writePromiseList));
for (int i = 0; i < 5; i++) {
- Assert.assertEquals(new DLSN(1L, 1L, i), writeResults.get(i));
+ assertEquals(new DLSN(1L, 1L, i), writeResults.get(i));
}
- Assert.assertEquals(new DLSN(1L, 1L, 5), writeResults.get(5));
+ assertEquals(new DLSN(1L, 1L, 5), writeResults.get(5));
for (int i = 0; i < 5; i++) {
- Assert.assertEquals(new DLSN(1L, 1L, (10 + i)), writeResults.get(6 + i));
+ assertEquals(new DLSN(1L, 1L, (10 + i)), writeResults.get(6 + i));
}
List<DLSN> recordSetWriteResults = Utils.ioResult(FutureUtils.collect(recordSetPromiseList));
for (int i = 0; i < 5; i++) {
- Assert.assertEquals(new DLSN(1L, 1L, (5 + i)), recordSetWriteResults.get(i));
+ assertEquals(new DLSN(1L, 1L, (5 + i)), recordSetWriteResults.get(i));
}
// Test reading from buffer
@@ -264,9 +275,11 @@
verifyReadResult(buffer, 1L, 1L, 1L, false,
new DLSN(1L, 1L, 12L), 0, 0, 3,
new DLSN(1L, 1L, 12L), 12L);
+
+ buffer.release();
}
- void verifyReadResult(Buffer data,
+ void verifyReadResult(ByteBuf data,
long lssn, long entryId, long startSequenceId,
boolean deserializeRecordSet,
DLSN skipTo,
@@ -275,14 +288,13 @@
int lastNumRecords,
DLSN expectedDLSN,
long expectedTxId) throws Exception {
- Entry recordSet = Entry.newBuilder()
- .setData(data.getData(), 0, data.size())
+ Reader reader = Entry.newBuilder()
+ .setEntry(data)
.setLogSegmentInfo(lssn, startSequenceId)
.setEntryId(entryId)
.deserializeRecordSet(deserializeRecordSet)
- .skipTo(skipTo)
- .build();
- Reader reader = recordSet.reader();
+ .buildReader();
+ reader.skipTo(skipTo);
LogRecordWithDLSN record;
for (int i = 0; i < firstNumRecords; i++) { // first
@@ -291,7 +303,7 @@
assertEquals(expectedDLSN, record.getDlsn());
assertEquals(expectedTxId, record.getTransactionId());
assertNotNull("record " + record + " payload is null",
- record.getPayload());
+ record.getPayloadBuf());
assertEquals("record-" + expectedTxId, new String(record.getPayload(), UTF_8));
expectedDLSN = expectedDLSN.getNextDLSN();
++expectedTxId;
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/TestEnvelopedEntry.java b/distributedlog-core/src/test/java/org/apache/distributedlog/TestEnvelopedEntry.java
deleted file mode 100644
index 6d78f03..0000000
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/TestEnvelopedEntry.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog;
-
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-
-import org.apache.distributedlog.io.Buffer;
-import org.apache.distributedlog.io.CompressionCodec;
-import org.apache.bookkeeper.stats.NullStatsLogger;
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class TestEnvelopedEntry {
-
- static final Logger LOG = LoggerFactory.getLogger(TestEnvelopedEntry.class);
-
- private String getString(boolean compressible) {
- if (compressible) {
- StringBuilder builder = new StringBuilder();
- for(int i = 0; i < 1000; i++) {
- builder.append('A');
- }
- return builder.toString();
- }
- return "DistributedLogEnvelopedEntry";
- }
-
- @Test(timeout = 20000)
- public void testEnvelope() throws Exception {
- byte[] data = getString(false).getBytes();
- EnvelopedEntry writeEntry = new EnvelopedEntry(EnvelopedEntry.CURRENT_VERSION,
- CompressionCodec.Type.NONE,
- data,
- data.length,
- new NullStatsLogger());
- Buffer outBuf = new Buffer(2 * data.length);
- writeEntry.writeFully(new DataOutputStream(outBuf));
- EnvelopedEntry readEntry = new EnvelopedEntry(EnvelopedEntry.CURRENT_VERSION,
- new NullStatsLogger());
- readEntry.readFully(new DataInputStream(new ByteArrayInputStream(outBuf.getData())));
- byte[] newData = readEntry.getDecompressedPayload();
- Assert.assertEquals("Written data should equal read data", new String(data), new String(newData));
- }
-
- @Test(timeout = 20000)
- public void testLZ4Compression() throws Exception {
- byte[] data = getString(true).getBytes();
- EnvelopedEntry writeEntry = new EnvelopedEntry(EnvelopedEntry.CURRENT_VERSION,
- CompressionCodec.Type.LZ4,
- data,
- data.length,
- new NullStatsLogger());
- Buffer outBuf = new Buffer(data.length);
- writeEntry.writeFully(new DataOutputStream(outBuf));
- Assert.assertTrue(data.length > outBuf.size());
- EnvelopedEntry readEntry = new EnvelopedEntry(EnvelopedEntry.CURRENT_VERSION,
- new NullStatsLogger());
- readEntry.readFully(new DataInputStream(new ByteArrayInputStream(outBuf.getData())));
- byte[] newData = readEntry.getDecompressedPayload();
- Assert.assertEquals("Written data should equal read data", new String(data), new String(newData));
- }
-}
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/TestReadAheadEntryReader.java b/distributedlog-core/src/test/java/org/apache/distributedlog/TestReadAheadEntryReader.java
index eda8eb2..1e8ed84 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/TestReadAheadEntryReader.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/TestReadAheadEntryReader.java
@@ -343,6 +343,7 @@
readAheadEntryReader.getNextReadAheadEntry(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
assertEquals(2L, entryReader.getLSSN());
assertEquals(1L, entryReader.getEntryId());
+ entryReader.release();
Utils.close(readAheadEntryReader);
// positioning on a partially truncated log segment (segment 2) before min active dlsn
@@ -355,6 +356,7 @@
readAheadEntryReader.getNextReadAheadEntry(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
assertEquals(2L, entryReader.getLSSN());
assertEquals(1L, entryReader.getEntryId());
+ entryReader.release();
Utils.close(readAheadEntryReader);
// positioning on a partially truncated log segment (segment 2) after min active dlsn
@@ -367,6 +369,7 @@
readAheadEntryReader.getNextReadAheadEntry(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
assertEquals(2L, entryReader.getLSSN());
assertEquals(2L, entryReader.getEntryId());
+ entryReader.release();
Utils.close(readAheadEntryReader);
Utils.close(writer);
diff --git a/distributedlog-protocol/pom.xml b/distributedlog-protocol/pom.xml
index e81450d..010f5b2 100644
--- a/distributedlog-protocol/pom.xml
+++ b/distributedlog-protocol/pom.xml
@@ -37,6 +37,11 @@
<scope>provided</scope>
</dependency>
<dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-buffer</artifactId>
+ <version>${netty.version}</version>
+ </dependency>
+ <dependency>
<groupId>net.jpountz.lz4</groupId>
<artifactId>lz4</artifactId>
<version>${lz4.version}</version>
diff --git a/distributedlog-protocol/src/main/java/org/apache/distributedlog/EnvelopedRecordSetReader.java b/distributedlog-protocol/src/main/java/org/apache/distributedlog/EnvelopedRecordSetReader.java
index 1648c6d..83a950e 100644
--- a/distributedlog-protocol/src/main/java/org/apache/distributedlog/EnvelopedRecordSetReader.java
+++ b/distributedlog-protocol/src/main/java/org/apache/distributedlog/EnvelopedRecordSetReader.java
@@ -17,23 +17,21 @@
*/
package org.apache.distributedlog;
-import static org.apache.distributedlog.LogRecordSet.COMPRESSION_CODEC_LZ4;
import static org.apache.distributedlog.LogRecordSet.METADATA_COMPRESSION_MASK;
import static org.apache.distributedlog.LogRecordSet.METADATA_VERSION_MASK;
-import static org.apache.distributedlog.LogRecordSet.NULL_OP_STATS_LOGGER;
import static org.apache.distributedlog.LogRecordSet.VERSION;
-import org.apache.distributedlog.io.CompressionCodec;
-import org.apache.distributedlog.io.CompressionUtils;
-import java.io.DataInputStream;
+import io.netty.buffer.ByteBuf;
import java.io.IOException;
-import java.io.InputStream;
-import java.nio.ByteBuffer;
-
+import lombok.extern.slf4j.Slf4j;
+import org.apache.distributedlog.io.CompressionCodec;
+import org.apache.distributedlog.io.CompressionCodec.Type;
+import org.apache.distributedlog.io.CompressionUtils;
/**
* Record reader to read records from an enveloped entry buffer.
*/
+@Slf4j
class EnvelopedRecordSetReader implements LogRecordSet.Reader {
private final long logSegmentSeqNo;
@@ -41,7 +39,7 @@
private final long transactionId;
private final long startSequenceId;
private int numRecords;
- private final ByteBuffer reader;
+ private final ByteBuf reader;
// slot id
private long slotId;
@@ -53,7 +51,7 @@
long startSlotId,
int startPositionWithinLogSegment,
long startSequenceId,
- InputStream in)
+ ByteBuf src)
throws IOException {
this.logSegmentSeqNo = logSegmentSeqNo;
this.entryId = entryId;
@@ -63,7 +61,6 @@
this.startSequenceId = startSequenceId;
// read data
- DataInputStream src = new DataInputStream(in);
int metadata = src.readInt();
int version = metadata & METADATA_VERSION_MASK;
if (version != VERSION) {
@@ -72,22 +69,21 @@
}
int codecCode = metadata & METADATA_COMPRESSION_MASK;
this.numRecords = src.readInt();
- int originDataLen = src.readInt();
- int actualDataLen = src.readInt();
- byte[] compressedData = new byte[actualDataLen];
- src.readFully(compressedData);
-
- if (COMPRESSION_CODEC_LZ4 == codecCode) {
- CompressionCodec codec = CompressionUtils.getCompressionCodec(CompressionCodec.Type.LZ4);
- byte[] decompressedData = codec.decompress(compressedData, 0, actualDataLen,
- originDataLen, NULL_OP_STATS_LOGGER);
- this.reader = ByteBuffer.wrap(decompressedData);
- } else {
- if (originDataLen != actualDataLen) {
- throw new IOException("Inconsistent data length found for a non-compressed record set : original = "
- + originDataLen + ", actual = " + actualDataLen);
+ int decompressedDataLen = src.readInt();
+ int compressedDataLen = src.readInt();
+ ByteBuf compressedBuf = src.slice(src.readerIndex(), compressedDataLen);
+ try {
+ if (Type.NONE.code() == codecCode && decompressedDataLen != compressedDataLen) {
+ throw new IOException("Inconsistent data length found for a non-compressed record set : decompressed = "
+ + decompressedDataLen + ", actual = " + compressedDataLen);
}
- this.reader = ByteBuffer.wrap(compressedData);
+ CompressionCodec codec = CompressionUtils.getCompressionCodec(Type.of(codecCode));
+ this.reader = codec.decompress(compressedBuf, decompressedDataLen);
+ } finally {
+ compressedBuf.release();
+ }
+ if (numRecords == 0) {
+ this.reader.release();
}
}
@@ -97,22 +93,34 @@
return null;
}
- int recordLen = reader.getInt();
- byte[] recordData = new byte[recordLen];
- reader.get(recordData);
- DLSN dlsn = new DLSN(logSegmentSeqNo, entryId, slotId);
+ int recordLen = reader.readInt();
+ ByteBuf recordBuf = reader.slice(reader.readerIndex(), recordLen);
+ reader.readerIndex(reader.readerIndex() + recordLen);
+ DLSN dlsn = new DLSN(logSegmentSeqNo, entryId, slotId);
LogRecordWithDLSN record =
new LogRecordWithDLSN(dlsn, startSequenceId);
record.setPositionWithinLogSegment(position);
record.setTransactionId(transactionId);
- record.setPayload(recordData);
+ record.setPayloadBuf(recordBuf, true);
++slotId;
++position;
--numRecords;
+ // release the record set buffer when exhausting the reader
+ if (0 == numRecords) {
+ this.reader.release();
+ }
+
return record;
}
+ @Override
+ public void release() {
+ if (0 != numRecords) {
+ numRecords = 0;
+ reader.release();
+ }
+ }
}
diff --git a/distributedlog-protocol/src/main/java/org/apache/distributedlog/EnvelopedRecordSetWriter.java b/distributedlog-protocol/src/main/java/org/apache/distributedlog/EnvelopedRecordSetWriter.java
index 2a60ff3..0bfd06f 100644
--- a/distributedlog-protocol/src/main/java/org/apache/distributedlog/EnvelopedRecordSetWriter.java
+++ b/distributedlog-protocol/src/main/java/org/apache/distributedlog/EnvelopedRecordSetWriter.java
@@ -17,71 +17,59 @@
*/
package org.apache.distributedlog;
+import static org.apache.distributedlog.LogRecord.MAX_LOGRECORDSET_SIZE;
import static org.apache.distributedlog.LogRecord.MAX_LOGRECORD_SIZE;
-import static org.apache.distributedlog.LogRecordSet.COMPRESSION_CODEC_LZ4;
-import static org.apache.distributedlog.LogRecordSet.COMPRESSION_CODEC_NONE;
+import static org.apache.distributedlog.LogRecordSet.COMPRESSED_SIZE_OFFSET;
+import static org.apache.distributedlog.LogRecordSet.COUNT_OFFSET;
+import static org.apache.distributedlog.LogRecordSet.DECOMPRESSED_SIZE_OFFSET;
import static org.apache.distributedlog.LogRecordSet.HEADER_LEN;
import static org.apache.distributedlog.LogRecordSet.METADATA_COMPRESSION_MASK;
+import static org.apache.distributedlog.LogRecordSet.METADATA_OFFSET;
import static org.apache.distributedlog.LogRecordSet.METADATA_VERSION_MASK;
-import static org.apache.distributedlog.LogRecordSet.NULL_OP_STATS_LOGGER;
import static org.apache.distributedlog.LogRecordSet.VERSION;
-import java.io.DataOutputStream;
-import java.io.IOException;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.util.ReferenceCountUtil;
import java.nio.ByteBuffer;
-import java.nio.channels.Channels;
-import java.nio.channels.WritableByteChannel;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
+import lombok.extern.slf4j.Slf4j;
import org.apache.distributedlog.exceptions.LogRecordTooLongException;
import org.apache.distributedlog.exceptions.WriteException;
import org.apache.distributedlog.io.Buffer;
import org.apache.distributedlog.io.CompressionCodec;
+import org.apache.distributedlog.io.CompressionCodec.Type;
import org.apache.distributedlog.io.CompressionUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* {@link Buffer} based log record set writer.
*/
+@Slf4j
class EnvelopedRecordSetWriter implements LogRecordSet.Writer {
- private static final Logger logger = LoggerFactory.getLogger(EnvelopedRecordSetWriter.class);
-
- private final Buffer buffer;
- private final DataOutputStream writer;
- private final WritableByteChannel writeChannel;
+ private final ByteBuf buffer;
private final List<CompletableFuture<DLSN>> promiseList;
private final CompressionCodec.Type codec;
+ private final int metadata;
private final int codecCode;
private int count = 0;
- private ByteBuffer recordSetBuffer = null;
+ private ByteBuf recordSetBuffer = null;
EnvelopedRecordSetWriter(int initialBufferSize,
CompressionCodec.Type codec) {
- this.buffer = new Buffer(Math.max(initialBufferSize, HEADER_LEN));
+ this.buffer = PooledByteBufAllocator.DEFAULT.buffer(
+ Math.max(initialBufferSize, HEADER_LEN),
+ MAX_LOGRECORDSET_SIZE);
this.promiseList = new LinkedList<CompletableFuture<DLSN>>();
this.codec = codec;
- switch (codec) {
- case LZ4:
- this.codecCode = COMPRESSION_CODEC_LZ4;
- break;
- default:
- this.codecCode = COMPRESSION_CODEC_NONE;
- break;
- }
- this.writer = new DataOutputStream(buffer);
- try {
- this.writer.writeInt((VERSION & METADATA_VERSION_MASK)
- | (codecCode & METADATA_COMPRESSION_MASK));
- this.writer.writeInt(0); // count
- this.writer.writeInt(0); // original len
- this.writer.writeInt(0); // actual len
- } catch (IOException e) {
- logger.warn("Failed to serialize the header to an enveloped record set", e);
- }
- this.writeChannel = Channels.newChannel(writer);
+ this.codecCode = codec.code();
+ this.metadata = (VERSION & METADATA_VERSION_MASK) | (codecCode & METADATA_COMPRESSION_MASK);
+ this.buffer.writeInt(metadata);
+ this.buffer.writeInt(0); // count
+ this.buffer.writeInt(0); // original len
+ this.buffer.writeInt(0); // actual len
}
synchronized List<CompletableFuture<DLSN>> getPromiseList() {
@@ -98,15 +86,10 @@
"Log Record of size " + logRecordSize + " written when only "
+ MAX_LOGRECORD_SIZE + " is allowed");
}
- try {
- writer.writeInt(record.remaining());
- writeChannel.write(record);
- ++count;
- promiseList.add(transmitPromise);
- } catch (IOException e) {
- logger.error("Failed to append record to record set", e);
- throw new WriteException("", "Failed to append record to record set");
- }
+ buffer.writeInt(logRecordSize);
+ buffer.writeBytes(record);
+ ++count;
+ promiseList.add(transmitPromise);
}
private synchronized void satisfyPromises(long lssn, long entryId, long startSlotId) {
@@ -127,7 +110,7 @@
@Override
public int getNumBytes() {
- return buffer.size();
+ return buffer.readableBytes();
}
@Override
@@ -136,62 +119,53 @@
}
@Override
- public synchronized ByteBuffer getBuffer() {
+ public synchronized ByteBuf getBuffer() {
if (null == recordSetBuffer) {
recordSetBuffer = createBuffer();
}
- return recordSetBuffer.duplicate();
+ return recordSetBuffer.retainedSlice();
}
- ByteBuffer createBuffer() {
- byte[] data = buffer.getData();
+ ByteBuf createBuffer() {
int dataOffset = HEADER_LEN;
- int dataLen = buffer.size() - HEADER_LEN;
+ int dataLen = buffer.readableBytes() - HEADER_LEN;
- if (COMPRESSION_CODEC_LZ4 != codecCode) {
- ByteBuffer recordSetBuffer = ByteBuffer.wrap(data, 0, buffer.size());
+ if (Type.NONE.code() == codecCode) {
// update count
- recordSetBuffer.putInt(4, count);
+ buffer.setInt(COUNT_OFFSET, count);
// update data len
- recordSetBuffer.putInt(8, dataLen);
- recordSetBuffer.putInt(12, dataLen);
- return recordSetBuffer;
+ buffer.setInt(DECOMPRESSED_SIZE_OFFSET, dataLen);
+ buffer.setInt(COMPRESSED_SIZE_OFFSET, dataLen);
+ return buffer.retain();
}
// compression
CompressionCodec compressor =
CompressionUtils.getCompressionCodec(codec);
- byte[] compressed =
- compressor.compress(data, dataOffset, dataLen, NULL_OP_STATS_LOGGER);
-
- ByteBuffer recordSetBuffer;
- if (compressed.length > dataLen) {
- byte[] newData = new byte[HEADER_LEN + compressed.length];
- System.arraycopy(data, 0, newData, 0, HEADER_LEN + dataLen);
- recordSetBuffer = ByteBuffer.wrap(newData);
- } else {
- recordSetBuffer = ByteBuffer.wrap(data);
- }
- // version
- recordSetBuffer.position(4);
+ ByteBuf uncompressedBuf = buffer.slice(dataOffset, dataLen);
+ ByteBuf compressedBuf = compressor.compress(uncompressedBuf, HEADER_LEN);
+ compressedBuf.setInt(METADATA_OFFSET, metadata);
// update count
- recordSetBuffer.putInt(count);
+ compressedBuf.setInt(COUNT_OFFSET, count);
// update data len
- recordSetBuffer.putInt(dataLen);
- recordSetBuffer.putInt(compressed.length);
- recordSetBuffer.put(compressed);
- recordSetBuffer.flip();
- return recordSetBuffer;
+ compressedBuf.setInt(DECOMPRESSED_SIZE_OFFSET, dataLen);
+ compressedBuf.setInt(COMPRESSED_SIZE_OFFSET, compressedBuf.readableBytes() - HEADER_LEN);
+
+ return compressedBuf;
}
@Override
- public void completeTransmit(long lssn, long entryId, long startSlotId) {
+ public synchronized void completeTransmit(long lssn, long entryId, long startSlotId) {
satisfyPromises(lssn, entryId, startSlotId);
+ buffer.release();
+ ReferenceCountUtil.release(recordSetBuffer);
}
@Override
- public void abortTransmit(Throwable reason) {
+ public synchronized void abortTransmit(Throwable reason) {
cancelPromises(reason);
+ buffer.release();
+ ReferenceCountUtil.release(recordSetBuffer);
}
}
diff --git a/distributedlog-protocol/src/main/java/org/apache/distributedlog/LogRecord.java b/distributedlog-protocol/src/main/java/org/apache/distributedlog/LogRecord.java
index f2a19fc..01d9e9b 100644
--- a/distributedlog-protocol/src/main/java/org/apache/distributedlog/LogRecord.java
+++ b/distributedlog-protocol/src/main/java/org/apache/distributedlog/LogRecord.java
@@ -18,12 +18,15 @@
package org.apache.distributedlog;
import com.google.common.annotations.VisibleForTesting;
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufInputStream;
+import io.netty.buffer.Unpooled;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
+import java.nio.ByteBuffer;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.apache.distributedlog.common.util.ByteBufUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -103,6 +106,7 @@
*
* @see LogRecordWithDLSN
*/
+@NotThreadSafe
public class LogRecord {
private static final Logger LOG = LoggerFactory.getLogger(LogRecord.class);
@@ -128,7 +132,7 @@
private long metadata;
private long txid;
- private byte[] payload;
+ private ByteBuf payload;
/**
* Construct an uninitialized log record.
@@ -152,7 +156,32 @@
*/
public LogRecord(long txid, byte[] payload) {
this.txid = txid;
- this.payload = payload;
+ this.payload = Unpooled.wrappedBuffer(payload);
+ }
+
+ /**
+ * Construct a log record with <i>txid</i> and payload <i>buffer</i>.
+ *
+ * @param txid application defined transaction id.
+ * @param buffer payload buffer.
+ */
+ public LogRecord(long txid, ByteBuffer buffer) {
+ this.txid = txid;
+ this.payload = Unpooled.wrappedBuffer(buffer);
+ }
+
+ /**
+ * Used by {@link LogRecordWithDLSN} to construct a log record read by readers.
+ *
+ * @param txid transaction id
+ * @param payload playload
+ */
+ protected LogRecord(long txid, ByteBuf payload) {
+ this.txid = txid;
+ // Need to make a copy since the passed payload is using a ref-count buffer that we don't know when could
+ // release, since the record is passed to the user. Also, the passed ByteBuf is coming from network and is
+ // backed by a direct buffer which we could not expose as a byte[]
+ this.payload = Unpooled.copiedBuffer(payload);
this.metadata = 0;
}
@@ -184,16 +213,22 @@
* @return payload of this log record.
*/
public byte[] getPayload() {
- return payload;
+ return ByteBufUtils.getArray(payload);
}
- /**
- * Set payload for this log record.
- *
- * @param payload payload of this log record
- */
- void setPayload(byte[] payload) {
- this.payload = payload;
+ public ByteBuf getPayloadBuf() {
+ return payload.slice();
+ }
+
+ void setPayloadBuf(ByteBuf payload, boolean copyData) {
+ if (null != this.payload) {
+ this.payload.release();
+ }
+ if (copyData) {
+ this.payload = Unpooled.copiedBuffer(payload);
+ } else {
+ this.payload = payload;
+ }
}
/**
@@ -202,7 +237,7 @@
* @return payload as input stream
*/
public InputStream getPayLoadInputStream() {
- return new ByteArrayInputStream(payload);
+ return new ByteBufInputStream(payload.retainedSlice(), true);
}
//
@@ -340,21 +375,25 @@
// Serialization & Deserialization
//
- protected void readPayload(DataInputStream in) throws IOException {
+ protected void readPayload(ByteBuf in, boolean copyData) throws IOException {
int length = in.readInt();
if (length < 0) {
throw new EOFException("Log Record is corrupt: Negative length " + length);
}
- payload = new byte[length];
- in.readFully(payload);
+ if (copyData) {
+ setPayloadBuf(in.slice(in.readerIndex(), length), true);
+ } else {
+ setPayloadBuf(in.retainedSlice(in.readerIndex(), length), false);
+ }
+ in.skipBytes(length);
}
- private void writePayload(DataOutputStream out) throws IOException {
- out.writeInt(payload.length);
- out.write(payload);
+ private void writePayload(ByteBuf out) {
+ out.writeInt(payload.readableBytes());
+ out.writeBytes(payload, payload.readerIndex(), payload.readableBytes());
}
- private void writeToStream(DataOutputStream out) throws IOException {
+ private void writeToStream(ByteBuf out) {
out.writeLong(metadata);
out.writeLong(txid);
writePayload(out);
@@ -369,16 +408,16 @@
*/
int getPersistentSize() {
// Flags + TxId + Payload-length + payload
- return 2 * (Long.SIZE / 8) + Integer.SIZE / 8 + payload.length;
+ return 2 * (Long.SIZE / 8) + Integer.SIZE / 8 + payload.readableBytes();
}
/**
* Writer class to write log records into an output {@code stream}.
*/
public static class Writer {
- private final DataOutputStream buf;
+ private final ByteBuf buf;
- public Writer(DataOutputStream out) {
+ public Writer(ByteBuf out) {
this.buf = out;
}
@@ -386,14 +425,13 @@
* Write an operation to the output stream.
*
* @param record The operation to write
- * @throws IOException if an error occurs during writing.
*/
- public void writeOp(LogRecord record) throws IOException {
+ public void writeOp(LogRecord record) {
record.writeToStream(buf);
}
public int getPendingBytes() {
- return buf.size();
+ return buf.readableBytes();
}
}
@@ -402,7 +440,7 @@
*/
public static class Reader {
private final RecordStream recordStream;
- private final DataInputStream in;
+ private final ByteBuf in;
private final long startSequenceId;
private final boolean deserializeRecordSet;
private static final int SKIP_BUFFER_SIZE = 512;
@@ -417,13 +455,13 @@
* @param startSequenceId the start sequence id.
*/
public Reader(RecordStream recordStream,
- DataInputStream in,
+ ByteBuf in,
long startSequenceId) {
this(recordStream, in, startSequenceId, true);
}
public Reader(RecordStream recordStream,
- DataInputStream in,
+ ByteBuf in,
long startSequenceId,
boolean deserializeRecordSet) {
this.recordStream = recordStream;
@@ -460,6 +498,10 @@
}
}
+ if (in.readableBytes() <= 0) {
+ return null;
+ }
+
try {
long metadata = in.readLong();
// Reading the first 8 bytes positions the record stream on the correct log record
@@ -470,7 +512,14 @@
nextRecordInStream = new LogRecordWithDLSN(recordStream.getCurrentPosition(), startSequenceId);
nextRecordInStream.setMetadata(metadata);
nextRecordInStream.setTransactionId(in.readLong());
- nextRecordInStream.readPayload(in);
+
+ // 1) if it is simple record, copy the data
+ // 2) if it is record set and deserializeRecordSet is true, we don't need to copy data,
+ // defer data copying to deserializing record from record set.
+ // 3) if it is record set and deserializeRecordSet is false, we copy the data, so applications
+ // don't have to deal with reference count.
+ boolean copyData = !isRecordSet(metadata) || !deserializeRecordSet;
+ nextRecordInStream.readPayload(in, copyData);
if (LOG.isTraceEnabled()) {
if (nextRecordInStream.isControl()) {
LOG.trace("Reading {} Control DLSN {}",
@@ -509,8 +558,6 @@
}
private boolean skipTo(Long txId, DLSN dlsn, boolean skipControl) throws IOException {
- LOG.debug("SkipTo");
- byte[] skipBuffer = null;
boolean found = false;
while (true) {
try {
@@ -519,7 +566,7 @@
// if there is not record set, read next record
if (null == recordSetReader) {
- in.mark(INPUTSTREAM_MARK_LIMIT);
+ in.markReaderIndex();
flags = in.readLong();
currTxId = in.readLong();
} else {
@@ -539,7 +586,7 @@
LOG.trace("Found position {} beyond {}", recordStream.getCurrentPosition(), dlsn);
}
if (null == lastRecordSkipTo) {
- in.reset();
+ in.resetReaderIndex();
}
found = true;
break;
@@ -550,7 +597,7 @@
LOG.trace("Found position {} beyond {}", currTxId, txId);
}
if (null == lastRecordSkipTo) {
- in.reset();
+ in.resetReaderIndex();
}
found = true;
break;
@@ -569,7 +616,7 @@
new LogRecordWithDLSN(recordStream.getCurrentPosition(), startSequenceId);
record.setMetadata(flags);
record.setTransactionId(currTxId);
- record.readPayload(in);
+ record.readPayload(in, false);
recordSetReader = LogRecordSet.of(record);
} else {
int length = in.readInt();
@@ -580,15 +627,7 @@
break;
}
// skip single record
- if (null == skipBuffer) {
- skipBuffer = new byte[SKIP_BUFFER_SIZE];
- }
- int read = 0;
- while (read < length) {
- int bytesToRead = Math.min(length - read, SKIP_BUFFER_SIZE);
- in.readFully(skipBuffer, 0, bytesToRead);
- read += bytesToRead;
- }
+ in.skipBytes(length);
if (LOG.isTraceEnabled()) {
LOG.trace("Skipped Record with TxId {} DLSN {}",
currTxId, recordStream.getCurrentPosition());
diff --git a/distributedlog-protocol/src/main/java/org/apache/distributedlog/LogRecordSet.java b/distributedlog-protocol/src/main/java/org/apache/distributedlog/LogRecordSet.java
index 55b20ff..8db6019 100644
--- a/distributedlog-protocol/src/main/java/org/apache/distributedlog/LogRecordSet.java
+++ b/distributedlog-protocol/src/main/java/org/apache/distributedlog/LogRecordSet.java
@@ -19,12 +19,10 @@
import static com.google.common.base.Preconditions.checkArgument;
-import java.io.ByteArrayInputStream;
+import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
-import org.apache.bookkeeper.stats.NullStatsLogger;
-import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.distributedlog.exceptions.LogRecordTooLongException;
import org.apache.distributedlog.exceptions.WriteException;
import org.apache.distributedlog.io.CompressionCodec;
@@ -57,42 +55,35 @@
*/
public class LogRecordSet {
- static final OpStatsLogger NULL_OP_STATS_LOGGER =
- NullStatsLogger.INSTANCE.getOpStatsLogger("");
-
public static final int HEADER_LEN =
- 4 /* Metadata */
- + 4 /* Count */
- + 8 /* Lengths */
+ Integer.BYTES /* Metadata */
+ + Integer.BYTES /* Count */
+ + Integer.BYTES + Integer.BYTES /* Lengths: (decompressed + compressed) */
;
// Version
static final int VERSION = 0x1000;
+ static final int METADATA_OFFSET = 0;
+ static final int COUNT_OFFSET = METADATA_OFFSET + Integer.BYTES;
+ static final int DECOMPRESSED_SIZE_OFFSET = COUNT_OFFSET + Integer.BYTES;
+ static final int COMPRESSED_SIZE_OFFSET = DECOMPRESSED_SIZE_OFFSET + Integer.BYTES;
+
// Metadata
static final int METADATA_VERSION_MASK = 0xf000;
static final int METADATA_COMPRESSION_MASK = 0x3;
- // Compression Codec
- static final int COMPRESSION_CODEC_NONE = 0x0;
- static final int COMPRESSION_CODEC_LZ4 = 0X1;
-
public static int numRecords(LogRecord record) throws IOException {
checkArgument(record.isRecordSet(),
"record is not a recordset");
- byte[] data = record.getPayload();
- return numRecords(data);
- }
-
- public static int numRecords(byte[] data) throws IOException {
- ByteBuffer buffer = ByteBuffer.wrap(data);
- int metadata = buffer.getInt();
+ ByteBuf buffer = record.getPayloadBuf();
+ int metadata = buffer.getInt(METADATA_OFFSET);
int version = (metadata & METADATA_VERSION_MASK);
if (version != VERSION) {
throw new IOException(String.format("Version mismatch while reading. Received: %d,"
+ " Required: %d", version, VERSION));
}
- return buffer.getInt();
+ return buffer.getInt(COUNT_OFFSET);
}
public static Writer newWriter(int initialBufferSize,
@@ -103,7 +94,6 @@
public static Reader of(LogRecordWithDLSN record) throws IOException {
checkArgument(record.isRecordSet(),
"record is not a recordset");
- byte[] data = record.getPayload();
DLSN dlsn = record.getDlsn();
int startPosition = record.getPositionWithinLogSegment();
long startSequenceId = record.getStartSequenceIdOfCurrentSegment();
@@ -115,7 +105,7 @@
dlsn.getSlotId(),
startPosition,
startSequenceId,
- new ByteArrayInputStream(data));
+ record.getPayloadBuf());
}
/**
@@ -150,6 +140,11 @@
*/
LogRecordWithDLSN nextRecord() throws IOException;
+ /**
+ * Release the resources hold by this record set reader.
+ */
+ void release();
+
}
}
diff --git a/distributedlog-protocol/src/main/java/org/apache/distributedlog/LogRecordSetBuffer.java b/distributedlog-protocol/src/main/java/org/apache/distributedlog/LogRecordSetBuffer.java
index 1fac817..27d54f8 100644
--- a/distributedlog-protocol/src/main/java/org/apache/distributedlog/LogRecordSetBuffer.java
+++ b/distributedlog-protocol/src/main/java/org/apache/distributedlog/LogRecordSetBuffer.java
@@ -17,7 +17,7 @@
*/
package org.apache.distributedlog;
-import java.nio.ByteBuffer;
+import io.netty.buffer.ByteBuf;
/**
* Write representation of a {@link LogRecordSet}.
@@ -44,7 +44,7 @@
*
* @return the buffer to transmit.
*/
- ByteBuffer getBuffer();
+ ByteBuf getBuffer();
/**
* Complete transmit.
diff --git a/distributedlog-protocol/src/main/java/org/apache/distributedlog/LogRecordWithDLSN.java b/distributedlog-protocol/src/main/java/org/apache/distributedlog/LogRecordWithDLSN.java
index 3279a5a..096fc81 100644
--- a/distributedlog-protocol/src/main/java/org/apache/distributedlog/LogRecordWithDLSN.java
+++ b/distributedlog-protocol/src/main/java/org/apache/distributedlog/LogRecordWithDLSN.java
@@ -18,6 +18,7 @@
package org.apache.distributedlog;
import com.google.common.annotations.VisibleForTesting;
+import io.netty.buffer.ByteBuf;
/**
* Log record with {@link DLSN} and <code>SequenceId</code>.
@@ -72,6 +73,13 @@
this.startSequenceIdOfCurrentSegment = startSequenceIdOfCurrentSegment;
}
+ @VisibleForTesting
+ public LogRecordWithDLSN(DLSN dlsn, long txid, ByteBuf buffer, long startSequenceIdOfCurrentSegment) {
+ super(txid, buffer);
+ this.dlsn = dlsn;
+ this.startSequenceIdOfCurrentSegment = startSequenceIdOfCurrentSegment;
+ }
+
long getStartSequenceIdOfCurrentSegment() {
return startSequenceIdOfCurrentSegment;
}
diff --git a/distributedlog-protocol/src/main/java/org/apache/distributedlog/io/CompressionCodec.java b/distributedlog-protocol/src/main/java/org/apache/distributedlog/io/CompressionCodec.java
index 9a0e3a3..f07647e 100644
--- a/distributedlog-protocol/src/main/java/org/apache/distributedlog/io/CompressionCodec.java
+++ b/distributedlog-protocol/src/main/java/org/apache/distributedlog/io/CompressionCodec.java
@@ -17,7 +17,7 @@
*/
package org.apache.distributedlog.io;
-import org.apache.bookkeeper.stats.OpStatsLogger;
+import io.netty.buffer.ByteBuf;
/**
* Common interface for compression/decompression operations using different compression codecs.
@@ -27,54 +27,54 @@
* Enum specifying the currently supported compression types.
*/
enum Type {
- NONE, LZ4, UNKNOWN
+
+ UNKNOWN(-0x1),
+ NONE(0x0),
+ LZ4(0x1);
+
+ private int code;
+
+ Type(int code) {
+ this.code = code;
+ }
+
+ public int code() {
+ return this.code;
+ }
+
+ public static Type of(int code) {
+ switch (code) {
+ case 0x0:
+ return NONE;
+ case 0x1:
+ return LZ4;
+ default:
+ return UNKNOWN;
+ }
+ }
+
}
/**
- * Return the compressed data as a byte array.
- * @param data
+ * Return the compressed data as a byte buffer.
+ *
+ * @param uncompressed
* The data to be compressed
- * @param offset
- * The offset in the bytes of data to compress
- * @param length
- * The number of bytes of data to compress
- * @param compressionStat
- * The stat to use for timing the compression operation
+ * @param headerLen
+ * Account the header len for compressed buffer.
* @return
* The compressed data
* The returned byte array is sized to the length of the compressed data
*/
- byte[] compress(byte[] data, int offset, int length, OpStatsLogger compressionStat);
+ ByteBuf compress(ByteBuf uncompressed, int headerLen);
/**
* Return the decompressed data as a byte array.
- * @param data
- * The data to be decompressed
- * @param offset
- * The offset in the bytes of data to decompress
- * @param length
- * The number of bytes of data to decompress
- * @param decompressionStat
- * The stat to use for timing the decompression operation
- * @return
- * The decompressed data
- */
- byte[] decompress(byte[] data, int offset, int length, OpStatsLogger decompressionStat);
-
- /**
- * Return the decompressed data as a byte array.
- * @param data
+ *
+ * @param compressed
* The data to the decompressed
- * @param offset
- * The offset in the bytes of data to decompress
- * @param length
- * The number of bytes of data to decompress
- * @param decompressedSize
- * The exact size of the decompressed data
- * @param decompressionStat
- * The stat to use for timing the decompression operation
* @return
* The decompressed data
*/
- byte[] decompress(byte[] data, int offset, int length, int decompressedSize, OpStatsLogger decompressionStat);
+ ByteBuf decompress(ByteBuf compressed, int decompressedSize);
}
diff --git a/distributedlog-protocol/src/main/java/org/apache/distributedlog/io/CompressionUtils.java b/distributedlog-protocol/src/main/java/org/apache/distributedlog/io/CompressionUtils.java
index 7bac92a..ecbc09d 100644
--- a/distributedlog-protocol/src/main/java/org/apache/distributedlog/io/CompressionUtils.java
+++ b/distributedlog-protocol/src/main/java/org/apache/distributedlog/io/CompressionUtils.java
@@ -25,9 +25,6 @@
public static final String LZ4 = "lz4";
public static final String NONE = "none";
- private static final CompressionCodec IDENTITY_CODEC = new IdentityCompressionCodec();
- private static final CompressionCodec LZ4_CODEC = new LZ4CompressionCodec();
-
/**
* Get a cached compression codec instance for the specified type.
* @param type compression codec type
@@ -35,10 +32,10 @@
*/
public static CompressionCodec getCompressionCodec(CompressionCodec.Type type) {
if (type == CompressionCodec.Type.LZ4) {
- return LZ4_CODEC;
+ return LZ4CompressionCodec.of();
}
// No Compression
- return IDENTITY_CODEC;
+ return IdentityCompressionCodec.of();
}
/**
diff --git a/distributedlog-protocol/src/main/java/org/apache/distributedlog/io/IdentityCompressionCodec.java b/distributedlog-protocol/src/main/java/org/apache/distributedlog/io/IdentityCompressionCodec.java
index 5034033..d082ff8 100644
--- a/distributedlog-protocol/src/main/java/org/apache/distributedlog/io/IdentityCompressionCodec.java
+++ b/distributedlog-protocol/src/main/java/org/apache/distributedlog/io/IdentityCompressionCodec.java
@@ -20,31 +20,40 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
-import java.util.Arrays;
-import org.apache.bookkeeper.stats.OpStatsLogger;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.CompositeByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
/**
* An identity compression codec implementation for {@link CompressionCodec}.
*/
public class IdentityCompressionCodec implements CompressionCodec {
+
+ public static IdentityCompressionCodec of() {
+ return INSTANCE;
+ }
+
+ private static final IdentityCompressionCodec INSTANCE = new IdentityCompressionCodec();
+
@Override
- public byte[] compress(byte[] data, int offset, int length, OpStatsLogger compressionStat) {
- checkNotNull(data);
- checkArgument(length >= 0);
- return Arrays.copyOfRange(data, offset, offset + length);
+ public ByteBuf compress(ByteBuf uncompressed, int headerLen) {
+ checkNotNull(uncompressed);
+ checkArgument(uncompressed.readableBytes() >= 0);
+ if (headerLen == 0) {
+ return uncompressed.retain();
+ } else {
+ CompositeByteBuf composited = PooledByteBufAllocator.DEFAULT.compositeBuffer(2);
+ composited.addComponent(PooledByteBufAllocator.DEFAULT.buffer(headerLen, headerLen));
+ composited.addComponent(uncompressed.retain());
+ return composited;
+ }
}
@Override
- public byte[] decompress(byte[] data, int offset, int length, OpStatsLogger decompressionStat) {
- checkNotNull(data);
- return Arrays.copyOfRange(data, offset, offset + length);
- }
-
- @Override
- // Decompressed size is the same as the length of the data because this is an
- // Identity compressor
- public byte[] decompress(byte[] data, int offset, int length,
- int decompressedSize, OpStatsLogger decompressionStat) {
- return decompress(data, offset, length, decompressionStat);
+ public ByteBuf decompress(ByteBuf compressed, int decompressedSize) {
+ checkNotNull(compressed);
+ checkArgument(compressed.readableBytes() >= 0);
+ checkArgument(decompressedSize >= 0);
+ return compressed.retain();
}
}
diff --git a/distributedlog-protocol/src/main/java/org/apache/distributedlog/io/LZ4CompressionCodec.java b/distributedlog-protocol/src/main/java/org/apache/distributedlog/io/LZ4CompressionCodec.java
index b164fef..0f6b30a 100644
--- a/distributedlog-protocol/src/main/java/org/apache/distributedlog/io/LZ4CompressionCodec.java
+++ b/distributedlog-protocol/src/main/java/org/apache/distributedlog/io/LZ4CompressionCodec.java
@@ -20,14 +20,12 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
-import com.google.common.base.Stopwatch;
-import java.util.concurrent.TimeUnit;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import java.nio.ByteBuffer;
import net.jpountz.lz4.LZ4Compressor;
-import net.jpountz.lz4.LZ4Exception;
import net.jpountz.lz4.LZ4Factory;
import net.jpountz.lz4.LZ4FastDecompressor;
-import net.jpountz.lz4.LZ4SafeDecompressor;
-import org.apache.bookkeeper.stats.OpStatsLogger;
/**
* An {@code lz4} based {@link CompressionCodec} implementation.
@@ -36,66 +34,55 @@
*/
public class LZ4CompressionCodec implements CompressionCodec {
- // Used for compression
- private final LZ4Compressor compressor;
- // Used to decompress when the size of the output is known
- private final LZ4FastDecompressor fastDecompressor;
- // Used to decompress when the size of the output is not known
- private final LZ4SafeDecompressor safeDecompressor;
-
- public LZ4CompressionCodec() {
- this.compressor = LZ4Factory.fastestInstance().fastCompressor();
- this.fastDecompressor = LZ4Factory.fastestInstance().fastDecompressor();
- this.safeDecompressor = LZ4Factory.fastestInstance().safeDecompressor();
+ public static LZ4CompressionCodec of() {
+ return INSTANCE;
}
- @Override
- public byte[] compress(byte[] data, int offset, int length, OpStatsLogger compressionStat) {
- checkNotNull(data);
- checkArgument(offset >= 0 && offset < data.length);
- checkArgument(length >= 0);
- checkNotNull(compressionStat);
+ private static final LZ4CompressionCodec INSTANCE = new LZ4CompressionCodec();
- Stopwatch watch = Stopwatch.createStarted();
- byte[] compressed = compressor.compress(data, offset, length);
- compressionStat.registerSuccessfulEvent(watch.elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
+ private static final LZ4Factory factory = LZ4Factory.fastestJavaInstance();
+ // Used for compression
+ private static final LZ4Compressor compressor = factory.fastCompressor();
+ // Used to decompress when the size of the output is known
+ private static final LZ4FastDecompressor decompressor = factory.fastDecompressor();
+
+ @Override
+ public ByteBuf compress(ByteBuf uncompressed, int headerLen) {
+ checkNotNull(uncompressed);
+ checkArgument(uncompressed.readableBytes() > 0);
+
+ int uncompressedLen = uncompressed.readableBytes();
+ int maxLen = compressor.maxCompressedLength(uncompressedLen);
+
+ // get the source bytebuffer
+ ByteBuffer uncompressedNio = uncompressed.nioBuffer(uncompressed.readerIndex(), uncompressedLen);
+ ByteBuf compressed = PooledByteBufAllocator.DEFAULT.buffer(
+ maxLen + headerLen, maxLen + headerLen);
+ ByteBuffer compressedNio = compressed.nioBuffer(headerLen, maxLen);
+
+ int compressedLen = compressor.compress(
+ uncompressedNio, uncompressedNio.position(), uncompressedLen,
+ compressedNio, compressedNio.position(), maxLen);
+ compressed.writerIndex(compressedLen + headerLen);
+
return compressed;
}
@Override
- public byte[] decompress(byte[] data, int offset, int length, OpStatsLogger decompressionStat) {
- checkNotNull(data);
- checkArgument(offset >= 0 && offset < data.length);
- checkArgument(length >= 0);
- checkNotNull(decompressionStat);
-
- Stopwatch watch = Stopwatch.createStarted();
- // Assume that we have a compression ratio of 1/3.
- int outLength = length * 3;
- while (true) {
- try {
- byte[] decompressed = safeDecompressor.decompress(data, offset, length, outLength);
- decompressionStat.registerSuccessfulEvent(watch.elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
- return decompressed;
- } catch (LZ4Exception e) {
- outLength *= 2;
- }
- }
- }
-
- @Override
// length parameter is ignored here because of the way the fastDecompressor works.
- public byte[] decompress(byte[] data, int offset, int length, int decompressedSize,
- OpStatsLogger decompressionStat) {
- checkNotNull(data);
- checkArgument(offset >= 0 && offset < data.length);
- checkArgument(length >= 0);
+ public ByteBuf decompress(ByteBuf compressed, int decompressedSize) {
+ checkNotNull(compressed);
+ checkArgument(compressed.readableBytes() >= 0);
checkArgument(decompressedSize >= 0);
- checkNotNull(decompressionStat);
- Stopwatch watch = Stopwatch.createStarted();
- byte[] decompressed = fastDecompressor.decompress(data, offset, decompressedSize);
- decompressionStat.registerSuccessfulEvent(watch.elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
- return decompressed;
+ ByteBuf uncompressed = PooledByteBufAllocator.DEFAULT.buffer(decompressedSize, decompressedSize);
+ ByteBuffer uncompressedNio = uncompressed.nioBuffer(0, decompressedSize);
+ ByteBuffer compressedNio = compressed.nioBuffer(compressed.readerIndex(), compressed.readableBytes());
+
+ decompressor.decompress(
+ compressedNio, compressedNio.position(),
+ uncompressedNio, uncompressedNio.position(), uncompressedNio.remaining());
+ uncompressed.writerIndex(decompressedSize);
+ return uncompressed;
}
}
diff --git a/distributedlog-protocol/src/test/java/org/apache/distributedlog/TestLogRecordSet.java b/distributedlog-protocol/src/test/java/org/apache/distributedlog/TestLogRecordSet.java
index 1c5db24..a75f071 100644
--- a/distributedlog-protocol/src/test/java/org/apache/distributedlog/TestLogRecordSet.java
+++ b/distributedlog-protocol/src/test/java/org/apache/distributedlog/TestLogRecordSet.java
@@ -25,9 +25,11 @@
import static org.junit.Assert.fail;
import com.google.common.collect.Lists;
+import io.netty.buffer.ByteBuf;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.CompletableFuture;
+import lombok.extern.slf4j.Slf4j;
import org.apache.distributedlog.LogRecordSet.Reader;
import org.apache.distributedlog.LogRecordSet.Writer;
import org.apache.distributedlog.exceptions.LogRecordTooLongException;
@@ -38,6 +40,7 @@
/**
* Test Case for {@link LogRecordSet}.
*/
+@Slf4j
public class TestLogRecordSet {
@Test(timeout = 60000)
@@ -46,21 +49,19 @@
assertEquals("zero user bytes", HEADER_LEN, writer.getNumBytes());
assertEquals("zero records", 0, writer.getNumRecords());
- ByteBuffer buffer = writer.getBuffer();
- assertEquals("zero user bytes", HEADER_LEN, buffer.remaining());
-
- byte[] data = new byte[buffer.remaining()];
- buffer.get(data);
+ ByteBuf buffer = writer.getBuffer();
+ assertEquals("zero user bytes", HEADER_LEN, buffer.readableBytes());
LogRecordWithDLSN record = new LogRecordWithDLSN(
new DLSN(1L, 0L, 0L),
1L,
- data,
+ buffer,
1L);
record.setRecordSet();
Reader reader = LogRecordSet.of(record);
assertNull("Empty record set should return null",
reader.nextRecord());
+ reader.release();
}
@Test(timeout = 60000)
@@ -78,21 +79,19 @@
}
assertEquals("zero user bytes", HEADER_LEN, writer.getNumBytes());
assertEquals("zero records", 0, writer.getNumRecords());
- ByteBuffer buffer = writer.getBuffer();
- assertEquals("zero user bytes", HEADER_LEN, buffer.remaining());
-
- byte[] data = new byte[buffer.remaining()];
- buffer.get(data);
+ ByteBuf buffer = writer.getBuffer();
+ assertEquals("zero user bytes", HEADER_LEN, buffer.readableBytes());
LogRecordWithDLSN record = new LogRecordWithDLSN(
new DLSN(1L, 0L, 0L),
1L,
- data,
+ buffer,
1L);
record.setRecordSet();
Reader reader = LogRecordSet.of(record);
assertNull("Empty record set should return null",
reader.nextRecord());
+ reader.release();
}
@Test(timeout = 20000)
@@ -137,7 +136,7 @@
assertEquals((i + 6) + " records", (i + 6), writer.getNumRecords());
}
- ByteBuffer buffer = writer.getBuffer();
+ ByteBuf buffer = writer.getBuffer();
assertEquals("10 records", 10, writer.getNumRecords());
// Test transmit complete
@@ -147,14 +146,10 @@
assertEquals(new DLSN(1L, 1L, 10L + i), writeResults.get(i));
}
- // Test reading from buffer
- byte[] data = new byte[buffer.remaining()];
- buffer.get(data);
-
LogRecordWithDLSN record = new LogRecordWithDLSN(
new DLSN(1L, 1L, 10L),
99L,
- data,
+ buffer,
999L);
record.setPositionWithinLogSegment(888);
record.setRecordSet();
@@ -172,6 +167,7 @@
readRecord = reader.nextRecord();
}
assertEquals(10, numReads);
+ reader.release();
}
}
diff --git a/distributedlog-protocol/src/test/java/org/apache/distributedlog/io/TestCompressionCodec.java b/distributedlog-protocol/src/test/java/org/apache/distributedlog/io/TestCompressionCodec.java
new file mode 100644
index 0000000..23bb8c6
--- /dev/null
+++ b/distributedlog-protocol/src/test/java/org/apache/distributedlog/io/TestCompressionCodec.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.io;
+
+import static com.google.common.base.Charsets.UTF_8;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.buffer.Unpooled;
+import org.apache.distributedlog.LogRecord;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Test Case for {@link CompressionCodec}.
+ */
+public class TestCompressionCodec {
+
+ @Test(timeout = 10000)
+ public void testUnknownCompressionCodec() throws Exception {
+ assertEquals(
+ CompressionCodec.Type.UNKNOWN,
+ CompressionUtils.stringToType("unknown"));
+ }
+
+ @Test(timeout = 10000)
+ public void testIdentityCompressionCodec() throws Exception {
+ testCompressionCodec(CompressionUtils.getCompressionCodec(CompressionCodec.Type.NONE));
+ }
+
+ @Test(timeout = 10000)
+ public void testLZ4CompressionCodec() throws Exception {
+ testCompressionCodec(CompressionUtils.getCompressionCodec(CompressionCodec.Type.LZ4));
+ }
+
+ @Test(timeout = 10000)
+ public void testIdentityCompressionCodec2() throws Exception {
+ testCompressionCodec2(CompressionUtils.getCompressionCodec(CompressionCodec.Type.NONE));
+ }
+
+ @Test(timeout = 10000)
+ public void testLZ4CompressionCodec2() throws Exception {
+ testCompressionCodec2(CompressionUtils.getCompressionCodec(CompressionCodec.Type.LZ4));
+ }
+
+ private void testCompressionCodec(CompressionCodec codec) throws Exception {
+ byte[] data = "identity-compression-codec".getBytes(UTF_8);
+ ByteBuf buf = Unpooled.wrappedBuffer(data);
+ ByteBuf compressedBuf = codec.compress(buf, 0);
+ ByteBuf decompressedBuf = codec.decompress(compressedBuf, data.length);
+ assertEquals("The length of decompressed buf should be same as the original buffer",
+ data.length, decompressedBuf.readableBytes());
+ byte[] decompressedData = new byte[data.length];
+ decompressedBuf.readBytes(decompressedData);
+ assertArrayEquals("The decompressed bytes should be same as the original bytes",
+ data, decompressedData);
+ buf.release();
+ compressedBuf.release();
+ decompressedBuf.release();
+ }
+
+ private void testCompressionCodec2(CompressionCodec codec) throws Exception {
+ ByteBuf buffer = PooledByteBufAllocator.DEFAULT.buffer(32, LogRecord.MAX_LOGRECORDSET_SIZE);
+ for (int i = 0; i < 100; i++) {
+ ByteBuffer record = ByteBuffer.wrap(("record-" + i).getBytes(UTF_8));
+ buffer.writeInt(record.remaining());
+ buffer.writeBytes(record);
+ }
+ byte[] uncompressedData = new byte[buffer.readableBytes()];
+ buffer.slice().readBytes(uncompressedData);
+
+ ByteBuf compressedBuf = codec.compress(buffer, 0);
+ byte[] compressedData = new byte[compressedBuf.readableBytes()];
+ compressedBuf.slice().readBytes(compressedData);
+
+ ByteBuf decompressedBuf = codec.decompress(compressedBuf, uncompressedData.length);
+ byte[] decompressedData = new byte[decompressedBuf.readableBytes()];
+ decompressedBuf.slice().readBytes(decompressedData);
+
+ buffer.release();
+ compressedBuf.release();
+ decompressedBuf.release();
+ }
+
+}
diff --git a/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/DistributedLogClientImpl.java b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/DistributedLogClientImpl.java
index 0ed93d0..ce851b5 100644
--- a/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/DistributedLogClientImpl.java
+++ b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/DistributedLogClientImpl.java
@@ -24,6 +24,8 @@
import com.google.common.base.Stopwatch;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
import org.apache.distributedlog.DLSN;
import org.apache.distributedlog.LogRecordSetBuffer;
import org.apache.distributedlog.client.monitor.MonitorServiceClient;
@@ -367,10 +369,12 @@
beforeComplete(sc, response.getHeader());
AbstractWriteOp.this.complete(sc.getAddress(), response);
}
+ onSendWriteRequestCompleted();
}
@Override
public void onFailure(Throwable cause) {
// handled by the ResponseHeader listener
+ onSendWriteRequestCompleted();
}
}).map(new AbstractFunction1<WriteResponse, ResponseHeader>() {
@Override
@@ -381,27 +385,58 @@
}
abstract Future<WriteResponse> sendWriteRequest(ProxyClient sc);
+
+ // action triggered on {@link #sendWriteRequest} completed (either succeed or failed)
+ void onSendWriteRequestCompleted() {
+ }
}
class WriteOp extends AbstractWriteOp {
+ final ByteBuf dataBuf;
final ByteBuffer data;
+ WriteOp(final String name, final ByteBuf dataBuf) {
+ super(name, clientStats.getOpStats("write"));
+ this.dataBuf = dataBuf;
+ this.data = dataBuf.nioBuffer();
+ }
+
WriteOp(final String name, final ByteBuffer data) {
super(name, clientStats.getOpStats("write"));
this.data = data;
+ this.dataBuf = Unpooled.wrappedBuffer(data);
+ }
+
+ @Override
+ void complete(SocketAddress address, WriteResponse response) {
+ super.complete(address, response);
+ release();
+ }
+
+ @Override
+ void fail(SocketAddress address, Throwable t) {
+ super.fail(address, t);
+ release();
}
@Override
Future<WriteResponse> sendWriteRequest(ProxyClient sc) {
- return sc.getService().writeWithContext(stream, data, ctx);
+ // retain the databuf when sending a write request
+ // release the databuf {@link #onSendWriteRequestCompleted()}
+ dataBuf.retain();
+ return sc.getService()
+ .writeWithContext(stream, data.duplicate(), ctx);
+ }
+
+ @Override
+ void onSendWriteRequestCompleted() {
+ dataBuf.release();
}
@Override
Long computeChecksum() {
if (null == crc32) {
- byte[] dataBytes = new byte[data.remaining()];
- data.duplicate().get(dataBytes);
- crc32 = ProtocolUtils.writeOpCRC32(stream, dataBytes);
+ crc32 = ProtocolUtils.writeOpCRC32(stream, data.duplicate());
}
return crc32;
}
@@ -414,6 +449,12 @@
}
});
}
+
+ void release() {
+ if (null != dataBuf) {
+ dataBuf.release();
+ }
+ }
}
class TruncateOp extends AbstractWriteOp {
@@ -456,7 +497,6 @@
}
-
class ReleaseOp extends AbstractWriteOp {
ReleaseOp(String name) {
diff --git a/distributedlog-proxy-protocol/src/main/java/org/apache/distributedlog/protocol/util/ProtocolUtils.java b/distributedlog-proxy-protocol/src/main/java/org/apache/distributedlog/protocol/util/ProtocolUtils.java
index 1f91968..1768f5c 100644
--- a/distributedlog-proxy-protocol/src/main/java/org/apache/distributedlog/protocol/util/ProtocolUtils.java
+++ b/distributedlog-proxy-protocol/src/main/java/org/apache/distributedlog/protocol/util/ProtocolUtils.java
@@ -19,6 +19,8 @@
import static com.google.common.base.Charsets.UTF_8;
+import io.netty.buffer.ByteBuf;
+import java.nio.ByteBuffer;
import org.apache.distributedlog.DLSN;
import java.util.zip.CRC32;
import org.apache.distributedlog.exceptions.DLException;
@@ -42,11 +44,25 @@
/**
* Generate crc32 for WriteOp.
*/
- public static Long writeOpCRC32(String stream, byte[] payload) {
+ public static Long writeOpCRC32(String stream, ByteBuf data) {
CRC32 crc = requestCRC.get();
try {
crc.update(stream.getBytes(UTF_8));
- crc.update(payload);
+ crc.update(data.nioBuffer());
+ return crc.getValue();
+ } finally {
+ crc.reset();
+ }
+ }
+
+ /**
+ * Generate crc32 for WriteOp.
+ */
+ public static Long writeOpCRC32(String stream, ByteBuffer data) {
+ CRC32 crc = requestCRC.get();
+ try {
+ crc.update(stream.getBytes(UTF_8));
+ crc.update(data);
return crc.getValue();
} finally {
crc.reset();
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/WriteOp.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/WriteOp.java
index e3aa703..b78ba00 100644
--- a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/WriteOp.java
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/WriteOp.java
@@ -53,7 +53,7 @@
private static final Logger logger = LoggerFactory.getLogger(WriteOp.class);
- private final byte[] payload;
+ private final ByteBuffer payload;
private final boolean isRecordSet;
// Stats
@@ -80,8 +80,7 @@
Feature checksumDisabledFeature,
AccessControlManager accessControlManager) {
super(stream, requestStat(statsLogger, "write"), checksum, checksumDisabledFeature);
- payload = new byte[data.remaining()];
- data.get(payload);
+ this.payload = data;
this.isRecordSet = isRecordSet;
final Partition partition = streamPartitionConverter.convert(stream);
@@ -121,12 +120,12 @@
@Override
public long getPayloadSize() {
- return payload.length;
+ return payload.remaining();
}
@Override
public Long computeChecksum() {
- return ProtocolUtils.writeOpCRC32(stream, payload);
+ return ProtocolUtils.writeOpCRC32(stream, payload.duplicate());
}
@Override
diff --git a/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/TestDistributedLogService.java b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/TestDistributedLogService.java
index 60f814e..ef1b5eb 100644
--- a/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/TestDistributedLogService.java
+++ b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/TestDistributedLogService.java
@@ -26,6 +26,7 @@
import static org.junit.Assert.fail;
import com.google.common.collect.Lists;
+import io.netty.buffer.Unpooled;
import org.apache.distributedlog.DLSN;
import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.distributedlog.TestDistributedLogBase;
@@ -569,7 +570,7 @@
public void testWriteOpChecksumBadStream() throws Exception {
DistributedLogServiceImpl localService = createConfiguredLocalService();
WriteContext ctx = new WriteContext().setCrc32(
- ProtocolUtils.writeOpCRC32("test", getTestDataBuffer().array()));
+ ProtocolUtils.writeOpCRC32("test", getTestDataBuffer()));
Future<WriteResponse> result = localService.writeWithContext("test1", getTestDataBuffer(), ctx);
WriteResponse resp = Await.result(result);
assertEquals(StatusCode.CHECKSUM_FAILED, resp.getHeader().getCode());
@@ -581,7 +582,7 @@
DistributedLogServiceImpl localService = createConfiguredLocalService();
ByteBuffer buffer = getTestDataBuffer();
WriteContext ctx = new WriteContext().setCrc32(
- ProtocolUtils.writeOpCRC32("test", buffer.array()));
+ ProtocolUtils.writeOpCRC32("test", buffer));
// Overwrite 1 byte to corrupt data.
buffer.put(1, (byte) 0xab);
@@ -658,11 +659,11 @@
WriteOp writeOp0 = getWriteOp(
streamName,
disabledFeature,
- ProtocolUtils.writeOpCRC32(streamName, "test".getBytes()));
+ ProtocolUtils.writeOpCRC32(streamName, Unpooled.wrappedBuffer("test".getBytes())));
WriteOp writeOp1 = getWriteOp(
streamName,
disabledFeature,
- ProtocolUtils.writeOpCRC32(streamName, "test".getBytes()));
+ ProtocolUtils.writeOpCRC32(streamName, Unpooled.wrappedBuffer("test".getBytes())));
writeOp0.preExecute();
disabledFeature.set(0);
diff --git a/pom.xml b/pom.xml
index bf2a57d..88bfa4a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -114,8 +114,9 @@
<junit.version>4.8.1</junit.version>
<libthrift.version>0.5.0-1</libthrift.version>
<lombok.version>1.16.16</lombok.version>
- <lz4.version>1.2.0</lz4.version>
+ <lz4.version>1.3.0</lz4.version>
<mockito.version>1.9.5</mockito.version>
+ <netty.version>4.1.12.Final</netty.version>
<scrooge.version>4.6.0</scrooge.version>
<slf4j.version>1.6.4</slf4j.version>
<stats-util.version>0.0.58</stats-util.version>
@@ -215,7 +216,7 @@
<version>${maven-surefire-plugin.version}</version>
<configuration>
<redirectTestOutputToFile>true</redirectTestOutputToFile>
- <argLine>-Xmx3G -Djava.net.preferIPv4Stack=true -XX:MaxDirectMemorySize=2G</argLine>
+ <argLine>-Xmx3G -Djava.net.preferIPv4Stack=true -XX:MaxDirectMemorySize=2G -Dio.netty.leakDetection.level=PARANOID</argLine>
<forkMode>always</forkMode>
<forkedProcessTimeoutInSeconds>1800</forkedProcessTimeoutInSeconds>
</configuration>
@@ -226,25 +227,30 @@
<version>${apache-rat-plugin.version}</version>
<configuration>
<excludes>
+ <exclude>ChangeLog</exclude>
+ <exclude>CONFIG.ini</exclude>
+ <exclude>GROUPS</exclude>
+ <exclude>OWNERS</exclude>
+ <exclude>dist/**/*</exclude>
<exclude>docs/**/*</exclude>
+ <exclude>scripts/dev/reviewers</exclude>
<exclude>website/**/*</exclude>
+ <exclude>**/*.md</exclude>
+ <exclude>**/apidocs/*</exclude>
+ <exclude>**/dependency-reduced-pom.xml</exclude>
+ <exclude>**/org/apache/distributedlog/thrift/*</exclude>
+ <!-- Git -->
<exclude>.git/**/*</exclude>
<exclude>.github/**/*</exclude>
<exclude>.gitignore</exclude>
<exclude>.idea/**/*</exclude>
- <exclude>dist/**/*</exclude>
+ <!-- Intellij -->
+ <exclude>**/*.iml</exclude>
+ <exclude>**/*.iws</exclude>
+ <exclude>**/*.ipr</exclude>
+ <!-- SVN -->
<exclude>**/.svn/**/*</exclude>
- <exclude>ChangeLog</exclude>
- <exclude>**/README.md</exclude>
- <exclude>**/apidocs/*</exclude>
- <exclude>GROUPS</exclude>
- <exclude>OWNERS</exclude>
- <exclude>CONFIG.ini</exclude>
- <exclude>**/**.md</exclude>
- <exclude>**/**.iml</exclude>
- <exclude>scripts/dev/reviewers</exclude>
- <exclude>**/dependency-reduced-pom.xml</exclude>
- <exclude>**/org/apache/distributedlog/thrift/*</exclude>
+ <!-- Maven -->
<exclude>.repository/**</exclude>
</excludes>
</configuration>
diff --git a/tests/jmh/conf/log4j.properties b/tests/jmh/conf/log4j.properties
new file mode 100644
index 0000000..af1cf5f
--- /dev/null
+++ b/tests/jmh/conf/log4j.properties
@@ -0,0 +1,56 @@
+#/**
+# * Licensed to the Apache Software Foundation (ASF) under one
+# * or more contributor license agreements. See the NOTICE file
+# * distributed with this work for additional information
+# * regarding copyright ownership. The ASF licenses this file
+# * to you under the Apache License, Version 2.0 (the
+# * "License"); you may not use this file except in compliance
+# * with the License. You may obtain a copy of the License at
+# *
+# * http://www.apache.org/licenses/LICENSE-2.0
+# *
+# * Unless required by applicable law or agreed to in writing, software
+# * distributed under the License is distributed on an "AS IS" BASIS,
+# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# * See the License for the specific language governing permissions and
+# * limitations under the License.
+# */
+
+#
+# DistributedLog Logging Configuration
+#
+
+# Default values
+dlog.root.logger=INFO, R
+dlog.log.dir=logs
+dlog.log.file=dlog.log
+
+log4j.rootLogger=${dlog.root.logger}
+log4j.logger.org.apache.zookeeper=INFO
+log4j.logger.org.apache.bookkeeper=INFO
+
+# redirect executor output to executors.log since slow op warnings can be quite verbose
+log4j.logger.org.apache.bookkeeper.util.SafeRunnable=INFO, Executors
+log4j.additivity.org.apache.bookkeeper.util.SafeRunnable=false
+
+log4j.appender.Executors=org.apache.log4j.RollingFileAppender
+log4j.appender.Executors.Threshold=INFO
+log4j.appender.Executors.File=${dlog.log.dir}/executors.log
+log4j.appender.Executors.MaxFileSize=20MB
+log4j.appender.Executors.MaxBackupIndex=5
+log4j.appender.Executors.layout=org.apache.log4j.PatternLayout
+log4j.appender.Executors.layout.ConversionPattern=%d{ISO8601} - %-5p - [%t:%C{1}@%L] - %m%n
+
+log4j.appender.R=org.apache.log4j.RollingFileAppender
+log4j.appender.R.Threshold=INFO
+log4j.appender.R.File=${dlog.log.dir}/${dlog.log.file}
+log4j.appender.R.MaxFileSize=20MB
+log4j.appender.R.MaxBackupIndex=50
+log4j.appender.R.layout=org.apache.log4j.PatternLayout
+log4j.appender.R.layout.ConversionPattern=%d{ISO8601} - %-5p - [%t:%C{1}@%L] - %m%n
+
+log4j.appender.stderr=org.apache.log4j.ConsoleAppender
+log4j.appender.stderr.Target=System.err
+log4j.appender.stderr.Threshold=INFO
+log4j.appender.stderr.layout=org.apache.log4j.PatternLayout
+log4j.appender.stderr.layout.ConversionPattern=%d{ISO8601} - %-5p - [%t:%C{1}@%L] - %m%n
diff --git a/tests/jmh/src/main/java/org/apache/distributedlog/tests/CompressionBenchmark.java b/tests/jmh/src/main/java/org/apache/distributedlog/tests/CompressionBenchmark.java
index af66212..1869e4f 100644
--- a/tests/jmh/src/main/java/org/apache/distributedlog/tests/CompressionBenchmark.java
+++ b/tests/jmh/src/main/java/org/apache/distributedlog/tests/CompressionBenchmark.java
@@ -20,6 +20,8 @@
*/
package org.apache.distributedlog.tests;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.stats.NullStatsLogger;
@@ -47,9 +49,9 @@
@Param({ "10", "100", "1000", "10000" })
int size;
- byte[] entry;
- byte[] compressedLz4;
- byte[] compressedNone;
+ ByteBuf entry;
+ ByteBuf compressedLz4;
+ ByteBuf compressedNone;
CompressionCodec codecLz4;
CompressionCodec codecNone;
OpStatsLogger opStatsLogger;
@@ -57,14 +59,16 @@
@Setup
public void prepare() {
Random r = new Random(System.currentTimeMillis());
- this.entry = new byte[this.size];
- r.nextBytes(entry);
+ byte[] data = new byte[this.size];
+ r.nextBytes(data);
+ this.entry = Unpooled.buffer(size);
+ this.entry.writeBytes(data);
this.codecLz4 = CompressionUtils.getCompressionCodec(Type.LZ4);
this.codecNone = CompressionUtils.getCompressionCodec(Type.NONE);
this.opStatsLogger = NullStatsLogger.INSTANCE.getOpStatsLogger("");
- this.compressedLz4 = codecLz4.compress(entry, 0, entry.length, opStatsLogger);
- this.compressedNone = codecNone.compress(entry, 0, entry.length, opStatsLogger);
+ this.compressedLz4 = codecLz4.compress(entry.slice(), 0);
+ this.compressedNone = codecNone.compress(entry.slice(), 0);
}
@@ -79,7 +83,8 @@
}
private void testCompress(CompressionCodec codec) {
- codec.compress(entry, 0, entry.length, opStatsLogger);
+ ByteBuf compressed = codec.compress(entry.slice(), 0);
+ compressed.release();
}
@Benchmark
@@ -92,8 +97,9 @@
testDecompress(codecNone, compressedNone);
}
- private void testDecompress(CompressionCodec codec, byte[] compressed) {
- codec.decompress(compressed, 0, compressed.length, opStatsLogger);
+ private void testDecompress(CompressionCodec codec, ByteBuf compressed) {
+ ByteBuf decompressed = codec.decompress(compressed.slice(), size);
+ decompressed.release();
}
}