blob: 8061566757a5cb882ece2a89150cfdfc9cbc30df [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cassandra.utils.memory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import com.google.common.annotations.VisibleForTesting;
import com.codahale.metrics.Timer;
import org.apache.cassandra.metrics.CassandraMetricsRegistry;
import org.apache.cassandra.metrics.DefaultNameFactory;
import org.apache.cassandra.utils.concurrent.WaitQueue;
import org.apache.cassandra.utils.ExecutorUtils;
/**
* Represents an amount of memory used for a given purpose, that can be allocated to specific tasks through
* child MemtableAllocator objects.
*/
public abstract class MemtablePool
{
final MemtableCleanerThread<?> cleaner;
// the total memory used by this pool
public final SubPool onHeap;
public final SubPool offHeap;
public final Timer blockedOnAllocating;
final WaitQueue hasRoom = new WaitQueue();
MemtablePool(long maxOnHeapMemory, long maxOffHeapMemory, float cleanThreshold, Runnable cleaner)
{
this.onHeap = getSubPool(maxOnHeapMemory, cleanThreshold);
this.offHeap = getSubPool(maxOffHeapMemory, cleanThreshold);
this.cleaner = getCleaner(cleaner);
blockedOnAllocating = CassandraMetricsRegistry.Metrics.timer(new DefaultNameFactory("MemtablePool")
.createMetricName("BlockedOnAllocation"));
if (this.cleaner != null)
this.cleaner.start();
}
SubPool getSubPool(long limit, float cleanThreshold)
{
return new SubPool(limit, cleanThreshold);
}
MemtableCleanerThread<?> getCleaner(Runnable cleaner)
{
return cleaner == null ? null : new MemtableCleanerThread<>(this, cleaner);
}
public abstract boolean needToCopyOnHeap();
@VisibleForTesting
public void shutdownAndWait(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException
{
ExecutorUtils.shutdownNowAndWait(timeout, unit, cleaner);
}
public abstract MemtableAllocator newAllocator();
/**
* Note the difference between acquire() and allocate(); allocate() makes more resources available to all owners,
* and acquire() makes shared resources unavailable but still recorded. An Owner must always acquire resources,
* but only needs to allocate if there are none already available. This distinction is not always meaningful.
*/
public class SubPool
{
// total memory/resource permitted to allocate
public final long limit;
// ratio of used to spare (both excluding 'reclaiming') at which to trigger a clean
public final float cleanThreshold;
// total bytes allocated and reclaiming
volatile long allocated;
volatile long reclaiming;
// a cache of the calculation determining at what allocation threshold we should next clean
volatile long nextClean;
public SubPool(long limit, float cleanThreshold)
{
this.limit = limit;
this.cleanThreshold = cleanThreshold;
}
/** Methods for tracking and triggering a clean **/
boolean needsCleaning()
{
// use strictly-greater-than so we don't clean when limit is 0
return used() > nextClean && updateNextClean();
}
void maybeClean()
{
if (needsCleaning() && cleaner != null)
cleaner.trigger();
}
private boolean updateNextClean()
{
while (true)
{
long current = nextClean;
long reclaiming = this.reclaiming;
long next = reclaiming + (long) (this.limit * cleanThreshold);
if (current == next || nextCleanUpdater.compareAndSet(this, current, next))
return used() > next;
}
}
/** Methods to allocate space **/
boolean tryAllocate(long size)
{
while (true)
{
long cur;
if ((cur = allocated) + size > limit)
return false;
if (allocatedUpdater.compareAndSet(this, cur, cur + size))
return true;
}
}
/**
* apply the size adjustment to allocated, bypassing any limits or constraints. If this reduces the
* allocated total, we will signal waiters
*/
private void adjustAllocated(long size)
{
while (true)
{
long cur = allocated;
if (allocatedUpdater.compareAndSet(this, cur, cur + size))
return;
}
}
void allocated(long size)
{
assert size >= 0;
if (size == 0)
return;
adjustAllocated(size);
maybeClean();
}
void acquired(long size)
{
maybeClean();
}
void released(long size)
{
assert size >= 0;
adjustAllocated(-size);
hasRoom.signalAll();
}
void reclaiming(long size)
{
if (size == 0)
return;
reclaimingUpdater.addAndGet(this, size);
}
void reclaimed(long size)
{
if (size == 0)
return;
reclaimingUpdater.addAndGet(this, -size);
if (updateNextClean() && cleaner != null)
cleaner.trigger();
}
public long used()
{
return allocated;
}
public float reclaimingRatio()
{
float r = reclaiming / (float) limit;
if (Float.isNaN(r))
return 0;
return r;
}
public float usedRatio()
{
float r = allocated / (float) limit;
if (Float.isNaN(r))
return 0;
return r;
}
public MemtableAllocator.SubAllocator newAllocator()
{
return new MemtableAllocator.SubAllocator(this);
}
public WaitQueue hasRoom()
{
return hasRoom;
}
public Timer.Context blockedTimerContext()
{
return blockedOnAllocating.time();
}
}
private static final AtomicLongFieldUpdater<SubPool> reclaimingUpdater = AtomicLongFieldUpdater.newUpdater(SubPool.class, "reclaiming");
private static final AtomicLongFieldUpdater<SubPool> allocatedUpdater = AtomicLongFieldUpdater.newUpdater(SubPool.class, "allocated");
private static final AtomicLongFieldUpdater<SubPool> nextCleanUpdater = AtomicLongFieldUpdater.newUpdater(SubPool.class, "nextClean");
}