blob: 4fe39a10ca96ffac05399434531a661c8188b66c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.db.memtable;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.db.commitlog.CommitLogPosition;
import org.apache.cassandra.schema.TableMetadataRef;
import org.apache.cassandra.utils.concurrent.OpOrder;
/**
* Memtable that uses a commit log for persistence. Provides methods of tracking the commit log positions covered by
* it and safely switching between memtables.
*/
public abstract class AbstractMemtableWithCommitlog extends AbstractMemtable
{
// The approximate lower bound by this memtable; must be <= commitLogLowerBound once our predecessor
// has been finalised, and this is enforced in the ColumnFamilyStore.setCommitLogUpperBound
private final CommitLogPosition approximateCommitLogLowerBound = CommitLog.instance.getCurrentPosition();
// the precise lower bound of CommitLogPosition owned by this memtable; equal to its predecessor's commitLogUpperBound
private final AtomicReference<CommitLogPosition> commitLogLowerBound;
// the write barrier for directing writes to this memtable or the next during a switch
private volatile OpOrder.Barrier writeBarrier;
// the precise upper bound of CommitLogPosition owned by this memtable
private volatile AtomicReference<CommitLogPosition> commitLogUpperBound;
public AbstractMemtableWithCommitlog(TableMetadataRef metadataRef, AtomicReference<CommitLogPosition> commitLogLowerBound)
{
super(metadataRef);
this.commitLogLowerBound = commitLogLowerBound;
}
public CommitLogPosition getApproximateCommitLogLowerBound()
{
return approximateCommitLogLowerBound;
}
public void switchOut(OpOrder.Barrier writeBarrier, AtomicReference<CommitLogPosition> commitLogUpperBound)
{
// This can prepare the memtable data for deletion; it will still be used while the flush is proceeding.
// A setDiscarded call will follow.
assert this.writeBarrier == null;
this.commitLogUpperBound = commitLogUpperBound;
this.writeBarrier = writeBarrier;
}
public void discard()
{
assert writeBarrier != null : "Memtable must be switched out before being discarded.";
}
// decide if this memtable should take the write, or if it should go to the next memtable
@Override
public boolean accepts(OpOrder.Group opGroup, CommitLogPosition commitLogPosition)
{
// if the barrier hasn't been set yet, then this memtable is still the newest and is taking ALL writes.
OpOrder.Barrier barrier = this.writeBarrier;
if (barrier == null)
return true;
// Note that if this races with taking the barrier the opGroup and commit log position we were given must
// necessarily be before the barrier and any LastCommitLogPosition is set, thus this function will return true
// and no update to commitLogUpperBound is necessary.
// If the barrier has been set and issued, but is in the past, we are definitely destined for a future memtable.
// Because we issue the barrier after taking LastCommitLogPosition and mutations take their position after
// taking the opGroup, this condition also ensures the given commit log position is greater than the chosen
// upper bound.
if (!barrier.isAfter(opGroup))
return false;
// We are in the segment of time between the barrier is constructed (and the memtable is switched out)
// and the barrier is issued.
// if we aren't durable we are directed only by the barrier
if (commitLogPosition == null)
return true;
while (true)
{
// If the CL boundary has been set, the mutation can be accepted depending on whether it falls before it.
// However, if it has not been set, the old sstable must still accept writes but we must also ensure that
// their positions are accounted for in the boundary (as there may be a delay between taking the log
// position for the boundary and setting it where a mutation sneaks in).
// Thus, if the boundary hasn't been finalised yet, we simply update it to the max of its current value and
// ours; this permits us to coordinate a safe boundary, as the boundary choice is made atomically wrt our
// max() maintenance, so an operation cannot sneak into the past.
CommitLogPosition currentLast = commitLogUpperBound.get();
if (currentLast instanceof LastCommitLogPosition)
return currentLast.compareTo(commitLogPosition) >= 0;
if (currentLast != null && currentLast.compareTo(commitLogPosition) >= 0)
return true;
if (commitLogUpperBound.compareAndSet(currentLast, commitLogPosition))
return true;
}
}
public CommitLogPosition getCommitLogLowerBound()
{
return commitLogLowerBound.get();
}
public LastCommitLogPosition getFinalCommitLogUpperBound()
{
assert commitLogUpperBound != null : "Commit log upper bound should be set before flushing";
assert commitLogUpperBound.get() instanceof LastCommitLogPosition : "Commit log upper bound has not been sealed yet? " + commitLogUpperBound.get();
return (LastCommitLogPosition) commitLogUpperBound.get();
}
public boolean mayContainDataBefore(CommitLogPosition position)
{
return approximateCommitLogLowerBound.compareTo(position) < 0;
}
}