blob: 51cd5f2a14d4fe9b4062ecacf6c4e97ffcb2ccb6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.db.memtable;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterators;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.DataRange;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.PartitionPosition;
import org.apache.cassandra.db.RegularAndStaticColumns;
import org.apache.cassandra.db.Slices;
import org.apache.cassandra.db.commitlog.CommitLogPosition;
import org.apache.cassandra.db.filter.ClusteringIndexFilter;
import org.apache.cassandra.db.filter.ColumnFilter;
import org.apache.cassandra.db.partitions.AbstractUnfilteredPartitionIterator;
import org.apache.cassandra.db.partitions.AtomicBTreePartition;
import org.apache.cassandra.db.partitions.Partition;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
import org.apache.cassandra.db.rows.EncodingStats;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.dht.Bounds;
import org.apache.cassandra.dht.IncludingExcludingBounds;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.index.transactions.UpdateTransaction;
import org.apache.cassandra.io.sstable.format.SSTableReadsListener;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.schema.TableMetadataRef;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.concurrent.OpOrder;
import org.apache.cassandra.utils.memory.Cloner;
import org.apache.cassandra.utils.memory.MemtableAllocator;
import org.github.jamm.Unmetered;
/**
* A proof-of-concept sharded memtable implementation. This implementation splits the partition skip-list into several
* independent skip-lists each covering a roughly equal part of the token space served by this node. This reduces
* congestion of the skip-list from concurrent writes and can lead to improved write throughput.
*
* The implementation takes two parameters:
* - shards: the number of shards to split into.
* - serialize_writes: if false, each shard may serve multiple writes in parallel; if true, writes to each shard are
* synchronized.
*
* Also see Memtable_API.md.
*/
public class ShardedSkipListMemtable extends AbstractAllocatorMemtable
{
private static final Logger logger = LoggerFactory.getLogger(ShardedSkipListMemtable.class);
public static final String SHARDS_OPTION = "shards";
public static final String LOCKING_OPTION = "serialize_writes";
// The boundaries for the keyspace as they were calculated when the memtable is created.
// The boundaries will be NONE for system keyspaces or if StorageService is not yet initialized.
// The fact this is fixed for the duration of the memtable lifetime, guarantees we'll always pick the same shard
// for a given key, even if we race with the StorageService initialization or with topology changes.
@Unmetered
final ShardBoundaries boundaries;
/**
* Core-specific memtable regions. All writes must go through the specific core. The data structures used
* are concurrent-read safe, thus reads can be carried out from any thread.
*/
final MemtableShard[] shards;
@VisibleForTesting
public static final String SHARD_COUNT_PROPERTY = "cassandra.memtable.shard.count";
// default shard count, used when a specific number of shards is not specified in the parameters
private static final int SHARD_COUNT = Integer.getInteger(SHARD_COUNT_PROPERTY, FBUtilities.getAvailableProcessors());
private final Factory factory;
// only to be used by init(), to setup the very first memtable for the cfs
ShardedSkipListMemtable(AtomicReference<CommitLogPosition> commitLogLowerBound,
TableMetadataRef metadataRef,
Owner owner,
Integer shardCountOption,
Factory factory)
{
super(commitLogLowerBound, metadataRef, owner);
int shardCount = shardCountOption != null ? shardCountOption : SHARD_COUNT;
this.boundaries = owner.localRangeSplits(shardCount);
this.shards = generatePartitionShards(boundaries.shardCount(), allocator, metadataRef);
this.factory = factory;
}
private static MemtableShard[] generatePartitionShards(int splits,
MemtableAllocator allocator,
TableMetadataRef metadata)
{
MemtableShard[] partitionMapContainer = new MemtableShard[splits];
for (int i = 0; i < splits; i++)
partitionMapContainer[i] = new MemtableShard(metadata, allocator);
return partitionMapContainer;
}
public boolean isClean()
{
for (MemtableShard shard : shards)
if (!shard.isEmpty())
return false;
return true;
}
@Override
protected Memtable.Factory factory()
{
return factory;
}
/**
* Should only be called by ColumnFamilyStore.apply via Keyspace.apply, which supplies the appropriate
* OpOrdering.
*
* commitLogSegmentPosition should only be null if this is a secondary index, in which case it is *expected* to be null
*/
public long put(PartitionUpdate update, UpdateTransaction indexer, OpOrder.Group opGroup)
{
DecoratedKey key = update.partitionKey();
MemtableShard shard = shards[boundaries.getShardForKey(key)];
return shard.put(key, update, indexer, opGroup);
}
/**
* Technically we should scatter gather on all the core threads because the size in following calls are not
* using volatile variables, but for metrics purpose this should be good enough.
*/
@Override
public long getLiveDataSize()
{
long total = 0L;
for (MemtableShard shard : shards)
total += shard.liveDataSize();
return total;
}
@Override
public long operationCount()
{
long total = 0L;
for (MemtableShard shard : shards)
total += shard.currentOperations();
return total;
}
@Override
public long partitionCount()
{
int total = 0;
for (MemtableShard shard : shards)
total += shard.size();
return total;
}
/**
* Returns the minTS if one available, otherwise NO_MIN_TIMESTAMP.
*
* EncodingStats uses a synthetic epoch TS at 2015. We don't want to leak that (CASSANDRA-18118) so we return NO_MIN_TIMESTAMP instead.
*
* @return The minTS or NO_MIN_TIMESTAMP if none available
*/
@Override
public long getMinTimestamp()
{
long min = Long.MAX_VALUE;
for (MemtableShard shard : shards)
min = Long.min(min, shard.minTimestamp());
return min != EncodingStats.NO_STATS.minTimestamp ? min : NO_MIN_TIMESTAMP;
}
@Override
public int getMinLocalDeletionTime()
{
int min = Integer.MAX_VALUE;
for (MemtableShard shard : shards)
min = Integer.min(min, shard.minLocalDeletionTime());
return min;
}
@Override
RegularAndStaticColumns columns()
{
for (MemtableShard shard : shards)
columnsCollector.update(shard.columnsCollector);
return columnsCollector.get();
}
@Override
EncodingStats encodingStats()
{
for (MemtableShard shard : shards)
statsCollector.update(shard.statsCollector.get());
return statsCollector.get();
}
@Override
public MemtableUnfilteredPartitionIterator partitionIterator(final ColumnFilter columnFilter,
final DataRange dataRange,
SSTableReadsListener readsListener)
{
AbstractBounds<PartitionPosition> keyRange = dataRange.keyRange();
PartitionPosition left = keyRange.left;
PartitionPosition right = keyRange.right;
boolean isBound = keyRange instanceof Bounds;
boolean includeStart = isBound || keyRange instanceof IncludingExcludingBounds;
boolean includeStop = isBound || keyRange instanceof Range;
Iterator<AtomicBTreePartition> iterator = getPartitionIterator(left, includeStart, right, includeStop);
return new MemtableUnfilteredPartitionIterator(metadata(), iterator, columnFilter, dataRange);
// readsListener is ignored as it only accepts sstable signals
}
private Iterator<AtomicBTreePartition> getPartitionIterator(PartitionPosition left, boolean includeStart, PartitionPosition right, boolean includeStop)
{
int leftShard = left != null && !left.isMinimum() ? boundaries.getShardForKey(left) : 0;
int rightShard = right != null && !right.isMinimum() ? boundaries.getShardForKey(right) : boundaries.shardCount() - 1;
Iterator<AtomicBTreePartition> iterator;
if (leftShard == rightShard)
iterator = shards[leftShard].getPartitionsSubMap(left, includeStart, right, includeStop).values().iterator();
else
{
Iterator<AtomicBTreePartition>[] iters = new Iterator[rightShard - leftShard + 1];
int i = leftShard;
iters[0] = shards[leftShard].getPartitionsSubMap(left, includeStart, null, true).values().iterator();
for (++i; i < rightShard; ++i)
iters[i - leftShard] = shards[i].partitions.values().iterator();
iters[i - leftShard] = shards[i].getPartitionsSubMap(null, true, right, includeStop).values().iterator();
iterator = Iterators.concat(iters);
}
return iterator;
}
private Partition getPartition(DecoratedKey key)
{
int shardIndex = boundaries.getShardForKey(key);
return shards[shardIndex].partitions.get(key);
}
@Override
public UnfilteredRowIterator rowIterator(DecoratedKey key, Slices slices, ColumnFilter selectedColumns, boolean reversed, SSTableReadsListener listener)
{
Partition p = getPartition(key);
if (p == null)
return null;
else
return p.unfilteredIterator(selectedColumns, slices, reversed);
}
@Override
public UnfilteredRowIterator rowIterator(DecoratedKey key)
{
Partition p = getPartition(key);
return p != null ? p.unfilteredIterator() : null;
}
public FlushablePartitionSet<AtomicBTreePartition> getFlushSet(PartitionPosition from, PartitionPosition to)
{
long keySize = 0;
int keyCount = 0;
for (Iterator<AtomicBTreePartition> it = getPartitionIterator(from, true, to,false); it.hasNext();)
{
AtomicBTreePartition en = it.next();
keySize += en.partitionKey().getKey().remaining();
keyCount++;
}
long partitionKeySize = keySize;
int partitionCount = keyCount;
Iterator<AtomicBTreePartition> toFlush = getPartitionIterator(from, true, to,false);
return new AbstractFlushablePartitionSet<AtomicBTreePartition>()
{
public Memtable memtable()
{
return ShardedSkipListMemtable.this;
}
public PartitionPosition from()
{
return from;
}
public PartitionPosition to()
{
return to;
}
public long partitionCount()
{
return partitionCount;
}
public Iterator<AtomicBTreePartition> iterator()
{
return toFlush;
}
public long partitionKeysSize()
{
return partitionKeySize;
}
};
}
static class MemtableShard
{
// The following fields are volatile as we have to make sure that when we
// collect results from all sub-ranges, the thread accessing the value
// is guaranteed to see the changes to the values.
// The smallest timestamp for all partitions stored in this shard
private final AtomicLong minTimestamp = new AtomicLong(Long.MAX_VALUE);
private final AtomicInteger minLocalDeletionTime = new AtomicInteger(Integer.MAX_VALUE);
private final AtomicLong liveDataSize = new AtomicLong(0);
private final AtomicLong currentOperations = new AtomicLong(0);
// We index the memtable by PartitionPosition only for the purpose of being able
// to select key range using Token.KeyBound. However put() ensures that we
// actually only store DecoratedKey.
private final ConcurrentNavigableMap<PartitionPosition, AtomicBTreePartition> partitions = new ConcurrentSkipListMap<>();
private final ColumnsCollector columnsCollector;
private final StatsCollector statsCollector;
@Unmetered // total pool size should not be included in memtable's deep size
private final MemtableAllocator allocator;
private final TableMetadataRef metadata;
@VisibleForTesting
MemtableShard(TableMetadataRef metadata, MemtableAllocator allocator)
{
this.columnsCollector = new ColumnsCollector(metadata.get().regularAndStaticColumns());
this.statsCollector = new StatsCollector();
this.allocator = allocator;
this.metadata = metadata;
}
public long put(DecoratedKey key, PartitionUpdate update, UpdateTransaction indexer, OpOrder.Group opGroup)
{
Cloner cloner = allocator.cloner(opGroup);
AtomicBTreePartition previous = partitions.get(key);
long initialSize = 0;
if (previous == null)
{
final DecoratedKey cloneKey = cloner.clone(key);
AtomicBTreePartition empty = new AtomicBTreePartition(metadata, cloneKey, allocator);
// We'll add the columns later. This avoids wasting works if we get beaten in the putIfAbsent
previous = partitions.putIfAbsent(cloneKey, empty);
if (previous == null)
{
previous = empty;
// allocate the row overhead after the fact; this saves over allocating and having to free after, but
// means we can overshoot our declared limit.
int overhead = (int) (cloneKey.getToken().getHeapSize() + SkipListMemtable.ROW_OVERHEAD_HEAP_SIZE);
allocator.onHeap().allocate(overhead, opGroup);
initialSize = 8;
}
}
long[] pair = previous.addAllWithSizeDelta(update, cloner, opGroup, indexer);
updateMin(minTimestamp, update.stats().minTimestamp);
updateMin(minLocalDeletionTime, update.stats().minLocalDeletionTime);
liveDataSize.addAndGet(initialSize + pair[0]);
columnsCollector.update(update.columns());
statsCollector.update(update.stats());
currentOperations.addAndGet(update.operationCount());
return pair[1];
}
private Map<PartitionPosition, AtomicBTreePartition> getPartitionsSubMap(PartitionPosition left,
boolean includeLeft,
PartitionPosition right,
boolean includeRight)
{
if (left != null && left.isMinimum())
left = null;
if (right != null && right.isMinimum())
right = null;
try
{
if (left == null)
return right == null ? partitions : partitions.headMap(right, includeRight);
else
return right == null
? partitions.tailMap(left, includeLeft)
: partitions.subMap(left, includeLeft, right, includeRight);
}
catch (IllegalArgumentException e)
{
logger.error("Invalid range requested {} - {}", left, right);
throw e;
}
}
public boolean isEmpty()
{
return partitions.isEmpty();
}
public int size()
{
return partitions.size();
}
long minTimestamp()
{
return minTimestamp.get();
}
long liveDataSize()
{
return liveDataSize.get();
}
long currentOperations()
{
return currentOperations.get();
}
public int minLocalDeletionTime()
{
return minLocalDeletionTime.get();
}
}
public static class MemtableUnfilteredPartitionIterator extends AbstractUnfilteredPartitionIterator implements UnfilteredPartitionIterator
{
private final TableMetadata metadata;
private final Iterator<AtomicBTreePartition> iter;
private final ColumnFilter columnFilter;
private final DataRange dataRange;
public MemtableUnfilteredPartitionIterator(TableMetadata metadata, Iterator<AtomicBTreePartition> iterator, ColumnFilter columnFilter, DataRange dataRange)
{
this.metadata = metadata;
this.iter = iterator;
this.columnFilter = columnFilter;
this.dataRange = dataRange;
}
public TableMetadata metadata()
{
return metadata;
}
public boolean hasNext()
{
return iter.hasNext();
}
public UnfilteredRowIterator next()
{
AtomicBTreePartition entry = iter.next();
DecoratedKey key = entry.partitionKey();
ClusteringIndexFilter filter = dataRange.clusteringIndexFilter(key);
return filter.getUnfilteredRowIterator(columnFilter, entry);
}
}
static class Locking extends ShardedSkipListMemtable
{
Locking(AtomicReference<CommitLogPosition> commitLogLowerBound, TableMetadataRef metadataRef, Owner owner, Integer shardCountOption, Factory factory)
{
super(commitLogLowerBound, metadataRef, owner, shardCountOption, factory);
}
/**
* Should only be called by ColumnFamilyStore.apply via Keyspace.apply, which supplies the appropriate
* OpOrdering.
*
* commitLogSegmentPosition should only be null if this is a secondary index, in which case it is *expected* to be null
*/
public long put(PartitionUpdate update, UpdateTransaction indexer, OpOrder.Group opGroup)
{
DecoratedKey key = update.partitionKey();
MemtableShard shard = shards[boundaries.getShardForKey(key)];
synchronized (shard)
{
return shard.put(key, update, indexer, opGroup);
}
}
}
public static Factory factory(Map<String, String> optionsCopy)
{
String shardsString = optionsCopy.remove(SHARDS_OPTION);
Integer shardCount = shardsString != null ? Integer.parseInt(shardsString) : null;
boolean isLocking = Boolean.parseBoolean(optionsCopy.remove(LOCKING_OPTION));
return new Factory(shardCount, isLocking);
}
static class Factory implements Memtable.Factory
{
final Integer shardCount;
final boolean isLocking;
Factory(Integer shardCount, boolean isLocking)
{
this.shardCount = shardCount;
this.isLocking = isLocking;
}
public Memtable create(AtomicReference<CommitLogPosition> commitLogLowerBound,
TableMetadataRef metadataRef,
Owner owner)
{
return isLocking
? new Locking(commitLogLowerBound, metadataRef, owner, shardCount, this)
: new ShardedSkipListMemtable(commitLogLowerBound, metadataRef, owner, shardCount, this);
}
public boolean equals(Object o)
{
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
Factory factory = (Factory) o;
return Objects.equals(shardCount, factory.shardCount);
}
public int hashCode()
{
return Objects.hash(shardCount);
}
}
}