blob: b1eadc56732e595dd767cf2d564e3818f0b7b5fb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.db.lifecycle;
import java.io.File;
import java.nio.file.Path;
import java.util.*;
import java.util.function.BiFunction;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Predicate;
import com.google.common.collect.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Directories;
import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.io.sstable.SSTable;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.format.SSTableReader.UniqueIdentifier;
import org.apache.cassandra.utils.concurrent.Transactional;
import static com.google.common.base.Functions.compose;
import static com.google.common.base.Predicates.*;
import static com.google.common.collect.ImmutableSet.copyOf;
import static com.google.common.collect.Iterables.*;
import static java.util.Collections.singleton;
import static org.apache.cassandra.db.lifecycle.Helpers.*;
import static org.apache.cassandra.db.lifecycle.View.updateCompacting;
import static org.apache.cassandra.db.lifecycle.View.updateLiveSet;
import static org.apache.cassandra.utils.Throwables.maybeFail;
import static org.apache.cassandra.utils.concurrent.Refs.release;
import static org.apache.cassandra.utils.concurrent.Refs.selfRefs;
/**
* IMPORTANT: When this object is involved in a transactional graph, for correct behaviour its commit MUST occur before
* any others, since it may legitimately fail. This is consistent with the Transactional API, which permits one failing
* action to occur at the beginning of the commit phase, but also *requires* that the prepareToCommit() phase only take
* actions that can be rolled back.
*/
public class LifecycleTransaction extends Transactional.AbstractTransactional
{
private static final Logger logger = LoggerFactory.getLogger(LifecycleTransaction.class);
/**
* A class that represents accumulated modifications to the Tracker.
* has two instances, one containing modifications that are "staged" (i.e. invisible)
* and one containing those "logged" that have been made visible through a call to checkpoint()
*/
private static class State
{
// readers that are either brand new, update a previous new reader, or update one of the original readers
final Set<SSTableReader> update = new HashSet<>();
// disjoint from update, represents a subset of originals that is no longer needed
final Set<SSTableReader> obsolete = new HashSet<>();
void log(State staged)
{
update.removeAll(staged.obsolete);
update.removeAll(staged.update);
update.addAll(staged.update);
obsolete.addAll(staged.obsolete);
}
boolean contains(SSTableReader reader)
{
return update.contains(reader) || obsolete.contains(reader);
}
boolean isEmpty()
{
return update.isEmpty() && obsolete.isEmpty();
}
void clear()
{
update.clear();
obsolete.clear();
}
}
public final Tracker tracker;
// The transaction logs keep track of new and old sstable files
private final LogTransaction log;
// the original readers this transaction was opened over, and that it guards
// (no other transactions may operate over these readers concurrently)
private final Set<SSTableReader> originals = new HashSet<>();
// the set of readers we've marked as compacting (only updated on creation and in checkpoint())
private final Set<SSTableReader> marked = new HashSet<>();
// the identity set of readers we've ever encountered; used to ensure we don't accidentally revisit the
// same version of a reader. potentially a dangerous property if there are reference counting bugs
// as they won't be caught until the transaction's lifespan is over.
private final Set<UniqueIdentifier> identities = Collections.newSetFromMap(new IdentityHashMap<>());
// changes that have been made visible
private final State logged = new State();
// changes that are pending
private final State staged = new State();
// the tidier and their readers, to be used for marking readers obsoleted during a commit
private List<LogTransaction.Obsoletion> obsoletions;
/**
* construct a Transaction for use in an offline operation
*/
public static LifecycleTransaction offline(OperationType operationType, SSTableReader reader)
{
return offline(operationType, singleton(reader));
}
/**
* construct a Transaction for use in an offline operation
*/
public static LifecycleTransaction offline(OperationType operationType, Iterable<SSTableReader> readers)
{
// if offline, for simplicity we just use a dummy tracker
Tracker dummy = new Tracker(null, false);
dummy.addInitialSSTables(readers);
dummy.apply(updateCompacting(emptySet(), readers));
return new LifecycleTransaction(dummy, operationType, readers);
}
/**
* construct an empty Transaction with no existing readers
*/
@SuppressWarnings("resource") // log closed during postCleanup
public static LifecycleTransaction offline(OperationType operationType)
{
Tracker dummy = new Tracker(null, false);
return new LifecycleTransaction(dummy, new LogTransaction(operationType, dummy), Collections.emptyList());
}
@SuppressWarnings("resource") // log closed during postCleanup
LifecycleTransaction(Tracker tracker, OperationType operationType, Iterable<SSTableReader> readers)
{
this(tracker, new LogTransaction(operationType, tracker), readers);
}
LifecycleTransaction(Tracker tracker, LogTransaction log, Iterable<SSTableReader> readers)
{
this.tracker = tracker;
this.log = log;
for (SSTableReader reader : readers)
{
originals.add(reader);
marked.add(reader);
identities.add(reader.instanceId);
}
}
public LogTransaction log()
{
return log;
}
public OperationType opType()
{
return log.type();
}
public UUID opId()
{
return log.id();
}
public void doPrepare()
{
// note for future: in anticompaction two different operations use the same Transaction, and both prepareToCommit()
// separately: the second prepareToCommit is ignored as a "redundant" transition. since it is only a checkpoint
// (and these happen anyway) this is fine but if more logic gets inserted here than is performed in a checkpoint,
// it may break this use case, and care is needed
checkpoint();
// prepare for compaction obsolete readers as long as they were part of the original set
// since those that are not original are early readers that share the same desc with the finals
maybeFail(prepareForObsoletion(filterIn(logged.obsolete, originals), log, obsoletions = new ArrayList<>(), null));
log.prepareToCommit();
}
/**
* point of no return: commit all changes, but leave all readers marked as compacting
*/
public Throwable doCommit(Throwable accumulate)
{
assert staged.isEmpty() : "must be no actions introduced between prepareToCommit and a commit";
logger.trace("Committing update:{}, obsolete:{}", staged.update, staged.obsolete);
// accumulate must be null if we have been used correctly, so fail immediately if it is not
maybeFail(accumulate);
// transaction log commit failure means we must abort; safe commit is not possible
maybeFail(log.commit(null));
// this is now the point of no return; we cannot safely rollback, so we ignore exceptions until we're done
// we restore state by obsoleting our obsolete files, releasing our references to them, and updating our size
// and notification status for the obsolete and new files
accumulate = markObsolete(obsoletions, accumulate);
accumulate = tracker.updateSizeTracking(logged.obsolete, logged.update, accumulate);
accumulate = release(selfRefs(logged.obsolete), accumulate);
accumulate = tracker.notifySSTablesChanged(originals, logged.update, log.type(), accumulate);
return accumulate;
}
/**
* undo all of the changes made by this transaction, resetting the state to its original form
*/
public Throwable doAbort(Throwable accumulate)
{
if (logger.isTraceEnabled())
logger.trace("Aborting transaction over {}, with ({},{}) logged and ({},{}) staged", originals, logged.update, logged.obsolete, staged.update, staged.obsolete);
accumulate = abortObsoletion(obsoletions, accumulate);
if (logged.isEmpty() && staged.isEmpty())
return log.abort(accumulate);
// mark obsolete all readers that are not versions of those present in the original set
Iterable<SSTableReader> obsolete = filterOut(concatUniq(staged.update, logged.update), originals);
logger.trace("Obsoleting {}", obsolete);
accumulate = prepareForObsoletion(obsolete, log, obsoletions = new ArrayList<>(), accumulate);
// it's safe to abort even if committed, see maybeFail in doCommit() above, in this case it will just report
// a failure to abort, which is useful information to have for debug
accumulate = log.abort(accumulate);
accumulate = markObsolete(obsoletions, accumulate);
// replace all updated readers with a version restored to its original state
List<SSTableReader> restored = restoreUpdatedOriginals();
List<SSTableReader> invalid = Lists.newArrayList(Iterables.concat(logged.update, logged.obsolete));
accumulate = tracker.apply(updateLiveSet(logged.update, restored), accumulate);
accumulate = tracker.notifySSTablesChanged(invalid, restored, OperationType.COMPACTION, accumulate);
// setReplaced immediately preceding versions that have not been obsoleted
accumulate = setReplaced(logged.update, accumulate);
// we have replaced all of logged.update and never made visible staged.update,
// and the files we have logged as obsolete we clone fresh versions of, so they are no longer needed either
// any _staged_ obsoletes should either be in staged.update already, and dealt with there,
// or is still in its original form (so left as is); in either case no extra action is needed
accumulate = release(selfRefs(concat(staged.update, logged.update, logged.obsolete)), accumulate);
logged.clear();
staged.clear();
return accumulate;
}
@Override
protected Throwable doPostCleanup(Throwable accumulate)
{
log.close();
return unmarkCompacting(marked, accumulate);
}
public boolean isOffline()
{
return tracker.isDummy();
}
public void permitRedundantTransitions()
{
super.permitRedundantTransitions();
}
/**
* call when a consistent batch of changes is ready to be made atomically visible
* these will be exposed in the Tracker atomically, or an exception will be thrown; in this case
* the transaction should be rolled back
*/
public void checkpoint()
{
maybeFail(checkpoint(null));
}
private Throwable checkpoint(Throwable accumulate)
{
if (logger.isTraceEnabled())
logger.trace("Checkpointing update:{}, obsolete:{}", staged.update, staged.obsolete);
if (staged.isEmpty())
return accumulate;
Set<SSTableReader> toUpdate = toUpdate();
Set<SSTableReader> fresh = copyOf(fresh());
// check the current versions of the readers we're replacing haven't somehow been replaced by someone else
checkNotReplaced(filterIn(toUpdate, staged.update));
// ensure any new readers are in the compacting set, since we aren't done with them yet
// and don't want anyone else messing with them
// apply atomically along with updating the live set of readers
tracker.apply(compose(updateCompacting(emptySet(), fresh),
updateLiveSet(toUpdate, staged.update)));
// log the staged changes and our newly marked readers
marked.addAll(fresh);
logged.log(staged);
// setup our tracker, and mark our prior versions replaced, also releasing our references to them
// we do not replace/release obsoleted readers, since we may need to restore them on rollback
accumulate = setReplaced(filterOut(toUpdate, staged.obsolete), accumulate);
accumulate = release(selfRefs(filterOut(toUpdate, staged.obsolete)), accumulate);
staged.clear();
return accumulate;
}
/**
* update a reader: if !original, this is a reader that is being introduced by this transaction;
* otherwise it must be in the originals() set, i.e. a reader guarded by this transaction
*/
public void update(SSTableReader reader, boolean original)
{
assert !staged.update.contains(reader) : "each reader may only be updated once per checkpoint: " + reader;
assert !identities.contains(reader.instanceId) : "each reader instance may only be provided as an update once: " + reader;
// check it isn't obsolete, and that it matches the original flag
assert !(logged.obsolete.contains(reader) || staged.obsolete.contains(reader)) : "may not update a reader that has been obsoleted";
assert original == originals.contains(reader) : String.format("the 'original' indicator was incorrect (%s provided): %s", original, reader);
staged.update.add(reader);
identities.add(reader.instanceId);
if (!isOffline())
reader.setupOnline();
}
public void update(Collection<SSTableReader> readers, boolean original)
{
for(SSTableReader reader: readers)
{
update(reader, original);
}
}
/**
* mark this reader as for obsoletion : on checkpoint() the reader will be removed from the live set
*/
public void obsolete(SSTableReader reader)
{
logger.trace("Staging for obsolescence {}", reader);
// check this is: a reader guarded by the transaction, an instance we have already worked with
// and that we haven't already obsoleted it, nor do we have other changes staged for it
assert identities.contains(reader.instanceId) : "only reader instances that have previously been provided may be obsoleted: " + reader;
assert originals.contains(reader) : "only readers in the 'original' set may be obsoleted: " + reader + " vs " + originals;
assert !(logged.obsolete.contains(reader) || staged.obsolete.contains(reader)) : "may not obsolete a reader that has already been obsoleted: " + reader;
assert !staged.update.contains(reader) : "may not obsolete a reader that has a staged update (must checkpoint first): " + reader;
assert current(reader) == reader : "may only obsolete the latest version of the reader: " + reader;
staged.obsolete.add(reader);
}
/**
* obsolete every file in the original transaction
*/
public void obsoleteOriginals()
{
logger.trace("Staging for obsolescence {}", originals);
// if we're obsoleting, we should have no staged updates for the original files
assert Iterables.isEmpty(filterIn(staged.update, originals)) : staged.update;
// stage obsoletes for any currently visible versions of any original readers
Iterables.addAll(staged.obsolete, filterIn(current(), originals));
}
/**
* return the readers we're replacing in checkpoint(), i.e. the currently visible version of those in staged
*/
private Set<SSTableReader> toUpdate()
{
return copyOf(filterIn(current(), staged.obsolete, staged.update));
}
/**
* new readers that haven't appeared previously (either in the original set or the logged updates)
*/
private Iterable<SSTableReader> fresh()
{
return filterOut(staged.update, originals, logged.update);
}
/**
* returns the currently visible readers managed by this transaction
*/
public Iterable<SSTableReader> current()
{
// i.e., those that are updates that have been logged (made visible),
// and any original readers that have neither been obsoleted nor updated
return concat(logged.update, filterOut(originals, logged.update, logged.obsolete));
}
/**
* update the current replacement of any original reader back to its original start
*/
private List<SSTableReader> restoreUpdatedOriginals()
{
Iterable<SSTableReader> torestore = filterIn(originals, logged.update, logged.obsolete);
return ImmutableList.copyOf(transform(torestore, (reader) -> current(reader).cloneWithRestoredStart(reader.first)));
}
/**
* the set of readers guarded by this transaction _in their original instance/state_
* call current(SSTableReader) on any reader in this set to get the latest instance
*/
public Set<SSTableReader> originals()
{
return Collections.unmodifiableSet(originals);
}
/**
* indicates if the reader has been marked for obsoletion
*/
public boolean isObsolete(SSTableReader reader)
{
return logged.obsolete.contains(reader) || staged.obsolete.contains(reader);
}
/**
* return the current version of the provided reader, whether or not it is visible or staged;
* i.e. returns the first version present by testing staged, logged and originals in order.
*/
public SSTableReader current(SSTableReader reader)
{
Set<SSTableReader> container;
if (staged.contains(reader))
container = staged.update.contains(reader) ? staged.update : staged.obsolete;
else if (logged.contains(reader))
container = logged.update.contains(reader) ? logged.update : logged.obsolete;
else if (originals.contains(reader))
container = originals;
else throw new AssertionError();
return select(reader, container);
}
/**
* remove the reader from the set we're modifying
*/
public void cancel(SSTableReader cancel)
{
logger.trace("Cancelling {} from transaction", cancel);
assert originals.contains(cancel) : "may only cancel a reader in the 'original' set: " + cancel + " vs " + originals;
assert !(staged.contains(cancel) || logged.contains(cancel)) : "may only cancel a reader that has not been updated or obsoleted in this transaction: " + cancel;
originals.remove(cancel);
marked.remove(cancel);
identities.remove(cancel.instanceId);
maybeFail(unmarkCompacting(singleton(cancel), null));
}
/**
* remove the readers from the set we're modifying
*/
public void cancel(Iterable<SSTableReader> cancels)
{
for (SSTableReader cancel : cancels)
cancel(cancel);
}
/**
* remove the provided readers from this Transaction, and return a new Transaction to manage them
* only permitted to be called if the current Transaction has never been used
*/
public LifecycleTransaction split(Collection<SSTableReader> readers)
{
logger.trace("Splitting {} into new transaction", readers);
checkUnused();
for (SSTableReader reader : readers)
assert identities.contains(reader.instanceId) : "may only split the same reader instance the transaction was opened with: " + reader;
for (SSTableReader reader : readers)
{
identities.remove(reader.instanceId);
originals.remove(reader);
marked.remove(reader);
}
return new LifecycleTransaction(tracker, log.type(), readers);
}
/**
* check this transaction has never been used
*/
private void checkUnused()
{
assert logged.isEmpty();
assert staged.isEmpty();
assert identities.size() == originals.size();
assert originals.size() == marked.size();
}
private Throwable unmarkCompacting(Set<SSTableReader> unmark, Throwable accumulate)
{
accumulate = tracker.apply(updateCompacting(unmark, emptySet()), accumulate);
// when the CFS is invalidated, it will call unreferenceSSTables(). However, unreferenceSSTables only deals
// with sstables that aren't currently being compacted. If there are ongoing compactions that finish or are
// interrupted after the CFS is invalidated, those sstables need to be unreferenced as well, so we do that here.
accumulate = tracker.dropSSTablesIfInvalid(accumulate);
return accumulate;
}
// convenience method for callers that know only one sstable is involved in the transaction
public SSTableReader onlyOne()
{
assert originals.size() == 1;
return getFirst(originals, null);
}
public void trackNew(SSTable table)
{
log.trackNew(table);
}
public void untrackNew(SSTable table)
{
log.untrackNew(table);
}
public static boolean removeUnfinishedLeftovers(ColumnFamilyStore cfs)
{
return LogTransaction.removeUnfinishedLeftovers(cfs.getDirectories().getCFDirectories());
}
public static boolean removeUnfinishedLeftovers(CFMetaData cfMetaData)
{
return LogTransaction.removeUnfinishedLeftovers(cfMetaData);
}
/**
* Get the files in the folder specified, provided that the filter returns true.
* A filter is given each file and its type, and decides which files should be returned
* and which should be discarded. To classify files into their type, we read transaction
* log files. Should we fail to read these log files after a few times, we look at onTxnErr
* to determine what to do.
*
* @param folder - the folder to scan
* @param onTxnErr - how to handle a failure to read a txn log file
* @param filter - A function that receives each file and its type, it should return true to have the file returned
* @return - the list of files that were scanned and for which the filter returned true
*/
public static List<File> getFiles(Path folder, BiFunction<File, Directories.FileType, Boolean> filter, Directories.OnTxnErr onTxnErr)
{
return new LogAwareFileLister(folder, filter, onTxnErr).list();
}
/**
* Retry all deletions that failed the first time around (presumably b/c the sstable was still mmap'd.)
* Useful because there are times when we know GC has been invoked; also exposed as an mbean.
*/
public static void rescheduleFailedDeletions()
{
LogTransaction.rescheduleFailedDeletions();
}
/**
* Deletions run on the nonPeriodicTasks executor, (both failedDeletions or global tidiers in SSTableReader)
* so by scheduling a new empty task and waiting for it we ensure any prior deletion has completed.
*/
public static void waitForDeletions()
{
LogTransaction.waitForDeletions();
}
// a class representing the current state of the reader within this transaction, encoding the actions both logged
// and pending, and the reader instances that are visible now, and will be after the next checkpoint (with null
// indicating either obsolescence, or that the reader does not occur in the transaction; which is defined
// by the corresponding Action)
@VisibleForTesting
public static class ReaderState
{
public enum Action
{
UPDATED, OBSOLETED, NONE;
public static Action get(boolean updated, boolean obsoleted)
{
assert !(updated && obsoleted);
return updated ? UPDATED : obsoleted ? OBSOLETED : NONE;
}
}
final Action staged;
final Action logged;
final SSTableReader nextVisible;
final SSTableReader currentlyVisible;
final boolean original;
public ReaderState(Action logged, Action staged, SSTableReader currentlyVisible, SSTableReader nextVisible, boolean original)
{
this.staged = staged;
this.logged = logged;
this.currentlyVisible = currentlyVisible;
this.nextVisible = nextVisible;
this.original = original;
}
public boolean equals(Object that)
{
return that instanceof ReaderState && equals((ReaderState) that);
}
public boolean equals(ReaderState that)
{
return this.staged == that.staged && this.logged == that.logged && this.original == that.original
&& this.currentlyVisible == that.currentlyVisible && this.nextVisible == that.nextVisible;
}
public String toString()
{
return String.format("[logged=%s staged=%s original=%s]", logged, staged, original);
}
public static SSTableReader visible(SSTableReader reader, Predicate<SSTableReader> obsolete, Collection<SSTableReader> ... selectFrom)
{
return obsolete.apply(reader) ? null : selectFirst(reader, selectFrom);
}
}
@VisibleForTesting
public ReaderState state(SSTableReader reader)
{
SSTableReader currentlyVisible = ReaderState.visible(reader, in(logged.obsolete), logged.update, originals);
SSTableReader nextVisible = ReaderState.visible(reader, orIn(staged.obsolete, logged.obsolete), staged.update, logged.update, originals);
return new ReaderState(ReaderState.Action.get(logged.update.contains(reader), logged.obsolete.contains(reader)),
ReaderState.Action.get(staged.update.contains(reader), staged.obsolete.contains(reader)),
currentlyVisible, nextVisible, originals.contains(reader)
);
}
public String toString()
{
return originals.toString();
}
}