blob: af9888a3f1056666fe11bb35467f8eb22069a5b7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.db.repair;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.BooleanSupplier;
import java.util.function.Function;
import java.util.stream.Collectors;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.Uninterruptibles;
import org.apache.cassandra.concurrent.FutureTask;
import org.apache.cassandra.utils.TimeUUID;
import org.apache.cassandra.utils.concurrent.Future;
import org.apache.cassandra.utils.concurrent.FutureCombiner;
import org.apache.cassandra.utils.concurrent.ImmediateFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.compaction.CompactionInfo;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
import org.apache.cassandra.locator.RangesAtEndpoint;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.utils.concurrent.Refs;
import static org.apache.cassandra.service.ActiveRepairService.NO_PENDING_REPAIR;
import static org.apache.cassandra.service.ActiveRepairService.UNREPAIRED_SSTABLE;
import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis;
/**
* Performs an anti compaction on a set of tables and token ranges, isolating the unrepaired sstables
* for a give token range into a pending repair group so they can't be compacted with other sstables
* while they are being repaired.
*/
public class PendingAntiCompaction
{
private static final Logger logger = LoggerFactory.getLogger(PendingAntiCompaction.class);
private static final int ACQUIRE_SLEEP_MS = Integer.getInteger("cassandra.acquire_sleep_ms", 1000);
private static final int ACQUIRE_RETRY_SECONDS = Integer.getInteger("cassandra.acquire_retry_seconds", 60);
public static class AcquireResult
{
final ColumnFamilyStore cfs;
final Refs<SSTableReader> refs;
final LifecycleTransaction txn;
AcquireResult(ColumnFamilyStore cfs, Refs<SSTableReader> refs, LifecycleTransaction txn)
{
this.cfs = cfs;
this.refs = refs;
this.txn = txn;
}
@VisibleForTesting
public void abort()
{
if (txn != null)
txn.abort();
if (refs != null)
refs.release();
}
}
static class SSTableAcquisitionException extends RuntimeException
{
SSTableAcquisitionException(String message)
{
super(message);
}
}
@VisibleForTesting
static class AntiCompactionPredicate implements Predicate<SSTableReader>
{
private final Collection<Range<Token>> ranges;
private final TimeUUID prsid;
public AntiCompactionPredicate(Collection<Range<Token>> ranges, TimeUUID prsid)
{
this.ranges = ranges;
this.prsid = prsid;
}
public boolean apply(SSTableReader sstable)
{
if (!sstable.intersects(ranges))
return false;
StatsMetadata metadata = sstable.getSSTableMetadata();
// exclude repaired sstables
if (metadata.repairedAt != UNREPAIRED_SSTABLE)
return false;
if (!sstable.descriptor.version.hasPendingRepair())
{
String message = String.format("Prepare phase failed because it encountered legacy sstables that don't " +
"support pending repair, run upgradesstables before starting incremental " +
"repairs, repair session (%s)", prsid);
throw new SSTableAcquisitionException(message);
}
// exclude sstables pending repair, but record session ids for
// non-finalized sessions for a later error message
if (metadata.pendingRepair != NO_PENDING_REPAIR)
{
if (!ActiveRepairService.instance.consistent.local.isSessionFinalized(metadata.pendingRepair))
{
String message = String.format("Prepare phase for incremental repair session %s has failed because it encountered " +
"intersecting sstables belonging to another incremental repair session (%s). This is " +
"caused by starting an incremental repair session before a previous one has completed. " +
"Check nodetool repair_admin for hung sessions and fix them.", prsid, metadata.pendingRepair);
throw new SSTableAcquisitionException(message);
}
return false;
}
Collection<CompactionInfo> cis = CompactionManager.instance.active.getCompactionsForSSTable(sstable, OperationType.ANTICOMPACTION);
if (cis != null && !cis.isEmpty())
{
// todo: start tracking the parent repair session id that created the anticompaction to be able to give a better error messsage here:
StringBuilder sb = new StringBuilder();
sb.append("Prepare phase for incremental repair session ");
sb.append(prsid);
sb.append(" has failed because it encountered intersecting sstables belonging to another incremental repair session. ");
sb.append("This is caused by starting multiple conflicting incremental repairs at the same time. ");
sb.append("Conflicting anticompactions: ");
for (CompactionInfo ci : cis)
sb.append(ci.getTaskId() == null ? "no compaction id" : ci.getTaskId()).append(':').append(ci.getSSTables()).append(',');
throw new SSTableAcquisitionException(sb.toString());
}
return true;
}
}
public static class AcquisitionCallable implements Callable<AcquireResult>
{
private final ColumnFamilyStore cfs;
private final TimeUUID sessionID;
private final AntiCompactionPredicate predicate;
private final int acquireRetrySeconds;
private final int acquireSleepMillis;
@VisibleForTesting
public AcquisitionCallable(ColumnFamilyStore cfs, Collection<Range<Token>> ranges, TimeUUID sessionID, int acquireRetrySeconds, int acquireSleepMillis)
{
this(cfs, sessionID, acquireRetrySeconds, acquireSleepMillis, new AntiCompactionPredicate(ranges, sessionID));
}
@VisibleForTesting
AcquisitionCallable(ColumnFamilyStore cfs, TimeUUID sessionID, int acquireRetrySeconds, int acquireSleepMillis, AntiCompactionPredicate predicate)
{
this.cfs = cfs;
this.sessionID = sessionID;
this.predicate = predicate;
this.acquireRetrySeconds = acquireRetrySeconds;
this.acquireSleepMillis = acquireSleepMillis;
}
@SuppressWarnings("resource")
private AcquireResult acquireTuple()
{
// this method runs with compactions stopped & disabled
try
{
// using predicate might throw if there are conflicting ranges
Set<SSTableReader> sstables = cfs.getLiveSSTables().stream().filter(predicate).collect(Collectors.toSet());
if (sstables.isEmpty())
return new AcquireResult(cfs, null, null);
LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION);
if (txn != null)
return new AcquireResult(cfs, Refs.ref(sstables), txn);
else
logger.error("Could not mark compacting for {} (sstables = {}, compacting = {})", sessionID, sstables, cfs.getTracker().getCompacting());
}
catch (SSTableAcquisitionException e)
{
logger.warn(e.getMessage());
logger.debug("Got exception trying to acquire sstables", e);
}
return null;
}
public AcquireResult call()
{
logger.debug("acquiring sstables for pending anti compaction on session {}", sessionID);
// try to modify after cancelling running compactions. This will attempt to cancel in flight compactions including the given sstables for
// up to a minute, after which point, null will be returned
long start = currentTimeMillis();
long delay = TimeUnit.SECONDS.toMillis(acquireRetrySeconds);
// Note that it is `predicate` throwing SSTableAcquisitionException if it finds a conflicting sstable
// and we only retry when runWithCompactionsDisabled throws when uses the predicate, not when acquireTuple is.
// This avoids the case when we have an sstable [0, 100] and a user starts a repair on [0, 50] and then [51, 100] before
// anticompaction has finished but not when the second repair is [25, 75] for example - then we will fail it without retry.
do
{
try
{
// Note that anticompactions are not disabled when running this. This is safe since runWithCompactionsDisabled
// is synchronized - acquireTuple and predicate can only be run by a single thread (for the given cfs).
return cfs.runWithCompactionsDisabled(this::acquireTuple, predicate, false, false, false);
}
catch (SSTableAcquisitionException e)
{
logger.warn("Session {} failed acquiring sstables: {}, retrying every {}ms for another {}s",
sessionID,
e.getMessage(),
acquireSleepMillis,
TimeUnit.SECONDS.convert(delay + start - currentTimeMillis(), TimeUnit.MILLISECONDS));
Uninterruptibles.sleepUninterruptibly(acquireSleepMillis, TimeUnit.MILLISECONDS);
if (currentTimeMillis() - start > delay)
logger.warn("{} Timed out waiting to acquire sstables", sessionID, e);
}
catch (Throwable t)
{
logger.error("Got exception disabling compactions for session {}", sessionID, t);
throw t;
}
} while (currentTimeMillis() - start < delay);
return null;
}
}
static class AcquisitionCallback implements Function<List<AcquireResult>, Future<List<Void>>>
{
private final TimeUUID parentRepairSession;
private final RangesAtEndpoint tokenRanges;
private final BooleanSupplier isCancelled;
public AcquisitionCallback(TimeUUID parentRepairSession, RangesAtEndpoint tokenRanges, BooleanSupplier isCancelled)
{
this.parentRepairSession = parentRepairSession;
this.tokenRanges = tokenRanges;
this.isCancelled = isCancelled;
}
Future<Void> submitPendingAntiCompaction(AcquireResult result)
{
return CompactionManager.instance.submitPendingAntiCompaction(result.cfs, tokenRanges, result.refs, result.txn, parentRepairSession, isCancelled);
}
private static boolean shouldAbort(AcquireResult result)
{
if (result == null)
return true;
// sstables in the acquire result are now marked compacting and are locked to this anti compaction. If any
// of them are marked repaired or pending repair, acquisition raced with another pending anti-compaction, or
// possibly even a repair session, and we need to abort to prevent sstables from moving between sessions.
return result.refs != null && Iterables.any(result.refs, sstable -> {
StatsMetadata metadata = sstable.getSSTableMetadata();
return metadata.pendingRepair != NO_PENDING_REPAIR || metadata.repairedAt != UNREPAIRED_SSTABLE;
});
}
public Future<List<Void>> apply(List<AcquireResult> results)
{
if (Iterables.any(results, AcquisitionCallback::shouldAbort))
{
// Release all sstables, and report failure back to coordinator
for (AcquireResult result : results)
{
if (result != null)
{
logger.info("Releasing acquired sstables for {}.{}", result.cfs.metadata.keyspace, result.cfs.metadata.name);
result.abort();
}
}
String message = String.format("Prepare phase for incremental repair session %s was unable to " +
"acquire exclusive access to the neccesary sstables. " +
"This is usually caused by running multiple incremental repairs on nodes that share token ranges",
parentRepairSession);
logger.warn(message);
return ImmediateFuture.failure(new SSTableAcquisitionException(message));
}
else
{
List<Future<Void>> pendingAntiCompactions = new ArrayList<>(results.size());
for (AcquireResult result : results)
{
if (result.txn != null)
{
Future<Void> future = submitPendingAntiCompaction(result);
pendingAntiCompactions.add(future);
}
}
return FutureCombiner.allOf(pendingAntiCompactions);
}
}
}
private final TimeUUID prsId;
private final Collection<ColumnFamilyStore> tables;
private final RangesAtEndpoint tokenRanges;
private final ExecutorService executor;
private final int acquireRetrySeconds;
private final int acquireSleepMillis;
private final BooleanSupplier isCancelled;
public PendingAntiCompaction(TimeUUID prsId,
Collection<ColumnFamilyStore> tables,
RangesAtEndpoint tokenRanges,
ExecutorService executor,
BooleanSupplier isCancelled)
{
this(prsId, tables, tokenRanges, ACQUIRE_RETRY_SECONDS, ACQUIRE_SLEEP_MS, executor, isCancelled);
}
@VisibleForTesting
PendingAntiCompaction(TimeUUID prsId,
Collection<ColumnFamilyStore> tables,
RangesAtEndpoint tokenRanges,
int acquireRetrySeconds,
int acquireSleepMillis,
ExecutorService executor,
BooleanSupplier isCancelled)
{
this.prsId = prsId;
this.tables = tables;
this.tokenRanges = tokenRanges;
this.executor = executor;
this.acquireRetrySeconds = acquireRetrySeconds;
this.acquireSleepMillis = acquireSleepMillis;
this.isCancelled = isCancelled;
}
public Future<List<Void>> run()
{
List<FutureTask<AcquireResult>> tasks = new ArrayList<>(tables.size());
for (ColumnFamilyStore cfs : tables)
{
cfs.forceBlockingFlush(ColumnFamilyStore.FlushReason.ANTICOMPACTION);
FutureTask<AcquireResult> task = new FutureTask<>(getAcquisitionCallable(cfs, tokenRanges.ranges(), prsId, acquireRetrySeconds, acquireSleepMillis));
executor.submit(task);
tasks.add(task);
}
Future<List<AcquireResult>> acquisitionResults = FutureCombiner.successfulOf(tasks);
return acquisitionResults.flatMap(getAcquisitionCallback(prsId, tokenRanges));
}
@VisibleForTesting
protected AcquisitionCallable getAcquisitionCallable(ColumnFamilyStore cfs, Set<Range<Token>> ranges, TimeUUID prsId, int acquireRetrySeconds, int acquireSleepMillis)
{
return new AcquisitionCallable(cfs, ranges, prsId, acquireRetrySeconds, acquireSleepMillis);
}
@VisibleForTesting
protected AcquisitionCallback getAcquisitionCallback(TimeUUID prsId, RangesAtEndpoint tokenRanges)
{
return new AcquisitionCallback(prsId, tokenRanges, isCancelled);
}
}