blob: 39ed353bf0a212ffc72f08685d6ea5bef2698bc5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.db.lifecycle;
import java.io.File;
import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicReference;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.collect.*;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Directories;
import org.apache.cassandra.db.Memtable;
import org.apache.cassandra.db.commitlog.CommitLogPosition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.metrics.StorageMetrics;
import org.apache.cassandra.notifications.*;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.Throwables;
import org.apache.cassandra.utils.concurrent.OpOrder;
import static com.google.common.base.Predicates.and;
import static com.google.common.collect.ImmutableSet.copyOf;
import static com.google.common.collect.Iterables.filter;
import static java.util.Collections.singleton;
import static java.util.Collections.singletonList;
import static org.apache.cassandra.db.lifecycle.Helpers.*;
import static org.apache.cassandra.db.lifecycle.View.permitCompacting;
import static org.apache.cassandra.db.lifecycle.View.updateCompacting;
import static org.apache.cassandra.db.lifecycle.View.updateLiveSet;
import static org.apache.cassandra.utils.Throwables.maybeFail;
import static org.apache.cassandra.utils.Throwables.merge;
import static org.apache.cassandra.utils.concurrent.Refs.release;
import static org.apache.cassandra.utils.concurrent.Refs.selfRefs;
/**
* Tracker tracks live {@link View} of data store for a table.
*/
public class Tracker
{
private static final Logger logger = LoggerFactory.getLogger(Tracker.class);
private final Collection<INotificationConsumer> subscribers = new CopyOnWriteArrayList<>();
public final ColumnFamilyStore cfstore;
final AtomicReference<View> view;
public final boolean loadsstables;
/**
* @param memtable Initial Memtable. Can be null.
* @param loadsstables true to indicate to load SSTables (TODO: remove as this is only accessed from 2i)
*/
public Tracker(Memtable memtable, boolean loadsstables)
{
this.cfstore = memtable != null ? memtable.cfs : null;
this.view = new AtomicReference<>();
this.loadsstables = loadsstables;
this.reset(memtable);
}
public LifecycleTransaction tryModify(SSTableReader sstable, OperationType operationType)
{
return tryModify(singleton(sstable), operationType);
}
/**
* @return a Transaction over the provided sstables if we are able to mark the given @param sstables as compacted, before anyone else
*/
public LifecycleTransaction tryModify(Iterable<SSTableReader> sstables, OperationType operationType)
{
if (Iterables.isEmpty(sstables))
return new LifecycleTransaction(this, operationType, sstables);
if (null == apply(permitCompacting(sstables), updateCompacting(emptySet(), sstables)))
return null;
return new LifecycleTransaction(this, operationType, sstables);
}
// METHODS FOR ATOMICALLY MODIFYING THE VIEW
Pair<View, View> apply(Function<View, View> function)
{
return apply(Predicates.<View>alwaysTrue(), function);
}
Throwable apply(Function<View, View> function, Throwable accumulate)
{
try
{
apply(function);
}
catch (Throwable t)
{
accumulate = merge(accumulate, t);
}
return accumulate;
}
/**
* atomically tests permit against the view and applies function to it, if permit yields true, returning the original;
* otherwise the method aborts, returning null
*/
Pair<View, View> apply(Predicate<View> permit, Function<View, View> function)
{
while (true)
{
View cur = view.get();
if (!permit.apply(cur))
return null;
View updated = function.apply(cur);
if (view.compareAndSet(cur, updated))
return Pair.create(cur, updated);
}
}
Throwable updateSizeTracking(Iterable<SSTableReader> oldSSTables, Iterable<SSTableReader> newSSTables, Throwable accumulate)
{
if (isDummy())
return accumulate;
long add = 0;
for (SSTableReader sstable : newSSTables)
{
if (logger.isTraceEnabled())
logger.trace("adding {} to list of files tracked for {}.{}", sstable.descriptor, cfstore.keyspace.getName(), cfstore.name);
try
{
add += sstable.bytesOnDisk();
}
catch (Throwable t)
{
accumulate = merge(accumulate, t);
}
}
long subtract = 0;
for (SSTableReader sstable : oldSSTables)
{
if (logger.isTraceEnabled())
logger.trace("removing {} from list of files tracked for {}.{}", sstable.descriptor, cfstore.keyspace.getName(), cfstore.name);
try
{
subtract += sstable.bytesOnDisk();
}
catch (Throwable t)
{
accumulate = merge(accumulate, t);
}
}
StorageMetrics.load.inc(add - subtract);
cfstore.metric.liveDiskSpaceUsed.inc(add - subtract);
// we don't subtract from total until the sstable is deleted, see TransactionLogs.SSTableTidier
cfstore.metric.totalDiskSpaceUsed.inc(add);
return accumulate;
}
// SETUP / CLEANUP
public void addInitialSSTables(Iterable<SSTableReader> sstables)
{
if (!isDummy())
setupOnline(sstables);
apply(updateLiveSet(emptySet(), sstables));
maybeFail(updateSizeTracking(emptySet(), sstables, null));
// no notifications or backup necessary
}
public void addSSTables(Iterable<SSTableReader> sstables)
{
addInitialSSTables(sstables);
maybeIncrementallyBackup(sstables);
notifyAdded(sstables);
}
/** (Re)initializes the tracker, purging all references. */
@VisibleForTesting
public void reset(Memtable memtable)
{
view.set(new View(memtable != null ? singletonList(memtable) : Collections.emptyList(),
Collections.emptyList(),
Collections.emptyMap(),
Collections.emptyMap(),
SSTableIntervalTree.empty()));
}
public Throwable dropSSTablesIfInvalid(Throwable accumulate)
{
if (!isDummy() && !cfstore.isValid())
accumulate = dropSSTables(accumulate);
return accumulate;
}
public void dropSSTables()
{
maybeFail(dropSSTables(null));
}
public Throwable dropSSTables(Throwable accumulate)
{
return dropSSTables(Predicates.<SSTableReader>alwaysTrue(), OperationType.UNKNOWN, accumulate);
}
/**
* removes all sstables that are not busy compacting.
*/
public Throwable dropSSTables(final Predicate<SSTableReader> remove, OperationType operationType, Throwable accumulate)
{
try (LogTransaction txnLogs = new LogTransaction(operationType, this))
{
Pair<View, View> result = apply(view -> {
Set<SSTableReader> toremove = copyOf(filter(view.sstables, and(remove, notIn(view.compacting))));
return updateLiveSet(toremove, emptySet()).apply(view);
});
Set<SSTableReader> removed = Sets.difference(result.left.sstables, result.right.sstables);
assert Iterables.all(removed, remove);
// It is important that any method accepting/returning a Throwable never throws an exception, and does its best
// to complete the instructions given to it
List<LogTransaction.Obsoletion> obsoletions = new ArrayList<>();
accumulate = prepareForObsoletion(removed, txnLogs, obsoletions, accumulate);
try
{
txnLogs.finish();
if (!removed.isEmpty())
{
accumulate = markObsolete(obsoletions, accumulate);
accumulate = updateSizeTracking(removed, emptySet(), accumulate);
accumulate = release(selfRefs(removed), accumulate);
// notifySSTablesChanged -> LeveledManifest.promote doesn't like a no-op "promotion"
accumulate = notifySSTablesChanged(removed, Collections.<SSTableReader>emptySet(), txnLogs.type(), accumulate);
}
}
catch (Throwable t)
{
accumulate = abortObsoletion(obsoletions, accumulate);
accumulate = Throwables.merge(accumulate, t);
}
}
catch (Throwable t)
{
accumulate = Throwables.merge(accumulate, t);
}
return accumulate;
}
/**
* Removes every SSTable in the directory from the Tracker's view.
* @param directory the unreadable directory, possibly with SSTables in it, but not necessarily.
*/
public void removeUnreadableSSTables(final File directory)
{
maybeFail(dropSSTables(new Predicate<SSTableReader>()
{
public boolean apply(SSTableReader reader)
{
return reader.descriptor.directory.equals(directory);
}
}, OperationType.UNKNOWN, null));
}
// FLUSHING
/**
* get the Memtable that the ordered writeOp should be directed to
*/
public Memtable getMemtableFor(OpOrder.Group opGroup, CommitLogPosition commitLogPosition)
{
// since any new memtables appended to the list after we fetch it will be for operations started
// after us, we can safely assume that we will always find the memtable that 'accepts' us;
// if the barrier for any memtable is set whilst we are reading the list, it must accept us.
// there may be multiple memtables in the list that would 'accept' us, however we only ever choose
// the oldest such memtable, as accepts() only prevents us falling behind (i.e. ensures we don't
// assign operations to a memtable that was retired/queued before we started)
for (Memtable memtable : view.get().liveMemtables)
{
if (memtable.accepts(opGroup, commitLogPosition))
return memtable;
}
throw new AssertionError(view.get().liveMemtables.toString());
}
/**
* Switch the current memtable. This atomically appends a new memtable to the end of the list of active memtables,
* returning the previously last memtable. It leaves the previous Memtable in the list of live memtables until
* discarding(memtable) is called. These two methods must be synchronized/paired, i.e. m = switchMemtable
* must be followed by discarding(m), they cannot be interleaved.
*
* @return the previously active memtable
*/
public Memtable switchMemtable(boolean truncating, Memtable newMemtable)
{
Pair<View, View> result = apply(View.switchMemtable(newMemtable));
if (truncating)
notifyRenewed(newMemtable);
else
notifySwitched(result.left.getCurrentMemtable());
return result.left.getCurrentMemtable();
}
public void markFlushing(Memtable memtable)
{
apply(View.markFlushing(memtable));
}
public void replaceFlushed(Memtable memtable, Iterable<SSTableReader> sstables)
{
assert !isDummy();
if (Iterables.isEmpty(sstables))
{
// sstable may be null if we flushed batchlog and nothing needed to be retained
// if it's null, we don't care what state the cfstore is in, we just replace it and continue
apply(View.replaceFlushed(memtable, null));
return;
}
sstables.forEach(SSTableReader::setupOnline);
// back up before creating a new Snapshot (which makes the new one eligible for compaction)
maybeIncrementallyBackup(sstables);
apply(View.replaceFlushed(memtable, sstables));
Throwable fail;
fail = updateSizeTracking(emptySet(), sstables, null);
notifyDiscarded(memtable);
// TODO: if we're invalidated, should we notifyadded AND removed, or just skip both?
fail = notifyAdded(sstables, fail);
if (!isDummy() && !cfstore.isValid())
dropSSTables();
maybeFail(fail);
}
// MISCELLANEOUS public utility calls
public Set<SSTableReader> getCompacting()
{
return view.get().compacting;
}
public Iterable<SSTableReader> getUncompacting()
{
return view.get().select(SSTableSet.NONCOMPACTING);
}
public Iterable<SSTableReader> getUncompacting(Iterable<SSTableReader> candidates)
{
return view.get().getUncompacting(candidates);
}
public void maybeIncrementallyBackup(final Iterable<SSTableReader> sstables)
{
if (!DatabaseDescriptor.isIncrementalBackupsEnabled())
return;
for (SSTableReader sstable : sstables)
{
File backupsDir = Directories.getBackupsDirectory(sstable.descriptor);
sstable.createLinks(FileUtils.getCanonicalPath(backupsDir));
}
}
// NOTIFICATION
Throwable notifySSTablesChanged(Collection<SSTableReader> removed, Collection<SSTableReader> added, OperationType compactionType, Throwable accumulate)
{
INotification notification = new SSTableListChangedNotification(added, removed, compactionType);
for (INotificationConsumer subscriber : subscribers)
{
try
{
subscriber.handleNotification(notification, this);
}
catch (Throwable t)
{
accumulate = merge(accumulate, t);
}
}
return accumulate;
}
Throwable notifyAdded(Iterable<SSTableReader> added, Throwable accumulate)
{
INotification notification = new SSTableAddedNotification(added);
for (INotificationConsumer subscriber : subscribers)
{
try
{
subscriber.handleNotification(notification, this);
}
catch (Throwable t)
{
accumulate = merge(accumulate, t);
}
}
return accumulate;
}
public void notifyAdded(Iterable<SSTableReader> added)
{
maybeFail(notifyAdded(added, null));
}
public void notifySSTableRepairedStatusChanged(Collection<SSTableReader> repairStatusesChanged)
{
INotification notification = new SSTableRepairStatusChanged(repairStatusesChanged);
for (INotificationConsumer subscriber : subscribers)
subscriber.handleNotification(notification, this);
}
public void notifyDeleting(SSTableReader deleting)
{
INotification notification = new SSTableDeletingNotification(deleting);
for (INotificationConsumer subscriber : subscribers)
subscriber.handleNotification(notification, this);
}
public void notifyTruncated(long truncatedAt)
{
INotification notification = new TruncationNotification(truncatedAt);
for (INotificationConsumer subscriber : subscribers)
subscriber.handleNotification(notification, this);
}
public void notifyRenewed(Memtable renewed)
{
notify(new MemtableRenewedNotification(renewed));
}
public void notifySwitched(Memtable previous)
{
notify(new MemtableSwitchedNotification(previous));
}
public void notifyDiscarded(Memtable discarded)
{
notify(new MemtableDiscardedNotification(discarded));
}
private void notify(INotification notification)
{
for (INotificationConsumer subscriber : subscribers)
subscriber.handleNotification(notification, this);
}
public boolean isDummy()
{
return cfstore == null || !DatabaseDescriptor.isDaemonInitialized();
}
public void subscribe(INotificationConsumer consumer)
{
subscribers.add(consumer);
}
public void unsubscribe(INotificationConsumer consumer)
{
subscribers.remove(consumer);
}
private static Set<SSTableReader> emptySet()
{
return Collections.emptySet();
}
public View getView()
{
return view.get();
}
@VisibleForTesting
public void removeUnsafe(Set<SSTableReader> toRemove)
{
Pair<View, View> result = apply(view -> {
return updateLiveSet(toRemove, emptySet()).apply(view);
});
}
}