| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.cassandra.db.rows; |
| |
| import java.io.IOException; |
| import java.io.IOError; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.cassandra.config.CFMetaData; |
| import org.apache.cassandra.db.*; |
| import org.apache.cassandra.db.filter.ColumnFilter; |
| import org.apache.cassandra.io.util.DataInputPlus; |
| import org.apache.cassandra.io.util.DataOutputPlus; |
| import org.apache.cassandra.utils.ByteBufferUtil; |
| |
| /** |
| * Serialize/Deserialize an unfiltered row iterator. |
| * |
| * The serialization is composed of a header, follows by the rows and range tombstones of the iterator serialized |
| * until we read the end of the partition (see UnfilteredSerializer for details). The header itself |
| * is: |
| * {@code |
| * <cfid><key><flags><s_header>[<partition_deletion>][<static_row>][<row_estimate>] |
| * where: |
| * <cfid> is the table cfid. |
| * <key> is the partition key. |
| * <flags> contains bit flags. Each flag is set if it's corresponding bit is set. From rightmost |
| * bit to leftmost one, the flags are: |
| * - is empty: whether the iterator is empty. If so, nothing follows the <flags> |
| * - is reversed: whether the iterator is in reversed clustering order |
| * - has partition deletion: whether or not there is a <partition_deletion> following |
| * - has static row: whether or not there is a <static_row> following |
| * - has row estimate: whether or not there is a <row_estimate> following |
| * <s_header> is the {@code SerializationHeader}. It contains in particular the columns contains in the serialized |
| * iterator as well as other information necessary to decoding the serialized rows |
| * (see {@code SerializationHeader.Serializer for details}). |
| * <partition_deletion> is the deletion time for the partition (delta-encoded) |
| * <static_row> is the static row for this partition as serialized by UnfilteredSerializer. |
| * <row_estimate> is the (potentially estimated) number of rows serialized. This is only used for |
| * the purpose of sizing on the receiving end and should not be relied upon too strongly. |
| * } |
| * |
| * Please note that the format described above is the on-wire format. On-disk, the format is basically the |
| * same, but the header is written once per sstable, not once per-partition. Further, the actual row and |
| * range tombstones are not written using this class, but rather by {@link ColumnIndex}. |
| */ |
| public class UnfilteredRowIteratorSerializer |
| { |
| protected static final Logger logger = LoggerFactory.getLogger(UnfilteredRowIteratorSerializer.class); |
| |
| private static final int IS_EMPTY = 0x01; |
| private static final int IS_REVERSED = 0x02; |
| private static final int HAS_PARTITION_DELETION = 0x04; |
| private static final int HAS_STATIC_ROW = 0x08; |
| private static final int HAS_ROW_ESTIMATE = 0x10; |
| |
| public static final UnfilteredRowIteratorSerializer serializer = new UnfilteredRowIteratorSerializer(); |
| |
| // Should only be used for the on-wire format. |
| public void serialize(UnfilteredRowIterator iterator, ColumnFilter selection, DataOutputPlus out, int version) throws IOException |
| { |
| serialize(iterator, selection, out, version, -1); |
| } |
| |
| // Should only be used for the on-wire format. |
| |
| public void serialize(UnfilteredRowIterator iterator, ColumnFilter selection, DataOutputPlus out, int version, int rowEstimate) throws IOException |
| { |
| |
| SerializationHeader header = new SerializationHeader(false, |
| iterator.metadata(), |
| iterator.columns(), |
| iterator.stats()); |
| |
| serialize(iterator, header, selection, out, version, rowEstimate); |
| } |
| |
| // Should only be used for the on-wire format. |
| public void serialize(UnfilteredRowIterator iterator, SerializationHeader header, ColumnFilter selection, DataOutputPlus out, int version, int rowEstimate) throws IOException |
| { |
| assert !header.isForSSTable(); |
| |
| ByteBufferUtil.writeWithVIntLength(iterator.partitionKey().getKey(), out); |
| |
| int flags = 0; |
| if (iterator.isReverseOrder()) |
| flags |= IS_REVERSED; |
| |
| if (iterator.isEmpty()) |
| { |
| out.writeByte((byte)(flags | IS_EMPTY)); |
| return; |
| } |
| |
| DeletionTime partitionDeletion = iterator.partitionLevelDeletion(); |
| if (!partitionDeletion.isLive()) |
| flags |= HAS_PARTITION_DELETION; |
| Row staticRow = iterator.staticRow(); |
| boolean hasStatic = staticRow != Rows.EMPTY_STATIC_ROW; |
| if (hasStatic) |
| flags |= HAS_STATIC_ROW; |
| |
| if (rowEstimate >= 0) |
| flags |= HAS_ROW_ESTIMATE; |
| |
| out.writeByte((byte)flags); |
| |
| SerializationHeader.serializer.serializeForMessaging(header, selection, out, hasStatic); |
| |
| if (!partitionDeletion.isLive()) |
| header.writeDeletionTime(partitionDeletion, out); |
| |
| if (hasStatic) |
| UnfilteredSerializer.serializer.serialize(staticRow, header, out, version); |
| |
| if (rowEstimate >= 0) |
| out.writeUnsignedVInt(rowEstimate); |
| |
| while (iterator.hasNext()) |
| UnfilteredSerializer.serializer.serialize(iterator.next(), header, out, version); |
| UnfilteredSerializer.serializer.writeEndOfPartition(out); |
| } |
| |
| // Please note that this consume the iterator, and as such should not be called unless we have a simple way to |
| // recreate an iterator for both serialize and serializedSize, which is mostly only PartitionUpdate/ArrayBackedCachedPartition. |
| public long serializedSize(UnfilteredRowIterator iterator, ColumnFilter selection, int version, int rowEstimate) |
| { |
| SerializationHeader header = new SerializationHeader(false, |
| iterator.metadata(), |
| iterator.columns(), |
| iterator.stats()); |
| |
| assert rowEstimate >= 0; |
| |
| long size = ByteBufferUtil.serializedSizeWithVIntLength(iterator.partitionKey().getKey()) |
| + 1; // flags |
| |
| if (iterator.isEmpty()) |
| return size; |
| |
| DeletionTime partitionDeletion = iterator.partitionLevelDeletion(); |
| Row staticRow = iterator.staticRow(); |
| boolean hasStatic = staticRow != Rows.EMPTY_STATIC_ROW; |
| |
| size += SerializationHeader.serializer.serializedSizeForMessaging(header, selection, hasStatic); |
| |
| if (!partitionDeletion.isLive()) |
| size += header.deletionTimeSerializedSize(partitionDeletion); |
| |
| if (hasStatic) |
| size += UnfilteredSerializer.serializer.serializedSize(staticRow, header, version); |
| |
| if (rowEstimate >= 0) |
| size += TypeSizes.sizeofUnsignedVInt(rowEstimate); |
| |
| while (iterator.hasNext()) |
| size += UnfilteredSerializer.serializer.serializedSize(iterator.next(), header, version); |
| size += UnfilteredSerializer.serializer.serializedSizeEndOfPartition(); |
| |
| return size; |
| } |
| |
| public Header deserializeHeader(CFMetaData metadata, ColumnFilter selection, DataInputPlus in, int version, SerializationHelper.Flag flag) throws IOException |
| { |
| DecoratedKey key = metadata.decorateKey(ByteBufferUtil.readWithVIntLength(in)); |
| int flags = in.readUnsignedByte(); |
| boolean isReversed = (flags & IS_REVERSED) != 0; |
| if ((flags & IS_EMPTY) != 0) |
| { |
| SerializationHeader sh = new SerializationHeader(false, metadata, PartitionColumns.NONE, EncodingStats.NO_STATS); |
| return new Header(sh, key, isReversed, true, null, null, 0); |
| } |
| |
| boolean hasPartitionDeletion = (flags & HAS_PARTITION_DELETION) != 0; |
| boolean hasStatic = (flags & HAS_STATIC_ROW) != 0; |
| boolean hasRowEstimate = (flags & HAS_ROW_ESTIMATE) != 0; |
| |
| SerializationHeader header = SerializationHeader.serializer.deserializeForMessaging(in, metadata, selection, hasStatic); |
| |
| DeletionTime partitionDeletion = hasPartitionDeletion ? header.readDeletionTime(in) : DeletionTime.LIVE; |
| |
| Row staticRow = Rows.EMPTY_STATIC_ROW; |
| if (hasStatic) |
| staticRow = UnfilteredSerializer.serializer.deserializeStaticRow(in, header, new SerializationHelper(metadata, version, flag)); |
| |
| int rowEstimate = hasRowEstimate ? (int)in.readUnsignedVInt() : -1; |
| return new Header(header, key, isReversed, false, partitionDeletion, staticRow, rowEstimate); |
| } |
| |
| public UnfilteredRowIterator deserialize(DataInputPlus in, int version, CFMetaData metadata, SerializationHelper.Flag flag, Header header) throws IOException |
| { |
| if (header.isEmpty) |
| return EmptyIterators.unfilteredRow(metadata, header.key, header.isReversed); |
| |
| final SerializationHelper helper = new SerializationHelper(metadata, version, flag); |
| final SerializationHeader sHeader = header.sHeader; |
| return new AbstractUnfilteredRowIterator(metadata, header.key, header.partitionDeletion, sHeader.columns(), header.staticRow, header.isReversed, sHeader.stats()) |
| { |
| private final Row.Builder builder = BTreeRow.sortedBuilder(); |
| |
| protected Unfiltered computeNext() |
| { |
| try |
| { |
| Unfiltered unfiltered = UnfilteredSerializer.serializer.deserialize(in, sHeader, helper, builder); |
| return unfiltered == null ? endOfData() : unfiltered; |
| } |
| catch (IOException e) |
| { |
| throw new IOError(e); |
| } |
| } |
| }; |
| } |
| |
| public UnfilteredRowIterator deserialize(DataInputPlus in, int version, CFMetaData metadata, ColumnFilter selection, SerializationHelper.Flag flag) throws IOException |
| { |
| return deserialize(in, version, metadata, flag, deserializeHeader(metadata, selection, in, version, flag)); |
| } |
| |
| public static class Header |
| { |
| public final SerializationHeader sHeader; |
| public final DecoratedKey key; |
| public final boolean isReversed; |
| public final boolean isEmpty; |
| public final DeletionTime partitionDeletion; |
| public final Row staticRow; |
| public final int rowEstimate; // -1 if no estimate |
| |
| private Header(SerializationHeader sHeader, |
| DecoratedKey key, |
| boolean isReversed, |
| boolean isEmpty, |
| DeletionTime partitionDeletion, |
| Row staticRow, |
| int rowEstimate) |
| { |
| this.sHeader = sHeader; |
| this.key = key; |
| this.isReversed = isReversed; |
| this.isEmpty = isEmpty; |
| this.partitionDeletion = partitionDeletion; |
| this.staticRow = staticRow; |
| this.rowEstimate = rowEstimate; |
| } |
| |
| @Override |
| public String toString() |
| { |
| return String.format("{header=%s, key=%s, isReversed=%b, isEmpty=%b, del=%s, staticRow=%s, rowEstimate=%d}", |
| sHeader, key, isReversed, isEmpty, partitionDeletion, staticRow, rowEstimate); |
| } |
| } |
| } |