blob: 56a860aa841809437c207ae5a80940b9dcdc3ccf [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.metrics;
import java.io.Closeable;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.TreeSet;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import com.google.common.annotations.VisibleForTesting;
import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.db.rows.Cell;
import org.apache.cassandra.db.rows.RangeTombstoneMarker;
import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.db.transform.Transformation;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.sstable.SSTable;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.service.StorageService;
import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis;
/**
* Tracks top partitions, currently by size and by tombstone count
*
* Collects during full and preview (-vd) repair since then we read the full partition
*
* Note that since we can run sub range repair there might be windows where the top partitions are not correct -
* for example, assume we track the top 2 partitions for this node:
*
* tokens with size:
* (a, 100); (b, 40); (c, 10); (d, 100); (e, 50); (f, 10)
* - top2: a, d
* now a is deleted and we run a repair for keys [a, c]
* - top2: b, d
* and when we repair [d, f]
* - top2: d, e
*
*/
public class TopPartitionTracker implements Closeable
{
private final static String SIZES = "SIZES";
private final static String TOMBSTONES = "TOMBSTONES";
private final AtomicReference<TopHolder> topSizes = new AtomicReference<>();
private final AtomicReference<TopHolder> topTombstones = new AtomicReference<>();
private final TableMetadata metadata;
private final Future<?> scheduledSave;
private long lastTombstoneSave = 0;
private long lastSizeSave = 0;
public TopPartitionTracker(TableMetadata metadata)
{
this.metadata = metadata;
topSizes.set(new TopHolder(SystemKeyspace.getTopPartitions(metadata, SIZES),
DatabaseDescriptor.getMaxTopSizePartitionCount(),
DatabaseDescriptor.getMinTrackedPartitionSizeInBytes().toBytes()));
topTombstones.set(new TopHolder(SystemKeyspace.getTopPartitions(metadata, TOMBSTONES),
DatabaseDescriptor.getMaxTopTombstonePartitionCount(),
DatabaseDescriptor.getMinTrackedPartitionTombstoneCount()));
scheduledSave = ScheduledExecutors.optionalTasks.scheduleAtFixedRate(this::save, 60, 60, TimeUnit.MINUTES);
}
public void close()
{
scheduledSave.cancel(true);
}
@VisibleForTesting
public void save()
{
TopHolder sizes = topSizes.get();
if (!sizes.top.isEmpty() && sizes.lastUpdate > lastSizeSave)
{
SystemKeyspace.saveTopPartitions(metadata, SIZES, sizes.top, sizes.lastUpdate);
lastSizeSave = sizes.lastUpdate;
}
TopHolder tombstones = topTombstones.get();
if (!tombstones.top.isEmpty() && tombstones.lastUpdate > lastTombstoneSave)
{
SystemKeyspace.saveTopPartitions(metadata, TOMBSTONES, tombstones.top, tombstones.lastUpdate);
lastTombstoneSave = tombstones.lastUpdate;
}
}
public void merge(Collector collector)
{
while (true)
{
TopHolder cur = topSizes.get();
TopHolder newSizes = cur.merge(collector.sizes, StorageService.instance.getLocalReplicas(metadata.keyspace).ranges());
if (topSizes.compareAndSet(cur, newSizes))
break;
}
while (true)
{
TopHolder cur = topTombstones.get();
TopHolder newTombstones = cur.merge(collector.tombstones, StorageService.instance.getLocalReplicas(metadata.keyspace).ranges());
if (topTombstones.compareAndSet(cur, newTombstones))
break;
}
}
@Override
public String toString()
{
return "TopPartitionTracker:\n" +
"topSizes:\n" + topSizes.get() + '\n' +
"topTombstones:\n" + topTombstones.get() + '\n';
}
public Map<String, Long> getTopTombstonePartitionMap()
{
return topTombstones.get().toMap(metadata);
}
public Map<String, Long> getTopSizePartitionMap()
{
return topSizes.get().toMap(metadata);
}
@VisibleForTesting
public TopHolder topSizes()
{
return topSizes.get();
}
@VisibleForTesting
public TopHolder topTombstones()
{
return topTombstones.get();
}
public static class Collector
{
private final TopHolder tombstones;
private final TopHolder sizes;
public Collector(Collection<Range<Token>> ranges)
{
this.tombstones = new TopHolder(DatabaseDescriptor.getMaxTopTombstonePartitionCount(),
DatabaseDescriptor.getMinTrackedPartitionTombstoneCount(),
ranges);
this.sizes = new TopHolder(DatabaseDescriptor.getMaxTopSizePartitionCount(),
DatabaseDescriptor.getMinTrackedPartitionSizeInBytes().toBytes(),
ranges);
}
public void trackTombstoneCount(DecoratedKey key, long count)
{
tombstones.track(key, count);
}
public void trackPartitionSize(DecoratedKey key, long size)
{
sizes.track(key, size);
}
public String toString()
{
return "tombstones:\n"+tombstones+"\nsizes:\n"+sizes;
}
}
public static class TopHolder
{
public final NavigableSet<TopPartition> top;
private final int maxTopPartitionCount;
private final long minTrackedValue;
private final Collection<Range<Token>> ranges;
private long currentMinValue = Long.MAX_VALUE;
public final long lastUpdate;
private TopHolder(int maxTopPartitionCount, long minTrackedValue, Collection<Range<Token>> ranges)
{
this(maxTopPartitionCount, minTrackedValue, new TreeSet<>(), ranges, 0);
}
private TopHolder(int maxTopPartitionCount, long minTrackedValue, NavigableSet<TopPartition> top, Collection<Range<Token>> ranges, long lastUpdate)
{
this.maxTopPartitionCount = maxTopPartitionCount;
this.minTrackedValue = minTrackedValue;
this.top = top;
this.ranges = ranges;
this.lastUpdate = lastUpdate;
}
private TopHolder(StoredTopPartitions storedTopPartitions,
int maxTopPartitionCount,
long minTrackedValue)
{
this.maxTopPartitionCount = maxTopPartitionCount;
this.minTrackedValue = minTrackedValue;
top = new TreeSet<>();
this.ranges = null;
this.lastUpdate = storedTopPartitions.lastUpdated;
for (TopPartition topPartition : storedTopPartitions.topPartitions)
track(topPartition);
}
public void track(DecoratedKey key, long value)
{
if (value < minTrackedValue)
return;
if (top.size() < maxTopPartitionCount || value > currentMinValue)
track(new TopPartition(SSTable.getMinimalKey(key), value));
}
private void track(TopPartition tp)
{
top.add(tp);
while (top.size() > maxTopPartitionCount)
{
top.pollLast();
currentMinValue = top.last().value;
}
currentMinValue = Math.min(tp.value, currentMinValue);
}
/**
* we merge any pre-existing top partitions on to the ones we just collected if they are outside of the
* range collected.
*
* This means that if a large partition is deleted it will disappear from the top partitions
*
* @param holder the newly collected holder - this will get copied and any existing token outside of the collected ranges will get added to the copy
* @param ownedRanges the ranges this node owns - any existing token outside of these ranges will get dropped
*/
public TopHolder merge(TopHolder holder, Collection<Range<Token>> ownedRanges)
{
TopHolder mergedHolder = holder.cloneForMerging(currentTimeMillis());
for (TopPartition existingTop : top)
{
if (!Range.isInRanges(existingTop.key.getToken(), mergedHolder.ranges) &&
(ownedRanges.isEmpty() || Range.isInRanges(existingTop.key.getToken(), ownedRanges))) // make sure we drop any tokens that we don't own anymore
mergedHolder.track(existingTop);
}
return mergedHolder;
}
private TopHolder cloneForMerging(long lastUpdate)
{
return new TopHolder(maxTopPartitionCount, minTrackedValue, new TreeSet<>(top), ranges, lastUpdate);
}
public String toString()
{
int i = 0;
Iterator<TopPartition> it = top.iterator();
StringBuilder sb = new StringBuilder();
while (it.hasNext())
{
i++;
sb.append(i).append(':').append(it.next()).append(System.lineSeparator());
}
return sb.toString();
}
public Map<String, Long> toMap(TableMetadata metadata)
{
Map<String, Long> topPartitionsMap = new LinkedHashMap<>();
for (TopPartitionTracker.TopPartition topPartition : top)
{
String key = metadata.partitionKeyType.getString(topPartition.key.getKey());
topPartitionsMap.put(key, topPartition.value);
}
return topPartitionsMap;
}
}
private static final Comparator<TopPartition> comparator = (o1, o2) -> {
int cmp = -Long.compare(o1.value, o2.value);
if (cmp != 0) return cmp;
return o1.key.compareTo(o2.key);
};
public static class TopPartition implements Comparable<TopPartition>
{
public final DecoratedKey key;
public final long value;
public TopPartition(DecoratedKey key, long value)
{
this.key = key;
this.value = value;
}
@Override
public int compareTo(TopPartition o)
{
return comparator.compare(this, o);
}
@Override
public String toString()
{
return "TopPartition{" +
"key=" + key +
", value=" + value +
'}';
}
}
public static class TombstoneCounter extends Transformation<UnfilteredRowIterator>
{
private final TopPartitionTracker.Collector collector;
private final int nowInSec;
private long tombstoneCount = 0;
private DecoratedKey key = null;
public TombstoneCounter(TopPartitionTracker.Collector collector, int nowInSec)
{
this.collector = collector;
this.nowInSec = nowInSec;
}
@Override
public Row applyToRow(Row row)
{
if (!row.deletion().isLive())
tombstoneCount++;
if (row.hasDeletion(nowInSec))
{
for (Cell<?> c : row.cells())
if (c.isTombstone())
tombstoneCount++;
}
return row;
}
@Override
public RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker)
{
tombstoneCount++;
return marker;
}
@Override
protected UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition)
{
reset(partition.partitionKey());
if (!partition.partitionLevelDeletion().isLive())
tombstoneCount++;
return Transformation.apply(partition, this);
}
private void reset(DecoratedKey key)
{
tombstoneCount = 0;
this.key = key;
}
@Override
public void onPartitionClose()
{
collector.trackTombstoneCount(key, tombstoneCount);
}
}
public static class StoredTopPartitions
{
public static StoredTopPartitions EMPTY = new StoredTopPartitions(Collections.emptyList(), 0);
public final List<TopPartition> topPartitions;
public final long lastUpdated;
public StoredTopPartitions(List<TopPartition> topPartitions, long lastUpdated)
{
this.topPartitions = topPartitions;
this.lastUpdated = lastUpdated;
}
}
}