blob: 9576ccf332b1ff50f6ecb8e96c57457b9505b19c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cassandra.utils.memory;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.utils.concurrent.OpOrder;
import org.apache.cassandra.utils.concurrent.WaitQueue;
public abstract class MemtableAllocator
{
private static final Logger logger = LoggerFactory.getLogger(MemtableAllocator.class);
private final SubAllocator onHeap;
private final SubAllocator offHeap;
enum LifeCycle
{
LIVE, DISCARDING, DISCARDED;
LifeCycle transition(LifeCycle targetState)
{
switch (targetState)
{
case DISCARDING:
assert this == LifeCycle.LIVE;
return LifeCycle.DISCARDING;
case DISCARDED:
assert this == LifeCycle.DISCARDING;
return LifeCycle.DISCARDED;
default:
throw new IllegalStateException();
}
}
}
MemtableAllocator(SubAllocator onHeap, SubAllocator offHeap)
{
this.onHeap = onHeap;
this.offHeap = offHeap;
}
public abstract Row.Builder rowBuilder(OpOrder.Group opGroup);
public abstract DecoratedKey clone(DecoratedKey key, OpOrder.Group opGroup);
public abstract EnsureOnHeap ensureOnHeap();
public SubAllocator onHeap()
{
return onHeap;
}
public SubAllocator offHeap()
{
return offHeap;
}
/**
* Mark this allocator reclaiming; this will permit any outstanding allocations to temporarily
* overshoot the maximum memory limit so that flushing can begin immediately
*/
public void setDiscarding()
{
onHeap.setDiscarding();
offHeap.setDiscarding();
}
/**
* Indicate the memory and resources owned by this allocator are no longer referenced,
* and can be reclaimed/reused.
*/
public void setDiscarded()
{
onHeap.setDiscarded();
offHeap.setDiscarded();
}
public boolean isLive()
{
return onHeap.state == LifeCycle.LIVE || offHeap.state == LifeCycle.LIVE;
}
/** Mark the BB as unused, permitting it to be reclaimed */
public static final class SubAllocator
{
// the tracker we are owning memory from
private final MemtablePool.SubPool parent;
// the state of the memtable
private volatile LifeCycle state;
// the amount of memory/resource owned by this object
private volatile long owns;
// the amount of memory we are reporting to collect; this may be inaccurate, but is close
// and is used only to ensure that once we have reclaimed we mark the tracker with the same amount
private volatile long reclaiming;
SubAllocator(MemtablePool.SubPool parent)
{
this.parent = parent;
this.state = LifeCycle.LIVE;
}
/**
* Mark this allocator reclaiming; this will permit any outstanding allocations to temporarily
* overshoot the maximum memory limit so that flushing can begin immediately
*/
void setDiscarding()
{
state = state.transition(LifeCycle.DISCARDING);
// mark the memory owned by this allocator as reclaiming
updateReclaiming();
}
/**
* Indicate the memory and resources owned by this allocator are no longer referenced,
* and can be reclaimed/reused.
*/
void setDiscarded()
{
state = state.transition(LifeCycle.DISCARDED);
// release any memory owned by this allocator; automatically signals waiters
releaseAll();
}
/**
* Should only be called once we know we will never allocate to the object again.
* currently no corroboration/enforcement of this is performed.
*/
void releaseAll()
{
parent.released(ownsUpdater.getAndSet(this, 0));
parent.reclaimed(reclaimingUpdater.getAndSet(this, 0));
}
/**
* Like allocate, but permits allocations to be negative.
*/
public void adjust(long size, OpOrder.Group opGroup)
{
if (size <= 0)
released(-size);
else
allocate(size, opGroup);
}
// allocate memory in the tracker, and mark ourselves as owning it
public void allocate(long size, OpOrder.Group opGroup)
{
assert size >= 0;
while (true)
{
if (parent.tryAllocate(size))
{
acquired(size);
return;
}
if (opGroup.isBlocking())
{
allocated(size);
return;
}
WaitQueue.Signal signal = opGroup.isBlockingSignal(parent.hasRoom().register(parent.blockedTimerContext()));
boolean allocated = parent.tryAllocate(size);
if (allocated || opGroup.isBlocking())
{
signal.cancel();
if (allocated) // if we allocated, take ownership
acquired(size);
else // otherwise we're blocking so we're permitted to overshoot our constraints, to just allocate without blocking
allocated(size);
return;
}
else
signal.awaitUninterruptibly();
}
}
/**
* Retroactively mark an amount allocated and acquired in the tracker, and owned by us. If the state is discarding,
* then also update reclaiming since the flush operation is waiting at the barrier for in-flight writes,
* and it will flush this memory too.
*/
private void allocated(long size)
{
parent.allocated(size);
ownsUpdater.addAndGet(this, size);
if (state == LifeCycle.DISCARDING)
{
if (logger.isTraceEnabled())
logger.trace("Allocated {} bytes whilst discarding", size);
updateReclaiming();
}
}
/**
* Retroactively mark an amount acquired in the tracker, and owned by us. If the state is discarding,
* then also update reclaiming since the flush operation is waiting at the barrier for in-flight writes,
* and it will flush this memory too.
*/
private void acquired(long size)
{
parent.acquired();
ownsUpdater.addAndGet(this, size);
if (state == LifeCycle.DISCARDING)
{
if (logger.isTraceEnabled())
logger.trace("Allocated {} bytes whilst discarding", size);
updateReclaiming();
}
}
/**
* If the state is still live, then we update the memory we own here and in the parent.
*
* However, if the state is not live, we do not update it because we would have to update
* reclaiming too, and it could cause problems to the memtable cleaner algorithm if reclaiming
* decreased. If the memtable is flushing, soon enough {@link this#releaseAll()} will be called.
*
* @param size the size that was released
*/
void released(long size)
{
if (state == LifeCycle.LIVE)
{
parent.released(size);
ownsUpdater.addAndGet(this, -size);
}
else
{
if (logger.isTraceEnabled())
logger.trace("Tried to release {} bytes whilst discarding", size);
}
}
/**
* Mark what we currently own as reclaiming, both here and in our parent.
* This method is called for the first time when the memtable is scheduled for flushing,
* in which case reclaiming will be zero and we mark everything that we own as reclaiming.
* Afterwards, if there are in flight writes that have not completed yet, we also mark any
* more memory that is allocated by these writes as reclaiming, since the memtable is waiting
* on the barrier for these writes to complete, before it can actually start flushing data.
*/
void updateReclaiming()
{
while (true)
{
long cur = owns;
long prev = reclaiming;
if (!reclaimingUpdater.compareAndSet(this, prev, cur))
continue;
parent.reclaiming(cur - prev);
return;
}
}
public long owns()
{
return owns;
}
public long getReclaiming()
{
return reclaiming;
}
public float ownershipRatio()
{
float r = owns / (float) parent.limit;
if (Float.isNaN(r))
return 0;
return r;
}
private static final AtomicLongFieldUpdater<SubAllocator> ownsUpdater = AtomicLongFieldUpdater.newUpdater(SubAllocator.class, "owns");
private static final AtomicLongFieldUpdater<SubAllocator> reclaimingUpdater = AtomicLongFieldUpdater.newUpdater(SubAllocator.class, "reclaiming");
}
}