blob: 45c026f5e3c586e02d1cdfa49b39842001bcda29 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.db.rows;
import java.io.IOException;
import java.io.IOError;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.filter.ColumnFilter;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.utils.ByteBufferUtil;
/**
* Serialize/Deserialize an unfiltered row iterator.
*
* The serialization is composed of a header, follows by the rows and range tombstones of the iterator serialized
* until we read the end of the partition (see UnfilteredSerializer for details). The header itself
* is:
* {@code
* <cfid><key><flags><s_header>[<partition_deletion>][<static_row>][<row_estimate>]
* where:
* <cfid> is the table cfid.
* <key> is the partition key.
* <flags> contains bit flags. Each flag is set if it's corresponding bit is set. From rightmost
* bit to leftmost one, the flags are:
* - is empty: whether the iterator is empty. If so, nothing follows the <flags>
* - is reversed: whether the iterator is in reversed clustering order
* - has partition deletion: whether or not there is a <partition_deletion> following
* - has static row: whether or not there is a <static_row> following
* - has row estimate: whether or not there is a <row_estimate> following
* <s_header> is the {@code SerializationHeader}. It contains in particular the columns contains in the serialized
* iterator as well as other information necessary to decoding the serialized rows
* (see {@code SerializationHeader.Serializer for details}).
* <partition_deletion> is the deletion time for the partition (delta-encoded)
* <static_row> is the static row for this partition as serialized by UnfilteredSerializer.
* <row_estimate> is the (potentially estimated) number of rows serialized. This is only used for
* the purpose of sizing on the receiving end and should not be relied upon too strongly.
* }
*
* Please note that the format described above is the on-wire format. On-disk, the format is basically the
* same, but the header is written once per sstable, not once per-partition. Further, the actual row and
* range tombstones are not written using this class, but rather by {@link ColumnIndex}.
*/
public class UnfilteredRowIteratorSerializer
{
protected static final Logger logger = LoggerFactory.getLogger(UnfilteredRowIteratorSerializer.class);
private static final int IS_EMPTY = 0x01;
private static final int IS_REVERSED = 0x02;
private static final int HAS_PARTITION_DELETION = 0x04;
private static final int HAS_STATIC_ROW = 0x08;
private static final int HAS_ROW_ESTIMATE = 0x10;
public static final UnfilteredRowIteratorSerializer serializer = new UnfilteredRowIteratorSerializer();
// Should only be used for the on-wire format.
public void serialize(UnfilteredRowIterator iterator, ColumnFilter selection, DataOutputPlus out, int version) throws IOException
{
serialize(iterator, selection, out, version, -1);
}
// Should only be used for the on-wire format.
public void serialize(UnfilteredRowIterator iterator, ColumnFilter selection, DataOutputPlus out, int version, int rowEstimate) throws IOException
{
SerializationHeader header = new SerializationHeader(false,
iterator.metadata(),
iterator.columns(),
iterator.stats());
serialize(iterator, header, selection, out, version, rowEstimate);
}
// Should only be used for the on-wire format.
public void serialize(UnfilteredRowIterator iterator, SerializationHeader header, ColumnFilter selection, DataOutputPlus out, int version, int rowEstimate) throws IOException
{
assert !header.isForSSTable();
ByteBufferUtil.writeWithVIntLength(iterator.partitionKey().getKey(), out);
int flags = 0;
if (iterator.isReverseOrder())
flags |= IS_REVERSED;
if (iterator.isEmpty())
{
out.writeByte((byte)(flags | IS_EMPTY));
return;
}
DeletionTime partitionDeletion = iterator.partitionLevelDeletion();
if (!partitionDeletion.isLive())
flags |= HAS_PARTITION_DELETION;
Row staticRow = iterator.staticRow();
boolean hasStatic = staticRow != Rows.EMPTY_STATIC_ROW;
if (hasStatic)
flags |= HAS_STATIC_ROW;
if (rowEstimate >= 0)
flags |= HAS_ROW_ESTIMATE;
out.writeByte((byte)flags);
SerializationHeader.serializer.serializeForMessaging(header, selection, out, hasStatic);
if (!partitionDeletion.isLive())
header.writeDeletionTime(partitionDeletion, out);
if (hasStatic)
UnfilteredSerializer.serializer.serialize(staticRow, header, out, version);
if (rowEstimate >= 0)
out.writeUnsignedVInt(rowEstimate);
while (iterator.hasNext())
UnfilteredSerializer.serializer.serialize(iterator.next(), header, out, version);
UnfilteredSerializer.serializer.writeEndOfPartition(out);
}
// Please note that this consume the iterator, and as such should not be called unless we have a simple way to
// recreate an iterator for both serialize and serializedSize, which is mostly only PartitionUpdate/ArrayBackedCachedPartition.
public long serializedSize(UnfilteredRowIterator iterator, ColumnFilter selection, int version, int rowEstimate)
{
SerializationHeader header = new SerializationHeader(false,
iterator.metadata(),
iterator.columns(),
iterator.stats());
assert rowEstimate >= 0;
long size = ByteBufferUtil.serializedSizeWithVIntLength(iterator.partitionKey().getKey())
+ 1; // flags
if (iterator.isEmpty())
return size;
DeletionTime partitionDeletion = iterator.partitionLevelDeletion();
Row staticRow = iterator.staticRow();
boolean hasStatic = staticRow != Rows.EMPTY_STATIC_ROW;
size += SerializationHeader.serializer.serializedSizeForMessaging(header, selection, hasStatic);
if (!partitionDeletion.isLive())
size += header.deletionTimeSerializedSize(partitionDeletion);
if (hasStatic)
size += UnfilteredSerializer.serializer.serializedSize(staticRow, header, version);
if (rowEstimate >= 0)
size += TypeSizes.sizeofUnsignedVInt(rowEstimate);
while (iterator.hasNext())
size += UnfilteredSerializer.serializer.serializedSize(iterator.next(), header, version);
size += UnfilteredSerializer.serializer.serializedSizeEndOfPartition();
return size;
}
public Header deserializeHeader(CFMetaData metadata, ColumnFilter selection, DataInputPlus in, int version, SerializationHelper.Flag flag) throws IOException
{
DecoratedKey key = metadata.decorateKey(ByteBufferUtil.readWithVIntLength(in));
int flags = in.readUnsignedByte();
boolean isReversed = (flags & IS_REVERSED) != 0;
if ((flags & IS_EMPTY) != 0)
{
SerializationHeader sh = new SerializationHeader(false, metadata, PartitionColumns.NONE, EncodingStats.NO_STATS);
return new Header(sh, key, isReversed, true, null, null, 0);
}
boolean hasPartitionDeletion = (flags & HAS_PARTITION_DELETION) != 0;
boolean hasStatic = (flags & HAS_STATIC_ROW) != 0;
boolean hasRowEstimate = (flags & HAS_ROW_ESTIMATE) != 0;
SerializationHeader header = SerializationHeader.serializer.deserializeForMessaging(in, metadata, selection, hasStatic);
DeletionTime partitionDeletion = hasPartitionDeletion ? header.readDeletionTime(in) : DeletionTime.LIVE;
Row staticRow = Rows.EMPTY_STATIC_ROW;
if (hasStatic)
staticRow = UnfilteredSerializer.serializer.deserializeStaticRow(in, header, new SerializationHelper(metadata, version, flag));
int rowEstimate = hasRowEstimate ? (int)in.readUnsignedVInt() : -1;
return new Header(header, key, isReversed, false, partitionDeletion, staticRow, rowEstimate);
}
public UnfilteredRowIterator deserialize(DataInputPlus in, int version, CFMetaData metadata, SerializationHelper.Flag flag, Header header) throws IOException
{
if (header.isEmpty)
return EmptyIterators.unfilteredRow(metadata, header.key, header.isReversed);
final SerializationHelper helper = new SerializationHelper(metadata, version, flag);
final SerializationHeader sHeader = header.sHeader;
return new AbstractUnfilteredRowIterator(metadata, header.key, header.partitionDeletion, sHeader.columns(), header.staticRow, header.isReversed, sHeader.stats())
{
private final Row.Builder builder = BTreeRow.sortedBuilder();
protected Unfiltered computeNext()
{
try
{
Unfiltered unfiltered = UnfilteredSerializer.serializer.deserialize(in, sHeader, helper, builder);
return unfiltered == null ? endOfData() : unfiltered;
}
catch (IOException e)
{
throw new IOError(e);
}
}
};
}
public UnfilteredRowIterator deserialize(DataInputPlus in, int version, CFMetaData metadata, ColumnFilter selection, SerializationHelper.Flag flag) throws IOException
{
return deserialize(in, version, metadata, flag, deserializeHeader(metadata, selection, in, version, flag));
}
public static class Header
{
public final SerializationHeader sHeader;
public final DecoratedKey key;
public final boolean isReversed;
public final boolean isEmpty;
public final DeletionTime partitionDeletion;
public final Row staticRow;
public final int rowEstimate; // -1 if no estimate
private Header(SerializationHeader sHeader,
DecoratedKey key,
boolean isReversed,
boolean isEmpty,
DeletionTime partitionDeletion,
Row staticRow,
int rowEstimate)
{
this.sHeader = sHeader;
this.key = key;
this.isReversed = isReversed;
this.isEmpty = isEmpty;
this.partitionDeletion = partitionDeletion;
this.staticRow = staticRow;
this.rowEstimate = rowEstimate;
}
@Override
public String toString()
{
return String.format("{header=%s, key=%s, isReversed=%b, isEmpty=%b, del=%s, staticRow=%s, rowEstimate=%d}",
sHeader, key, isReversed, isEmpty, partitionDeletion, staticRow, rowEstimate);
}
}
}