blob: 7393323dce5152584f64d7dc667fa8f3697b57bc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.db;
import java.io.File;
import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicReference;
import com.google.common.base.Predicate;
import com.google.common.collect.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.metrics.StorageMetrics;
import org.apache.cassandra.notifications.*;
import org.apache.cassandra.utils.Interval;
import org.apache.cassandra.utils.IntervalTree;
import org.apache.cassandra.utils.concurrent.OpOrder;
public class DataTracker
{
private static final Logger logger = LoggerFactory.getLogger(DataTracker.class);
public final Collection<INotificationConsumer> subscribers = new CopyOnWriteArrayList<>();
public final ColumnFamilyStore cfstore;
private final AtomicReference<View> view;
public DataTracker(ColumnFamilyStore cfstore)
{
this.cfstore = cfstore;
this.view = new AtomicReference<>();
this.init();
}
// get the Memtable that the ordered writeOp should be directed to
public Memtable getMemtableFor(OpOrder.Group opGroup)
{
// since any new memtables appended to the list after we fetch it will be for operations started
// after us, we can safely assume that we will always find the memtable that 'accepts' us;
// if the barrier for any memtable is set whilst we are reading the list, it must accept us.
// there may be multiple memtables in the list that would 'accept' us, however we only ever choose
// the oldest such memtable, as accepts() only prevents us falling behind (i.e. ensures we don't
// assign operations to a memtable that was retired/queued before we started)
for (Memtable memtable : view.get().liveMemtables)
{
if (memtable.accepts(opGroup))
return memtable;
}
throw new AssertionError(view.get().liveMemtables.toString());
}
public Set<SSTableReader> getSSTables()
{
return view.get().sstables;
}
public Set<SSTableReader> getUncompactingSSTables()
{
return view.get().nonCompactingSStables();
}
public Iterable<SSTableReader> getUncompactingSSTables(Iterable<SSTableReader> candidates)
{
final View v = view.get();
return Iterables.filter(candidates, new Predicate<SSTableReader>()
{
public boolean apply(SSTableReader sstable)
{
return !v.compacting.contains(sstable);
}
});
}
public View getView()
{
return view.get();
}
/**
* Switch the current memtable. This atomically appends a new memtable to the end of the list of active memtables,
* returning the previously last memtable. It leaves the previous Memtable in the list of live memtables until
* discarding(memtable) is called. These two methods must be synchronized/paired, i.e. m = switchMemtable
* must be followed by discarding(m), they cannot be interleaved.
*
* @return the previously active memtable
*/
public Memtable switchMemtable(boolean truncating)
{
Memtable newMemtable = new Memtable(cfstore);
Memtable toFlushMemtable;
View currentView, newView;
do
{
currentView = view.get();
toFlushMemtable = currentView.getCurrentMemtable();
newView = currentView.switchMemtable(newMemtable);
}
while (!view.compareAndSet(currentView, newView));
if (truncating)
notifyRenewed(newMemtable);
return toFlushMemtable;
}
public void markFlushing(Memtable memtable)
{
View currentView, newView;
do
{
currentView = view.get();
newView = currentView.markFlushing(memtable);
}
while (!view.compareAndSet(currentView, newView));
}
public void replaceFlushed(Memtable memtable, SSTableReader sstable)
{
// sstable may be null if we flushed batchlog and nothing needed to be retained
if (!cfstore.isValid())
{
View currentView, newView;
do
{
currentView = view.get();
newView = currentView.replaceFlushed(memtable, sstable);
if (sstable != null)
newView = newView.replace(Arrays.asList(sstable), Collections.<SSTableReader>emptyList());
}
while (!view.compareAndSet(currentView, newView));
return;
}
// back up before creating a new View (which makes the new one eligible for compaction)
if (sstable != null)
maybeIncrementallyBackup(sstable);
View currentView, newView;
do
{
currentView = view.get();
newView = currentView.replaceFlushed(memtable, sstable);
}
while (!view.compareAndSet(currentView, newView));
if (sstable != null)
{
addNewSSTablesSize(Arrays.asList(sstable));
notifyAdded(sstable);
}
}
public void maybeIncrementallyBackup(final SSTableReader sstable)
{
if (!DatabaseDescriptor.isIncrementalBackupsEnabled())
return;
File backupsDir = Directories.getBackupsDirectory(sstable.descriptor);
sstable.createLinks(FileUtils.getCanonicalPath(backupsDir));
}
/**
* @return true if we are able to mark the given @param sstables as compacted, before anyone else
*
* Note that we could acquire references on the marked sstables and release them in
* unmarkCompacting, but since we will never call markObsolete on a sstable marked
* as compacting (unless there is a serious bug), we can skip this.
*/
public boolean markCompacting(Iterable<SSTableReader> sstables)
{
assert sstables != null && !Iterables.isEmpty(sstables);
while (true)
{
View currentView = view.get();
Set<SSTableReader> set = ImmutableSet.copyOf(sstables);
Set<SSTableReader> inactive = Sets.difference(set, currentView.compacting);
if (inactive.size() < set.size())
return false;
if (Iterables.any(set, new Predicate<SSTableReader>()
{
@Override
public boolean apply(SSTableReader sstable)
{
return sstable.isMarkedCompacted();
}
}))
{
return false;
}
View newView = currentView.markCompacting(set);
if (view.compareAndSet(currentView, newView))
return true;
}
}
/**
* Removes files from compacting status: this is different from 'markObsolete'
* because it should be run regardless of whether a compaction succeeded.
*/
public void unmarkCompacting(Iterable<SSTableReader> unmark)
{
boolean isValid = cfstore.isValid();
if (!isValid)
{
// The CF has been dropped. We don't know if the original compaction suceeded or failed,
// which makes it difficult to know if the sstable reference has already been released.
// A "good enough" approach is to mark the sstables involved obsolete, which if compaction succeeded
// is harmlessly redundant, and if it failed ensures that at least the sstable will get deleted on restart.
for (SSTableReader sstable : unmark)
sstable.markObsolete();
}
View currentView, newView;
do
{
currentView = view.get();
newView = currentView.unmarkCompacting(unmark);
}
while (!view.compareAndSet(currentView, newView));
if (!isValid)
{
// when the CFS is invalidated, it will call unreferenceSSTables(). However, unreferenceSSTables only deals
// with sstables that aren't currently being compacted. If there are ongoing compactions that finish or are
// interrupted after the CFS is invalidated, those sstables need to be unreferenced as well, so we do that here.
unreferenceSSTables();
}
}
public void markObsolete(Collection<SSTableReader> sstables, OperationType compactionType)
{
replace(sstables, Collections.<SSTableReader>emptyList());
notifySSTablesChanged(sstables, Collections.<SSTableReader>emptyList(), compactionType);
}
// note that this DOES NOT insert the replacement sstables, it only removes the old sstables and notifies any listeners
// that they have been replaced by the provided sstables, which must have been performed by an earlier replaceReaders() call
public void markCompactedSSTablesReplaced(Collection<SSTableReader> sstables, Collection<SSTableReader> allReplacements, OperationType compactionType)
{
replace(sstables, Collections.<SSTableReader>emptyList());
notifySSTablesChanged(sstables, allReplacements, compactionType);
for (SSTableReader sstable : allReplacements)
{
long bytesOnDisk = sstable.bytesOnDisk();
cfstore.metric.totalDiskSpaceUsed.inc(bytesOnDisk);
cfstore.metric.liveDiskSpaceUsed.inc(bytesOnDisk);
}
}
public void addInitialSSTables(Collection<SSTableReader> sstables)
{
replace(Collections.<SSTableReader>emptyList(), sstables);
// no notifications or backup necessary
}
public void addSSTables(Collection<SSTableReader> sstables)
{
replace(Collections.<SSTableReader>emptyList(), sstables);
for (SSTableReader sstable : sstables)
{
maybeIncrementallyBackup(sstable);
notifyAdded(sstable);
}
}
/**
* removes all sstables that are not busy compacting.
*/
public void unreferenceSSTables()
{
Set<SSTableReader> notCompacting;
View currentView, newView;
do
{
currentView = view.get();
notCompacting = currentView.nonCompactingSStables();
newView = currentView.replace(notCompacting, Collections.<SSTableReader>emptySet());
}
while (!view.compareAndSet(currentView, newView));
if (notCompacting.isEmpty())
{
// notifySSTablesChanged -> LeveledManifest.promote doesn't like a no-op "promotion"
return;
}
notifySSTablesChanged(notCompacting, Collections.<SSTableReader>emptySet(), OperationType.UNKNOWN);
postReplace(notCompacting, Collections.<SSTableReader>emptySet(), true);
}
/**
* Removes every SSTable in the directory from the DataTracker's view.
* @param directory the unreadable directory, possibly with SSTables in it, but not necessarily.
*/
void removeUnreadableSSTables(File directory)
{
View currentView, newView;
Set<SSTableReader> remaining = new HashSet<>();
do
{
currentView = view.get();
for (SSTableReader r : currentView.nonCompactingSStables())
if (!r.descriptor.directory.equals(directory))
remaining.add(r);
if (remaining.size() == currentView.nonCompactingSStables().size())
return;
newView = currentView.replace(currentView.sstables, remaining);
}
while (!view.compareAndSet(currentView, newView));
for (SSTableReader sstable : currentView.sstables)
if (!remaining.contains(sstable))
sstable.releaseReference();
notifySSTablesChanged(remaining, Collections.<SSTableReader>emptySet(), OperationType.UNKNOWN);
}
/** (Re)initializes the tracker, purging all references. */
void init()
{
view.set(new View(
ImmutableList.of(new Memtable(cfstore)),
ImmutableList.<Memtable>of(),
Collections.<SSTableReader>emptySet(),
Collections.<SSTableReader>emptySet(),
SSTableIntervalTree.empty()));
}
/**
* A special kind of replacement for SSTableReaders that were cloned with a new index summary sampling level (see
* SSTableReader.cloneWithNewSummarySamplingLevel and CASSANDRA-5519). This does not mark the old reader
* as compacted.
* @param oldSSTables replaced readers
* @param newSSTables replacement readers
*/
public void replaceReaders(Collection<SSTableReader> oldSSTables, Collection<SSTableReader> newSSTables, boolean notify)
{
View currentView, newView;
do
{
currentView = view.get();
newView = currentView.replace(oldSSTables, newSSTables);
}
while (!view.compareAndSet(currentView, newView));
if (!oldSSTables.isEmpty() && notify)
notifySSTablesChanged(oldSSTables, newSSTables, OperationType.COMPACTION);
for (SSTableReader sstable : newSSTables)
sstable.setTrackedBy(this);
for (SSTableReader sstable : oldSSTables)
sstable.releaseReference();
}
private void replace(Collection<SSTableReader> oldSSTables, Iterable<SSTableReader> replacements)
{
if (!cfstore.isValid())
{
removeOldSSTablesSize(replacements, false);
replacements = Collections.emptyList();
}
View currentView, newView;
do
{
currentView = view.get();
newView = currentView.replace(oldSSTables, replacements);
}
while (!view.compareAndSet(currentView, newView));
postReplace(oldSSTables, replacements, false);
}
private void postReplace(Collection<SSTableReader> oldSSTables, Iterable<SSTableReader> replacements, boolean tolerateCompacted)
{
addNewSSTablesSize(replacements);
removeOldSSTablesSize(oldSSTables, tolerateCompacted);
}
private void addNewSSTablesSize(Iterable<SSTableReader> newSSTables)
{
for (SSTableReader sstable : newSSTables)
{
if (logger.isDebugEnabled())
logger.debug(String.format("adding %s to list of files tracked for %s.%s",
sstable.descriptor, cfstore.keyspace.getName(), cfstore.name));
long size = sstable.bytesOnDisk();
StorageMetrics.load.inc(size);
cfstore.metric.liveDiskSpaceUsed.inc(size);
cfstore.metric.totalDiskSpaceUsed.inc(size);
sstable.setTrackedBy(this);
}
}
private void removeOldSSTablesSize(Iterable<SSTableReader> oldSSTables, boolean tolerateCompacted)
{
for (SSTableReader sstable : oldSSTables)
{
if (logger.isDebugEnabled())
logger.debug(String.format("removing %s from list of files tracked for %s.%s",
sstable.descriptor, cfstore.keyspace.getName(), cfstore.name));
long size = sstable.bytesOnDisk();
StorageMetrics.load.dec(size);
cfstore.metric.liveDiskSpaceUsed.dec(size);
// tolerateCompacted will be true when the CFS is no longer valid (dropped). If there were ongoing
// compactions when it was invalidated, sstables may already be marked compacted, so we should
// tolerate that (see CASSANDRA-5957)
boolean firstToCompact = sstable.markObsolete();
assert (tolerateCompacted || firstToCompact) : sstable + " was already marked compacted";
sstable.releaseReference();
}
}
public void spaceReclaimed(long size)
{
cfstore.metric.totalDiskSpaceUsed.dec(size);
}
public long estimatedKeys()
{
long n = 0;
for (SSTableReader sstable : getSSTables())
n += sstable.estimatedKeys();
return n;
}
public int getMeanColumns()
{
long sum = 0;
long count = 0;
for (SSTableReader sstable : getSSTables())
{
long n = sstable.getEstimatedColumnCount().count();
sum += sstable.getEstimatedColumnCount().mean() * n;
count += n;
}
return count > 0 ? (int) (sum / count) : 0;
}
public double getDroppableTombstoneRatio()
{
double allDroppable = 0;
long allColumns = 0;
int localTime = (int)(System.currentTimeMillis()/1000);
for (SSTableReader sstable : getSSTables())
{
allDroppable += sstable.getDroppableTombstonesBefore(localTime - sstable.metadata.getGcGraceSeconds());
allColumns += sstable.getEstimatedColumnCount().mean() * sstable.getEstimatedColumnCount().count();
}
return allColumns > 0 ? allDroppable / allColumns : 0;
}
public void notifySSTablesChanged(Collection<SSTableReader> removed, Collection<SSTableReader> added, OperationType compactionType)
{
INotification notification = new SSTableListChangedNotification(added, removed, compactionType);
for (INotificationConsumer subscriber : subscribers)
subscriber.handleNotification(notification, this);
}
public void notifyAdded(SSTableReader added)
{
INotification notification = new SSTableAddedNotification(added);
for (INotificationConsumer subscriber : subscribers)
subscriber.handleNotification(notification, this);
}
public void notifySSTableRepairedStatusChanged(Collection<SSTableReader> repairStatusesChanged)
{
INotification notification = new SSTableRepairStatusChanged(repairStatusesChanged);
for (INotificationConsumer subscriber : subscribers)
subscriber.handleNotification(notification, this);
}
public void notifyDeleting(SSTableReader deleting)
{
INotification notification = new SSTableDeletingNotification(deleting);
for (INotificationConsumer subscriber : subscribers)
subscriber.handleNotification(notification, this);
}
public void notifyRenewed(Memtable renewed)
{
INotification notification = new MemtableRenewedNotification(renewed);
for (INotificationConsumer subscriber : subscribers)
subscriber.handleNotification(notification, this);
}
public void notifyTruncated(long truncatedAt)
{
INotification notification = new TruncationNotification(truncatedAt);
for (INotificationConsumer subscriber : subscribers)
subscriber.handleNotification(notification, this);
}
public void subscribe(INotificationConsumer consumer)
{
subscribers.add(consumer);
}
public void unsubscribe(INotificationConsumer consumer)
{
subscribers.remove(consumer);
}
public static SSTableIntervalTree buildIntervalTree(Iterable<SSTableReader> sstables)
{
List<Interval<RowPosition, SSTableReader>> intervals = new ArrayList<>(Iterables.size(sstables));
for (SSTableReader sstable : sstables)
intervals.add(Interval.<RowPosition, SSTableReader>create(sstable.first, sstable.last, sstable));
return new SSTableIntervalTree(intervals);
}
public Set<SSTableReader> getCompacting()
{
return getView().compacting;
}
public static class SSTableIntervalTree extends IntervalTree<RowPosition, SSTableReader, Interval<RowPosition, SSTableReader>>
{
private static final SSTableIntervalTree EMPTY = new SSTableIntervalTree(null);
private SSTableIntervalTree(Collection<Interval<RowPosition, SSTableReader>> intervals)
{
super(intervals, null);
}
public static SSTableIntervalTree empty()
{
return EMPTY;
}
}
/**
* An immutable structure holding the current memtable, the memtables pending
* flush, the sstables for a column family, and the sstables that are active
* in compaction (a subset of the sstables).
*/
public static class View
{
/**
* ordinarily a list of size 1, but when preparing to flush will contain both the memtable we will flush
* and the new replacement memtable, until all outstanding write operations on the old table complete.
* The last item in the list is always the "current" memtable.
*/
private final List<Memtable> liveMemtables;
/**
* contains all memtables that are no longer referenced for writing and are queued for / in the process of being
* flushed. In chronologically ascending order.
*/
private final List<Memtable> flushingMemtables;
public final Set<SSTableReader> compacting;
public final Set<SSTableReader> sstables;
public final SSTableIntervalTree intervalTree;
View(List<Memtable> liveMemtables, List<Memtable> flushingMemtables, Set<SSTableReader> sstables, Set<SSTableReader> compacting, SSTableIntervalTree intervalTree)
{
assert liveMemtables != null;
assert flushingMemtables != null;
assert sstables != null;
assert compacting != null;
assert intervalTree != null;
this.liveMemtables = liveMemtables;
this.flushingMemtables = flushingMemtables;
this.sstables = sstables;
this.compacting = compacting;
this.intervalTree = intervalTree;
}
public Memtable getOldestMemtable()
{
if (!flushingMemtables.isEmpty())
return flushingMemtables.get(0);
return liveMemtables.get(0);
}
public Memtable getCurrentMemtable()
{
return liveMemtables.get(liveMemtables.size() - 1);
}
public Iterable<Memtable> getMemtablesPendingFlush()
{
if (liveMemtables.size() == 1)
return flushingMemtables;
return Iterables.concat(liveMemtables.subList(0, 1), flushingMemtables);
}
/**
* @return the active memtable and all the memtables that are pending flush.
*/
public Iterable<Memtable> getAllMemtables()
{
return Iterables.concat(flushingMemtables, liveMemtables);
}
public Sets.SetView<SSTableReader> nonCompactingSStables()
{
return Sets.difference(ImmutableSet.copyOf(sstables), compacting);
}
View switchMemtable(Memtable newMemtable)
{
List<Memtable> newLiveMemtables = ImmutableList.<Memtable>builder().addAll(liveMemtables).add(newMemtable).build();
return new View(newLiveMemtables, flushingMemtables, sstables, compacting, intervalTree);
}
View markFlushing(Memtable toFlushMemtable)
{
List<Memtable> live = liveMemtables, flushing = flushingMemtables;
// since we can have multiple flushes queued, we may occasionally race and start a flush out of order,
// so must locate it in the list to remove, rather than just removing from the beginning
int i = live.indexOf(toFlushMemtable);
assert i < live.size() - 1;
List<Memtable> newLive = ImmutableList.<Memtable>builder()
.addAll(live.subList(0, i))
.addAll(live.subList(i + 1, live.size()))
.build();
// similarly, if we out-of-order markFlushing once, we may afterwards need to insert a memtable into the
// flushing list in a position other than the end, though this will be rare
i = flushing.size();
while (i > 0 && flushing.get(i - 1).creationTime() > toFlushMemtable.creationTime())
i--;
List<Memtable> newFlushing = ImmutableList.<Memtable>builder()
.addAll(flushing.subList(0, i))
.add(toFlushMemtable)
.addAll(flushing.subList(i, flushing.size()))
.build();
return new View(newLive, newFlushing, sstables, compacting, intervalTree);
}
View replaceFlushed(Memtable flushedMemtable, SSTableReader newSSTable)
{
int index = flushingMemtables.indexOf(flushedMemtable);
List<Memtable> newQueuedMemtables = ImmutableList.<Memtable>builder()
.addAll(flushingMemtables.subList(0, index))
.addAll(flushingMemtables.subList(index + 1, flushingMemtables.size()))
.build();
Set<SSTableReader> newSSTables = newSSTable == null
? sstables
: newSSTables(newSSTable);
SSTableIntervalTree intervalTree = buildIntervalTree(newSSTables);
return new View(liveMemtables, newQueuedMemtables, newSSTables, compacting, intervalTree);
}
View replace(Collection<SSTableReader> oldSSTables, Iterable<SSTableReader> replacements)
{
Set<SSTableReader> newSSTables = newSSTables(oldSSTables, replacements);
SSTableIntervalTree intervalTree = buildIntervalTree(newSSTables);
return new View(liveMemtables, flushingMemtables, newSSTables, compacting, intervalTree);
}
View markCompacting(Collection<SSTableReader> tomark)
{
Set<SSTableReader> compactingNew = ImmutableSet.<SSTableReader>builder().addAll(compacting).addAll(tomark).build();
return new View(liveMemtables, flushingMemtables, sstables, compactingNew, intervalTree);
}
View unmarkCompacting(Iterable<SSTableReader> tounmark)
{
Set<SSTableReader> compactingNew = ImmutableSet.copyOf(Sets.difference(compacting, ImmutableSet.copyOf(tounmark)));
return new View(liveMemtables, flushingMemtables, sstables, compactingNew, intervalTree);
}
private Set<SSTableReader> newSSTables(SSTableReader newSSTable)
{
assert newSSTable != null;
// not performance-sensitive, don't obsess over doing a selection merge here
return newSSTables(Collections.<SSTableReader>emptyList(), Collections.singletonList(newSSTable));
}
private Set<SSTableReader> newSSTables(Collection<SSTableReader> oldSSTables, Iterable<SSTableReader> replacements)
{
ImmutableSet<SSTableReader> oldSet = ImmutableSet.copyOf(oldSSTables);
int newSSTablesSize = sstables.size() - oldSSTables.size() + Iterables.size(replacements);
assert newSSTablesSize >= Iterables.size(replacements) : String.format("Incoherent new size %d replacing %s by %s in %s", newSSTablesSize, oldSSTables, replacements, this);
Set<SSTableReader> newSSTables = new HashSet<>(newSSTablesSize);
for (SSTableReader sstable : sstables)
if (!oldSet.contains(sstable))
newSSTables.add(sstable);
Iterables.addAll(newSSTables, replacements);
assert newSSTables.size() == newSSTablesSize : String.format("Expecting new size of %d, got %d while replacing %s by %s in %s", newSSTablesSize, newSSTables.size(), oldSSTables, replacements, this);
return ImmutableSet.copyOf(newSSTables);
}
@Override
public String toString()
{
return String.format("View(pending_count=%d, sstables=%s, compacting=%s)", liveMemtables.size() + flushingMemtables.size() - 1, sstables, compacting);
}
public List<SSTableReader> sstablesInBounds(AbstractBounds<RowPosition> rowBounds)
{
RowPosition stopInTree = rowBounds.right.isMinimum(liveMemtables.get(0).cfs.partitioner) ? intervalTree.max() : rowBounds.right;
return intervalTree.search(Interval.<RowPosition, SSTableReader>create(rowBounds.left, stopInTree));
}
}
}