blob: b2e3d06572fee8b64c2ad464aa7be0cf37779ae8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.db.commitlog;
import java.io.IOException;
import java.nio.file.Files;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.RateLimiter;
import com.google.common.util.concurrent.Uninterruptibles;
import org.apache.cassandra.io.util.File;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.commitlog.CommitLogSegment.CDCState;
import org.apache.cassandra.exceptions.CDCWriteException;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.utils.DirectorySizeCalculator;
import org.apache.cassandra.utils.NoSpamLogger;
import static org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory;
public class CommitLogSegmentManagerCDC extends AbstractCommitLogSegmentManager
{
static final Logger logger = LoggerFactory.getLogger(CommitLogSegmentManagerCDC.class);
private final CDCSizeTracker cdcSizeTracker;
public CommitLogSegmentManagerCDC(final CommitLog commitLog, String storageDirectory)
{
super(commitLog, storageDirectory);
cdcSizeTracker = new CDCSizeTracker(this, new File(DatabaseDescriptor.getCDCLogLocation()));
}
@Override
void start()
{
cdcSizeTracker.start();
super.start();
}
public void discard(CommitLogSegment segment, boolean delete)
{
segment.close();
addSize(-segment.onDiskSize());
cdcSizeTracker.processDiscardedSegment(segment);
if (delete)
segment.logFile.delete();
if (segment.getCDCState() != CDCState.CONTAINS)
{
// Always delete hard-link from cdc folder if this segment didn't contain CDC data. Note: File may not exist
// if processing discard during startup.
File cdcLink = segment.getCDCFile();
File cdcIndexFile = segment.getCDCIndexFile();
deleteCDCFiles(cdcLink, cdcIndexFile);
}
}
/**
* Delete the oldest hard-linked CDC commit log segment to free up space.
* @param bytesToFree, the minimum space to free up
* @return total size under the CDC folder in bytes after deletion
*/
public long deleteOldLinkedCDCCommitLogSegment(long bytesToFree)
{
if (bytesToFree <= 0)
return 0;
File cdcDir = new File(DatabaseDescriptor.getCDCLogLocation());
Preconditions.checkState(cdcDir.isDirectory(), "The CDC directory does not exist.");
File[] files = cdcDir.tryList(f -> CommitLogDescriptor.isValid(f.name()));
if (files == null || files.length == 0)
{
logger.warn("Skip deleting due to no CDC commit log segments found.");
return 0;
}
List<File> sorted = Arrays.stream(files)
// sort by the commmit log segment id
.sorted(new CommitLogSegment.CommitLogSegmentFileComparator())
.collect(Collectors.toList());
long bytesDeleted = 0;
long bytesRemaining = 0;
boolean deletionCompleted = false;
// keep deleting from old to new until it reaches to the goal or the current writing segment
for (File linkedCdcFile : sorted)
{
// only evaluate/update when deletionCompleted is false
if (!deletionCompleted)
{
deletionCompleted = bytesDeleted >= bytesToFree || linkedCdcFile.equals(allocatingFrom().getCDCFile());
}
if (deletionCompleted)
{
bytesRemaining += linkedCdcFile.length();
}
else
{
File cdcIndexFile = CommitLogDescriptor.inferCdcIndexFile(linkedCdcFile);
bytesDeleted += deleteCDCFiles(linkedCdcFile, cdcIndexFile);
}
}
return bytesRemaining;
}
private long deleteCDCFiles(File cdcLink, File cdcIndexFile)
{
long total = 0;
if (cdcLink != null && cdcLink.exists())
{
total += cdcLink.length();
cdcLink.delete();
}
if (cdcIndexFile != null && cdcIndexFile.exists())
{
total += cdcIndexFile.length();
cdcIndexFile.delete();
}
return total;
}
/**
* Initiates the shutdown process for the management thread. Also stops the cdc on-disk size calculator executor.
*/
public void shutdown()
{
cdcSizeTracker.shutdown();
super.shutdown();
}
/**
* Reserve space in the current segment for the provided mutation or, if there isn't space available,
* create a new segment. For CDC mutations, allocation is expected to throw WTE if the segment disallows CDC mutations.
*
* @param mutation Mutation to allocate in segment manager
* @param size total size (overhead + serialized) of mutation
* @return the created Allocation object
* @throws CDCWriteException If segment disallows CDC mutations, we throw
*/
@Override
public CommitLogSegment.Allocation allocate(Mutation mutation, int size) throws CDCWriteException
{
CommitLogSegment segment = allocatingFrom();
CommitLogSegment.Allocation alloc;
permitSegmentMaybe(segment);
throwIfForbidden(mutation, segment);
while ( null == (alloc = segment.allocate(mutation, size)) )
{
// Failed to allocate, so move to a new segment with enough room if possible.
advanceAllocatingFrom(segment);
segment = allocatingFrom();
permitSegmentMaybe(segment);
throwIfForbidden(mutation, segment);
}
if (mutation.trackedByCDC())
segment.setCDCState(CDCState.CONTAINS);
return alloc;
}
// Permit a forbidden segment under the following conditions.
// - Non-blocking mode has just recently been enabled for CDC.
// - The CDC total space has droppped below the limit (e.g. CDC consumer cleans up).
private void permitSegmentMaybe(CommitLogSegment segment)
{
if (segment.getCDCState() != CDCState.FORBIDDEN)
return;
if (!DatabaseDescriptor.getCDCBlockWrites()
|| cdcSizeTracker.sizeInProgress.get() + DatabaseDescriptor.getCommitLogSegmentSize() < DatabaseDescriptor.getCDCTotalSpace())
{
CDCState oldState = segment.setCDCState(CDCState.PERMITTED);
if (oldState == CDCState.FORBIDDEN)
{
FileUtils.createHardLink(segment.logFile, segment.getCDCFile());
cdcSizeTracker.addSize(DatabaseDescriptor.getCommitLogSegmentSize());
}
}
}
private void throwIfForbidden(Mutation mutation, CommitLogSegment segment) throws CDCWriteException
{
if (mutation.trackedByCDC() && segment.getCDCState() == CDCState.FORBIDDEN)
{
cdcSizeTracker.submitOverflowSizeRecalculation();
String logMsg = String.format("Rejecting mutation to keyspace %s. Free up space in %s by processing CDC logs. " +
"Total CDC bytes on disk is %s.",
mutation.getKeyspaceName(), DatabaseDescriptor.getCDCLogLocation(),
cdcSizeTracker.sizeInProgress.get());
NoSpamLogger.log(logger,
NoSpamLogger.Level.WARN,
10,
TimeUnit.SECONDS,
logMsg);
throw new CDCWriteException(logMsg);
}
}
/**
* On segment creation, flag whether the segment should accept CDC mutations or not based on the total currently
* allocated unflushed CDC segments and the contents of cdc_raw
*
* Synchronized on this
*/
@Override
public CommitLogSegment createSegment()
{
CommitLogSegment segment = CommitLogSegment.createSegment(commitLog, this);
cdcSizeTracker.processNewSegment(segment);
// After processing, the state of the segment can either be PERMITTED or FORBIDDEN
if (segment.getCDCState() == CDCState.PERMITTED)
{
// Hard link file in cdc folder for realtime tracking
FileUtils.createHardLink(segment.logFile, segment.getCDCFile());
}
return segment;
}
/**
* Delete untracked segment files after replay
*
* @param file segment file that is no longer in use.
*/
@Override
void handleReplayedSegment(final File file)
{
super.handleReplayedSegment(file);
// delete untracked cdc segment hard link files if their index files do not exist
File cdcFile = new File(DatabaseDescriptor.getCDCLogLocation(), file.name());
File cdcIndexFile = new File(DatabaseDescriptor.getCDCLogLocation(), CommitLogDescriptor.fromFileName(file.name()).cdcIndexFileName());
if (cdcFile.exists() && !cdcIndexFile.exists())
{
logger.trace("(Unopened) CDC segment {} is no longer needed and will be deleted now", cdcFile);
cdcFile.delete();
}
}
/**
* For use after replay when replayer hard-links / adds tracking of replayed segments
*/
public void addCDCSize(long size)
{
cdcSizeTracker.addSize(size);
}
/**
* Tracks total disk usage of CDC subsystem, defined by the summation of all unflushed CommitLogSegments with CDC
* data in them and all segments archived into cdc_raw.
*
* Allows atomic increment/decrement of unflushed size, however only allows increment on flushed and requires a full
* directory walk to determine any potential deletions by CDC consumer.
*/
private static class CDCSizeTracker extends DirectorySizeCalculator
{
private final RateLimiter rateLimiter = RateLimiter.create(1000.0 / DatabaseDescriptor.getCDCDiskCheckInterval());
private ExecutorService cdcSizeCalculationExecutor;
private final CommitLogSegmentManagerCDC segmentManager;
// track the total size between two dictionary size calculations
private final AtomicLong sizeInProgress;
private final File path;
CDCSizeTracker(CommitLogSegmentManagerCDC segmentManager, File path)
{
this.path = path;
this.segmentManager = segmentManager;
this.sizeInProgress = new AtomicLong(0);
}
/**
* Needed for stop/restart during unit tests
*/
public void start()
{
sizeInProgress.getAndSet(0);
cdcSizeCalculationExecutor = executorFactory().configureSequential("CDCSizeCalculationExecutor")
.withRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy())
.withQueueLimit(0)
.withKeepAlive(1000, TimeUnit.SECONDS)
.build();
}
/**
* Synchronous size recalculation on each segment creation/deletion call could lead to very long delays in new
* segment allocation, thus long delays in thread signaling to wake waiting allocation / writer threads.
*
* This can be reached either from the segment management thread in AbstractCommitLogSegmentManager or from the
* size recalculation executor, so we synchronize on this object to reduce the race overlap window available for
* size to get off.
*
* Reference DirectorySizerBench for more information about performance of the directory size recalc.
*/
void processNewSegment(CommitLogSegment segment)
{
int segmentSize = defaultSegmentSize();
long allowance = DatabaseDescriptor.getCDCTotalSpace();
boolean blocking = DatabaseDescriptor.getCDCBlockWrites();
// See synchronization in CommitLogSegment.setCDCState
synchronized (segment.cdcStateLock)
{
segment.setCDCState(blocking && segmentSize + sizeInProgress.get() > allowance
? CDCState.FORBIDDEN
: CDCState.PERMITTED);
// Aggressively count in the (estimated) size of new segments.
if (segment.getCDCState() == CDCState.PERMITTED)
addSize(segmentSize);
}
// Remove the oldest cdc segment file when exceeding the CDC storage allowance
if (!blocking && sizeInProgress.get() > allowance)
{
long bytesToFree = sizeInProgress.get() - allowance;
long remainingSize = segmentManager.deleteOldLinkedCDCCommitLogSegment(bytesToFree);
long releasedSize = sizeInProgress.get() - remainingSize;
sizeInProgress.getAndSet(remainingSize);
logger.debug("Freed up {} ({}) bytes after deleting the oldest CDC commit log segments in non-blocking mode. " +
"Total on-disk CDC size: {}; allowed CDC size: {}",
releasedSize, bytesToFree, remainingSize, allowance);
}
// Take this opportunity to kick off a recalc to pick up any consumer file deletion.
submitOverflowSizeRecalculation();
}
void processDiscardedSegment(CommitLogSegment segment)
{
if (!segment.getCDCFile().exists())
{
logger.debug("Not processing discarded CommitLogSegment {}; this segment appears to have been deleted already.", segment);
return;
}
synchronized (segment.cdcStateLock)
{
// Add to flushed size before decrementing unflushed, so we don't have a window of false generosity
if (segment.getCDCState() == CDCState.CONTAINS)
addSize(segment.onDiskSize());
// Subtract the (estimated) size of the segment from processNewSegment.
// For the segement that CONTAINS, we update with adding the actual onDiskSize and removing the estimated size.
// For the segment that remains in PERMITTED, the file is to be deleted and the estimate should be returned.
if (segment.getCDCState() != CDCState.FORBIDDEN)
addSize(-defaultSegmentSize());
}
// Take this opportunity to kick off a recalc to pick up any consumer file deletion.
submitOverflowSizeRecalculation();
}
public void submitOverflowSizeRecalculation()
{
try
{
cdcSizeCalculationExecutor.submit(() -> {
rateLimiter.acquire();
calculateSize();
});
}
catch (RejectedExecutionException e)
{
// Do nothing. Means we have one in flight so this req. should be satisfied when it completes.
}
}
private int defaultSegmentSize()
{
// CommitLogSegmentSize is only loaded from yaml.
// There is a setter but is used only for testing.
return DatabaseDescriptor.getCommitLogSegmentSize();
}
private void calculateSize()
{
try
{
resetSize();
Files.walkFileTree(path.toPath(), this);
sizeInProgress.getAndSet(getAllocatedSize());
}
catch (IOException ie)
{
CommitLog.handleCommitError("Failed CDC Size Calculation", ie);
}
}
public void shutdown()
{
if (cdcSizeCalculationExecutor != null && !cdcSizeCalculationExecutor.isShutdown())
{
cdcSizeCalculationExecutor.shutdown();
}
}
private void addSize(long toAdd)
{
sizeInProgress.getAndAdd(toAdd);
}
}
/**
* Only use for testing / validation that size tracker is working. Not for production use.
*/
@VisibleForTesting
public long updateCDCTotalSize()
{
long sleepTime = DatabaseDescriptor.getCDCDiskCheckInterval() + 50L;
// Give the update time to finish the last run if any. Therefore, avoid modifying production code only for testing purpose.
Uninterruptibles.sleepUninterruptibly(sleepTime, TimeUnit.MILLISECONDS);
cdcSizeTracker.submitOverflowSizeRecalculation();
// Give the update time to run
Uninterruptibles.sleepUninterruptibly(sleepTime, TimeUnit.MILLISECONDS);
// then update the state of the segment it is allocating from. In produciton, the state is updated during "allocate"
if (allocatingFrom().getCDCState() == CDCState.FORBIDDEN)
cdcSizeTracker.processNewSegment(allocatingFrom());
return cdcSizeTracker.getAllocatedSize();
}
}