blob: 7175953d66705e2fb6e58e8bbf3b5f7da0d6fe5b [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.kafka.common.record;
import java.io.DataInputStream;
import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.Iterator;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.utils.AbstractIterator;
/**
* A {@link Records} implementation backed by a ByteBuffer.
*/
public class MemoryRecords implements Records {
private final static int WRITE_LIMIT_FOR_READABLE_ONLY = -1;
// the compressor used for appends-only
private final Compressor compressor;
// the write limit for writable buffer, which may be smaller than the buffer capacity
private final int writeLimit;
// the capacity of the initial buffer, which is only used for de-allocation of writable records
private final int initialCapacity;
// the underlying buffer used for read; while the records are still writable it is null
private ByteBuffer buffer;
// indicate if the memory records is writable or not (i.e. used for appends or read-only)
private boolean writable;
// Construct a writable memory records
private MemoryRecords(ByteBuffer buffer, CompressionType type, boolean writable, int writeLimit) {
this.writable = writable;
this.writeLimit = writeLimit;
this.initialCapacity = buffer.capacity();
if (this.writable) {
this.buffer = null;
this.compressor = new Compressor(buffer, type);
} else {
this.buffer = buffer;
this.compressor = null;
}
}
public static MemoryRecords emptyRecords(ByteBuffer buffer, CompressionType type, int writeLimit) {
return new MemoryRecords(buffer, type, true, writeLimit);
}
public static MemoryRecords emptyRecords(ByteBuffer buffer, CompressionType type) {
// use the buffer capacity as the default write limit
return emptyRecords(buffer, type, buffer.capacity());
}
public static MemoryRecords readableRecords(ByteBuffer buffer) {
return new MemoryRecords(buffer, CompressionType.NONE, false, WRITE_LIMIT_FOR_READABLE_ONLY);
}
/**
* Append the given record and offset to the buffer
*/
public void append(long offset, Record record) {
if (!writable)
throw new IllegalStateException("Memory records is not writable");
int size = record.size();
compressor.putLong(offset);
compressor.putInt(size);
compressor.put(record.buffer());
compressor.recordWritten(size + Records.LOG_OVERHEAD);
record.buffer().rewind();
}
/**
* Append a new record and offset to the buffer
* @return crc of the record
*/
public long append(long offset, long timestamp, byte[] key, byte[] value) {
if (!writable)
throw new IllegalStateException("Memory records is not writable");
int size = Record.recordSize(key, value);
compressor.putLong(offset);
compressor.putInt(size);
long crc = compressor.putRecord(timestamp, key, value);
compressor.recordWritten(size + Records.LOG_OVERHEAD);
return crc;
}
/**
* Check if we have room for a new record containing the given key/value pair
*
* Note that the return value is based on the estimate of the bytes written to the compressor, which may not be
* accurate if compression is really used. When this happens, the following append may cause dynamic buffer
* re-allocation in the underlying byte buffer stream.
*
* There is an exceptional case when appending a single message whose size is larger than the batch size, the
* capacity will be the message size which is larger than the write limit, i.e. the batch size. In this case
* the checking should be based on the capacity of the initialized buffer rather than the write limit in order
* to accept this single record.
*/
public boolean hasRoomFor(byte[] key, byte[] value) {
if (!this.writable)
return false;
return this.compressor.numRecordsWritten() == 0 ?
this.initialCapacity >= Records.LOG_OVERHEAD + Record.recordSize(key, value) :
this.writeLimit >= this.compressor.estimatedBytesWritten() + Records.LOG_OVERHEAD + Record.recordSize(key, value);
}
public boolean isFull() {
return !this.writable || this.writeLimit <= this.compressor.estimatedBytesWritten();
}
/**
* Close this batch for no more appends
*/
public void close() {
if (writable) {
// close the compressor to fill-in wrapper message metadata if necessary
compressor.close();
// flip the underlying buffer to be ready for reads
buffer = compressor.buffer();
buffer.flip();
// reset the writable flag
writable = false;
}
}
/**
* The size of this record set
*/
public int sizeInBytes() {
if (writable) {
return compressor.buffer().position();
} else {
return buffer.limit();
}
}
/**
* The compression rate of this record set
*/
public double compressionRate() {
if (compressor == null)
return 1.0;
else
return compressor.compressionRate();
}
/**
* Return the capacity of the initial buffer, for writable records
* it may be different from the current buffer's capacity
*/
public int initialCapacity() {
return this.initialCapacity;
}
/**
* Get the byte buffer that backs this records instance for reading
*/
public ByteBuffer buffer() {
if (writable)
throw new IllegalStateException("The memory records must not be writable any more before getting its underlying buffer");
return buffer.duplicate();
}
@Override
public Iterator<LogEntry> iterator() {
if (writable) {
// flip on a duplicate buffer for reading
return new RecordsIterator((ByteBuffer) this.buffer.duplicate().flip(), CompressionType.NONE, false);
} else {
// do not need to flip for non-writable buffer
return new RecordsIterator(this.buffer.duplicate(), CompressionType.NONE, false);
}
}
@Override
public String toString() {
Iterator<LogEntry> iter = iterator();
StringBuilder builder = new StringBuilder();
builder.append('[');
while (iter.hasNext()) {
LogEntry entry = iter.next();
builder.append('(');
builder.append("offset=");
builder.append(entry.offset());
builder.append(",");
builder.append("record=");
builder.append(entry.record());
builder.append(")");
}
builder.append(']');
return builder.toString();
}
public static class RecordsIterator extends AbstractIterator<LogEntry> {
private final ByteBuffer buffer;
private final DataInputStream stream;
private final CompressionType type;
private final boolean shallow;
private RecordsIterator innerIter;
// The variables for inner iterator
private final ArrayDeque<LogEntry> logEntries;
private final long absoluteBaseOffset;
public RecordsIterator(ByteBuffer buffer, CompressionType type, boolean shallow) {
this.type = type;
this.buffer = buffer;
this.shallow = shallow;
this.stream = Compressor.wrapForInput(new ByteBufferInputStream(this.buffer), type);
this.logEntries = null;
this.absoluteBaseOffset = -1;
}
// Private constructor for inner iterator.
private RecordsIterator(LogEntry entry) {
this.type = entry.record().compressionType();
this.buffer = entry.record().value();
this.shallow = true;
this.stream = Compressor.wrapForInput(new ByteBufferInputStream(this.buffer), type);
long wrapperRecordOffset = entry.offset();
// If relative offset is used, we need to decompress the entire message first to compute
// the absolute offset.
if (entry.record().magic() > Record.MAGIC_VALUE_V0) {
this.logEntries = new ArrayDeque<>();
long wrapperRecordTimestamp = entry.record().timestamp();
while (true) {
try {
LogEntry logEntry = getNextEntryFromStream();
Record recordWithTimestamp = new Record(logEntry.record().buffer(),
wrapperRecordTimestamp,
entry.record().timestampType());
logEntries.add(new LogEntry(logEntry.offset(), recordWithTimestamp));
} catch (EOFException e) {
break;
} catch (IOException e) {
throw new KafkaException(e);
}
}
this.absoluteBaseOffset = wrapperRecordOffset - logEntries.getLast().offset();
} else {
this.logEntries = null;
this.absoluteBaseOffset = -1;
}
}
/*
* Read the next record from the buffer.
*
* Note that in the compressed message set, each message value size is set as the size of the un-compressed
* version of the message value, so when we do de-compression allocating an array of the specified size for
* reading compressed value data is sufficient.
*/
@Override
protected LogEntry makeNext() {
if (innerDone()) {
try {
LogEntry entry = getNextEntry();
// No more record to return.
if (entry == null)
return allDone();
// Convert offset to absolute offset if needed.
if (absoluteBaseOffset >= 0) {
long absoluteOffset = absoluteBaseOffset + entry.offset();
entry = new LogEntry(absoluteOffset, entry.record());
}
// decide whether to go shallow or deep iteration if it is compressed
CompressionType compression = entry.record().compressionType();
if (compression == CompressionType.NONE || shallow) {
return entry;
} else {
// init the inner iterator with the value payload of the message,
// which will de-compress the payload to a set of messages;
// since we assume nested compression is not allowed, the deep iterator
// would not try to further decompress underlying messages
// There will be at least one element in the inner iterator, so we don't
// need to call hasNext() here.
innerIter = new RecordsIterator(entry);
return innerIter.next();
}
} catch (EOFException e) {
return allDone();
} catch (IOException e) {
throw new KafkaException(e);
}
} else {
return innerIter.next();
}
}
private LogEntry getNextEntry() throws IOException {
if (logEntries != null)
return getNextEntryFromEntryList();
else
return getNextEntryFromStream();
}
private LogEntry getNextEntryFromEntryList() {
return logEntries.isEmpty() ? null : logEntries.remove();
}
private LogEntry getNextEntryFromStream() throws IOException {
// read the offset
long offset = stream.readLong();
// read record size
int size = stream.readInt();
if (size < 0)
throw new IllegalStateException("Record with size " + size);
// read the record, if compression is used we cannot depend on size
// and hence has to do extra copy
ByteBuffer rec;
if (type == CompressionType.NONE) {
rec = buffer.slice();
int newPos = buffer.position() + size;
if (newPos > buffer.limit())
return null;
buffer.position(newPos);
rec.limit(size);
} else {
byte[] recordBuffer = new byte[size];
stream.readFully(recordBuffer, 0, size);
rec = ByteBuffer.wrap(recordBuffer);
}
return new LogEntry(offset, new Record(rec));
}
private boolean innerDone() {
return innerIter == null || !innerIter.hasNext();
}
}
}