blob: 4b3aae09074df1b8ee4864c299795abaa7ed35ef [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.db.lifecycle;
import java.util.*;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Functions;
import com.google.common.base.Predicate;
import com.google.common.collect.*;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.Memtable;
import org.apache.cassandra.db.PartitionPosition;
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.utils.Interval;
import static com.google.common.base.Predicates.equalTo;
import static com.google.common.base.Predicates.not;
import static com.google.common.collect.ImmutableList.copyOf;
import static com.google.common.collect.ImmutableList.of;
import static com.google.common.collect.Iterables.all;
import static com.google.common.collect.Iterables.concat;
import static com.google.common.collect.Iterables.filter;
import static com.google.common.collect.Iterables.transform;
import static org.apache.cassandra.db.lifecycle.Helpers.emptySet;
import static org.apache.cassandra.db.lifecycle.Helpers.filterOut;
import static org.apache.cassandra.db.lifecycle.Helpers.replace;
/**
* An immutable structure holding the current memtable, the memtables pending
* flush, the sstables for a column family, and the sstables that are active
* in compaction (a subset of the sstables).
*
* Modifications to instances are all performed via a Function produced by the static methods in this class.
* These are composed as necessary and provided to the Tracker.apply() methods, which atomically reject or
* accept and apply the changes to the View.
*
*/
public class View
{
/**
* ordinarily a list of size 1, but when preparing to flush will contain both the memtable we will flush
* and the new replacement memtable, until all outstanding write operations on the old table complete.
* The last item in the list is always the "current" memtable.
*/
public final List<Memtable> liveMemtables;
/**
* contains all memtables that are no longer referenced for writing and are queued for / in the process of being
* flushed. In chronologically ascending order.
*/
public final List<Memtable> flushingMemtables;
final Set<SSTableReader> compacting;
final Set<SSTableReader> sstables;
// we use a Map here so that we can easily perform identity checks as well as equality checks.
// When marking compacting, we now indicate if we expect the sstables to be present (by default we do),
// and we then check that not only are they all present in the live set, but that the exact instance present is
// the one we made our decision to compact against.
final Map<SSTableReader, SSTableReader> sstablesMap;
final Map<SSTableReader, SSTableReader> compactingMap;
final SSTableIntervalTree intervalTree;
View(List<Memtable> liveMemtables, List<Memtable> flushingMemtables, Map<SSTableReader, SSTableReader> sstables, Map<SSTableReader, SSTableReader> compacting, SSTableIntervalTree intervalTree)
{
assert liveMemtables != null;
assert flushingMemtables != null;
assert sstables != null;
assert compacting != null;
assert intervalTree != null;
this.liveMemtables = liveMemtables;
this.flushingMemtables = flushingMemtables;
this.sstablesMap = sstables;
this.sstables = sstablesMap.keySet();
this.compactingMap = compacting;
this.compacting = compactingMap.keySet();
this.intervalTree = intervalTree;
}
public Memtable getCurrentMemtable()
{
return liveMemtables.get(liveMemtables.size() - 1);
}
/**
* @return the active memtable and all the memtables that are pending flush.
*/
public Iterable<Memtable> getAllMemtables()
{
return concat(flushingMemtables, liveMemtables);
}
// shortcut for all live sstables, so can efficiently use it for size, etc
public Set<SSTableReader> liveSSTables()
{
return sstables;
}
public Iterable<SSTableReader> sstables(SSTableSet sstableSet, Predicate<SSTableReader> filter)
{
return filter(select(sstableSet), filter);
}
// any sstable known by this tracker in any form; we have a special method here since it's only used for testing/debug
// (strong leak detection), and it does not follow the normal pattern
@VisibleForTesting
public Iterable<SSTableReader> allKnownSSTables()
{
return Iterables.concat(sstables, filterOut(compacting, sstables));
}
public Iterable<SSTableReader> select(SSTableSet sstableSet)
{
switch (sstableSet)
{
case LIVE:
return sstables;
case NONCOMPACTING:
return filter(sstables, (s) -> !compacting.contains(s));
case CANONICAL:
Set<SSTableReader> canonicalSSTables = new HashSet<>();
for (SSTableReader sstable : compacting)
if (sstable.openReason != SSTableReader.OpenReason.EARLY)
canonicalSSTables.add(sstable);
// reason for checking if compacting contains the sstable is that if compacting has an EARLY version
// of a NORMAL sstable, we still have the canonical version of that sstable in sstables.
// note that the EARLY version is equal, but not == since it is a different instance of the same sstable.
for (SSTableReader sstable : sstables)
if (!compacting.contains(sstable) && sstable.openReason != SSTableReader.OpenReason.EARLY)
canonicalSSTables.add(sstable);
return canonicalSSTables;
default:
throw new IllegalStateException();
}
}
public Iterable<SSTableReader> getUncompacting(Iterable<SSTableReader> candidates)
{
return filter(candidates, new Predicate<SSTableReader>()
{
public boolean apply(SSTableReader sstable)
{
return !compacting.contains(sstable);
}
});
}
public boolean isEmpty()
{
return sstables.isEmpty()
&& liveMemtables.size() <= 1
&& flushingMemtables.size() == 0
&& (liveMemtables.size() == 0 || liveMemtables.get(0).getOperations() == 0);
}
@Override
public String toString()
{
return String.format("View(pending_count=%d, sstables=%s, compacting=%s)", liveMemtables.size() + flushingMemtables.size() - 1, sstables, compacting);
}
/**
* Returns the sstables that have any partition between {@code left} and {@code right}, when both bounds are taken inclusively.
* The interval formed by {@code left} and {@code right} shouldn't wrap.
*/
public Iterable<SSTableReader> liveSSTablesInBounds(PartitionPosition left, PartitionPosition right)
{
assert !AbstractBounds.strictlyWrapsAround(left, right);
if (intervalTree.isEmpty())
return Collections.emptyList();
PartitionPosition stopInTree = right.isMinimum() ? intervalTree.max() : right;
return intervalTree.search(Interval.create(left, stopInTree));
}
public static List<SSTableReader> sstablesInBounds(PartitionPosition left, PartitionPosition right, SSTableIntervalTree intervalTree)
{
assert !AbstractBounds.strictlyWrapsAround(left, right);
if (intervalTree.isEmpty())
return Collections.emptyList();
PartitionPosition stopInTree = right.isMinimum() ? intervalTree.max() : right;
return intervalTree.search(Interval.create(left, stopInTree));
}
public static Function<View, Iterable<SSTableReader>> selectFunction(SSTableSet sstableSet)
{
return (view) -> view.select(sstableSet);
}
public static Function<View, Iterable<SSTableReader>> select(SSTableSet sstableSet, Predicate<SSTableReader> filter)
{
return (view) -> view.sstables(sstableSet, filter);
}
/**
* @return a ViewFragment containing the sstables and memtables that may need to be merged
* for the given @param key, according to the interval tree
*/
public static Function<View, Iterable<SSTableReader>> select(SSTableSet sstableSet, DecoratedKey key)
{
assert sstableSet == SSTableSet.LIVE;
return (view) -> view.intervalTree.search(key);
}
/**
* @return a ViewFragment containing the sstables and memtables that may need to be merged
* for rows within @param rowBounds, inclusive, according to the interval tree.
*/
public static Function<View, Iterable<SSTableReader>> selectLive(AbstractBounds<PartitionPosition> rowBounds)
{
// Note that View.sstablesInBounds always includes it's bound while rowBounds may not. This is ok however
// because the fact we restrict the sstables returned by this function is an optimization in the first
// place and the returned sstables will (almost) never cover *exactly* rowBounds anyway. It's also
// *very* unlikely that a sstable is included *just* because we consider one of the bound inclusively
// instead of exclusively, so the performance impact is negligible in practice.
return (view) -> view.liveSSTablesInBounds(rowBounds.left, rowBounds.right);
}
// METHODS TO CONSTRUCT FUNCTIONS FOR MODIFYING A VIEW:
// return a function to un/mark the provided readers compacting in a view
static Function<View, View> updateCompacting(final Set<SSTableReader> unmark, final Iterable<SSTableReader> mark)
{
if (unmark.isEmpty() && Iterables.isEmpty(mark))
return Functions.identity();
return new Function<View, View>()
{
public View apply(View view)
{
assert all(mark, Helpers.idIn(view.sstablesMap));
return new View(view.liveMemtables, view.flushingMemtables, view.sstablesMap,
replace(view.compactingMap, unmark, mark),
view.intervalTree);
}
};
}
// construct a predicate to reject views that do not permit us to mark these readers compacting;
// i.e. one of them is either already compacting, has been compacted, or has been replaced
static Predicate<View> permitCompacting(final Iterable<SSTableReader> readers)
{
return new Predicate<View>()
{
public boolean apply(View view)
{
for (SSTableReader reader : readers)
if (view.compacting.contains(reader) || view.sstablesMap.get(reader) != reader || reader.isMarkedCompacted())
return false;
return true;
}
};
}
// construct a function to change the liveset in a Snapshot
static Function<View, View> updateLiveSet(final Set<SSTableReader> remove, final Iterable<SSTableReader> add)
{
if (remove.isEmpty() && Iterables.isEmpty(add))
return Functions.identity();
return new Function<View, View>()
{
public View apply(View view)
{
Map<SSTableReader, SSTableReader> sstableMap = replace(view.sstablesMap, remove, add);
return new View(view.liveMemtables, view.flushingMemtables, sstableMap, view.compactingMap,
SSTableIntervalTree.build(sstableMap.keySet()));
}
};
}
// called prior to initiating flush: add newMemtable to liveMemtables, making it the latest memtable
static Function<View, View> switchMemtable(final Memtable newMemtable)
{
return new Function<View, View>()
{
public View apply(View view)
{
List<Memtable> newLive = ImmutableList.<Memtable>builder().addAll(view.liveMemtables).add(newMemtable).build();
assert newLive.size() == view.liveMemtables.size() + 1;
return new View(newLive, view.flushingMemtables, view.sstablesMap, view.compactingMap, view.intervalTree);
}
};
}
// called before flush: move toFlush from liveMemtables to flushingMemtables
static Function<View, View> markFlushing(final Memtable toFlush)
{
return new Function<View, View>()
{
public View apply(View view)
{
List<Memtable> live = view.liveMemtables, flushing = view.flushingMemtables;
List<Memtable> newLive = copyOf(filter(live, not(equalTo(toFlush))));
List<Memtable> newFlushing = copyOf(concat(filter(flushing, lessThan(toFlush)),
of(toFlush),
filter(flushing, not(lessThan(toFlush)))));
assert newLive.size() == live.size() - 1;
assert newFlushing.size() == flushing.size() + 1;
return new View(newLive, newFlushing, view.sstablesMap, view.compactingMap, view.intervalTree);
}
};
}
// called after flush: removes memtable from flushingMemtables, and inserts flushed into the live sstable set
static Function<View, View> replaceFlushed(final Memtable memtable, final Collection<SSTableReader> flushed)
{
return new Function<View, View>()
{
public View apply(View view)
{
List<Memtable> flushingMemtables = copyOf(filter(view.flushingMemtables, not(equalTo(memtable))));
assert flushingMemtables.size() == view.flushingMemtables.size() - 1;
if (flushed == null || flushed.isEmpty())
return new View(view.liveMemtables, flushingMemtables, view.sstablesMap,
view.compactingMap, view.intervalTree);
Map<SSTableReader, SSTableReader> sstableMap = replace(view.sstablesMap, emptySet(), flushed);
return new View(view.liveMemtables, flushingMemtables, sstableMap, view.compactingMap,
SSTableIntervalTree.build(sstableMap.keySet()));
}
};
}
private static <T extends Comparable<T>> Predicate<T> lessThan(final T lessThan)
{
return new Predicate<T>()
{
public boolean apply(T t)
{
return t.compareTo(lessThan) < 0;
}
};
}
}