blob: 5af789e5135e8cbd381c30213428ec0e48f4f9e7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.db;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.SchemaConstants;
import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.db.commitlog.CommitLogPosition;
import org.apache.cassandra.db.commitlog.IntervalSet;
import org.apache.cassandra.db.filter.ClusteringIndexFilter;
import org.apache.cassandra.db.filter.ColumnFilter;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.db.partitions.*;
import org.apache.cassandra.db.rows.EncodingStats;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.dht.*;
import org.apache.cassandra.dht.Murmur3Partitioner.LongToken;
import org.apache.cassandra.index.transactions.UpdateTransaction;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.SSTableMultiWriter;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.ObjectSizes;
import org.apache.cassandra.utils.concurrent.OpOrder;
import org.apache.cassandra.utils.memory.HeapPool;
import org.apache.cassandra.utils.memory.MemtableAllocator;
import org.apache.cassandra.utils.memory.MemtablePool;
import org.apache.cassandra.utils.memory.NativePool;
import org.apache.cassandra.utils.memory.SlabPool;
public class Memtable implements Comparable<Memtable>
{
private static final Logger logger = LoggerFactory.getLogger(Memtable.class);
public static final MemtablePool MEMORY_POOL = createMemtableAllocatorPool();
private static MemtablePool createMemtableAllocatorPool()
{
long heapLimit = DatabaseDescriptor.getMemtableHeapSpaceInMb() << 20;
long offHeapLimit = DatabaseDescriptor.getMemtableOffheapSpaceInMb() << 20;
switch (DatabaseDescriptor.getMemtableAllocationType())
{
case unslabbed_heap_buffers:
return new HeapPool(heapLimit, DatabaseDescriptor.getMemtableCleanupThreshold(), new ColumnFamilyStore.FlushLargestColumnFamily());
case heap_buffers:
return new SlabPool(heapLimit, 0, DatabaseDescriptor.getMemtableCleanupThreshold(), new ColumnFamilyStore.FlushLargestColumnFamily());
case offheap_buffers:
if (!FileUtils.isCleanerAvailable)
{
throw new IllegalStateException("Could not free direct byte buffer: offheap_buffers is not a safe memtable_allocation_type without this ability, please adjust your config. This feature is only guaranteed to work on an Oracle JVM. Refusing to start.");
}
return new SlabPool(heapLimit, offHeapLimit, DatabaseDescriptor.getMemtableCleanupThreshold(), new ColumnFamilyStore.FlushLargestColumnFamily());
case offheap_objects:
return new NativePool(heapLimit, offHeapLimit, DatabaseDescriptor.getMemtableCleanupThreshold(), new ColumnFamilyStore.FlushLargestColumnFamily());
default:
throw new AssertionError();
}
}
private static final int ROW_OVERHEAD_HEAP_SIZE = estimateRowOverhead(Integer.parseInt(System.getProperty("cassandra.memtable_row_overhead_computation_step", "100000")));
private final MemtableAllocator allocator;
private final AtomicLong liveDataSize = new AtomicLong(0);
private final AtomicLong currentOperations = new AtomicLong(0);
// the write barrier for directing writes to this memtable or the next during a switch
private volatile OpOrder.Barrier writeBarrier;
// the precise upper bound of CommitLogPosition owned by this memtable
private volatile AtomicReference<CommitLogPosition> commitLogUpperBound;
// the precise lower bound of CommitLogPosition owned by this memtable; equal to its predecessor's commitLogUpperBound
private AtomicReference<CommitLogPosition> commitLogLowerBound;
// The approximate lower bound by this memtable; must be <= commitLogLowerBound once our predecessor
// has been finalised, and this is enforced in the ColumnFamilyStore.setCommitLogUpperBound
private final CommitLogPosition approximateCommitLogLowerBound = CommitLog.instance.getCurrentPosition();
public int compareTo(Memtable that)
{
return this.approximateCommitLogLowerBound.compareTo(that.approximateCommitLogLowerBound);
}
public static final class LastCommitLogPosition extends CommitLogPosition
{
public LastCommitLogPosition(CommitLogPosition copy)
{
super(copy.segmentId, copy.position);
}
}
// We index the memtable by PartitionPosition only for the purpose of being able
// to select key range using Token.KeyBound. However put() ensures that we
// actually only store DecoratedKey.
private final ConcurrentNavigableMap<PartitionPosition, AtomicBTreePartition> partitions = new ConcurrentSkipListMap<>();
public final ColumnFamilyStore cfs;
private final long creationNano = System.nanoTime();
// The smallest timestamp for all partitions stored in this memtable
private long minTimestamp = Long.MAX_VALUE;
// Record the comparator of the CFS at the creation of the memtable. This
// is only used when a user update the CF comparator, to know if the
// memtable was created with the new or old comparator.
public final ClusteringComparator initialComparator;
private final ColumnsCollector columnsCollector;
private final StatsCollector statsCollector = new StatsCollector();
// only to be used by init(), to setup the very first memtable for the cfs
public Memtable(AtomicReference<CommitLogPosition> commitLogLowerBound, ColumnFamilyStore cfs)
{
this.cfs = cfs;
this.commitLogLowerBound = commitLogLowerBound;
this.allocator = MEMORY_POOL.newAllocator();
this.initialComparator = cfs.metadata.comparator;
this.cfs.scheduleFlush();
this.columnsCollector = new ColumnsCollector(cfs.metadata.partitionColumns());
}
// ONLY to be used for testing, to create a mock Memtable
@VisibleForTesting
public Memtable(CFMetaData metadata)
{
this.initialComparator = metadata.comparator;
this.cfs = null;
this.allocator = null;
this.columnsCollector = new ColumnsCollector(metadata.partitionColumns());
}
public MemtableAllocator getAllocator()
{
return allocator;
}
public long getLiveDataSize()
{
return liveDataSize.get();
}
public long getOperations()
{
return currentOperations.get();
}
@VisibleForTesting
public void setDiscarding(OpOrder.Barrier writeBarrier, AtomicReference<CommitLogPosition> commitLogUpperBound)
{
assert this.writeBarrier == null;
this.commitLogUpperBound = commitLogUpperBound;
this.writeBarrier = writeBarrier;
allocator.setDiscarding();
}
void setDiscarded()
{
allocator.setDiscarded();
}
// decide if this memtable should take the write, or if it should go to the next memtable
public boolean accepts(OpOrder.Group opGroup, CommitLogPosition commitLogPosition)
{
// if the barrier hasn't been set yet, then this memtable is still taking ALL writes
OpOrder.Barrier barrier = this.writeBarrier;
if (barrier == null)
return true;
// if the barrier has been set, but is in the past, we are definitely destined for a future memtable
if (!barrier.isAfter(opGroup))
return false;
// if we aren't durable we are directed only by the barrier
if (commitLogPosition == null)
return true;
while (true)
{
// otherwise we check if we are in the past/future wrt the CL boundary;
// if the boundary hasn't been finalised yet, we simply update it to the max of
// its current value and ours; if it HAS been finalised, we simply accept its judgement
// this permits us to coordinate a safe boundary, as the boundary choice is made
// atomically wrt our max() maintenance, so an operation cannot sneak into the past
CommitLogPosition currentLast = commitLogUpperBound.get();
if (currentLast instanceof LastCommitLogPosition)
return currentLast.compareTo(commitLogPosition) >= 0;
if (currentLast != null && currentLast.compareTo(commitLogPosition) >= 0)
return true;
if (commitLogUpperBound.compareAndSet(currentLast, commitLogPosition))
return true;
}
}
public CommitLogPosition getCommitLogLowerBound()
{
return commitLogLowerBound.get();
}
public CommitLogPosition getCommitLogUpperBound()
{
return commitLogUpperBound.get();
}
public boolean isLive()
{
return allocator.isLive();
}
public boolean isClean()
{
return partitions.isEmpty();
}
public boolean mayContainDataBefore(CommitLogPosition position)
{
return approximateCommitLogLowerBound.compareTo(position) < 0;
}
/**
* @return true if this memtable is expired. Expiration time is determined by CF's memtable_flush_period_in_ms.
*/
public boolean isExpired()
{
int period = cfs.metadata.params.memtableFlushPeriodInMs;
return period > 0 && (System.nanoTime() - creationNano >= TimeUnit.MILLISECONDS.toNanos(period));
}
/**
* Should only be called by ColumnFamilyStore.apply via Keyspace.apply, which supplies the appropriate
* OpOrdering.
*
* commitLogSegmentPosition should only be null if this is a secondary index, in which case it is *expected* to be null
*/
long put(PartitionUpdate update, UpdateTransaction indexer, OpOrder.Group opGroup)
{
AtomicBTreePartition previous = partitions.get(update.partitionKey());
long initialSize = 0;
if (previous == null)
{
final DecoratedKey cloneKey = allocator.clone(update.partitionKey(), opGroup);
AtomicBTreePartition empty = new AtomicBTreePartition(cfs.metadata, cloneKey, allocator);
// We'll add the columns later. This avoids wasting works if we get beaten in the putIfAbsent
previous = partitions.putIfAbsent(cloneKey, empty);
if (previous == null)
{
previous = empty;
// allocate the row overhead after the fact; this saves over allocating and having to free after, but
// means we can overshoot our declared limit.
int overhead = (int) (cloneKey.getToken().getHeapSize() + ROW_OVERHEAD_HEAP_SIZE);
allocator.onHeap().allocate(overhead, opGroup);
initialSize = 8;
}
}
long[] pair = previous.addAllWithSizeDelta(update, opGroup, indexer);
minTimestamp = Math.min(minTimestamp, previous.stats().minTimestamp);
liveDataSize.addAndGet(initialSize + pair[0]);
columnsCollector.update(update.columns());
statsCollector.update(update.stats());
currentOperations.addAndGet(update.operationCount());
return pair[1];
}
public int partitionCount()
{
return partitions.size();
}
public List<FlushRunnable> flushRunnables(LifecycleTransaction txn)
{
return createFlushRunnables(txn);
}
private List<FlushRunnable> createFlushRunnables(LifecycleTransaction txn)
{
DiskBoundaries diskBoundaries = cfs.getDiskBoundaries();
List<PartitionPosition> boundaries = diskBoundaries.positions;
List<Directories.DataDirectory> locations = diskBoundaries.directories;
if (boundaries == null)
return Collections.singletonList(new FlushRunnable(txn));
List<FlushRunnable> runnables = new ArrayList<>(boundaries.size());
PartitionPosition rangeStart = cfs.getPartitioner().getMinimumToken().minKeyBound();
try
{
for (int i = 0; i < boundaries.size(); i++)
{
PartitionPosition t = boundaries.get(i);
runnables.add(new FlushRunnable(rangeStart, t, locations.get(i), txn));
rangeStart = t;
}
return runnables;
}
catch (Throwable e)
{
throw Throwables.propagate(abortRunnables(runnables, e));
}
}
public Throwable abortRunnables(List<FlushRunnable> runnables, Throwable t)
{
if (runnables != null)
for (FlushRunnable runnable : runnables)
t = runnable.writer.abort(t);
return t;
}
public String toString()
{
return String.format("Memtable-%s@%s(%s serialized bytes, %s ops, %.0f%%/%.0f%% of on/off-heap limit)",
cfs.name, hashCode(), FBUtilities.prettyPrintMemory(liveDataSize.get()), currentOperations,
100 * allocator.onHeap().ownershipRatio(), 100 * allocator.offHeap().ownershipRatio());
}
public MemtableUnfilteredPartitionIterator makePartitionIterator(final ColumnFilter columnFilter, final DataRange dataRange, final boolean isForThrift)
{
AbstractBounds<PartitionPosition> keyRange = dataRange.keyRange();
boolean startIsMin = keyRange.left.isMinimum();
boolean stopIsMin = keyRange.right.isMinimum();
boolean isBound = keyRange instanceof Bounds;
boolean includeStart = isBound || keyRange instanceof IncludingExcludingBounds;
boolean includeStop = isBound || keyRange instanceof Range;
Map<PartitionPosition, AtomicBTreePartition> subMap;
if (startIsMin)
subMap = stopIsMin ? partitions : partitions.headMap(keyRange.right, includeStop);
else
subMap = stopIsMin
? partitions.tailMap(keyRange.left, includeStart)
: partitions.subMap(keyRange.left, includeStart, keyRange.right, includeStop);
int minLocalDeletionTime = Integer.MAX_VALUE;
// avoid iterating over the memtable if we purge all tombstones
if (cfs.getCompactionStrategyManager().onlyPurgeRepairedTombstones())
minLocalDeletionTime = findMinLocalDeletionTime(subMap.entrySet().iterator());
final Iterator<Map.Entry<PartitionPosition, AtomicBTreePartition>> iter = subMap.entrySet().iterator();
return new MemtableUnfilteredPartitionIterator(cfs, iter, isForThrift, minLocalDeletionTime, columnFilter, dataRange);
}
private int findMinLocalDeletionTime(Iterator<Map.Entry<PartitionPosition, AtomicBTreePartition>> iterator)
{
int minLocalDeletionTime = Integer.MAX_VALUE;
while (iterator.hasNext())
{
Map.Entry<PartitionPosition, AtomicBTreePartition> entry = iterator.next();
minLocalDeletionTime = Math.min(minLocalDeletionTime, entry.getValue().stats().minLocalDeletionTime);
}
return minLocalDeletionTime;
}
public Partition getPartition(DecoratedKey key)
{
return partitions.get(key);
}
public long getMinTimestamp()
{
return minTimestamp;
}
/**
* For testing only. Give this memtable too big a size to make it always fail flushing.
*/
@VisibleForTesting
public void makeUnflushable()
{
liveDataSize.addAndGet(1L * 1024 * 1024 * 1024 * 1024 * 1024);
}
class FlushRunnable implements Callable<SSTableMultiWriter>
{
private final long estimatedSize;
private final ConcurrentNavigableMap<PartitionPosition, AtomicBTreePartition> toFlush;
private final boolean isBatchLogTable;
private final SSTableMultiWriter writer;
// keeping these to be able to log what we are actually flushing
private final PartitionPosition from;
private final PartitionPosition to;
FlushRunnable(PartitionPosition from, PartitionPosition to, Directories.DataDirectory flushLocation, LifecycleTransaction txn)
{
this(partitions.subMap(from, to), flushLocation, from, to, txn);
}
FlushRunnable(LifecycleTransaction txn)
{
this(partitions, null, null, null, txn);
}
FlushRunnable(ConcurrentNavigableMap<PartitionPosition, AtomicBTreePartition> toFlush, Directories.DataDirectory flushLocation, PartitionPosition from, PartitionPosition to, LifecycleTransaction txn)
{
this.toFlush = toFlush;
this.from = from;
this.to = to;
long keySize = 0;
for (PartitionPosition key : toFlush.keySet())
{
// make sure we don't write non-sensical keys
assert key instanceof DecoratedKey;
keySize += ((DecoratedKey) key).getKey().remaining();
}
estimatedSize = (long) ((keySize // index entries
+ keySize // keys in data file
+ liveDataSize.get()) // data
* 1.2); // bloom filter and row index overhead
this.isBatchLogTable = cfs.name.equals(SystemKeyspace.BATCHES) && cfs.keyspace.getName().equals(SchemaConstants.SYSTEM_KEYSPACE_NAME);
if (flushLocation == null)
writer = createFlushWriter(txn, cfs.getSSTablePath(getDirectories().getWriteableLocationAsFile(estimatedSize)), columnsCollector.get(), statsCollector.get());
else
writer = createFlushWriter(txn, cfs.getSSTablePath(getDirectories().getLocationForDisk(flushLocation)), columnsCollector.get(), statsCollector.get());
}
protected Directories getDirectories()
{
return cfs.getDirectories();
}
private void writeSortedContents()
{
logger.debug("Writing {}, flushed range = ({}, {}]", Memtable.this.toString(), from, to);
boolean trackContention = logger.isTraceEnabled();
int heavilyContendedRowCount = 0;
// (we can't clear out the map as-we-go to free up memory,
// since the memtable is being used for queries in the "pending flush" category)
for (AtomicBTreePartition partition : toFlush.values())
{
// Each batchlog partition is a separate entry in the log. And for an entry, we only do 2
// operations: 1) we insert the entry and 2) we delete it. Further, BL data is strictly local,
// we don't need to preserve tombstones for repair. So if both operation are in this
// memtable (which will almost always be the case if there is no ongoing failure), we can
// just skip the entry (CASSANDRA-4667).
if (isBatchLogTable && !partition.partitionLevelDeletion().isLive() && partition.hasRows())
continue;
if (trackContention && partition.useLock())
heavilyContendedRowCount++;
if (!partition.isEmpty())
{
try (UnfilteredRowIterator iter = partition.unfilteredIterator())
{
writer.append(iter);
}
}
}
long bytesFlushed = writer.getFilePointer();
logger.debug("Completed flushing {} ({}) for commitlog position {}",
writer.getFilename(),
FBUtilities.prettyPrintMemory(bytesFlushed),
commitLogUpperBound);
// Update the metrics
cfs.metric.bytesFlushed.inc(bytesFlushed);
if (heavilyContendedRowCount > 0)
logger.trace("High update contention in {}/{} partitions of {} ", heavilyContendedRowCount, toFlush.size(), Memtable.this);
}
public SSTableMultiWriter createFlushWriter(LifecycleTransaction txn,
String filename,
PartitionColumns columns,
EncodingStats stats)
{
MetadataCollector sstableMetadataCollector = new MetadataCollector(cfs.metadata.comparator)
.commitLogIntervals(new IntervalSet<>(commitLogLowerBound.get(), commitLogUpperBound.get()));
return cfs.createSSTableMultiWriter(Descriptor.fromFilename(filename),
toFlush.size(),
ActiveRepairService.UNREPAIRED_SSTABLE,
sstableMetadataCollector,
new SerializationHeader(true, cfs.metadata, columns, stats), txn);
}
@Override
public SSTableMultiWriter call()
{
writeSortedContents();
return writer;
}
}
private static int estimateRowOverhead(final int count)
{
// calculate row overhead
try (final OpOrder.Group group = new OpOrder().start())
{
int rowOverhead;
MemtableAllocator allocator = MEMORY_POOL.newAllocator();
ConcurrentNavigableMap<PartitionPosition, Object> partitions = new ConcurrentSkipListMap<>();
final Object val = new Object();
for (int i = 0 ; i < count ; i++)
partitions.put(allocator.clone(new BufferDecoratedKey(new LongToken(i), ByteBufferUtil.EMPTY_BYTE_BUFFER), group), val);
double avgSize = ObjectSizes.measureDeep(partitions) / (double) count;
rowOverhead = (int) ((avgSize - Math.floor(avgSize)) < 0.05 ? Math.floor(avgSize) : Math.ceil(avgSize));
rowOverhead -= ObjectSizes.measureDeep(new LongToken(0));
rowOverhead += AtomicBTreePartition.EMPTY_SIZE;
allocator.setDiscarding();
allocator.setDiscarded();
return rowOverhead;
}
}
public static class MemtableUnfilteredPartitionIterator extends AbstractUnfilteredPartitionIterator
{
private final ColumnFamilyStore cfs;
private final Iterator<Map.Entry<PartitionPosition, AtomicBTreePartition>> iter;
private final boolean isForThrift;
private final int minLocalDeletionTime;
private final ColumnFilter columnFilter;
private final DataRange dataRange;
public MemtableUnfilteredPartitionIterator(ColumnFamilyStore cfs, Iterator<Map.Entry<PartitionPosition, AtomicBTreePartition>> iter, boolean isForThrift, int minLocalDeletionTime, ColumnFilter columnFilter, DataRange dataRange)
{
this.cfs = cfs;
this.iter = iter;
this.isForThrift = isForThrift;
this.minLocalDeletionTime = minLocalDeletionTime;
this.columnFilter = columnFilter;
this.dataRange = dataRange;
}
public boolean isForThrift()
{
return isForThrift;
}
public int getMinLocalDeletionTime()
{
return minLocalDeletionTime;
}
public CFMetaData metadata()
{
return cfs.metadata;
}
public boolean hasNext()
{
return iter.hasNext();
}
public UnfilteredRowIterator next()
{
Map.Entry<PartitionPosition, AtomicBTreePartition> entry = iter.next();
// Actual stored key should be true DecoratedKey
assert entry.getKey() instanceof DecoratedKey;
DecoratedKey key = (DecoratedKey)entry.getKey();
ClusteringIndexFilter filter = dataRange.clusteringIndexFilter(key);
return filter.getUnfilteredRowIterator(columnFilter, entry.getValue());
}
}
private static class ColumnsCollector
{
private final HashMap<ColumnDefinition, AtomicBoolean> predefined = new HashMap<>();
private final ConcurrentSkipListSet<ColumnDefinition> extra = new ConcurrentSkipListSet<>();
ColumnsCollector(PartitionColumns columns)
{
for (ColumnDefinition def : columns.statics)
predefined.put(def, new AtomicBoolean());
for (ColumnDefinition def : columns.regulars)
predefined.put(def, new AtomicBoolean());
}
public void update(PartitionColumns columns)
{
for (ColumnDefinition s : columns.statics)
update(s);
for (ColumnDefinition r : columns.regulars)
update(r);
}
private void update(ColumnDefinition definition)
{
AtomicBoolean present = predefined.get(definition);
if (present != null)
{
if (!present.get())
present.set(true);
}
else
{
extra.add(definition);
}
}
public PartitionColumns get()
{
PartitionColumns.Builder builder = PartitionColumns.builder();
for (Map.Entry<ColumnDefinition, AtomicBoolean> e : predefined.entrySet())
if (e.getValue().get())
builder.add(e.getKey());
return builder.addAll(extra).build();
}
}
private static class StatsCollector
{
private final AtomicReference<EncodingStats> stats = new AtomicReference<>(EncodingStats.NO_STATS);
public void update(EncodingStats newStats)
{
while (true)
{
EncodingStats current = stats.get();
EncodingStats updated = current.mergeWith(newStats);
if (stats.compareAndSet(current, updated))
return;
}
}
public EncodingStats get()
{
return stats.get();
}
}
}