| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.cassandra.io.sstable.metadata; |
| |
| import java.nio.ByteBuffer; |
| import java.util.Collections; |
| import java.util.EnumMap; |
| import java.util.Map; |
| import java.util.UUID; |
| |
| import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus; |
| import com.clearspring.analytics.stream.cardinality.ICardinality; |
| import org.apache.cassandra.db.Clustering; |
| import org.apache.cassandra.db.ClusteringBound; |
| import org.apache.cassandra.db.ClusteringBoundOrBoundary; |
| import org.apache.cassandra.db.ClusteringComparator; |
| import org.apache.cassandra.db.ClusteringPrefix; |
| import org.apache.cassandra.db.DeletionTime; |
| import org.apache.cassandra.db.LivenessInfo; |
| import org.apache.cassandra.db.SerializationHeader; |
| import org.apache.cassandra.db.Slice; |
| import org.apache.cassandra.db.commitlog.CommitLogPosition; |
| import org.apache.cassandra.db.commitlog.IntervalSet; |
| import org.apache.cassandra.db.partitions.PartitionStatisticsCollector; |
| import org.apache.cassandra.db.rows.Cell; |
| import org.apache.cassandra.db.rows.Unfiltered; |
| import org.apache.cassandra.io.sstable.SSTable; |
| import org.apache.cassandra.io.sstable.format.SSTableReader; |
| import org.apache.cassandra.service.ActiveRepairService; |
| import org.apache.cassandra.service.StorageService; |
| import org.apache.cassandra.utils.ByteBufferUtil; |
| import org.apache.cassandra.utils.EstimatedHistogram; |
| import org.apache.cassandra.utils.FBUtilities; |
| import org.apache.cassandra.utils.MurmurHash; |
| import org.apache.cassandra.utils.TimeUUID; |
| import org.apache.cassandra.utils.streamhist.StreamingTombstoneHistogramBuilder; |
| import org.apache.cassandra.utils.streamhist.TombstoneHistogram; |
| |
| public class MetadataCollector implements PartitionStatisticsCollector |
| { |
| public static final double NO_COMPRESSION_RATIO = -1.0; |
| |
| private long currentPartitionCells = 0; |
| |
| static EstimatedHistogram defaultCellPerPartitionCountHistogram() |
| { |
| // EH of 118 can track a max value of 4139110981, i.e., > 4B cells |
| return new EstimatedHistogram(118); |
| } |
| |
| static EstimatedHistogram defaultPartitionSizeHistogram() |
| { |
| // EH of 155 can track a max value of 3520571548412 i.e. 3.5TB |
| return new EstimatedHistogram(155); |
| |
| } |
| |
| static TombstoneHistogram defaultTombstoneDropTimeHistogram() |
| { |
| return TombstoneHistogram.createDefault(); |
| } |
| |
| public static StatsMetadata defaultStatsMetadata() |
| { |
| return new StatsMetadata(defaultPartitionSizeHistogram(), |
| defaultCellPerPartitionCountHistogram(), |
| IntervalSet.empty(), |
| Long.MIN_VALUE, |
| Long.MAX_VALUE, |
| Integer.MAX_VALUE, |
| Integer.MAX_VALUE, |
| 0, |
| Integer.MAX_VALUE, |
| NO_COMPRESSION_RATIO, |
| defaultTombstoneDropTimeHistogram(), |
| 0, |
| Collections.emptyList(), |
| Slice.ALL, |
| true, |
| ActiveRepairService.UNREPAIRED_SSTABLE, |
| -1, |
| -1, |
| null, |
| null, |
| false, |
| true, |
| ByteBufferUtil.EMPTY_BYTE_BUFFER, |
| ByteBufferUtil.EMPTY_BYTE_BUFFER); |
| } |
| |
| protected EstimatedHistogram estimatedPartitionSize = defaultPartitionSizeHistogram(); |
| // TODO: cound the number of row per partition (either with the number of cells, or instead) |
| protected EstimatedHistogram estimatedCellPerPartitionCount = defaultCellPerPartitionCountHistogram(); |
| protected IntervalSet<CommitLogPosition> commitLogIntervals = IntervalSet.empty(); |
| protected final MinMaxLongTracker timestampTracker = new MinMaxLongTracker(); |
| protected final MinMaxIntTracker localDeletionTimeTracker = new MinMaxIntTracker(Cell.NO_DELETION_TIME, Cell.NO_DELETION_TIME); |
| protected final MinMaxIntTracker ttlTracker = new MinMaxIntTracker(Cell.NO_TTL, Cell.NO_TTL); |
| protected double compressionRatio = NO_COMPRESSION_RATIO; |
| protected StreamingTombstoneHistogramBuilder estimatedTombstoneDropTime = new StreamingTombstoneHistogramBuilder(SSTable.TOMBSTONE_HISTOGRAM_BIN_SIZE, SSTable.TOMBSTONE_HISTOGRAM_SPOOL_SIZE, SSTable.TOMBSTONE_HISTOGRAM_TTL_ROUND_SECONDS); |
| protected int sstableLevel; |
| |
| /** |
| * The smallest clustering prefix for any {@link Unfiltered} in the sstable. |
| * |
| * <p>This is always either a Clustering, or a start bound (since for any end range tombstone bound, there should |
| * be a corresponding start bound that is smaller). |
| */ |
| private ClusteringPrefix<?> minClustering = ClusteringBound.MAX_START; |
| /** |
| * The largest clustering prefix for any {@link Unfiltered} in the sstable. |
| * |
| * <p>This is always either a Clustering, or an end bound (since for any start range tombstone bound, there should |
| * be a corresponding end bound that is bigger). |
| */ |
| private ClusteringPrefix<?> maxClustering = ClusteringBound.MIN_END; |
| private boolean clusteringInitialized = false; |
| |
| protected boolean hasLegacyCounterShards = false; |
| private boolean hasPartitionLevelDeletions = false; |
| protected long totalColumnsSet; |
| protected long totalRows; |
| public int totalTombstones; |
| |
| /** |
| * Default cardinality estimation method is to use HyperLogLog++. |
| * Parameter here(p=13, sp=25) should give reasonable estimation |
| * while lowering bytes required to hold information. |
| * See CASSANDRA-5906 for detail. |
| */ |
| protected ICardinality cardinality = new HyperLogLogPlus(13, 25); |
| private final ClusteringComparator comparator; |
| private final int nowInSec = FBUtilities.nowInSeconds(); |
| |
| private final UUID originatingHostId; |
| |
| public MetadataCollector(ClusteringComparator comparator) |
| { |
| this(comparator, StorageService.instance.getLocalHostUUID()); |
| } |
| |
| public MetadataCollector(ClusteringComparator comparator, UUID originatingHostId) |
| { |
| this.comparator = comparator; |
| this.originatingHostId = originatingHostId; |
| } |
| |
| public MetadataCollector(Iterable<SSTableReader> sstables, ClusteringComparator comparator, int level) |
| { |
| this(comparator); |
| |
| IntervalSet.Builder<CommitLogPosition> intervals = new IntervalSet.Builder<>(); |
| if (originatingHostId != null) |
| { |
| for (SSTableReader sstable : sstables) |
| { |
| if (originatingHostId.equals(sstable.getSSTableMetadata().originatingHostId)) |
| intervals.addAll(sstable.getSSTableMetadata().commitLogIntervals); |
| } |
| } |
| commitLogIntervals(intervals.build()); |
| sstableLevel(level); |
| } |
| |
| public MetadataCollector addKey(ByteBuffer key) |
| { |
| long hashed = MurmurHash.hash2_64(key, key.position(), key.remaining(), 0); |
| cardinality.offerHashed(hashed); |
| totalTombstones = 0; |
| return this; |
| } |
| |
| public MetadataCollector addPartitionSizeInBytes(long partitionSize) |
| { |
| estimatedPartitionSize.add(partitionSize); |
| return this; |
| } |
| |
| public MetadataCollector addCellPerPartitionCount(long cellCount) |
| { |
| estimatedCellPerPartitionCount.add(cellCount); |
| return this; |
| } |
| |
| public MetadataCollector addCellPerPartitionCount() |
| { |
| estimatedCellPerPartitionCount.add(currentPartitionCells); |
| currentPartitionCells = 0; |
| return this; |
| } |
| |
| /** |
| * Ratio is compressed/uncompressed and it is |
| * if you have 1.x then compression isn't helping |
| */ |
| public MetadataCollector addCompressionRatio(long compressed, long uncompressed) |
| { |
| compressionRatio = (double) compressed/uncompressed; |
| return this; |
| } |
| |
| public void update(LivenessInfo newInfo) |
| { |
| if (newInfo.isEmpty()) |
| return; |
| |
| updateTimestamp(newInfo.timestamp()); |
| updateTTL(newInfo.ttl()); |
| updateLocalDeletionTime(newInfo.localExpirationTime()); |
| if (!newInfo.isLive(nowInSec)) |
| updateTombstoneCount(); |
| } |
| |
| public void update(Cell<?> cell) |
| { |
| ++currentPartitionCells; |
| updateTimestamp(cell.timestamp()); |
| updateTTL(cell.ttl()); |
| updateLocalDeletionTime(cell.localDeletionTime()); |
| if (!cell.isLive(nowInSec)) |
| updateTombstoneCount(); |
| } |
| |
| public void updatePartitionDeletion(DeletionTime dt) |
| { |
| if (!dt.isLive()) |
| hasPartitionLevelDeletions = true; |
| update(dt); |
| } |
| |
| public void update(DeletionTime dt) |
| { |
| if (!dt.isLive()) |
| { |
| updateTimestamp(dt.markedForDeleteAt()); |
| updateLocalDeletionTime(dt.localDeletionTime()); |
| updateTombstoneCount(); |
| } |
| } |
| |
| public void updateColumnSetPerRow(long columnSetInRow) |
| { |
| totalColumnsSet += columnSetInRow; |
| ++totalRows; |
| } |
| |
| private void updateTimestamp(long newTimestamp) |
| { |
| timestampTracker.update(newTimestamp); |
| } |
| |
| private void updateLocalDeletionTime(int newLocalDeletionTime) |
| { |
| localDeletionTimeTracker.update(newLocalDeletionTime); |
| if (newLocalDeletionTime != Cell.NO_DELETION_TIME) |
| estimatedTombstoneDropTime.update(newLocalDeletionTime); |
| } |
| |
| private void updateTombstoneCount() |
| { |
| ++totalTombstones; |
| } |
| |
| private void updateTTL(int newTTL) |
| { |
| ttlTracker.update(newTTL); |
| } |
| |
| public MetadataCollector commitLogIntervals(IntervalSet<CommitLogPosition> commitLogIntervals) |
| { |
| this.commitLogIntervals = commitLogIntervals; |
| return this; |
| } |
| |
| public MetadataCollector sstableLevel(int sstableLevel) |
| { |
| this.sstableLevel = sstableLevel; |
| return this; |
| } |
| |
| public void updateClusteringValues(Clustering<?> clustering) |
| { |
| if (clustering == Clustering.STATIC_CLUSTERING) |
| return; |
| |
| // In case of monotonically growing stream of clusterings, we will usually require only one comparison |
| // because if we detected X is greater than the current MAX, then it cannot be lower than the current MIN |
| // at the same time. The only case when we need to update MIN when the current MAX was detected to be updated |
| // is the case when MIN was not yet initialized and still point the ClusteringBound.MAX_START |
| if (comparator.compare(clustering, maxClustering) > 0) |
| { |
| maxClustering = clustering; |
| if (minClustering == ClusteringBound.MAX_START) |
| minClustering = clustering; |
| } |
| else if (comparator.compare(clustering, minClustering) < 0) |
| { |
| minClustering = clustering; |
| } |
| } |
| |
| public void updateClusteringValuesByBoundOrBoundary(ClusteringBoundOrBoundary<?> clusteringBoundOrBoundary) |
| { |
| // In a SSTable, every opening marker will be closed, so the start of a range tombstone marker will never be |
| // the maxClustering (the corresponding close might though) and there is no point in doing the comparison |
| // (and vice-versa for the close). By the same reasoning, a boundary will never be either the min or max |
| // clustering, and we can save on comparisons. |
| if (clusteringBoundOrBoundary.isBoundary()) |
| return; |
| |
| // see the comment in updateClusteringValues(Clustering) |
| if (comparator.compare(clusteringBoundOrBoundary, maxClustering) > 0) |
| { |
| if (clusteringBoundOrBoundary.kind().isEnd()) |
| maxClustering = clusteringBoundOrBoundary; |
| |
| // note that since we excluded boundaries above, there is no way that the provided clustering prefix is |
| // a start and en end at the same time |
| else if (minClustering == ClusteringBound.MAX_START) |
| minClustering = clusteringBoundOrBoundary; |
| } |
| else if (comparator.compare(clusteringBoundOrBoundary, minClustering) < 0) |
| { |
| if (clusteringBoundOrBoundary.kind().isStart()) |
| minClustering = clusteringBoundOrBoundary; |
| else if (maxClustering == ClusteringBound.MIN_END) |
| maxClustering = clusteringBoundOrBoundary; |
| } |
| } |
| |
| public void updateHasLegacyCounterShards(boolean hasLegacyCounterShards) |
| { |
| this.hasLegacyCounterShards = this.hasLegacyCounterShards || hasLegacyCounterShards; |
| } |
| |
| public Map<MetadataType, MetadataComponent> finalizeMetadata(String partitioner, double bloomFilterFPChance, long repairedAt, TimeUUID pendingRepair, boolean isTransient, SerializationHeader header, ByteBuffer firstKey, ByteBuffer lastKey) |
| { |
| assert minClustering.kind() == ClusteringPrefix.Kind.CLUSTERING || minClustering.kind().isStart(); |
| assert maxClustering.kind() == ClusteringPrefix.Kind.CLUSTERING || maxClustering.kind().isEnd(); |
| |
| Map<MetadataType, MetadataComponent> components = new EnumMap<>(MetadataType.class); |
| components.put(MetadataType.VALIDATION, new ValidationMetadata(partitioner, bloomFilterFPChance)); |
| components.put(MetadataType.STATS, new StatsMetadata(estimatedPartitionSize, |
| estimatedCellPerPartitionCount, |
| commitLogIntervals, |
| timestampTracker.min(), |
| timestampTracker.max(), |
| localDeletionTimeTracker.min(), |
| localDeletionTimeTracker.max(), |
| ttlTracker.min(), |
| ttlTracker.max(), |
| compressionRatio, |
| estimatedTombstoneDropTime.build(), |
| sstableLevel, |
| comparator.subtypes(), |
| Slice.make(minClustering.retainable().asStartBound(), maxClustering.retainable().asEndBound()), |
| hasLegacyCounterShards, |
| repairedAt, |
| totalColumnsSet, |
| totalRows, |
| originatingHostId, |
| pendingRepair, |
| isTransient, |
| hasPartitionLevelDeletions, |
| firstKey, |
| lastKey)); |
| components.put(MetadataType.COMPACTION, new CompactionMetadata(cardinality)); |
| components.put(MetadataType.HEADER, header.toComponent()); |
| return components; |
| } |
| |
| /** |
| * Release large memory objects while keeping metrics intact |
| */ |
| public void release() |
| { |
| estimatedTombstoneDropTime.releaseBuffers(); |
| } |
| |
| public static class MinMaxLongTracker |
| { |
| private final long defaultMin; |
| private final long defaultMax; |
| |
| private boolean isSet = false; |
| private long min; |
| private long max; |
| |
| public MinMaxLongTracker() |
| { |
| this(Long.MIN_VALUE, Long.MAX_VALUE); |
| } |
| |
| public MinMaxLongTracker(long defaultMin, long defaultMax) |
| { |
| this.defaultMin = defaultMin; |
| this.defaultMax = defaultMax; |
| } |
| |
| public void update(long value) |
| { |
| if (!isSet) |
| { |
| min = max = value; |
| isSet = true; |
| } |
| else |
| { |
| if (value < min) |
| min = value; |
| if (value > max) |
| max = value; |
| } |
| } |
| |
| public long min() |
| { |
| return isSet ? min : defaultMin; |
| } |
| |
| public long max() |
| { |
| return isSet ? max : defaultMax; |
| } |
| } |
| |
| public static class MinMaxIntTracker |
| { |
| private final int defaultMin; |
| private final int defaultMax; |
| |
| private boolean isSet = false; |
| private int min; |
| private int max; |
| |
| public MinMaxIntTracker() |
| { |
| this(Integer.MIN_VALUE, Integer.MAX_VALUE); |
| } |
| |
| public MinMaxIntTracker(int defaultMin, int defaultMax) |
| { |
| this.defaultMin = defaultMin; |
| this.defaultMax = defaultMax; |
| } |
| |
| public void update(int value) |
| { |
| if (!isSet) |
| { |
| min = max = value; |
| isSet = true; |
| } |
| else |
| { |
| if (value < min) |
| min = value; |
| if (value > max) |
| max = value; |
| } |
| } |
| |
| public int min() |
| { |
| return isSet ? min : defaultMin; |
| } |
| |
| public int max() |
| { |
| return isSet ? max : defaultMax; |
| } |
| } |
| } |