blob: 8f3b7e45f557f25eac7997ba08abbacedc00ff9c [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.cassandra.db.commitlog;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.*;
import org.apache.cassandra.utils.*;
import org.apache.cassandra.utils.concurrent.WaitQueue;
import static org.apache.cassandra.db.commitlog.CommitLogSegment.Allocation;
* Performs eager-creation of commit log segments in a background thread. All the
* public methods are thread safe.
public abstract class AbstractCommitLogSegmentManager
static final Logger logger = LoggerFactory.getLogger(AbstractCommitLogSegmentManager.class);
// Queue of work to be done by the manager thread, also used to wake the thread to perform segment allocation.
private final BlockingQueue<Runnable> segmentManagementTasks = new LinkedBlockingQueue<>();
/** Segments that are ready to be used. Head of the queue is the one we allocate writes to */
private final ConcurrentLinkedQueue<CommitLogSegment> availableSegments = new ConcurrentLinkedQueue<>();
/** Active segments, containing unflushed data */
private final ConcurrentLinkedQueue<CommitLogSegment> activeSegments = new ConcurrentLinkedQueue<>();
/** The segment we are currently allocating commit log records to */
protected volatile CommitLogSegment allocatingFrom = null;
private final WaitQueue hasAvailableSegments = new WaitQueue();
final String storageDirectory;
* Tracks commitlog size, in multiples of the segment size. We need to do this so we can "promise" size
* adjustments ahead of actually adding/freeing segments on disk, so that the "evict oldest segment" logic
* can see the effect of recycling segments immediately (even though they're really happening asynchronously
* on the manager thread, which will take a ms or two).
private final AtomicLong size = new AtomicLong();
* New segment creation is initially disabled because we'll typically get some "free" segments
* recycled after log replay.
volatile boolean createReserveSegments = false;
private Thread managerThread;
protected volatile boolean run = true;
protected final CommitLog commitLog;
private static final SimpleCachedBufferPool bufferPool =
new SimpleCachedBufferPool(DatabaseDescriptor.getCommitLogMaxCompressionBuffersInPool(), DatabaseDescriptor.getCommitLogSegmentSize());
AbstractCommitLogSegmentManager(final CommitLog commitLog, String storageDirectory)
this.commitLog = commitLog;
this.storageDirectory = storageDirectory;
void start()
// The run loop for the manager thread
Runnable runnable = new WrappedRunnable()
public void runMayThrow() throws Exception
while (run)
Runnable task = segmentManagementTasks.poll();
if (task == null)
// if we have no more work to do, check if we should create a new segment
if (!atSegmentLimit() &&
availableSegments.isEmpty() &&
(activeSegments.isEmpty() || createReserveSegments))
logger.trace("No segments in reserve; creating a fresh one");
// TODO : some error handling in case we fail to create a new segment
// flush old Cfs if we're full
long unused = unusedCapacity();
if (unused < 0)
List<CommitLogSegment> segmentsToRecycle = new ArrayList<>();
long spaceToReclaim = 0;
for (CommitLogSegment segment : activeSegments)
if (segment == allocatingFrom)
spaceToReclaim += DatabaseDescriptor.getCommitLogSegmentSize();
if (spaceToReclaim + unused >= 0)
flushDataFrom(segmentsToRecycle, false);
// Since we're operating on a "null" allocation task, block here for the next task on the
// queue rather than looping, grabbing another null, and repeating the above work.
task = segmentManagementTasks.take();
catch (InterruptedException e)
throw new AssertionError();
catch (Throwable t)
if (!CommitLog.handleCommitError("Failed managing commit log segments", t))
// sleep some arbitrary period to avoid spamming CL
Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
private boolean atSegmentLimit()
return CommitLogSegment.usesBufferPool(commitLog) && bufferPool.atLimit();
run = true;
managerThread = new Thread(runnable, "COMMIT-LOG-ALLOCATOR");
* Shut down the CLSM. Used both during testing and during regular shutdown, so needs to stop everything.
public abstract void shutdown();
* Allocate a segment within this CLSM. Should either succeed or throw.
public abstract Allocation allocate(Mutation mutation, int size);
* The recovery and replay process replays mutations into memtables and flushes them to disk. Individual CLSM
* decide what to do with those segments on disk after they've been replayed.
abstract void handleReplayedSegment(final File file);
* Hook to allow segment managers to track state surrounding creation of new segments. Onl perform as task submit
* to segment manager so it's performed on segment management thread.
abstract CommitLogSegment createSegment();
* Indicates that a segment file has been flushed and is no longer needed. Only perform as task submit to segment
* manager so it's performend on segment management thread, or perform while segment management thread is shutdown
* during testing resets.
* @param segment segment to be discarded
* @param delete whether or not the segment is safe to be deleted.
abstract void discard(CommitLogSegment segment, boolean delete);
* Grab the current CommitLogSegment we're allocating from. Also serves as a utility method to block while the allocator
* is working on initial allocation of a CommitLogSegment.
CommitLogSegment allocatingFrom()
CommitLogSegment r = allocatingFrom;
if (r == null)
r = allocatingFrom;
return r;
* Fetches a new segment from the queue, signaling the management thread to create a new one if necessary, and "activates" it.
* Blocks until a new segment is allocated and the thread requesting an advanceAllocatingFrom is signalled.
* WARNING: Assumes segment management thread always succeeds in allocating a new segment or kills the JVM.
protected void advanceAllocatingFrom(CommitLogSegment old)
while (true)
CommitLogSegment next;
synchronized (this)
// do this in a critical section so we can atomically remove from availableSegments and add to allocatingFrom/activeSegments
// see
if (allocatingFrom != old)
next = availableSegments.poll();
if (next != null)
allocatingFrom = next;
if (next != null)
if (old != null)
// Now we can run the user defined command just after switching to the new commit log.
// (Do this here instead of in the recycle call so we can get a head start on the archive.)
// ensure we don't continue to use the old file; not strictly necessary, but cleaner to enforce it
// request that the CL be synced out-of-band, as we've finished a segment
// no more segments, so register to receive a signal when not empty
WaitQueue.Signal signal = hasAvailableSegments.register(commitLog.metrics.waitingOnSegmentAllocation.time());
// trigger the management thread; this must occur after registering
// the signal to ensure we are woken by any new segment creation
// check if the queue has already been added to before waiting on the signal, to catch modifications
// that happened prior to registering the signal; *then* check to see if we've been beaten to making the change
if (!availableSegments.isEmpty() || allocatingFrom != old)
// if we've been beaten, just stop immediately
if (allocatingFrom != old)
// otherwise try again, as there should be an available segment
// can only reach here if the queue hasn't been inserted into
// before we registered the signal, as we only remove items from the queue
// after updating allocatingFrom. Can safely block until we are signalled
// by the allocator that new segments have been published
protected void wakeManager()
// put a NO-OP on the queue, to trigger management thread (and create a new segment if necessary)
* Switch to a new segment, regardless of how much is left in the current one.
* Flushes any dirty CFs for this segment and any older segments, and then recycles
* the segments
void forceRecycleAll(Iterable<UUID> droppedCfs)
List<CommitLogSegment> segmentsToRecycle = new ArrayList<>(activeSegments);
CommitLogSegment last = segmentsToRecycle.get(segmentsToRecycle.size() - 1);
// wait for the commit log modifications
// make sure the writes have materialized inside of the memtables by waiting for all outstanding writes
// on the relevant keyspaces to complete
// flush and wait for all CFs that are dirty in segments up-to and including 'last'
Future<?> future = flushDataFrom(segmentsToRecycle, true);
for (CommitLogSegment segment : activeSegments)
for (UUID cfId : droppedCfs)
segment.markClean(cfId, CommitLogPosition.NONE, segment.getCurrentCommitLogPosition());
// now recycle segments that are unused, as we may not have triggered a discardCompletedSegments()
// if the previous active segment was the only one to recycle (since an active segment isn't
// necessarily dirty, and we only call dCS after a flush).
for (CommitLogSegment segment : activeSegments)
if (segment.isUnused())
CommitLogSegment first;
if ((first = activeSegments.peek()) != null && <=
logger.error("Failed to force-recycle all segments; at least one segment is still in use with dirty CFs.");
catch (Throwable t)
// for now just log the error
logger.error("Failed waiting for a forced recycle of in-use commit log segments", t);
* Indicates that a segment is no longer in use and that it should be recycled.
* @param segment segment that is no longer in use
void recycleSegment(final CommitLogSegment segment)
boolean archiveSuccess = commitLog.archiver.maybeWaitForArchiving(segment.getName());
if (activeSegments.remove(segment))
// if archiving (command) was not successful then leave the file alone. don't delete or recycle.
discardSegment(segment, archiveSuccess);
logger.warn("segment {} not found in activeSegments queue", segment);
* Indicates that a segment file should be deleted.
* @param segment segment to be discarded
private void discardSegment(final CommitLogSegment segment, final boolean deleteFile)
logger.trace("Segment {} is no longer active and will be deleted {}", segment, deleteFile ? "now" : "by the archive script");
segmentManagementTasks.add(() -> discard(segment, deleteFile));
* Adjust the tracked on-disk size. Called by individual segments to reflect writes, allocations and discards.
* @param addedSize
void addSize(long addedSize)
* @return the space (in bytes) used by all segment files.
public long onDiskSize()
return size.get();
private long unusedCapacity()
long total = DatabaseDescriptor.getTotalCommitlogSpaceInMB() * 1024 * 1024;
long currentSize = size.get();
logger.trace("Total active commitlog segment space used is {} out of {}", currentSize, total);
return total - currentSize;
* @param name the filename to check
* @return true if file is managed by this manager.
public boolean manages(String name)
for (CommitLogSegment segment : Iterables.concat(activeSegments, availableSegments))
if (segment.getName().equals(name))
return true;
return false;
* Throws a flag that enables the behavior of keeping at least one spare segment
* available at all times.
void enableReserveSegmentCreation()
createReserveSegments = true;
* Force a flush on all CFs that are still dirty in @param segments.
* @return a Future that will finish when all the flushes are complete.
private Future<?> flushDataFrom(List<CommitLogSegment> segments, boolean force)
if (segments.isEmpty())
return Futures.immediateFuture(null);
final CommitLogPosition maxCommitLogPosition = segments.get(segments.size() - 1).getCurrentCommitLogPosition();
// a map of CfId -> forceFlush() to ensure we only queue one flush per cf
final Map<UUID, ListenableFuture<?>> flushes = new LinkedHashMap<>();
for (CommitLogSegment segment : segments)
for (UUID dirtyCFId : segment.getDirtyCFIDs())
Pair<String,String> pair = Schema.instance.getCF(dirtyCFId);
if (pair == null)
// even though we remove the schema entry before a final flush when dropping a CF,
// it's still possible for a writer to race and finish his append after the flush.
logger.trace("Marking clean CF {} that doesn't exist anymore", dirtyCFId);
segment.markClean(dirtyCFId, CommitLogPosition.NONE, segment.getCurrentCommitLogPosition());
else if (!flushes.containsKey(dirtyCFId))
String keyspace = pair.left;
final ColumnFamilyStore cfs =;
// can safely call forceFlush here as we will only ever block (briefly) for other attempts to flush,
// no deadlock possibility since switchLock removal
flushes.put(dirtyCFId, force ? cfs.forceFlush() : cfs.forceFlush(maxCommitLogPosition));
return Futures.allAsList(flushes.values());
* Stops CL, for testing purposes. DO NOT USE THIS OUTSIDE OF TESTS.
* Only call this after the AbstractCommitLogService is shut down.
public void stopUnsafe(boolean deleteSegments)
logger.trace("CLSM closing and clearing existing commit log segments...");
createReserveSegments = false;
catch (InterruptedException e)
throw new RuntimeException(e);
synchronized (this)
for (CommitLogSegment segment : activeSegments)
closeAndDeleteSegmentUnsafe(segment, deleteSegments);
for (CommitLogSegment segment : availableSegments)
closeAndDeleteSegmentUnsafe(segment, deleteSegments);
allocatingFrom = null;
logger.trace("CLSM done with closing and clearing existing commit log segments.");
// Used by tests only.
void awaitManagementTasksCompletion()
while (!segmentManagementTasks.isEmpty())
// The last management task is not yet complete. Wait a while for it.
Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
// TODO: If this functionality is required by anything other than tests, signalling must be used to ensure
// waiting completes correctly.
* Explicitly for use only during resets in unit testing.
private void closeAndDeleteSegmentUnsafe(CommitLogSegment segment, boolean delete)
discard(segment, delete);
catch (AssertionError ignored)
// segment file does not exist
* Returns when the management thread terminates.
public void awaitTermination() throws InterruptedException
for (CommitLogSegment segment : activeSegments)
for (CommitLogSegment segment : availableSegments)
* @return a read-only collection of the active commit log segments
public Collection<CommitLogSegment> getActiveSegments()
return Collections.unmodifiableCollection(activeSegments);
* @return the current CommitLogPosition of the active segment we're allocating from
CommitLogPosition getCurrentPosition()
return allocatingFrom().getCurrentCommitLogPosition();
* Forces a disk flush on the commit log files that need it. Blocking.
public void sync(boolean syncAllSegments) throws IOException
CommitLogSegment current = allocatingFrom();
for (CommitLogSegment segment : getActiveSegments())
if (!syncAllSegments && >
* Used by compressed and encrypted segments to share a buffer pool across the CLSM.
SimpleCachedBufferPool getBufferPool()
return bufferPool;