blob: 423d1e1656f86d9647aac26f7c4d52a5ce631ab0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.record;
import org.apache.kafka.common.errors.UnsupportedCompressionTypeException;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
public class RecordsUtil {
/**
* Down convert batches to the provided message format version. The first offset parameter is only relevant in the
* conversion from uncompressed v2 or higher to v1 or lower. The reason is that uncompressed records in v0 and v1
* are not batched (put another way, each batch always has 1 record).
*
* If a client requests records in v1 format starting from the middle of an uncompressed batch in v2 format, we
* need to drop records from the batch during the conversion. Some versions of librdkafka rely on this for
* correctness.
*
* The temporaryMemoryBytes computation assumes that the batches are not loaded into the heap
* (via classes like FileChannelRecordBatch) before this method is called. This is the case in the broker (we
* only load records into the heap when down converting), but it's not for the producer. However, down converting
* in the producer is very uncommon and the extra complexity to handle that case is not worth it.
*/
protected static ConvertedRecords<MemoryRecords> downConvert(Iterable<? extends RecordBatch> batches, byte toMagic,
long firstOffset, Time time) {
// maintain the batch along with the decompressed records to avoid the need to decompress again
List<RecordBatchAndRecords> recordBatchAndRecordsList = new ArrayList<>();
int totalSizeEstimate = 0;
long startNanos = time.nanoseconds();
for (RecordBatch batch : batches) {
if (toMagic < RecordBatch.MAGIC_VALUE_V2) {
if (batch.isControlBatch())
continue;
if (batch.compressionType() == CompressionType.ZSTD)
throw new UnsupportedCompressionTypeException("Down-conversion of zstandard-compressed batches " +
"is not supported");
}
if (batch.magic() <= toMagic) {
totalSizeEstimate += batch.sizeInBytes();
recordBatchAndRecordsList.add(new RecordBatchAndRecords(batch, null, null));
} else {
List<Record> records = new ArrayList<>();
for (Record record : batch) {
// See the method javadoc for an explanation
if (toMagic > RecordBatch.MAGIC_VALUE_V1 || batch.isCompressed() || record.offset() >= firstOffset)
records.add(record);
}
if (records.isEmpty())
continue;
final long baseOffset;
if (batch.magic() >= RecordBatch.MAGIC_VALUE_V2 && toMagic >= RecordBatch.MAGIC_VALUE_V2)
baseOffset = batch.baseOffset();
else
baseOffset = records.get(0).offset();
totalSizeEstimate += AbstractRecords.estimateSizeInBytes(toMagic, baseOffset, batch.compressionType(), records);
recordBatchAndRecordsList.add(new RecordBatchAndRecords(batch, records, baseOffset));
}
}
ByteBuffer buffer = ByteBuffer.allocate(totalSizeEstimate);
long temporaryMemoryBytes = 0;
int numRecordsConverted = 0;
for (RecordBatchAndRecords recordBatchAndRecords : recordBatchAndRecordsList) {
temporaryMemoryBytes += recordBatchAndRecords.batch.sizeInBytes();
if (recordBatchAndRecords.batch.magic() <= toMagic) {
buffer = Utils.ensureCapacity(buffer, buffer.position() + recordBatchAndRecords.batch.sizeInBytes());
recordBatchAndRecords.batch.writeTo(buffer);
} else {
MemoryRecordsBuilder builder = convertRecordBatch(toMagic, buffer, recordBatchAndRecords);
buffer = builder.buffer();
temporaryMemoryBytes += builder.uncompressedBytesWritten();
numRecordsConverted += builder.numRecords();
}
}
buffer.flip();
RecordConversionStats stats = new RecordConversionStats(temporaryMemoryBytes, numRecordsConverted,
time.nanoseconds() - startNanos);
return new ConvertedRecords<>(MemoryRecords.readableRecords(buffer), stats);
}
/**
* Return a buffer containing the converted record batches. The returned buffer may not be the same as the received
* one (e.g. it may require expansion).
*/
private static MemoryRecordsBuilder convertRecordBatch(byte magic, ByteBuffer buffer, RecordBatchAndRecords recordBatchAndRecords) {
RecordBatch batch = recordBatchAndRecords.batch;
final TimestampType timestampType = batch.timestampType();
long logAppendTime = timestampType == TimestampType.LOG_APPEND_TIME ? batch.maxTimestamp() : RecordBatch.NO_TIMESTAMP;
MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, batch.compressionType(),
timestampType, recordBatchAndRecords.baseOffset, logAppendTime);
for (Record record : recordBatchAndRecords.records) {
// Down-convert this record. Ignore headers when down-converting to V0 and V1 since they are not supported
if (magic > RecordBatch.MAGIC_VALUE_V1)
builder.append(record);
else
builder.appendWithOffset(record.offset(), record.timestamp(), record.key(), record.value());
}
builder.close();
return builder;
}
private static class RecordBatchAndRecords {
private final RecordBatch batch;
private final List<Record> records;
private final Long baseOffset;
private RecordBatchAndRecords(RecordBatch batch, List<Record> records, Long baseOffset) {
this.batch = batch;
this.records = records;
this.baseOffset = baseOffset;
}
}
}