| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.cassandra.db.partitions; |
| |
| import java.io.IOException; |
| |
| import org.apache.cassandra.db.*; |
| import org.apache.cassandra.db.filter.DataLimits; |
| import org.apache.cassandra.db.rows.*; |
| import org.apache.cassandra.io.ISerializer; |
| import org.apache.cassandra.io.util.DataInputPlus; |
| import org.apache.cassandra.io.util.DataOutputPlus; |
| import org.apache.cassandra.net.MessagingService; |
| import org.apache.cassandra.schema.Schema; |
| import org.apache.cassandra.schema.TableId; |
| import org.apache.cassandra.schema.TableMetadata; |
| import org.apache.cassandra.utils.btree.BTree; |
| |
| public class CachedBTreePartition extends ImmutableBTreePartition implements CachedPartition |
| { |
| private final int createdAtInSec; |
| |
| private final int cachedLiveRows; |
| private final int rowsWithNonExpiringCells; |
| |
| private CachedBTreePartition(TableMetadata metadata, |
| DecoratedKey partitionKey, |
| Holder holder, |
| int createdAtInSec, |
| int cachedLiveRows, |
| int rowsWithNonExpiringCells) |
| { |
| super(metadata, partitionKey, holder); |
| this.createdAtInSec = createdAtInSec; |
| this.cachedLiveRows = cachedLiveRows; |
| this.rowsWithNonExpiringCells = rowsWithNonExpiringCells; |
| } |
| |
| /** |
| * Creates an {@code ArrayBackedCachedPartition} holding all the data of the provided iterator. |
| * |
| * Warning: Note that this method does not close the provided iterator and it is |
| * up to the caller to do so. |
| * |
| * @param iterator the iterator got gather in memory. |
| * @param nowInSec the time of the creation in seconds. This is the time at which {@link #cachedLiveRows} applies. |
| * @return the created partition. |
| */ |
| public static CachedBTreePartition create(UnfilteredRowIterator iterator, int nowInSec) |
| { |
| return create(iterator, 16, nowInSec); |
| } |
| |
| /** |
| * Creates an {@code ArrayBackedCachedPartition} holding all the data of the provided iterator. |
| * |
| * Warning: Note that this method does not close the provided iterator and it is |
| * up to the caller to do so. |
| * |
| * @param iterator the iterator got gather in memory. |
| * @param initialRowCapacity sizing hint (in rows) to use for the created partition. It should ideally |
| * correspond or be a good estimation of the number or rows in {@code iterator}. |
| * @param nowInSec the time of the creation in seconds. This is the time at which {@link #cachedLiveRows} applies. |
| * @return the created partition. |
| */ |
| public static CachedBTreePartition create(UnfilteredRowIterator iterator, int initialRowCapacity, int nowInSec) |
| { |
| Holder holder = ImmutableBTreePartition.build(iterator, initialRowCapacity); |
| |
| int cachedLiveRows = 0; |
| int rowsWithNonExpiringCells = 0; |
| boolean enforceStrictLiveness = iterator.metadata().enforceStrictLiveness(); |
| |
| for (Row row : BTree.<Row>iterable(holder.tree)) |
| { |
| if (row.hasLiveData(nowInSec, enforceStrictLiveness)) |
| ++cachedLiveRows; |
| |
| boolean hasNonExpiringLiveCell = false; |
| for (Cell<?> cell : row.cells()) |
| { |
| if (!cell.isTombstone() && !cell.isExpiring()) |
| { |
| hasNonExpiringLiveCell = true; |
| break; |
| } |
| } |
| |
| if (hasNonExpiringLiveCell) |
| ++rowsWithNonExpiringCells; |
| } |
| |
| return new CachedBTreePartition(iterator.metadata(), |
| iterator.partitionKey(), |
| holder, |
| nowInSec, |
| cachedLiveRows, |
| rowsWithNonExpiringCells); |
| } |
| |
| /** |
| * The number of rows that were live at the time the partition was cached. |
| * |
| * See {@link ColumnFamilyStore#isFilterFullyCoveredBy} to see why we need this. |
| * |
| * @return the number of rows in this partition that were live at the time the |
| * partition was cached (this can be different from the number of live rows now |
| * due to expiring cells). |
| */ |
| public int cachedLiveRows() |
| { |
| return cachedLiveRows; |
| } |
| |
| /** |
| * The number of rows in this cached partition that have at least one non-expiring |
| * non-deleted cell. |
| * |
| * Note that this is generally not a very meaningful number, but this is used by |
| * {@link DataLimits#hasEnoughLiveData} as an optimization. |
| * |
| * @return the number of row that have at least one non-expiring non-deleted cell. |
| */ |
| public int rowsWithNonExpiringCells() |
| { |
| return rowsWithNonExpiringCells; |
| } |
| |
| static class Serializer implements ISerializer<CachedPartition> |
| { |
| public void serialize(CachedPartition partition, DataOutputPlus out) throws IOException |
| { |
| int version = MessagingService.current_version; |
| |
| assert partition instanceof CachedBTreePartition; |
| CachedBTreePartition p = (CachedBTreePartition)partition; |
| |
| out.writeInt(p.createdAtInSec); |
| out.writeInt(p.cachedLiveRows); |
| out.writeInt(p.rowsWithNonExpiringCells); |
| partition.metadata().id.serialize(out); |
| try (UnfilteredRowIterator iter = p.unfilteredIterator()) |
| { |
| UnfilteredRowIteratorSerializer.serializer.serialize(iter, null, out, version, p.rowCount()); |
| } |
| } |
| |
| public CachedPartition deserialize(DataInputPlus in) throws IOException |
| { |
| int version = MessagingService.current_version; |
| |
| // Note that it would be slightly simpler to just do |
| // ArrayBackedCachedPiartition.create(UnfilteredRowIteratorSerializer.serializer.deserialize(...)); |
| // However deserializing the header separatly is not a lot harder and allows us to: |
| // 1) get the capacity of the partition so we can size it properly directly |
| // 2) saves the creation of a temporary iterator: rows are directly written to the partition, which |
| // is slightly faster. |
| |
| int createdAtInSec = in.readInt(); |
| int cachedLiveRows = in.readInt(); |
| int rowsWithNonExpiringCells = in.readInt(); |
| |
| |
| TableMetadata metadata = Schema.instance.getExistingTableMetadata(TableId.deserialize(in)); |
| UnfilteredRowIteratorSerializer.Header header = UnfilteredRowIteratorSerializer.serializer.deserializeHeader(metadata, null, in, version, DeserializationHelper.Flag.LOCAL); |
| assert !header.isReversed && header.rowEstimate >= 0; |
| |
| Holder holder; |
| try (UnfilteredRowIterator partition = UnfilteredRowIteratorSerializer.serializer.deserialize(in, version, metadata, DeserializationHelper.Flag.LOCAL, header)) |
| { |
| holder = ImmutableBTreePartition.build(partition, header.rowEstimate); |
| } |
| |
| return new CachedBTreePartition(metadata, |
| header.key, |
| holder, |
| createdAtInSec, |
| cachedLiveRows, |
| rowsWithNonExpiringCells); |
| |
| } |
| |
| public long serializedSize(CachedPartition partition) |
| { |
| int version = MessagingService.current_version; |
| |
| assert partition instanceof CachedBTreePartition; |
| CachedBTreePartition p = (CachedBTreePartition)partition; |
| |
| try (UnfilteredRowIterator iter = p.unfilteredIterator()) |
| { |
| return TypeSizes.sizeof(p.createdAtInSec) |
| + TypeSizes.sizeof(p.cachedLiveRows) |
| + TypeSizes.sizeof(p.rowsWithNonExpiringCells) |
| + partition.metadata().id.serializedSize() |
| + UnfilteredRowIteratorSerializer.serializer.serializedSize(iter, null, MessagingService.current_version, p.rowCount()); |
| } |
| } |
| } |
| } |
| |