blob: ae1251646ec72f829174320626d848b825784b11 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.cassandra.db.memtable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.concurrent.ImmediateExecutor;
import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ClusteringComparator;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.commitlog.CommitLogPosition;
import org.apache.cassandra.schema.TableMetadataRef;
import org.apache.cassandra.utils.Clock;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.WrappedRunnable;
import org.apache.cassandra.utils.concurrent.AsyncPromise;
import org.apache.cassandra.utils.concurrent.Future;
import org.apache.cassandra.utils.concurrent.OpOrder;
import org.apache.cassandra.utils.concurrent.Promise;
import org.apache.cassandra.utils.memory.HeapPool;
import org.apache.cassandra.utils.memory.MemtableAllocator;
import org.apache.cassandra.utils.memory.MemtableCleaner;
import org.apache.cassandra.utils.memory.MemtablePool;
import org.apache.cassandra.utils.memory.NativePool;
import org.apache.cassandra.utils.memory.SlabPool;
* A memtable that uses memory tracked and maybe allocated via a MemtableAllocator from a MemtablePool.
* Provides methods of memory tracking and triggering flushes when the relevant limits are reached.
public abstract class AbstractAllocatorMemtable extends AbstractMemtableWithCommitlog
private static final Logger logger = LoggerFactory.getLogger(AbstractAllocatorMemtable.class);
public static final MemtablePool MEMORY_POOL = AbstractAllocatorMemtable.createMemtableAllocatorPool();
protected final Owner owner;
protected final MemtableAllocator allocator;
// Record the comparator of the CFS at the creation of the memtable. This
// is only used when a user update the CF comparator, to know if the
// memtable was created with the new or old comparator.
protected final ClusteringComparator initialComparator;
private final long creationNano = Clock.Global.nanoTime();
private static MemtablePool createMemtableAllocatorPool()
Config.MemtableAllocationType allocationType = DatabaseDescriptor.getMemtableAllocationType();
long heapLimit = DatabaseDescriptor.getMemtableHeapSpaceInMiB() << 20;
long offHeapLimit = DatabaseDescriptor.getMemtableOffheapSpaceInMiB() << 20;
float memtableCleanupThreshold = DatabaseDescriptor.getMemtableCleanupThreshold();
MemtableCleaner cleaner = AbstractAllocatorMemtable::flushLargestMemtable;
return createMemtableAllocatorPoolInternal(allocationType, heapLimit, offHeapLimit, memtableCleanupThreshold, cleaner);
public static MemtablePool createMemtableAllocatorPoolInternal(Config.MemtableAllocationType allocationType,
long heapLimit, long offHeapLimit,
float memtableCleanupThreshold, MemtableCleaner cleaner)
switch (allocationType)
case unslabbed_heap_buffers_logged:
return new HeapPool.Logged(heapLimit, memtableCleanupThreshold, cleaner);
case unslabbed_heap_buffers:
logger.debug("Memtables allocating with on-heap buffers");
return new HeapPool(heapLimit, memtableCleanupThreshold, cleaner);
case heap_buffers:
logger.debug("Memtables allocating with on-heap slabs");
return new SlabPool(heapLimit, 0, memtableCleanupThreshold, cleaner);
case offheap_buffers:
logger.debug("Memtables allocating with off-heap buffers");
return new SlabPool(heapLimit, offHeapLimit, memtableCleanupThreshold, cleaner);
case offheap_objects:
logger.debug("Memtables allocating with off-heap objects");
return new NativePool(heapLimit, offHeapLimit, memtableCleanupThreshold, cleaner);
throw new AssertionError();
// only to be used by init(), to setup the very first memtable for the cfs
public AbstractAllocatorMemtable(AtomicReference<CommitLogPosition> commitLogLowerBound, TableMetadataRef metadataRef, Owner owner)
super(metadataRef, commitLogLowerBound);
this.allocator = MEMORY_POOL.newAllocator(metadataRef.toString());
this.initialComparator = metadata.get().comparator;
this.owner = owner;
public MemtableAllocator getAllocator()
return allocator;
public boolean shouldSwitch(ColumnFamilyStore.FlushReason reason)
switch (reason)
return initialComparator != metadata().comparator // If the CF comparator has changed, because our partitions reference the old one
|| metadata().params.memtable.factory() != factory(); // If a different type of memtable is requested
return false; // by default we don't use the local ranges, thus this has no effect
return true;
public void metadataUpdated()
// We decided not to swap out this memtable, but if the flush period has changed we must schedule it for the
// new expiration time.
public void localRangesUpdated()
// nothing to be done by default
public void performSnapshot(String snapshotName)
throw new AssertionError("performSnapshot must be implemented if shouldSwitch(SNAPSHOT) can return false.");
protected abstract Factory factory();
public void switchOut(OpOrder.Barrier writeBarrier, AtomicReference<CommitLogPosition> commitLogUpperBound)
super.switchOut(writeBarrier, commitLogUpperBound);
public void discard()
public String toString()
MemoryUsage usage = Memtable.getMemoryUsage(this);
return String.format("Memtable-%s@%s(%s serialized bytes, %s ops, %s)",
public void addMemoryUsageTo(MemoryUsage stats)
stats.ownershipRatioOnHeap += getAllocator().onHeap().ownershipRatio();
stats.ownershipRatioOffHeap += getAllocator().offHeap().ownershipRatio();
stats.ownsOnHeap += getAllocator().onHeap().owns();
stats.ownsOffHeap += getAllocator().offHeap().owns();
public void markExtraOnHeapUsed(long additionalSpace, OpOrder.Group opGroup)
getAllocator().onHeap().allocate(additionalSpace, opGroup);
public void markExtraOffHeapUsed(long additionalSpace, OpOrder.Group opGroup)
getAllocator().offHeap().allocate(additionalSpace, opGroup);
void scheduleFlush()
int period = metadata().params.memtableFlushPeriodInMs;
if (period > 0)
scheduleFlush(owner, period);
private static void scheduleFlush(Owner owner, int period)
logger.trace("scheduling flush in {} ms", period);
WrappedRunnable runnable = new WrappedRunnable()
protected void runMayThrow()
Memtable current = owner.getCurrentMemtable();
if (current instanceof AbstractAllocatorMemtable)
((AbstractAllocatorMemtable) current).flushIfPeriodExpired();
ScheduledExecutors.scheduledTasks.scheduleSelfRecurring(runnable, period, TimeUnit.MILLISECONDS);
private void flushIfPeriodExpired()
int period = metadata().params.memtableFlushPeriodInMs;
if (period > 0 && (Clock.Global.nanoTime() - creationNano >= TimeUnit.MILLISECONDS.toNanos(period)))
if (isClean())
// if we're still clean, instead of swapping just reschedule a flush for later
scheduleFlush(owner, period);
// we'll be rescheduled by the constructor of the Memtable.
* Finds the largest memtable, as a percentage of *either* on- or off-heap memory limits, and immediately
* queues it for flushing. If the memtable selected is flushed before this completes, no work is done.
public static Future<Boolean> flushLargestMemtable()
float largestRatio = 0f;
AbstractAllocatorMemtable largestMemtable = null;
Memtable.MemoryUsage largestUsage = null;
float liveOnHeap = 0, liveOffHeap = 0;
// we take a reference to the current main memtable for the CF prior to snapping its ownership ratios
// to ensure we have some ordering guarantee for performing the switchMemtableIf(), i.e. we will only
// swap if the memtables we are measuring here haven't already been swapped by the time we try to swap them
for (Memtable currentMemtable : ColumnFamilyStore.activeMemtables())
if (!(currentMemtable instanceof AbstractAllocatorMemtable))
AbstractAllocatorMemtable current = (AbstractAllocatorMemtable) currentMemtable;
// find the total ownership ratio for the memtable and all SecondaryIndexes owned by this CF,
// both on- and off-heap, and select the largest of the two ratios to weight this CF
MemoryUsage usage = Memtable.newMemoryUsage();
for (Memtable indexMemtable : current.owner.getIndexMemtables())
if (indexMemtable instanceof AbstractAllocatorMemtable)
float ratio = Math.max(usage.ownershipRatioOnHeap, usage.ownershipRatioOffHeap);
if (ratio > largestRatio)
largestMemtable = current;
largestUsage = usage;
largestRatio = ratio;
liveOnHeap += usage.ownershipRatioOnHeap;
liveOffHeap += usage.ownershipRatioOffHeap;
Promise<Boolean> returnFuture = new AsyncPromise<>();
if (largestMemtable != null)
float usedOnHeap = MEMORY_POOL.onHeap.usedRatio();
float usedOffHeap = MEMORY_POOL.offHeap.usedRatio();
float flushingOnHeap = MEMORY_POOL.onHeap.reclaimingRatio();
float flushingOffHeap = MEMORY_POOL.offHeap.reclaimingRatio();"Flushing largest {} to free up room. Used total: {}, live: {}, flushing: {}, this: {}",
largestMemtable.owner, ratio(usedOnHeap, usedOffHeap), ratio(liveOnHeap, liveOffHeap),
ratio(flushingOnHeap, flushingOffHeap), ratio(largestUsage.ownershipRatioOnHeap, largestUsage.ownershipRatioOffHeap));
Future<CommitLogPosition> flushFuture = largestMemtable.owner.signalFlushRequired(largestMemtable, ColumnFamilyStore.FlushReason.MEMTABLE_LIMIT);
flushFuture.addListener(() -> {
catch (Throwable t)
}, ImmediateExecutor.INSTANCE);
logger.debug("Flushing of largest memtable, not done, no memtable found");
return returnFuture;
private static String ratio(float onHeap, float offHeap)
return String.format("%.2f/%.2f", onHeap, offHeap);