| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.cassandra.db.compaction; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.util.*; |
| import java.util.concurrent.*; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.function.Predicate; |
| import java.util.stream.Collectors; |
| import javax.management.openmbean.OpenDataException; |
| import javax.management.openmbean.TabularData; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.collect.*; |
| import com.google.common.util.concurrent.*; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import io.netty.util.concurrent.FastThreadLocal; |
| import org.apache.cassandra.cache.AutoSavingCache; |
| import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor; |
| import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor; |
| import org.apache.cassandra.concurrent.NamedThreadFactory; |
| import org.apache.cassandra.config.CFMetaData; |
| import org.apache.cassandra.config.DatabaseDescriptor; |
| import org.apache.cassandra.config.Schema; |
| import org.apache.cassandra.db.*; |
| import org.apache.cassandra.db.compaction.CompactionInfo.Holder; |
| import org.apache.cassandra.db.lifecycle.ILifecycleTransaction; |
| import org.apache.cassandra.db.lifecycle.LifecycleTransaction; |
| import org.apache.cassandra.db.lifecycle.SSTableIntervalTree; |
| import org.apache.cassandra.db.lifecycle.SSTableSet; |
| import org.apache.cassandra.db.lifecycle.View; |
| import org.apache.cassandra.db.lifecycle.WrappedLifecycleTransaction; |
| import org.apache.cassandra.db.rows.UnfilteredRowIterator; |
| import org.apache.cassandra.db.view.ViewBuilder; |
| import org.apache.cassandra.dht.Bounds; |
| import org.apache.cassandra.dht.Range; |
| import org.apache.cassandra.dht.Token; |
| import org.apache.cassandra.index.SecondaryIndexBuilder; |
| import org.apache.cassandra.io.sstable.Descriptor; |
| import org.apache.cassandra.io.sstable.ISSTableScanner; |
| import org.apache.cassandra.io.sstable.IndexSummaryRedistribution; |
| import org.apache.cassandra.io.sstable.SSTableRewriter; |
| import org.apache.cassandra.io.sstable.SnapshotDeletingTask; |
| import org.apache.cassandra.io.sstable.format.SSTableReader; |
| import org.apache.cassandra.io.sstable.format.SSTableWriter; |
| import org.apache.cassandra.io.sstable.metadata.MetadataCollector; |
| import org.apache.cassandra.io.util.FileUtils; |
| import org.apache.cassandra.metrics.CompactionMetrics; |
| import org.apache.cassandra.repair.Validator; |
| import org.apache.cassandra.schema.CompactionParams.TombstoneOption; |
| import org.apache.cassandra.service.ActiveRepairService; |
| import org.apache.cassandra.service.StorageService; |
| import org.apache.cassandra.utils.*; |
| import org.apache.cassandra.utils.concurrent.Refs; |
| |
| import static java.util.Collections.singleton; |
| |
| /** |
| * <p> |
| * A singleton which manages a private executor of ongoing compactions. |
| * </p> |
| * Scheduling for compaction is accomplished by swapping sstables to be compacted into |
| * a set via Tracker. New scheduling attempts will ignore currently compacting |
| * sstables. |
| */ |
| public class CompactionManager implements CompactionManagerMBean |
| { |
| public static final String MBEAN_OBJECT_NAME = "org.apache.cassandra.db:type=CompactionManager"; |
| private static final Logger logger = LoggerFactory.getLogger(CompactionManager.class); |
| public static final CompactionManager instance; |
| |
| public static final int NO_GC = Integer.MIN_VALUE; |
| public static final int GC_ALL = Integer.MAX_VALUE; |
| |
| // A thread local that tells us if the current thread is owned by the compaction manager. Used |
| // by CounterContext to figure out if it should log a warning for invalid counter shards. |
| public static final FastThreadLocal<Boolean> isCompactionManager = new FastThreadLocal<Boolean>() |
| { |
| @Override |
| protected Boolean initialValue() |
| { |
| return false; |
| } |
| }; |
| |
| static |
| { |
| instance = new CompactionManager(); |
| |
| MBeanWrapper.instance.registerMBean(instance, MBEAN_OBJECT_NAME); |
| } |
| |
| private final CompactionExecutor executor = new CompactionExecutor(); |
| private final CompactionExecutor validationExecutor = new ValidationExecutor(); |
| private final CompactionExecutor cacheCleanupExecutor = new CacheCleanupExecutor(); |
| |
| private final CompactionMetrics metrics = new CompactionMetrics(executor, validationExecutor); |
| @VisibleForTesting |
| final Multiset<ColumnFamilyStore> compactingCF = ConcurrentHashMultiset.create(); |
| |
| // used to temporarily pause non-strategy managed compactions (like index summary redistribution) |
| private final AtomicInteger globalCompactionPauseCount = new AtomicInteger(0); |
| |
| private final RateLimiter compactionRateLimiter = RateLimiter.create(Double.MAX_VALUE); |
| |
| /** |
| * Gets compaction rate limiter. |
| * Rate unit is bytes per sec. |
| * |
| * @return RateLimiter with rate limit set |
| */ |
| public RateLimiter getRateLimiter() |
| { |
| setRate(DatabaseDescriptor.getCompactionThroughputMbPerSec()); |
| return compactionRateLimiter; |
| } |
| |
| /** |
| * Sets the rate for the rate limiter. When compaction_throughput_mb_per_sec is 0 or node is bootstrapping, |
| * this sets the rate to Double.MAX_VALUE bytes per second. |
| * @param throughPutMbPerSec throughput to set in mb per second |
| */ |
| public void setRate(final double throughPutMbPerSec) |
| { |
| double throughput = throughPutMbPerSec * 1024.0 * 1024.0; |
| // if throughput is set to 0, throttling is disabled |
| if (throughput == 0 || StorageService.instance.isBootstrapMode()) |
| throughput = Double.MAX_VALUE; |
| if (compactionRateLimiter.getRate() != throughput) |
| compactionRateLimiter.setRate(throughput); |
| } |
| |
| /** |
| * Call this whenever a compaction might be needed on the given columnfamily. |
| * It's okay to over-call (within reason) if a call is unnecessary, it will |
| * turn into a no-op in the bucketing/candidate-scan phase. |
| */ |
| public List<Future<?>> submitBackground(final ColumnFamilyStore cfs) |
| { |
| if (cfs.isAutoCompactionDisabled()) |
| { |
| logger.trace("Autocompaction is disabled"); |
| return Collections.emptyList(); |
| } |
| |
| /** |
| * If a CF is currently being compacted, and there are no idle threads, submitBackground should be a no-op; |
| * we can wait for the current compaction to finish and re-submit when more information is available. |
| * Otherwise, we should submit at least one task to prevent starvation by busier CFs, and more if there |
| * are idle threads stil. (CASSANDRA-4310) |
| */ |
| int count = compactingCF.count(cfs); |
| if (count > 0 && executor.getActiveCount() >= executor.getMaximumPoolSize()) |
| { |
| logger.trace("Background compaction is still running for {}.{} ({} remaining). Skipping", |
| cfs.keyspace.getName(), cfs.name, count); |
| return Collections.emptyList(); |
| } |
| |
| logger.trace("Scheduling a background task check for {}.{} with {}", |
| cfs.keyspace.getName(), |
| cfs.name, |
| cfs.getCompactionStrategyManager().getName()); |
| |
| List<Future<?>> futures = new ArrayList<>(1); |
| Future<?> fut = executor.submitIfRunning(new BackgroundCompactionCandidate(cfs), "background task"); |
| if (!fut.isCancelled()) |
| futures.add(fut); |
| else |
| compactingCF.remove(cfs); |
| return futures; |
| } |
| |
| public boolean isCompacting(Iterable<ColumnFamilyStore> cfses) |
| { |
| for (ColumnFamilyStore cfs : cfses) |
| if (!cfs.getTracker().getCompacting().isEmpty()) |
| return true; |
| return false; |
| } |
| |
| /** |
| * Shutdowns both compaction and validation executors, cancels running compaction / validation, |
| * and waits for tasks to complete if tasks were not cancelable. |
| */ |
| public void forceShutdown() |
| { |
| // shutdown executors to prevent further submission |
| executor.shutdown(); |
| validationExecutor.shutdown(); |
| cacheCleanupExecutor.shutdown(); |
| |
| // interrupt compactions and validations |
| for (Holder compactionHolder : CompactionMetrics.getCompactions()) |
| { |
| compactionHolder.stop(); |
| } |
| |
| // wait for tasks to terminate |
| // compaction tasks are interrupted above, so it shuold be fairy quick |
| // until not interrupted tasks to complete. |
| for (ExecutorService exec : Arrays.asList(executor, validationExecutor, cacheCleanupExecutor)) |
| { |
| try |
| { |
| if (!exec.awaitTermination(1, TimeUnit.MINUTES)) |
| logger.warn("Failed to wait for compaction executors shutdown"); |
| } |
| catch (InterruptedException e) |
| { |
| logger.error("Interrupted while waiting for tasks to be terminated", e); |
| } |
| } |
| } |
| |
| public void finishCompactionsAndShutdown(long timeout, TimeUnit unit) throws InterruptedException |
| { |
| executor.shutdown(); |
| executor.awaitTermination(timeout, unit); |
| } |
| |
| // the actual sstables to compact are not determined until we run the BCT; that way, if new sstables |
| // are created between task submission and execution, we execute against the most up-to-date information |
| class BackgroundCompactionCandidate implements Runnable |
| { |
| private final ColumnFamilyStore cfs; |
| |
| BackgroundCompactionCandidate(ColumnFamilyStore cfs) |
| { |
| compactingCF.add(cfs); |
| this.cfs = cfs; |
| } |
| |
| public void run() |
| { |
| try |
| { |
| logger.trace("Checking {}.{}", cfs.keyspace.getName(), cfs.name); |
| if (!cfs.isValid()) |
| { |
| logger.trace("Aborting compaction for dropped CF"); |
| return; |
| } |
| |
| CompactionStrategyManager strategy = cfs.getCompactionStrategyManager(); |
| AbstractCompactionTask task = strategy.getNextBackgroundTask(getDefaultGcBefore(cfs, FBUtilities.nowInSeconds())); |
| if (task == null) |
| { |
| logger.trace("No tasks available"); |
| return; |
| } |
| task.execute(metrics); |
| } |
| finally |
| { |
| compactingCF.remove(cfs); |
| } |
| submitBackground(cfs); |
| } |
| } |
| |
| /** |
| * Run an operation over all sstables using jobs threads |
| * |
| * @param cfs the column family store to run the operation on |
| * @param operation the operation to run |
| * @param jobs the number of threads to use - 0 means use all available. It never uses more than concurrent_compactors threads |
| * @return status of the operation |
| * @throws ExecutionException |
| * @throws InterruptedException |
| */ |
| @SuppressWarnings("resource") |
| private AllSSTableOpStatus parallelAllSSTableOperation(final ColumnFamilyStore cfs, final OneSSTableOperation operation, int jobs, OperationType operationType) throws ExecutionException, InterruptedException |
| { |
| logger.info("Starting {} for {}.{}", operationType, cfs.keyspace.getName(), cfs.getTableName()); |
| List<LifecycleTransaction> transactions = new ArrayList<>(); |
| List<Future<?>> futures = new ArrayList<>(); |
| try (LifecycleTransaction compacting = cfs.markAllCompacting(operationType)) |
| { |
| if (compacting == null) |
| return AllSSTableOpStatus.UNABLE_TO_CANCEL; |
| |
| Iterable<SSTableReader> sstables = Lists.newArrayList(operation.filterSSTables(compacting)); |
| if (Iterables.isEmpty(sstables)) |
| { |
| logger.info("No sstables to {} for {}.{}", operationType.name(), cfs.keyspace.getName(), cfs.name); |
| return AllSSTableOpStatus.SUCCESSFUL; |
| } |
| |
| for (final SSTableReader sstable : sstables) |
| { |
| final LifecycleTransaction txn = compacting.split(singleton(sstable)); |
| transactions.add(txn); |
| Callable<Object> callable = new Callable<Object>() |
| { |
| @Override |
| public Object call() throws Exception |
| { |
| operation.execute(txn); |
| return this; |
| } |
| }; |
| Future<?> fut = executor.submitIfRunning(callable, "paralell sstable operation"); |
| if (!fut.isCancelled()) |
| futures.add(fut); |
| else |
| return AllSSTableOpStatus.ABORTED; |
| |
| if (jobs > 0 && futures.size() == jobs) |
| { |
| Future<?> f = FBUtilities.waitOnFirstFuture(futures); |
| futures.remove(f); |
| } |
| } |
| FBUtilities.waitOnFutures(futures); |
| assert compacting.originals().isEmpty(); |
| logger.info("Finished {} for {}.{} successfully", operationType, cfs.keyspace.getName(), cfs.getTableName()); |
| return AllSSTableOpStatus.SUCCESSFUL; |
| } |
| finally |
| { |
| // wait on any unfinished futures to make sure we don't close an ongoing transaction |
| try |
| { |
| FBUtilities.waitOnFutures(futures); |
| } |
| catch (Throwable t) |
| { |
| // these are handled/logged in CompactionExecutor#afterExecute |
| } |
| Throwable fail = Throwables.close(null, transactions); |
| if (fail != null) |
| logger.error("Failed to cleanup lifecycle transactions ({} for {}.{})", operationType, cfs.keyspace.getName(), cfs.getTableName(), fail); |
| } |
| } |
| |
| private static interface OneSSTableOperation |
| { |
| Iterable<SSTableReader> filterSSTables(LifecycleTransaction transaction); |
| void execute(LifecycleTransaction input) throws IOException; |
| } |
| |
| public enum AllSSTableOpStatus |
| { |
| SUCCESSFUL(0), |
| ABORTED(1), |
| UNABLE_TO_CANCEL(2); |
| |
| public final int statusCode; |
| |
| AllSSTableOpStatus(int statusCode) |
| { |
| this.statusCode = statusCode; |
| } |
| } |
| |
| public AllSSTableOpStatus performScrub(final ColumnFamilyStore cfs, final boolean skipCorrupted, final boolean checkData, |
| int jobs) |
| throws InterruptedException, ExecutionException |
| { |
| return performScrub(cfs, skipCorrupted, checkData, false, jobs); |
| } |
| |
| public AllSSTableOpStatus performScrub(final ColumnFamilyStore cfs, final boolean skipCorrupted, final boolean checkData, |
| final boolean reinsertOverflowedTTL, int jobs) |
| throws InterruptedException, ExecutionException |
| { |
| return parallelAllSSTableOperation(cfs, new OneSSTableOperation() |
| { |
| @Override |
| public Iterable<SSTableReader> filterSSTables(LifecycleTransaction input) |
| { |
| return input.originals(); |
| } |
| |
| @Override |
| public void execute(LifecycleTransaction input) throws IOException |
| { |
| scrubOne(cfs, input, skipCorrupted, checkData, reinsertOverflowedTTL); |
| } |
| }, jobs, OperationType.SCRUB); |
| } |
| |
| public AllSSTableOpStatus performVerify(final ColumnFamilyStore cfs, final boolean extendedVerify) throws InterruptedException, ExecutionException |
| { |
| assert !cfs.isIndex(); |
| return parallelAllSSTableOperation(cfs, new OneSSTableOperation() |
| { |
| @Override |
| public Iterable<SSTableReader> filterSSTables(LifecycleTransaction input) |
| { |
| return input.originals(); |
| } |
| |
| @Override |
| public void execute(LifecycleTransaction input) throws IOException |
| { |
| verifyOne(cfs, input.onlyOne(), extendedVerify); |
| } |
| }, 0, OperationType.VERIFY); |
| } |
| |
| public AllSSTableOpStatus performSSTableRewrite(final ColumnFamilyStore cfs, final boolean excludeCurrentVersion, int jobs) throws InterruptedException, ExecutionException |
| { |
| return parallelAllSSTableOperation(cfs, new OneSSTableOperation() |
| { |
| @Override |
| public Iterable<SSTableReader> filterSSTables(LifecycleTransaction transaction) |
| { |
| List<SSTableReader> sortedSSTables = Lists.newArrayList(transaction.originals()); |
| Collections.sort(sortedSSTables, SSTableReader.sizeComparator.reversed()); |
| Iterator<SSTableReader> iter = sortedSSTables.iterator(); |
| while (iter.hasNext()) |
| { |
| SSTableReader sstable = iter.next(); |
| if (excludeCurrentVersion && sstable.descriptor.version.equals(sstable.descriptor.getFormat().getLatestVersion())) |
| { |
| transaction.cancel(sstable); |
| iter.remove(); |
| } |
| } |
| return sortedSSTables; |
| } |
| |
| @Override |
| public void execute(LifecycleTransaction txn) |
| { |
| AbstractCompactionTask task = cfs.getCompactionStrategyManager().getCompactionTask(txn, NO_GC, Long.MAX_VALUE); |
| task.setUserDefined(true); |
| task.setCompactionType(OperationType.UPGRADE_SSTABLES); |
| task.execute(metrics); |
| } |
| }, jobs, OperationType.UPGRADE_SSTABLES); |
| } |
| |
| public AllSSTableOpStatus performCleanup(final ColumnFamilyStore cfStore, int jobs) throws InterruptedException, ExecutionException |
| { |
| assert !cfStore.isIndex(); |
| Keyspace keyspace = cfStore.keyspace; |
| if (!StorageService.instance.isJoined()) |
| { |
| logger.info("Cleanup cannot run before a node has joined the ring"); |
| return AllSSTableOpStatus.ABORTED; |
| } |
| // if local ranges is empty, it means no data should remain |
| final Collection<Range<Token>> ranges = StorageService.instance.getLocalRanges(keyspace.getName()); |
| final boolean hasIndexes = cfStore.indexManager.hasIndexes(); |
| |
| return parallelAllSSTableOperation(cfStore, new OneSSTableOperation() |
| { |
| @Override |
| public Iterable<SSTableReader> filterSSTables(LifecycleTransaction transaction) |
| { |
| List<SSTableReader> sortedSSTables = Lists.newArrayList(transaction.originals()); |
| Iterator<SSTableReader> sstableIter = sortedSSTables.iterator(); |
| int totalSSTables = 0; |
| int skippedSStables = 0; |
| while (sstableIter.hasNext()) |
| { |
| SSTableReader sstable = sstableIter.next(); |
| totalSSTables++; |
| if (!needsCleanup(sstable, ranges)) |
| { |
| logger.debug("Not cleaning up {} ([{}, {}]) - no tokens outside owned ranges {}", |
| sstable, sstable.first.getToken(), sstable.last.getToken(), ranges); |
| sstableIter.remove(); |
| transaction.cancel(sstable); |
| skippedSStables++; |
| } |
| } |
| logger.info("Skipping cleanup for {}/{} sstables for {}.{} since they are fully contained in owned ranges ({})", |
| skippedSStables, totalSSTables, cfStore.keyspace.getName(), cfStore.getTableName(), ranges); |
| sortedSSTables.sort(SSTableReader.sizeComparator); |
| return sortedSSTables; |
| } |
| |
| @Override |
| public void execute(LifecycleTransaction txn) throws IOException |
| { |
| CleanupStrategy cleanupStrategy = CleanupStrategy.get(cfStore, ranges, FBUtilities.nowInSeconds()); |
| doCleanupOne(cfStore, txn, cleanupStrategy, ranges, hasIndexes); |
| } |
| }, jobs, OperationType.CLEANUP); |
| } |
| |
| public AllSSTableOpStatus performGarbageCollection(final ColumnFamilyStore cfStore, TombstoneOption tombstoneOption, int jobs) throws InterruptedException, ExecutionException |
| { |
| assert !cfStore.isIndex(); |
| |
| return parallelAllSSTableOperation(cfStore, new OneSSTableOperation() |
| { |
| @Override |
| public Iterable<SSTableReader> filterSSTables(LifecycleTransaction transaction) |
| { |
| Iterable<SSTableReader> originals = transaction.originals(); |
| if (cfStore.getCompactionStrategyManager().onlyPurgeRepairedTombstones()) |
| originals = Iterables.filter(originals, SSTableReader::isRepaired); |
| List<SSTableReader> sortedSSTables = Lists.newArrayList(originals); |
| Collections.sort(sortedSSTables, SSTableReader.maxTimestampAscending); |
| return sortedSSTables; |
| } |
| |
| @Override |
| public void execute(LifecycleTransaction txn) throws IOException |
| { |
| logger.debug("Garbage collecting {}", txn.originals()); |
| CompactionTask task = new CompactionTask(cfStore, txn, getDefaultGcBefore(cfStore, FBUtilities.nowInSeconds())) |
| { |
| @Override |
| protected CompactionController getCompactionController(Set<SSTableReader> toCompact) |
| { |
| return new CompactionController(cfStore, toCompact, gcBefore, null, tombstoneOption); |
| } |
| }; |
| task.setUserDefined(true); |
| task.setCompactionType(OperationType.GARBAGE_COLLECT); |
| task.execute(metrics); |
| } |
| }, jobs, OperationType.GARBAGE_COLLECT); |
| } |
| |
| public AllSSTableOpStatus relocateSSTables(final ColumnFamilyStore cfs, int jobs) throws ExecutionException, InterruptedException |
| { |
| if (!cfs.getPartitioner().splitter().isPresent()) |
| { |
| logger.info("Partitioner does not support splitting"); |
| return AllSSTableOpStatus.ABORTED; |
| } |
| final Collection<Range<Token>> r = StorageService.instance.getLocalRanges(cfs.keyspace.getName()); |
| |
| if (r.isEmpty()) |
| { |
| logger.info("Relocate cannot run before a node has joined the ring"); |
| return AllSSTableOpStatus.ABORTED; |
| } |
| |
| final DiskBoundaries diskBoundaries = cfs.getDiskBoundaries(); |
| |
| return parallelAllSSTableOperation(cfs, new OneSSTableOperation() |
| { |
| @Override |
| public Iterable<SSTableReader> filterSSTables(LifecycleTransaction transaction) |
| { |
| Set<SSTableReader> originals = Sets.newHashSet(transaction.originals()); |
| Set<SSTableReader> needsRelocation = originals.stream().filter(s -> !inCorrectLocation(s)).collect(Collectors.toSet()); |
| transaction.cancel(Sets.difference(originals, needsRelocation)); |
| |
| Map<Integer, List<SSTableReader>> groupedByDisk = groupByDiskIndex(needsRelocation); |
| |
| int maxSize = 0; |
| for (List<SSTableReader> diskSSTables : groupedByDisk.values()) |
| maxSize = Math.max(maxSize, diskSSTables.size()); |
| |
| List<SSTableReader> mixedSSTables = new ArrayList<>(); |
| |
| for (int i = 0; i < maxSize; i++) |
| for (List<SSTableReader> diskSSTables : groupedByDisk.values()) |
| if (i < diskSSTables.size()) |
| mixedSSTables.add(diskSSTables.get(i)); |
| |
| return mixedSSTables; |
| } |
| |
| public Map<Integer, List<SSTableReader>> groupByDiskIndex(Set<SSTableReader> needsRelocation) |
| { |
| return needsRelocation.stream().collect(Collectors.groupingBy((s) -> diskBoundaries.getDiskIndex(s))); |
| } |
| |
| private boolean inCorrectLocation(SSTableReader sstable) |
| { |
| if (!cfs.getPartitioner().splitter().isPresent()) |
| return true; |
| |
| int diskIndex = diskBoundaries.getDiskIndex(sstable); |
| PartitionPosition diskLast = diskBoundaries.positions.get(diskIndex); |
| |
| // the location we get from directoryIndex is based on the first key in the sstable |
| // now we need to make sure the last key is less than the boundary as well: |
| Directories.DataDirectory dataDirectory = cfs.getDirectories().getDataDirectoryForFile(sstable.descriptor); |
| return diskBoundaries.directories.get(diskIndex).equals(dataDirectory) && sstable.last.compareTo(diskLast) <= 0; |
| } |
| |
| @Override |
| public void execute(LifecycleTransaction txn) |
| { |
| logger.debug("Relocating {}", txn.originals()); |
| AbstractCompactionTask task = cfs.getCompactionStrategyManager().getCompactionTask(txn, NO_GC, Long.MAX_VALUE); |
| task.setUserDefined(true); |
| task.setCompactionType(OperationType.RELOCATE); |
| task.execute(metrics); |
| } |
| }, jobs, OperationType.RELOCATE); |
| } |
| |
| /** |
| * Submit anti-compactions for a collection of SSTables over a set of repaired ranges and marks corresponding SSTables |
| * as repaired. |
| * |
| * @param cfs Column family for anti-compaction |
| * @param ranges Repaired ranges to be anti-compacted into separate SSTables. |
| * @param sstables {@link Refs} of SSTables within CF to anti-compact. |
| * @param repairedAt Unix timestamp of when repair was completed. |
| * @param parentRepairSession Corresponding repair session |
| * @return Futures executing anti-compaction. |
| */ |
| public ListenableFuture<?> submitAntiCompaction(final ColumnFamilyStore cfs, |
| final Collection<Range<Token>> ranges, |
| final Refs<SSTableReader> sstables, |
| final long repairedAt, |
| final UUID parentRepairSession) |
| { |
| Runnable runnable = new WrappedRunnable() |
| { |
| @Override |
| @SuppressWarnings("resource") |
| public void runMayThrow() throws Exception |
| { |
| LifecycleTransaction modifier = null; |
| while (modifier == null) |
| { |
| for (SSTableReader compactingSSTable : cfs.getTracker().getCompacting()) |
| sstables.releaseIfHolds(compactingSSTable); |
| // We don't anti-compact any SSTable that has been compacted during repair as it may have been compacted |
| // with unrepaired data. |
| Set<SSTableReader> compactedSSTables = new HashSet<>(); |
| for (SSTableReader sstable : sstables) |
| if (sstable.isMarkedCompacted()) |
| compactedSSTables.add(sstable); |
| sstables.release(compactedSSTables); |
| modifier = cfs.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION); |
| } |
| performAnticompaction(cfs, ranges, sstables, modifier, repairedAt, parentRepairSession); |
| } |
| }; |
| |
| ListenableFuture<?> ret = null; |
| try |
| { |
| ret = executor.submitIfRunning(runnable, "anticompaction"); |
| return ret; |
| } |
| finally |
| { |
| if (ret == null || ret.isCancelled()) |
| sstables.release(); |
| } |
| } |
| |
| /** |
| * Make sure the {validatedForRepair} are marked for compaction before calling this. |
| * |
| * Caller must reference the validatedForRepair sstables (via ParentRepairSession.getActiveRepairedSSTableRefs(..)). |
| * |
| * NOTE: Repairs can take place on both unrepaired (incremental + full) and repaired (full) data. |
| * Although anti-compaction could work on repaired sstables as well and would result in having more accurate |
| * repairedAt values for these, we avoid anti-compacting already repaired sstables, as we currently don't |
| * make use of any actual repairedAt value and splitting up sstables just for that is not worth it. However, we will |
| * still update repairedAt if the SSTable is fully contained within the repaired ranges, as this does not require |
| * anticompaction. |
| * |
| * @param cfs |
| * @param ranges Ranges that the repair was carried out on |
| * @param validatedForRepair SSTables containing the repaired ranges. Should be referenced before passing them. |
| * @param txn Transaction across all SSTables that were repaired. |
| * @param parentRepairSession parent repair session ID |
| * @throws InterruptedException |
| * @throws IOException |
| */ |
| public void performAnticompaction(ColumnFamilyStore cfs, |
| Collection<Range<Token>> ranges, |
| Refs<SSTableReader> validatedForRepair, |
| LifecycleTransaction txn, |
| long repairedAt, |
| UUID parentRepairSession) throws InterruptedException, IOException |
| { |
| logger.info("[repair #{}] Starting anticompaction for {}.{} on {}/{} sstables", parentRepairSession, cfs.keyspace.getName(), cfs.getTableName(), validatedForRepair.size(), cfs.getLiveSSTables()); |
| logger.trace("[repair #{}] Starting anticompaction for ranges {}", parentRepairSession, ranges); |
| Set<SSTableReader> sstables = new HashSet<>(validatedForRepair); |
| Set<SSTableReader> mutatedRepairStatuses = new HashSet<>(); // SSTables that were completely repaired only |
| Set<SSTableReader> nonAnticompacting = new HashSet<>(); |
| |
| Iterator<SSTableReader> sstableIterator = sstables.iterator(); |
| try |
| { |
| List<Range<Token>> normalizedRanges = Range.normalize(ranges); |
| |
| while (sstableIterator.hasNext()) |
| { |
| SSTableReader sstable = sstableIterator.next(); |
| List<String> anticompactRanges = new ArrayList<>(); |
| // We don't anti-compact SSTables already marked repaired. See CASSANDRA-13153 |
| // and CASSANDRA-14423. |
| if (sstable.isRepaired()) // We never anti-compact already repaired SSTables |
| nonAnticompacting.add(sstable); |
| |
| Bounds<Token> sstableBounds = new Bounds<>(sstable.first.getToken(), sstable.last.getToken()); |
| |
| boolean shouldAnticompact = false; |
| |
| for (Range<Token> r : normalizedRanges) |
| { |
| if (r.contains(sstableBounds.left) && r.contains(sstableBounds.right)) |
| { |
| logger.info("[repair #{}] SSTable {} fully contained in range {}, mutating repairedAt instead of anticompacting", parentRepairSession, sstable, r); |
| sstable.descriptor.getMetadataSerializer().mutateRepairedAt(sstable.descriptor, repairedAt); |
| sstable.reloadSSTableMetadata(); |
| if (!nonAnticompacting.contains(sstable)) // don't notify if the SSTable was already repaired |
| mutatedRepairStatuses.add(sstable); |
| sstableIterator.remove(); |
| shouldAnticompact = true; |
| break; |
| } |
| else if (r.intersects(sstableBounds) && !nonAnticompacting.contains(sstable)) |
| { |
| anticompactRanges.add(r.toString()); |
| shouldAnticompact = true; |
| } |
| } |
| |
| if (!anticompactRanges.isEmpty()) |
| logger.info("[repair #{}] SSTable {} ({}) will be anticompacted on range {}", parentRepairSession, sstable, sstableBounds, String.join(", ", anticompactRanges)); |
| |
| if (!shouldAnticompact) |
| { |
| logger.info("[repair #{}] SSTable {} ({}) not subject to anticompaction of repaired ranges {}, not touching repairedAt.", parentRepairSession, sstable, sstableBounds, normalizedRanges); |
| nonAnticompacting.add(sstable); |
| sstableIterator.remove(); |
| } |
| } |
| cfs.getTracker().notifySSTableRepairedStatusChanged(mutatedRepairStatuses); |
| txn.cancel(Sets.union(nonAnticompacting, mutatedRepairStatuses)); |
| validatedForRepair.release(Sets.union(nonAnticompacting, mutatedRepairStatuses)); |
| assert txn.originals().equals(sstables); |
| if (!sstables.isEmpty()) |
| doAntiCompaction(cfs, ranges, txn, repairedAt); |
| txn.finish(); |
| } |
| finally |
| { |
| validatedForRepair.release(); |
| txn.close(); |
| } |
| |
| logger.info("[repair #{}] Completed anticompaction successfully", parentRepairSession); |
| } |
| |
| public void performMaximal(final ColumnFamilyStore cfStore, boolean splitOutput) |
| { |
| FBUtilities.waitOnFutures(submitMaximal(cfStore, getDefaultGcBefore(cfStore, FBUtilities.nowInSeconds()), splitOutput)); |
| } |
| |
| public List<Future<?>> submitMaximal(final ColumnFamilyStore cfStore, final int gcBefore, boolean splitOutput) |
| { |
| // here we compute the task off the compaction executor, so having that present doesn't |
| // confuse runWithCompactionsDisabled -- i.e., we don't want to deadlock ourselves, waiting |
| // for ourselves to finish/acknowledge cancellation before continuing. |
| final Collection<AbstractCompactionTask> tasks = cfStore.getCompactionStrategyManager().getMaximalTasks(gcBefore, splitOutput); |
| |
| if (tasks == null) |
| return Collections.emptyList(); |
| |
| List<Future<?>> futures = new ArrayList<>(); |
| |
| int nonEmptyTasks = 0; |
| for (final AbstractCompactionTask task : tasks) |
| { |
| if (task.transaction.originals().size() > 0) |
| nonEmptyTasks++; |
| |
| Runnable runnable = new WrappedRunnable() |
| { |
| protected void runMayThrow() |
| { |
| task.execute(metrics); |
| } |
| }; |
| |
| Future<?> fut = executor.submitIfRunning(runnable, "maximal task"); |
| if (!fut.isCancelled()) |
| futures.add(fut); |
| } |
| if (nonEmptyTasks > 1) |
| logger.info("Major compaction will not result in a single sstable - repaired and unrepaired data is kept separate and compaction runs per data_file_directory."); |
| |
| |
| return futures; |
| } |
| |
| public void forceCompactionForTokenRange(ColumnFamilyStore cfStore, Collection<Range<Token>> ranges) |
| { |
| Callable<Collection<AbstractCompactionTask>> taskCreator = () -> { |
| Collection<SSTableReader> sstables = sstablesInBounds(cfStore, ranges); |
| if (sstables == null || sstables.isEmpty()) |
| { |
| logger.debug("No sstables found for the provided token range"); |
| return null; |
| } |
| return cfStore.getCompactionStrategyManager().getUserDefinedTasks(sstables, getDefaultGcBefore(cfStore, FBUtilities.nowInSeconds())); |
| }; |
| |
| final Collection<AbstractCompactionTask> tasks = cfStore.runWithCompactionsDisabled(taskCreator, false, false); |
| |
| if (tasks == null) |
| return; |
| |
| Runnable runnable = new WrappedRunnable() |
| { |
| protected void runMayThrow() throws Exception |
| { |
| try |
| { |
| for (AbstractCompactionTask task : tasks) |
| if (task != null) |
| task.execute(metrics); |
| } |
| finally |
| { |
| FBUtilities.closeAll(tasks.stream().map(task -> task.transaction).collect(Collectors.toList())); |
| } |
| } |
| }; |
| |
| FBUtilities.waitOnFuture(executor.submitIfRunning(runnable, "force compaction for token range")); |
| } |
| |
| private static Collection<SSTableReader> sstablesInBounds(ColumnFamilyStore cfs, Collection<Range<Token>> tokenRangeCollection) |
| { |
| final Set<SSTableReader> sstables = new HashSet<>(); |
| Iterable<SSTableReader> liveTables = cfs.getTracker().getView().select(SSTableSet.LIVE); |
| SSTableIntervalTree tree = SSTableIntervalTree.build(liveTables); |
| |
| for (Range<Token> tokenRange : tokenRangeCollection) |
| { |
| Iterable<SSTableReader> ssTableReaders = View.sstablesInBounds(tokenRange.left.minKeyBound(), tokenRange.right.maxKeyBound(), tree); |
| Iterables.addAll(sstables, ssTableReaders); |
| } |
| return sstables; |
| } |
| |
| public void forceUserDefinedCompaction(String dataFiles) |
| { |
| String[] filenames = dataFiles.split(","); |
| Multimap<ColumnFamilyStore, Descriptor> descriptors = ArrayListMultimap.create(); |
| |
| for (String filename : filenames) |
| { |
| // extract keyspace and columnfamily name from filename |
| Descriptor desc = Descriptor.fromFilename(filename.trim()); |
| if (Schema.instance.getCFMetaData(desc) == null) |
| { |
| logger.warn("Schema does not exist for file {}. Skipping.", filename); |
| continue; |
| } |
| // group by keyspace/columnfamily |
| ColumnFamilyStore cfs = Keyspace.open(desc.ksname).getColumnFamilyStore(desc.cfname); |
| descriptors.put(cfs, cfs.getDirectories().find(new File(filename.trim()).getName())); |
| } |
| |
| List<Future<?>> futures = new ArrayList<>(); |
| int nowInSec = FBUtilities.nowInSeconds(); |
| for (ColumnFamilyStore cfs : descriptors.keySet()) |
| futures.add(submitUserDefined(cfs, descriptors.get(cfs), getDefaultGcBefore(cfs, nowInSec))); |
| FBUtilities.waitOnFutures(futures); |
| } |
| |
| public void forceUserDefinedCleanup(String dataFiles) |
| { |
| String[] filenames = dataFiles.split(","); |
| HashMap<ColumnFamilyStore, Descriptor> descriptors = Maps.newHashMap(); |
| |
| for (String filename : filenames) |
| { |
| // extract keyspace and columnfamily name from filename |
| Descriptor desc = Descriptor.fromFilename(filename.trim()); |
| if (Schema.instance.getCFMetaData(desc) == null) |
| { |
| logger.warn("Schema does not exist for file {}. Skipping.", filename); |
| continue; |
| } |
| // group by keyspace/columnfamily |
| ColumnFamilyStore cfs = Keyspace.open(desc.ksname).getColumnFamilyStore(desc.cfname); |
| desc = cfs.getDirectories().find(new File(filename.trim()).getName()); |
| if (desc != null) |
| descriptors.put(cfs, desc); |
| } |
| |
| if (!StorageService.instance.isJoined()) |
| { |
| logger.error("Cleanup cannot run before a node has joined the ring"); |
| return; |
| } |
| |
| for (Map.Entry<ColumnFamilyStore,Descriptor> entry : descriptors.entrySet()) |
| { |
| ColumnFamilyStore cfs = entry.getKey(); |
| Keyspace keyspace = cfs.keyspace; |
| Collection<Range<Token>> ranges = StorageService.instance.getLocalRanges(keyspace.getName()); |
| boolean hasIndexes = cfs.indexManager.hasIndexes(); |
| SSTableReader sstable = lookupSSTable(cfs, entry.getValue()); |
| |
| if (sstable == null) |
| { |
| logger.warn("Will not clean {}, it is not an active sstable", entry.getValue()); |
| } |
| else |
| { |
| CleanupStrategy cleanupStrategy = CleanupStrategy.get(cfs, ranges, FBUtilities.nowInSeconds()); |
| try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstable, OperationType.CLEANUP)) |
| { |
| doCleanupOne(cfs, txn, cleanupStrategy, ranges, hasIndexes); |
| } |
| catch (IOException e) |
| { |
| logger.error("forceUserDefinedCleanup failed: {}", e.getLocalizedMessage()); |
| } |
| } |
| } |
| } |
| |
| |
| public Future<?> submitUserDefined(final ColumnFamilyStore cfs, final Collection<Descriptor> dataFiles, final int gcBefore) |
| { |
| Runnable runnable = new WrappedRunnable() |
| { |
| protected void runMayThrow() throws Exception |
| { |
| // look up the sstables now that we're on the compaction executor, so we don't try to re-compact |
| // something that was already being compacted earlier. |
| Collection<SSTableReader> sstables = new ArrayList<>(dataFiles.size()); |
| for (Descriptor desc : dataFiles) |
| { |
| // inefficient but not in a performance sensitive path |
| SSTableReader sstable = lookupSSTable(cfs, desc); |
| if (sstable == null) |
| { |
| logger.info("Will not compact {}: it is not an active sstable", desc); |
| } |
| else |
| { |
| sstables.add(sstable); |
| } |
| } |
| |
| if (sstables.isEmpty()) |
| { |
| logger.info("No files to compact for user defined compaction"); |
| } |
| else |
| { |
| List<AbstractCompactionTask> tasks = cfs.getCompactionStrategyManager().getUserDefinedTasks(sstables, gcBefore); |
| try |
| { |
| for (AbstractCompactionTask task : tasks) |
| { |
| if (task != null) |
| task.execute(metrics); |
| } |
| } |
| finally |
| { |
| FBUtilities.closeAll(tasks.stream().map(task -> task.transaction).collect(Collectors.toList())); |
| } |
| } |
| } |
| }; |
| |
| return executor.submitIfRunning(runnable, "user defined task"); |
| } |
| |
| // This acquire a reference on the sstable |
| // This is not efficient, do not use in any critical path |
| private SSTableReader lookupSSTable(final ColumnFamilyStore cfs, Descriptor descriptor) |
| { |
| for (SSTableReader sstable : cfs.getSSTables(SSTableSet.CANONICAL)) |
| { |
| if (sstable.descriptor.equals(descriptor)) |
| return sstable; |
| } |
| return null; |
| } |
| |
| /** |
| * Does not mutate data, so is not scheduled. |
| */ |
| public Future<?> submitValidation(final ColumnFamilyStore cfStore, final Validator validator) |
| { |
| Callable<Object> callable = new Callable<Object>() |
| { |
| public Object call() throws IOException |
| { |
| try |
| { |
| doValidationCompaction(cfStore, validator); |
| } |
| catch (Throwable e) |
| { |
| // we need to inform the remote end of our failure, otherwise it will hang on repair forever |
| validator.fail(); |
| throw e; |
| } |
| return this; |
| } |
| }; |
| |
| return validationExecutor.submitIfRunning(callable, "validation"); |
| } |
| |
| /* Used in tests. */ |
| public void disableAutoCompaction() |
| { |
| for (String ksname : Schema.instance.getNonSystemKeyspaces()) |
| { |
| for (ColumnFamilyStore cfs : Keyspace.open(ksname).getColumnFamilyStores()) |
| cfs.disableAutoCompaction(); |
| } |
| } |
| |
| private void scrubOne(ColumnFamilyStore cfs, LifecycleTransaction modifier, boolean skipCorrupted, boolean checkData, boolean reinsertOverflowedTTL) throws IOException |
| { |
| CompactionInfo.Holder scrubInfo = null; |
| |
| try (Scrubber scrubber = new Scrubber(cfs, modifier, skipCorrupted, checkData, reinsertOverflowedTTL)) |
| { |
| scrubInfo = scrubber.getScrubInfo(); |
| metrics.beginCompaction(scrubInfo); |
| scrubber.scrub(); |
| } |
| finally |
| { |
| if (scrubInfo != null) |
| metrics.finishCompaction(scrubInfo); |
| } |
| } |
| |
| private void verifyOne(ColumnFamilyStore cfs, SSTableReader sstable, boolean extendedVerify) throws IOException |
| { |
| CompactionInfo.Holder verifyInfo = null; |
| |
| try (Verifier verifier = new Verifier(cfs, sstable, false)) |
| { |
| verifyInfo = verifier.getVerifyInfo(); |
| metrics.beginCompaction(verifyInfo); |
| verifier.verify(extendedVerify); |
| } |
| finally |
| { |
| if (verifyInfo != null) |
| metrics.finishCompaction(verifyInfo); |
| } |
| } |
| |
| /** |
| * Determines if a cleanup would actually remove any data in this SSTable based |
| * on a set of owned ranges. |
| */ |
| @VisibleForTesting |
| public static boolean needsCleanup(SSTableReader sstable, Collection<Range<Token>> ownedRanges) |
| { |
| if (ownedRanges.isEmpty()) |
| { |
| return true; // all data will be cleaned |
| } |
| |
| // unwrap and sort the ranges by LHS token |
| List<Range<Token>> sortedRanges = Range.normalize(ownedRanges); |
| |
| // see if there are any keys LTE the token for the start of the first range |
| // (token range ownership is exclusive on the LHS.) |
| Range<Token> firstRange = sortedRanges.get(0); |
| if (sstable.first.getToken().compareTo(firstRange.left) <= 0) |
| return true; |
| |
| // then, iterate over all owned ranges and see if the next key beyond the end of the owned |
| // range falls before the start of the next range |
| for (int i = 0; i < sortedRanges.size(); i++) |
| { |
| Range<Token> range = sortedRanges.get(i); |
| if (range.right.isMinimum()) |
| { |
| // we split a wrapping range and this is the second half. |
| // there can't be any keys beyond this (and this is the last range) |
| return false; |
| } |
| |
| DecoratedKey firstBeyondRange = sstable.firstKeyBeyond(range.right.maxKeyBound()); |
| if (firstBeyondRange == null) |
| { |
| // we ran off the end of the sstable looking for the next key; we don't need to check any more ranges |
| return false; |
| } |
| |
| if (i == (sortedRanges.size() - 1)) |
| { |
| // we're at the last range and we found a key beyond the end of the range |
| return true; |
| } |
| |
| Range<Token> nextRange = sortedRanges.get(i + 1); |
| if (firstBeyondRange.getToken().compareTo(nextRange.left) <= 0) |
| { |
| // we found a key in between the owned ranges |
| return true; |
| } |
| } |
| |
| return false; |
| } |
| |
| /** |
| * This function goes over a file and removes the keys that the node is not responsible for |
| * and only keeps keys that this node is responsible for. |
| * |
| * @throws IOException |
| */ |
| private void doCleanupOne(final ColumnFamilyStore cfs, LifecycleTransaction txn, CleanupStrategy cleanupStrategy, Collection<Range<Token>> ranges, boolean hasIndexes) throws IOException |
| { |
| assert !cfs.isIndex(); |
| |
| SSTableReader sstable = txn.onlyOne(); |
| |
| // if ranges is empty and no index, entire sstable is discarded |
| if (!hasIndexes && !new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(ranges)) |
| { |
| txn.obsoleteOriginals(); |
| txn.finish(); |
| logger.info("SSTable {} ([{}, {}]) does not intersect the owned ranges ({}), dropping it", sstable, sstable.first.getToken(), sstable.last.getToken(), ranges); |
| return; |
| } |
| |
| long start = System.nanoTime(); |
| |
| long totalkeysWritten = 0; |
| |
| long expectedBloomFilterSize = Math.max(cfs.metadata.params.minIndexInterval, |
| SSTableReader.getApproximateKeyCount(txn.originals())); |
| if (logger.isTraceEnabled()) |
| logger.trace("Expected bloom filter size : {}", expectedBloomFilterSize); |
| |
| logger.info("Cleaning up {}", sstable); |
| |
| File compactionFileLocation = sstable.descriptor.directory; |
| RateLimiter limiter = getRateLimiter(); |
| double compressionRatio = sstable.getCompressionRatio(); |
| if (compressionRatio == MetadataCollector.NO_COMPRESSION_RATIO) |
| compressionRatio = 1.0; |
| |
| List<SSTableReader> finished; |
| |
| int nowInSec = FBUtilities.nowInSeconds(); |
| try (SSTableRewriter writer = SSTableRewriter.construct(cfs, txn, false, sstable.maxDataAge); |
| ISSTableScanner scanner = cleanupStrategy.getScanner(sstable, null); |
| CompactionController controller = new CompactionController(cfs, txn.originals(), getDefaultGcBefore(cfs, nowInSec)); |
| Refs<SSTableReader> refs = Refs.ref(Collections.singleton(sstable)); |
| CompactionIterator ci = new CompactionIterator(OperationType.CLEANUP, Collections.singletonList(scanner), controller, nowInSec, UUIDGen.getTimeUUID(), metrics)) |
| { |
| writer.switchWriter(createWriter(cfs, compactionFileLocation, expectedBloomFilterSize, sstable.getSSTableMetadata().repairedAt, sstable, txn)); |
| long lastBytesScanned = 0; |
| |
| |
| while (ci.hasNext()) |
| { |
| if (ci.isStopRequested()) |
| throw new CompactionInterruptedException(ci.getCompactionInfo()); |
| |
| try (UnfilteredRowIterator partition = ci.next(); |
| UnfilteredRowIterator notCleaned = cleanupStrategy.cleanup(partition)) |
| { |
| if (notCleaned == null) |
| continue; |
| |
| if (writer.append(notCleaned) != null) |
| totalkeysWritten++; |
| |
| long bytesScanned = scanner.getBytesScanned(); |
| |
| compactionRateLimiterAcquire(limiter, bytesScanned, lastBytesScanned, compressionRatio); |
| |
| lastBytesScanned = bytesScanned; |
| } |
| } |
| |
| // flush to ensure we don't lose the tombstones on a restart, since they are not commitlog'd |
| cfs.indexManager.flushAllIndexesBlocking(); |
| |
| finished = writer.finish(); |
| } |
| |
| if (!finished.isEmpty()) |
| { |
| String format = "Cleaned up to %s. %s to %s (~%d%% of original) for %,d keys. Time: %,dms."; |
| long dTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); |
| long startsize = sstable.onDiskLength(); |
| long endsize = 0; |
| for (SSTableReader newSstable : finished) |
| endsize += newSstable.onDiskLength(); |
| double ratio = (double) endsize / (double) startsize; |
| logger.info(String.format(format, finished.get(0).getFilename(), FBUtilities.prettyPrintMemory(startsize), |
| FBUtilities.prettyPrintMemory(endsize), (int) (ratio * 100), totalkeysWritten, dTime)); |
| } |
| |
| } |
| |
| static void compactionRateLimiterAcquire(RateLimiter limiter, long bytesScanned, long lastBytesScanned, double compressionRatio) |
| { |
| long lengthRead = (long) ((bytesScanned - lastBytesScanned) * compressionRatio) + 1; |
| while (lengthRead >= Integer.MAX_VALUE) |
| { |
| limiter.acquire(Integer.MAX_VALUE); |
| lengthRead -= Integer.MAX_VALUE; |
| } |
| if (lengthRead > 0) |
| { |
| limiter.acquire((int) lengthRead); |
| } |
| } |
| |
| private static abstract class CleanupStrategy |
| { |
| protected final Collection<Range<Token>> ranges; |
| protected final int nowInSec; |
| |
| protected CleanupStrategy(Collection<Range<Token>> ranges, int nowInSec) |
| { |
| this.ranges = ranges; |
| this.nowInSec = nowInSec; |
| } |
| |
| public static CleanupStrategy get(ColumnFamilyStore cfs, Collection<Range<Token>> ranges, int nowInSec) |
| { |
| return cfs.indexManager.hasIndexes() |
| ? new Full(cfs, ranges, nowInSec) |
| : new Bounded(cfs, ranges, nowInSec); |
| } |
| |
| public abstract ISSTableScanner getScanner(SSTableReader sstable, RateLimiter limiter); |
| public abstract UnfilteredRowIterator cleanup(UnfilteredRowIterator partition); |
| |
| private static final class Bounded extends CleanupStrategy |
| { |
| public Bounded(final ColumnFamilyStore cfs, Collection<Range<Token>> ranges, int nowInSec) |
| { |
| super(ranges, nowInSec); |
| instance.cacheCleanupExecutor.submit(new Runnable() |
| { |
| @Override |
| public void run() |
| { |
| cfs.cleanupCache(); |
| } |
| }); |
| } |
| |
| @Override |
| public ISSTableScanner getScanner(SSTableReader sstable, RateLimiter limiter) |
| { |
| return sstable.getScanner(ranges, limiter); |
| } |
| |
| @Override |
| public UnfilteredRowIterator cleanup(UnfilteredRowIterator partition) |
| { |
| return partition; |
| } |
| } |
| |
| private static final class Full extends CleanupStrategy |
| { |
| private final ColumnFamilyStore cfs; |
| |
| public Full(ColumnFamilyStore cfs, Collection<Range<Token>> ranges, int nowInSec) |
| { |
| super(ranges, nowInSec); |
| this.cfs = cfs; |
| } |
| |
| @Override |
| public ISSTableScanner getScanner(SSTableReader sstable, RateLimiter limiter) |
| { |
| return sstable.getScanner(limiter); |
| } |
| |
| @Override |
| public UnfilteredRowIterator cleanup(UnfilteredRowIterator partition) |
| { |
| if (Range.isInRanges(partition.partitionKey().getToken(), ranges)) |
| return partition; |
| |
| cfs.invalidateCachedPartition(partition.partitionKey()); |
| |
| cfs.indexManager.deletePartition(partition, nowInSec); |
| return null; |
| } |
| } |
| } |
| |
| public static SSTableWriter createWriter(ColumnFamilyStore cfs, |
| File compactionFileLocation, |
| long expectedBloomFilterSize, |
| long repairedAt, |
| SSTableReader sstable, |
| LifecycleTransaction txn) |
| { |
| FileUtils.createDirectory(compactionFileLocation); |
| SerializationHeader header = sstable.header; |
| if (header == null) |
| header = SerializationHeader.make(sstable.metadata, Collections.singleton(sstable)); |
| |
| return SSTableWriter.create(cfs.metadata, |
| Descriptor.fromFilename(cfs.getSSTablePath(compactionFileLocation)), |
| expectedBloomFilterSize, |
| repairedAt, |
| sstable.getSSTableLevel(), |
| header, |
| cfs.indexManager.listIndexes(), |
| txn); |
| } |
| |
| public static SSTableWriter createWriterForAntiCompaction(ColumnFamilyStore cfs, |
| File compactionFileLocation, |
| int expectedBloomFilterSize, |
| long repairedAt, |
| Collection<SSTableReader> sstables, |
| ILifecycleTransaction txn) |
| { |
| FileUtils.createDirectory(compactionFileLocation); |
| int minLevel = Integer.MAX_VALUE; |
| // if all sstables have the same level, we can compact them together without creating overlap during anticompaction |
| // note that we only anticompact from unrepaired sstables, which is not leveled, but we still keep original level |
| // after first migration to be able to drop the sstables back in their original place in the repaired sstable manifest |
| for (SSTableReader sstable : sstables) |
| { |
| if (minLevel == Integer.MAX_VALUE) |
| minLevel = sstable.getSSTableLevel(); |
| |
| if (minLevel != sstable.getSSTableLevel()) |
| { |
| minLevel = 0; |
| break; |
| } |
| } |
| return SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(compactionFileLocation)), |
| (long) expectedBloomFilterSize, |
| repairedAt, |
| cfs.metadata, |
| new MetadataCollector(sstables, cfs.metadata.comparator, minLevel), |
| SerializationHeader.make(cfs.metadata, sstables), |
| cfs.indexManager.listIndexes(), |
| txn); |
| } |
| |
| |
| /** |
| * Performs a readonly "compaction" of all sstables in order to validate complete rows, |
| * but without writing the merge result |
| */ |
| @SuppressWarnings("resource") |
| private void doValidationCompaction(ColumnFamilyStore cfs, Validator validator) throws IOException |
| { |
| // this isn't meant to be race-proof, because it's not -- it won't cause bugs for a CFS to be dropped |
| // mid-validation, or to attempt to validate a droped CFS. this is just a best effort to avoid useless work, |
| // particularly in the scenario where a validation is submitted before the drop, and there are compactions |
| // started prior to the drop keeping some sstables alive. Since validationCompaction can run |
| // concurrently with other compactions, it would otherwise go ahead and scan those again. |
| if (!cfs.isValid()) |
| return; |
| |
| Refs<SSTableReader> sstables = null; |
| try |
| { |
| |
| int gcBefore; |
| int nowInSec = FBUtilities.nowInSeconds(); |
| UUID parentRepairSessionId = validator.desc.parentSessionId; |
| String snapshotName; |
| boolean isGlobalSnapshotValidation = cfs.snapshotExists(parentRepairSessionId.toString()); |
| if (isGlobalSnapshotValidation) |
| snapshotName = parentRepairSessionId.toString(); |
| else |
| snapshotName = validator.desc.sessionId.toString(); |
| boolean isSnapshotValidation = cfs.snapshotExists(snapshotName); |
| |
| if (isSnapshotValidation) |
| { |
| // If there is a snapshot created for the session then read from there. |
| // note that we populate the parent repair session when creating the snapshot, meaning the sstables in the snapshot are the ones we |
| // are supposed to validate. |
| sstables = cfs.getSnapshotSSTableReader(snapshotName); |
| |
| |
| // Computing gcbefore based on the current time wouldn't be very good because we know each replica will execute |
| // this at a different time (that's the whole purpose of repair with snaphsot). So instead we take the creation |
| // time of the snapshot, which should give us roughtly the same time on each replica (roughtly being in that case |
| // 'as good as in the non-snapshot' case) |
| gcBefore = cfs.gcBefore((int)(cfs.getSnapshotCreationTime(snapshotName) / 1000)); |
| } |
| else |
| { |
| // flush first so everyone is validating data that is as similar as possible |
| StorageService.instance.forceKeyspaceFlush(cfs.keyspace.getName(), cfs.name); |
| sstables = getSSTablesToValidate(cfs, validator); |
| if (sstables == null) |
| return; // this means the parent repair session was removed - the repair session failed on another node and we removed it |
| if (validator.gcBefore > 0) |
| gcBefore = validator.gcBefore; |
| else |
| gcBefore = getDefaultGcBefore(cfs, nowInSec); |
| } |
| |
| // Create Merkle trees suitable to hold estimated partitions for the given ranges. |
| // We blindly assume that a partition is evenly distributed on all sstables for now. |
| MerkleTrees tree = createMerkleTrees(sstables, validator.desc.ranges, cfs); |
| long start = System.nanoTime(); |
| try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategyManager().getScanners(sstables, validator.desc.ranges); |
| ValidationCompactionController controller = new ValidationCompactionController(cfs, gcBefore); |
| CompactionIterator ci = new ValidationCompactionIterator(scanners.scanners, controller, nowInSec, metrics)) |
| { |
| // validate the CF as we iterate over it |
| validator.prepare(cfs, tree); |
| while (ci.hasNext()) |
| { |
| if (ci.isStopRequested()) |
| throw new CompactionInterruptedException(ci.getCompactionInfo()); |
| try (UnfilteredRowIterator partition = ci.next()) |
| { |
| validator.add(partition); |
| } |
| } |
| validator.complete(); |
| } |
| finally |
| { |
| if (isSnapshotValidation && !isGlobalSnapshotValidation) |
| { |
| // we can only clear the snapshot if we are not doing a global snapshot validation (we then clear it once anticompaction |
| // is done). |
| cfs.clearSnapshot(snapshotName); |
| } |
| } |
| |
| if (logger.isDebugEnabled()) |
| { |
| long duration = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); |
| logger.debug("Validation finished in {} msec, for {}", |
| duration, |
| validator.desc); |
| } |
| } |
| finally |
| { |
| if (sstables != null) |
| sstables.release(); |
| } |
| } |
| |
| private static MerkleTrees createMerkleTrees(Iterable<SSTableReader> sstables, Collection<Range<Token>> ranges, ColumnFamilyStore cfs) |
| { |
| MerkleTrees tree = new MerkleTrees(cfs.getPartitioner()); |
| long allPartitions = 0; |
| Map<Range<Token>, Long> rangePartitionCounts = Maps.newHashMapWithExpectedSize(ranges.size()); |
| for (Range<Token> range : ranges) |
| { |
| long numPartitions = 0; |
| for (SSTableReader sstable : sstables) |
| numPartitions += sstable.estimatedKeysForRanges(Collections.singleton(range)); |
| rangePartitionCounts.put(range, numPartitions); |
| allPartitions += numPartitions; |
| } |
| |
| for (Range<Token> range : ranges) |
| { |
| long numPartitions = rangePartitionCounts.get(range); |
| double rangeOwningRatio = allPartitions > 0 ? (double)numPartitions / allPartitions : 0; |
| // determine max tree depth proportional to range size to avoid blowing up memory with multiple tress, |
| // capping at a configurable depth (default 18) to prevent large tree (CASSANDRA-11390, CASSANDRA-14096) |
| int maxDepth = rangeOwningRatio > 0 |
| ? (int) Math.floor(Math.max(0.0, DatabaseDescriptor.getRepairSessionMaxTreeDepth() - |
| Math.log(1 / rangeOwningRatio) / Math.log(2))) |
| : 0; |
| |
| // determine tree depth from number of partitions, capping at max tree depth (CASSANDRA-5263) |
| int depth = numPartitions > 0 ? (int) Math.min(Math.ceil(Math.log(numPartitions) / Math.log(2)), maxDepth) : 0; |
| tree.addMerkleTree((int) Math.pow(2, depth), range); |
| } |
| if (logger.isDebugEnabled()) |
| { |
| // MT serialize may take time |
| logger.debug("Created {} merkle trees with merkle trees size {}, {} partitions, {} bytes", tree.ranges().size(), tree.size(), allPartitions, MerkleTrees.serializer.serializedSize(tree, 0)); |
| } |
| |
| return tree; |
| } |
| |
| private synchronized Refs<SSTableReader> getSSTablesToValidate(ColumnFamilyStore cfs, Validator validator) |
| { |
| Refs<SSTableReader> sstables; |
| |
| ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(validator.desc.parentSessionId); |
| if (prs == null) |
| return null; |
| Set<SSTableReader> sstablesToValidate = new HashSet<>(); |
| if (prs.isGlobal) |
| prs.markSSTablesRepairing(cfs.metadata.cfId, validator.desc.parentSessionId); |
| // note that we always grab all existing sstables for this - if we were to just grab the ones that |
| // were marked as repairing, we would miss any ranges that were compacted away and this would cause us to overstream |
| try (ColumnFamilyStore.RefViewFragment sstableCandidates = cfs.selectAndReference(View.select(SSTableSet.CANONICAL, (s) -> !prs.isIncremental || !s.isRepaired()))) |
| { |
| for (SSTableReader sstable : sstableCandidates.sstables) |
| { |
| if (new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(validator.desc.ranges)) |
| { |
| sstablesToValidate.add(sstable); |
| } |
| } |
| |
| sstables = Refs.tryRef(sstablesToValidate); |
| if (sstables == null) |
| { |
| logger.error("Could not reference sstables"); |
| throw new RuntimeException("Could not reference sstables"); |
| } |
| } |
| |
| return sstables; |
| } |
| |
| /** |
| * Splits up an sstable into two new sstables. The first of the new tables will store repaired ranges, the second |
| * will store the non-repaired ranges. Once anticompation is completed, the original sstable is marked as compacted |
| * and subsequently deleted. |
| * @param cfs |
| * @param repaired a transaction over the repaired sstables to anticompacy |
| * @param ranges Repaired ranges to be placed into one of the new sstables. The repaired table will be tracked via |
| * the {@link org.apache.cassandra.io.sstable.metadata.StatsMetadata#repairedAt} field. |
| */ |
| private void doAntiCompaction(ColumnFamilyStore cfs, Collection<Range<Token>> ranges, LifecycleTransaction repaired, long repairedAt) |
| { |
| int numAnticompact = repaired.originals().size(); |
| logger.info("Performing anticompaction on {} sstables", numAnticompact); |
| |
| //Group SSTables |
| Collection<Collection<SSTableReader>> groupedSSTables = cfs.getCompactionStrategyManager().groupSSTablesForAntiCompaction(repaired.originals()); |
| // iterate over sstables to check if the repaired / unrepaired ranges intersect them. |
| int antiCompactedSSTableCount = 0; |
| for (Collection<SSTableReader> sstableGroup : groupedSSTables) |
| { |
| try (LifecycleTransaction txn = repaired.split(sstableGroup)) |
| { |
| int antiCompacted = antiCompactGroup(cfs, ranges, txn, repairedAt); |
| antiCompactedSSTableCount += antiCompacted; |
| } |
| } |
| |
| String format = "Anticompaction completed successfully, anticompacted from {} to {} sstable(s)."; |
| logger.info(format, numAnticompact, antiCompactedSSTableCount); |
| } |
| |
| |
| @VisibleForTesting |
| int antiCompactGroup(ColumnFamilyStore cfs, Collection<Range<Token>> ranges, |
| LifecycleTransaction anticompactionGroup, long repairedAt) |
| { |
| long groupMaxDataAge = -1; |
| |
| for (Iterator<SSTableReader> i = anticompactionGroup.originals().iterator(); i.hasNext();) |
| { |
| SSTableReader sstable = i.next(); |
| if (groupMaxDataAge < sstable.maxDataAge) |
| groupMaxDataAge = sstable.maxDataAge; |
| } |
| |
| if (anticompactionGroup.originals().size() == 0) |
| { |
| logger.info("No valid anticompactions for this group, All sstables were compacted and are no longer available"); |
| return 0; |
| } |
| |
| logger.info("Anticompacting {}", anticompactionGroup); |
| Set<SSTableReader> sstableAsSet = anticompactionGroup.originals(); |
| |
| File destination = cfs.getDirectories().getWriteableLocationAsFile(cfs.getExpectedCompactedFileSize(sstableAsSet, OperationType.ANTICOMPACTION)); |
| long repairedKeyCount = 0; |
| long unrepairedKeyCount = 0; |
| int nowInSec = FBUtilities.nowInSeconds(); |
| |
| /** |
| * HACK WARNING |
| * |
| * We have multiple writers operating over the same Transaction, producing different sets of sstables that all |
| * logically replace the transaction's originals. The SSTableRewriter assumes it has exclusive control over |
| * the transaction state, and this will lead to temporarily inconsistent sstable/tracker state if we do not |
| * take special measures to avoid it. |
| * |
| * Specifically, if a number of rewriter have prepareToCommit() invoked in sequence, then two problematic things happen: |
| * 1. The obsoleteOriginals() call of the first rewriter immediately remove the originals from the tracker, despite |
| * their having been only partially replaced. To avoid this, we must either avoid obsoleteOriginals() or checkpoint() |
| * 2. The LifecycleTransaction may only have prepareToCommit() invoked once, and this will checkpoint() also. |
| * |
| * Similarly commit() would finalise partially complete on-disk state. |
| * |
| * To avoid these problems, we introduce a SharedTxn that proxies all calls onto the underlying transaction |
| * except prepareToCommit(), checkpoint(), obsoleteOriginals(), and commit(). |
| * We then invoke these methods directly once each of the rewriter has updated the transaction |
| * with their share of replacements. |
| * |
| * Note that for the same essential reason we also explicitly disable early open. |
| * By noop-ing checkpoint we avoid any of the problems with early open, but by continuing to explicitly |
| * disable it we also prevent any of the extra associated work from being performed. |
| */ |
| class SharedTxn extends WrappedLifecycleTransaction |
| { |
| public SharedTxn(ILifecycleTransaction delegate) { super(delegate); } |
| public Throwable commit(Throwable accumulate) { return accumulate; } |
| public void prepareToCommit() {} |
| public void checkpoint() {} |
| public void obsoleteOriginals() {} |
| public void close() {} |
| } |
| |
| CompactionStrategyManager strategy = cfs.getCompactionStrategyManager(); |
| try (SharedTxn sharedTxn = new SharedTxn(anticompactionGroup); |
| SSTableRewriter repairedSSTableWriter = SSTableRewriter.constructWithoutEarlyOpening(sharedTxn, false, groupMaxDataAge); |
| SSTableRewriter unRepairedSSTableWriter = SSTableRewriter.constructWithoutEarlyOpening(sharedTxn, false, groupMaxDataAge); |
| AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(anticompactionGroup.originals()); |
| CompactionController controller = new CompactionController(cfs, sstableAsSet, getDefaultGcBefore(cfs, nowInSec)); |
| CompactionIterator ci = new CompactionIterator(OperationType.ANTICOMPACTION, scanners.scanners, controller, nowInSec, UUIDGen.getTimeUUID(), metrics)) |
| { |
| int expectedBloomFilterSize = Math.max(cfs.metadata.params.minIndexInterval, (int)(SSTableReader.getApproximateKeyCount(sstableAsSet))); |
| |
| repairedSSTableWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs, destination, expectedBloomFilterSize, repairedAt, sstableAsSet, sharedTxn)); |
| unRepairedSSTableWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs, destination, expectedBloomFilterSize, ActiveRepairService.UNREPAIRED_SSTABLE, sstableAsSet, sharedTxn)); |
| Range.OrderedRangeContainmentChecker containmentChecker = new Range.OrderedRangeContainmentChecker(ranges); |
| while (ci.hasNext()) |
| { |
| try (UnfilteredRowIterator partition = ci.next()) |
| { |
| // if current range from sstable is repaired, save it into the new repaired sstable |
| if (containmentChecker.contains(partition.partitionKey().getToken())) |
| { |
| repairedSSTableWriter.append(partition); |
| repairedKeyCount++; |
| } |
| // otherwise save into the new 'non-repaired' table |
| else |
| { |
| unRepairedSSTableWriter.append(partition); |
| unrepairedKeyCount++; |
| } |
| } |
| } |
| |
| List<SSTableReader> anticompactedSSTables = new ArrayList<>(); |
| |
| repairedSSTableWriter.setRepairedAt(repairedAt).prepareToCommit(); |
| unRepairedSSTableWriter.prepareToCommit(); |
| anticompactionGroup.checkpoint(); |
| anticompactionGroup.obsoleteOriginals(); |
| anticompactionGroup.prepareToCommit(); |
| anticompactedSSTables.addAll(repairedSSTableWriter.finished()); |
| anticompactedSSTables.addAll(unRepairedSSTableWriter.finished()); |
| repairedSSTableWriter.commit(); |
| unRepairedSSTableWriter.commit(); |
| Throwables.maybeFail(anticompactionGroup.commit(null)); |
| |
| logger.trace("Repaired {} keys out of {} for {}/{} in {}", repairedKeyCount, |
| repairedKeyCount + unrepairedKeyCount, |
| cfs.keyspace.getName(), |
| cfs.getColumnFamilyName(), |
| anticompactionGroup); |
| return anticompactedSSTables.size(); |
| } |
| catch (Throwable e) |
| { |
| JVMStabilityInspector.inspectThrowable(e); |
| logger.error("Error anticompacting " + anticompactionGroup, e); |
| } |
| return 0; |
| } |
| |
| /** |
| * Is not scheduled, because it is performing disjoint work from sstable compaction. |
| */ |
| public Future<?> submitIndexBuild(final SecondaryIndexBuilder builder) |
| { |
| Runnable runnable = new Runnable() |
| { |
| public void run() |
| { |
| metrics.beginCompaction(builder); |
| try |
| { |
| builder.build(); |
| } |
| finally |
| { |
| metrics.finishCompaction(builder); |
| } |
| } |
| }; |
| |
| return executor.submitIfRunning(runnable, "index build"); |
| } |
| |
| public Future<?> submitCacheWrite(final AutoSavingCache.Writer writer) |
| { |
| Runnable runnable = new Runnable() |
| { |
| public void run() |
| { |
| if (!AutoSavingCache.flushInProgress.add(writer.cacheType())) |
| { |
| logger.trace("Cache flushing was already in progress: skipping {}", writer.getCompactionInfo()); |
| return; |
| } |
| try |
| { |
| metrics.beginCompaction(writer); |
| try |
| { |
| writer.saveCache(); |
| } |
| finally |
| { |
| metrics.finishCompaction(writer); |
| } |
| } |
| finally |
| { |
| AutoSavingCache.flushInProgress.remove(writer.cacheType()); |
| } |
| } |
| }; |
| |
| return executor.submitIfRunning(runnable, "cache write"); |
| } |
| |
| public List<SSTableReader> runIndexSummaryRedistribution(IndexSummaryRedistribution redistribution) throws IOException |
| { |
| metrics.beginCompaction(redistribution); |
| |
| try |
| { |
| return redistribution.redistributeSummaries(); |
| } |
| finally |
| { |
| metrics.finishCompaction(redistribution); |
| } |
| } |
| |
| public static int getDefaultGcBefore(ColumnFamilyStore cfs, int nowInSec) |
| { |
| // 2ndary indexes have ExpiringColumns too, so we need to purge tombstones deleted before now. We do not need to |
| // add any GcGrace however since 2ndary indexes are local to a node. |
| return cfs.isIndex() ? nowInSec : cfs.gcBefore(nowInSec); |
| } |
| |
| private static class ValidationCompactionIterator extends CompactionIterator |
| { |
| public ValidationCompactionIterator(List<ISSTableScanner> scanners, ValidationCompactionController controller, int nowInSec, CompactionMetrics metrics) |
| { |
| super(OperationType.VALIDATION, scanners, controller, nowInSec, UUIDGen.getTimeUUID(), metrics); |
| } |
| } |
| |
| /* |
| * Controller for validation compaction that always purges. |
| * Note that we should not call cfs.getOverlappingSSTables on the provided |
| * sstables because those sstables are not guaranteed to be active sstables |
| * (since we can run repair on a snapshot). |
| */ |
| private static class ValidationCompactionController extends CompactionController |
| { |
| public ValidationCompactionController(ColumnFamilyStore cfs, int gcBefore) |
| { |
| super(cfs, gcBefore); |
| } |
| |
| @Override |
| public Predicate<Long> getPurgeEvaluator(DecoratedKey key) |
| { |
| /* |
| * The main reason we always purge is that including gcable tombstone would mean that the |
| * repair digest will depends on the scheduling of compaction on the different nodes. This |
| * is still not perfect because gcbefore is currently dependend on the current time at which |
| * the validation compaction start, which while not too bad for normal repair is broken for |
| * repair on snapshots. A better solution would be to agree on a gcbefore that all node would |
| * use, and we'll do that with CASSANDRA-4932. |
| * Note validation compaction includes all sstables, so we don't have the problem of purging |
| * a tombstone that could shadow a column in another sstable, but this is doubly not a concern |
| * since validation compaction is read-only. |
| */ |
| return time -> true; |
| } |
| } |
| |
| public Future<?> submitViewBuilder(final ViewBuilder builder) |
| { |
| Runnable runnable = new Runnable() |
| { |
| public void run() |
| { |
| metrics.beginCompaction(builder); |
| try |
| { |
| builder.run(); |
| } |
| finally |
| { |
| metrics.finishCompaction(builder); |
| } |
| } |
| }; |
| if (executor.isShutdown()) |
| { |
| logger.info("Compaction executor has shut down, not submitting index build"); |
| return null; |
| } |
| |
| return executor.submit(runnable); |
| } |
| public int getActiveCompactions() |
| { |
| return CompactionMetrics.getCompactions().size(); |
| } |
| |
| static class CompactionExecutor extends JMXEnabledThreadPoolExecutor |
| { |
| protected CompactionExecutor(int minThreads, int maxThreads, String name, BlockingQueue<Runnable> queue) |
| { |
| super(minThreads, maxThreads, 60, TimeUnit.SECONDS, queue, new NamedThreadFactory(name, Thread.MIN_PRIORITY), "internal"); |
| } |
| |
| private CompactionExecutor(int threadCount, String name) |
| { |
| this(threadCount, threadCount, name, new LinkedBlockingQueue<Runnable>()); |
| } |
| |
| public CompactionExecutor() |
| { |
| this(Math.max(1, DatabaseDescriptor.getConcurrentCompactors()), "CompactionExecutor"); |
| } |
| |
| protected void beforeExecute(Thread t, Runnable r) |
| { |
| // can't set this in Thread factory, so we do it redundantly here |
| isCompactionManager.set(true); |
| super.beforeExecute(t, r); |
| } |
| |
| // modified from DebuggableThreadPoolExecutor so that CompactionInterruptedExceptions are not logged |
| @Override |
| public void afterExecute(Runnable r, Throwable t) |
| { |
| DebuggableThreadPoolExecutor.maybeResetTraceSessionWrapper(r); |
| |
| if (t == null) |
| t = DebuggableThreadPoolExecutor.extractThrowable(r); |
| |
| if (t != null) |
| { |
| if (t instanceof CompactionInterruptedException) |
| { |
| logger.info(t.getMessage()); |
| if (t.getSuppressed() != null && t.getSuppressed().length > 0) |
| logger.warn("Interruption of compaction encountered exceptions:", t); |
| else |
| logger.trace("Full interruption stack trace:", t); |
| } |
| else |
| { |
| DebuggableThreadPoolExecutor.handleOrLog(t); |
| } |
| } |
| |
| // Snapshots cannot be deleted on Windows while segments of the root element are mapped in NTFS. Compactions |
| // unmap those segments which could free up a snapshot for successful deletion. |
| SnapshotDeletingTask.rescheduleFailedTasks(); |
| } |
| |
| public ListenableFuture<?> submitIfRunning(Runnable task, String name) |
| { |
| return submitIfRunning(Executors.callable(task, null), name); |
| } |
| |
| /** |
| * Submit the task but only if the executor has not been shutdown.If the executor has |
| * been shutdown, or in case of a rejected execution exception return a cancelled future. |
| * |
| * @param task - the task to submit |
| * @param name - the task name to use in log messages |
| * |
| * @return the future that will deliver the task result, or a future that has already been |
| * cancelled if the task could not be submitted. |
| */ |
| public ListenableFuture<?> submitIfRunning(Callable<?> task, String name) |
| { |
| if (isShutdown()) |
| { |
| logger.info("Executor has been shut down, not submitting {}", name); |
| return Futures.immediateCancelledFuture(); |
| } |
| |
| try |
| { |
| ListenableFutureTask ret = ListenableFutureTask.create(task); |
| execute(ret); |
| return ret; |
| } |
| catch (RejectedExecutionException ex) |
| { |
| if (isShutdown()) |
| logger.info("Executor has shut down, could not submit {}", name); |
| else |
| logger.error("Failed to submit {}", name, ex); |
| |
| return Futures.immediateCancelledFuture(); |
| } |
| } |
| } |
| |
| private static class ValidationExecutor extends CompactionExecutor |
| { |
| public ValidationExecutor() |
| { |
| super(1, Integer.MAX_VALUE, "ValidationExecutor", new SynchronousQueue<Runnable>()); |
| } |
| } |
| |
| private static class CacheCleanupExecutor extends CompactionExecutor |
| { |
| public CacheCleanupExecutor() |
| { |
| super(1, "CacheCleanupExecutor"); |
| } |
| } |
| |
| public interface CompactionExecutorStatsCollector |
| { |
| void beginCompaction(CompactionInfo.Holder ci); |
| |
| void finishCompaction(CompactionInfo.Holder ci); |
| } |
| |
| public List<Map<String, String>> getCompactions() |
| { |
| List<Holder> compactionHolders = CompactionMetrics.getCompactions(); |
| List<Map<String, String>> out = new ArrayList<Map<String, String>>(compactionHolders.size()); |
| for (CompactionInfo.Holder ci : compactionHolders) |
| out.add(ci.getCompactionInfo().asMap()); |
| return out; |
| } |
| |
| public List<String> getCompactionSummary() |
| { |
| List<Holder> compactionHolders = CompactionMetrics.getCompactions(); |
| List<String> out = new ArrayList<String>(compactionHolders.size()); |
| for (CompactionInfo.Holder ci : compactionHolders) |
| out.add(ci.getCompactionInfo().toString()); |
| return out; |
| } |
| |
| public TabularData getCompactionHistory() |
| { |
| try |
| { |
| return SystemKeyspace.getCompactionHistory(); |
| } |
| catch (OpenDataException e) |
| { |
| throw new RuntimeException(e); |
| } |
| } |
| |
| public long getTotalBytesCompacted() |
| { |
| return metrics.bytesCompacted.getCount(); |
| } |
| |
| public long getTotalCompactionsCompleted() |
| { |
| return metrics.totalCompactionsCompleted.getCount(); |
| } |
| |
| public int getPendingTasks() |
| { |
| return metrics.pendingTasks.getValue(); |
| } |
| |
| public long getCompletedTasks() |
| { |
| return metrics.completedTasks.getValue(); |
| } |
| |
| public void stopCompaction(String type) |
| { |
| OperationType operation = OperationType.valueOf(type); |
| for (Holder holder : CompactionMetrics.getCompactions()) |
| { |
| if (holder.getCompactionInfo().getTaskType() == operation) |
| holder.stop(); |
| } |
| } |
| |
| public void stopCompactionById(String compactionId) |
| { |
| for (Holder holder : CompactionMetrics.getCompactions()) |
| { |
| UUID holderId = holder.getCompactionInfo().compactionId(); |
| if (holderId != null && holderId.equals(UUID.fromString(compactionId))) |
| holder.stop(); |
| } |
| } |
| |
| public void setConcurrentCompactors(int value) |
| { |
| if (value > executor.getCorePoolSize()) |
| { |
| // we are increasing the value |
| executor.setMaximumPoolSize(value); |
| executor.setCorePoolSize(value); |
| } |
| else if (value < executor.getCorePoolSize()) |
| { |
| // we are reducing the value |
| executor.setCorePoolSize(value); |
| executor.setMaximumPoolSize(value); |
| } |
| } |
| |
| public int getCoreCompactorThreads() |
| { |
| return executor.getCorePoolSize(); |
| } |
| |
| public void setCoreCompactorThreads(int number) |
| { |
| executor.setCorePoolSize(number); |
| } |
| |
| public int getMaximumCompactorThreads() |
| { |
| return executor.getMaximumPoolSize(); |
| } |
| |
| public void setMaximumCompactorThreads(int number) |
| { |
| executor.setMaximumPoolSize(number); |
| } |
| |
| public int getCoreValidationThreads() |
| { |
| return validationExecutor.getCorePoolSize(); |
| } |
| |
| public void setCoreValidationThreads(int number) |
| { |
| validationExecutor.setCorePoolSize(number); |
| } |
| |
| public int getMaximumValidatorThreads() |
| { |
| return validationExecutor.getMaximumPoolSize(); |
| } |
| |
| public void setMaximumValidatorThreads(int number) |
| { |
| validationExecutor.setMaximumPoolSize(number); |
| } |
| |
| /** |
| * Try to stop all of the compactions for given ColumnFamilies. |
| * |
| * Note that this method does not wait for all compactions to finish; you'll need to loop against |
| * isCompacting if you want that behavior. |
| * |
| * @param columnFamilies The ColumnFamilies to try to stop compaction upon. |
| * @param interruptValidation true if validation operations for repair should also be interrupted |
| * |
| */ |
| public void interruptCompactionFor(Iterable<CFMetaData> columnFamilies, boolean interruptValidation) |
| { |
| assert columnFamilies != null; |
| |
| // interrupt in-progress compactions |
| for (Holder compactionHolder : CompactionMetrics.getCompactions()) |
| { |
| CompactionInfo info = compactionHolder.getCompactionInfo(); |
| if ((info.getTaskType() == OperationType.VALIDATION) && !interruptValidation) |
| continue; |
| |
| // cfmetadata is null for index summary redistributions which are 'global' - they involve all keyspaces/tables |
| if (info.getCFMetaData() == null || Iterables.contains(columnFamilies, info.getCFMetaData())) |
| compactionHolder.stop(); // signal compaction to stop |
| } |
| } |
| |
| public void interruptCompactionForCFs(Iterable<ColumnFamilyStore> cfss, boolean interruptValidation) |
| { |
| List<CFMetaData> metadata = new ArrayList<>(); |
| for (ColumnFamilyStore cfs : cfss) |
| metadata.add(cfs.metadata); |
| |
| interruptCompactionFor(metadata, interruptValidation); |
| } |
| |
| public void waitForCessation(Iterable<ColumnFamilyStore> cfss) |
| { |
| long start = System.nanoTime(); |
| long delay = TimeUnit.MINUTES.toNanos(1); |
| while (System.nanoTime() - start < delay) |
| { |
| if (CompactionManager.instance.isCompacting(cfss)) |
| Uninterruptibles.sleepUninterruptibly(1, TimeUnit.MILLISECONDS); |
| else |
| break; |
| } |
| } |
| |
| /** |
| * Return whether "global" compactions should be paused, used by ColumnFamilyStore#runWithCompactionsDisabled |
| * |
| * a global compaction is one that includes several/all tables, currently only IndexSummaryBuilder |
| */ |
| public boolean isGlobalCompactionPaused() |
| { |
| return globalCompactionPauseCount.get() > 0; |
| } |
| |
| public CompactionPauser pauseGlobalCompaction() |
| { |
| CompactionPauser pauser = globalCompactionPauseCount::decrementAndGet; |
| globalCompactionPauseCount.incrementAndGet(); |
| return pauser; |
| } |
| |
| public interface CompactionPauser extends AutoCloseable |
| { |
| public void close(); |
| } |
| } |