blob: a91384f762417ece19ef3531104f66a3a37521b0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.db.commitlog;
import java.io.File;
import java.io.IOException;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.concurrent.*;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.RateLimiter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.commitlog.CommitLogSegment.CDCState;
import org.apache.cassandra.exceptions.WriteTimeoutException;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.utils.DirectorySizeCalculator;
import org.apache.cassandra.utils.NoSpamLogger;
public class CommitLogSegmentManagerCDC extends AbstractCommitLogSegmentManager
{
static final Logger logger = LoggerFactory.getLogger(CommitLogSegmentManagerCDC.class);
private final CDCSizeTracker cdcSizeTracker;
public CommitLogSegmentManagerCDC(final CommitLog commitLog, String storageDirectory)
{
super(commitLog, storageDirectory);
cdcSizeTracker = new CDCSizeTracker(this, new File(DatabaseDescriptor.getCDCLogLocation()));
}
@Override
void start()
{
cdcSizeTracker.start();
super.start();
}
public void discard(CommitLogSegment segment, boolean delete)
{
segment.close();
addSize(-segment.onDiskSize());
cdcSizeTracker.processDiscardedSegment(segment);
if (segment.getCDCState() == CDCState.CONTAINS)
FileUtils.renameWithConfirm(segment.logFile.getAbsolutePath(), DatabaseDescriptor.getCDCLogLocation() + File.separator + segment.logFile.getName());
else
{
if (delete)
FileUtils.deleteWithConfirm(segment.logFile);
}
}
/**
* Initiates the shutdown process for the management thread. Also stops the cdc on-disk size calculator executor.
*/
public void shutdown()
{
cdcSizeTracker.shutdown();
super.shutdown();
}
/**
* Reserve space in the current segment for the provided mutation or, if there isn't space available,
* create a new segment. For CDC mutations, allocation is expected to throw WTE if the segment disallows CDC mutations.
*
* @param mutation Mutation to allocate in segment manager
* @param size total size (overhead + serialized) of mutation
* @return the created Allocation object
* @throws WriteTimeoutException If segment disallows CDC mutations, we throw WTE
*/
@Override
public CommitLogSegment.Allocation allocate(Mutation mutation, int size) throws WriteTimeoutException
{
CommitLogSegment segment = allocatingFrom();
CommitLogSegment.Allocation alloc;
throwIfForbidden(mutation, segment);
while ( null == (alloc = segment.allocate(mutation, size)) )
{
// Failed to allocate, so move to a new segment with enough room if possible.
advanceAllocatingFrom(segment);
segment = allocatingFrom();
throwIfForbidden(mutation, segment);
}
if (mutation.trackedByCDC())
segment.setCDCState(CDCState.CONTAINS);
return alloc;
}
private void throwIfForbidden(Mutation mutation, CommitLogSegment segment) throws WriteTimeoutException
{
if (mutation.trackedByCDC() && segment.getCDCState() == CDCState.FORBIDDEN)
{
cdcSizeTracker.submitOverflowSizeRecalculation();
NoSpamLogger.log(logger,
NoSpamLogger.Level.WARN,
10,
TimeUnit.SECONDS,
"Rejecting Mutation containing CDC-enabled table. Free up space in {}.",
DatabaseDescriptor.getCDCLogLocation());
throw new WriteTimeoutException(WriteType.CDC, ConsistencyLevel.LOCAL_ONE, 0, 1);
}
}
/**
* Move files to cdc_raw after replay, since recovery will flush to SSTable and these mutations won't be available
* in the CL subsystem otherwise.
*/
void handleReplayedSegment(final File file)
{
logger.trace("Moving (Unopened) segment {} to cdc_raw directory after replay", file);
FileUtils.renameWithConfirm(file.getAbsolutePath(), DatabaseDescriptor.getCDCLogLocation() + File.separator + file.getName());
cdcSizeTracker.addFlushedSize(file.length());
}
/**
* On segment creation, flag whether the segment should accept CDC mutations or not based on the total currently
* allocated unflushed CDC segments and the contents of cdc_raw
*/
public CommitLogSegment createSegment()
{
CommitLogSegment segment = CommitLogSegment.createSegment(commitLog, this);
cdcSizeTracker.processNewSegment(segment);
return segment;
}
/**
* Tracks total disk usage of CDC subsystem, defined by the summation of all unflushed CommitLogSegments with CDC
* data in them and all segments archived into cdc_raw.
*
* Allows atomic increment/decrement of unflushed size, however only allows increment on flushed and requires a full
* directory walk to determine any potential deletions by CDC consumer.
*/
private static class CDCSizeTracker extends DirectorySizeCalculator
{
private final RateLimiter rateLimiter = RateLimiter.create(1000.0 / DatabaseDescriptor.getCDCDiskCheckInterval());
private ExecutorService cdcSizeCalculationExecutor;
private CommitLogSegmentManagerCDC segmentManager;
private volatile long unflushedCDCSize;
// Used instead of size during walk to remove chance of over-allocation
private volatile long sizeInProgress = 0;
CDCSizeTracker(CommitLogSegmentManagerCDC segmentManager, File path)
{
super(path);
this.segmentManager = segmentManager;
}
/**
* Needed for stop/restart during unit tests
*/
public void start()
{
size = 0;
unflushedCDCSize = 0;
cdcSizeCalculationExecutor = new ThreadPoolExecutor(1, 1, 1000, TimeUnit.SECONDS, new SynchronousQueue<>(), new ThreadPoolExecutor.DiscardPolicy());
}
/**
* Synchronous size recalculation on each segment creation/deletion call could lead to very long delays in new
* segment allocation, thus long delays in thread signaling to wake waiting allocation / writer threads.
*
* This can be reached either from the segment management thread in ABstractCommitLogSegmentManager or from the
* size recalculation executor, so we synchronize on this object to reduce the race overlap window available for
* size to get off.
*
* Reference DirectorySizerBench for more information about performance of the directory size recalc.
*/
void processNewSegment(CommitLogSegment segment)
{
// See synchronization in CommitLogSegment.setCDCState
synchronized(segment.cdcStateLock)
{
segment.setCDCState(defaultSegmentSize() + totalCDCSizeOnDisk() > allowableCDCBytes()
? CDCState.FORBIDDEN
: CDCState.PERMITTED);
if (segment.getCDCState() == CDCState.PERMITTED)
unflushedCDCSize += defaultSegmentSize();
}
// Take this opportunity to kick off a recalc to pick up any consumer file deletion.
submitOverflowSizeRecalculation();
}
void processDiscardedSegment(CommitLogSegment segment)
{
// See synchronization in CommitLogSegment.setCDCState
synchronized(segment.cdcStateLock)
{
// Add to flushed size before decrementing unflushed so we don't have a window of false generosity
if (segment.getCDCState() == CDCState.CONTAINS)
size += segment.onDiskSize();
if (segment.getCDCState() != CDCState.FORBIDDEN)
unflushedCDCSize -= defaultSegmentSize();
}
// Take this opportunity to kick off a recalc to pick up any consumer file deletion.
submitOverflowSizeRecalculation();
}
private long allowableCDCBytes()
{
return (long)DatabaseDescriptor.getCDCSpaceInMB() * 1024 * 1024;
}
public void submitOverflowSizeRecalculation()
{
try
{
cdcSizeCalculationExecutor.submit(() -> recalculateOverflowSize());
}
catch (RejectedExecutionException e)
{
// Do nothing. Means we have one in flight so this req. should be satisfied when it completes.
}
}
private void recalculateOverflowSize()
{
rateLimiter.acquire();
calculateSize();
CommitLogSegment allocatingFrom = segmentManager.allocatingFrom();
if (allocatingFrom.getCDCState() == CDCState.FORBIDDEN)
processNewSegment(allocatingFrom);
}
private int defaultSegmentSize()
{
return DatabaseDescriptor.getCommitLogSegmentSize();
}
private void calculateSize()
{
try
{
// The Arrays.stream approach is considerably slower on Windows than linux
sizeInProgress = 0;
Files.walkFileTree(path.toPath(), this);
size = sizeInProgress;
}
catch (IOException ie)
{
CommitLog.instance.handleCommitError("Failed CDC Size Calculation", ie);
}
}
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException
{
sizeInProgress += attrs.size();
return FileVisitResult.CONTINUE;
}
private void addFlushedSize(long toAdd)
{
size += toAdd;
}
private long totalCDCSizeOnDisk()
{
return unflushedCDCSize + size;
}
public void shutdown()
{
cdcSizeCalculationExecutor.shutdown();
}
}
/**
* Only use for testing / validation that size tracker is working. Not for production use.
*/
@VisibleForTesting
public long updateCDCTotalSize()
{
cdcSizeTracker.submitOverflowSizeRecalculation();
// Give the update time to run
try
{
Thread.sleep(DatabaseDescriptor.getCDCDiskCheckInterval() + 10);
}
catch (InterruptedException e) {}
return cdcSizeTracker.totalCDCSizeOnDisk();
}
}