blob: a71d1af03749cbcd58584616f3acc388963c5afc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.io.sstable;
import java.lang.ref.WeakReference;
import java.util.*;
import com.google.common.annotations.VisibleForTesting;
import org.apache.cassandra.cache.InstrumentingCache;
import org.apache.cassandra.cache.KeyCacheKey;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.RowIndexEntry;
import org.apache.cassandra.db.lifecycle.ILifecycleTransaction;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.format.SSTableWriter;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.utils.NativeLibrary;
import org.apache.cassandra.utils.concurrent.Transactional;
/**
* Wraps one or more writers as output for rewriting one or more readers: every sstable_preemptive_open_interval_in_mb
* we look in the summary we're collecting for the latest writer for the penultimate key that we know to have been fully
* flushed to the index file, and then double check that the key is fully present in the flushed data file.
* Then we move the starts of each reader forwards to that point, replace them in the Tracker, and attach a runnable
* for on-close (i.e. when all references expire) that drops the page cache prior to that key position
*
* hard-links are created for each partially written sstable so that readers opened against them continue to work past
* the rename of the temporary file, which is deleted once all readers against the hard-link have been closed.
* If for any reason the writer is rolled over, we immediately rename and fully expose the completed file in the Tracker.
*
* On abort we restore the original lower bounds to the existing readers and delete any temporary files we had in progress,
* but leave any hard-links in place for the readers we opened to cleanup when they're finished as we would had we finished
* successfully.
*/
public class SSTableRewriter extends Transactional.AbstractTransactional implements Transactional
{
@VisibleForTesting
public static boolean disableEarlyOpeningForTests = false;
private final long preemptiveOpenInterval;
private final long maxAge;
private long repairedAt = -1;
// the set of final readers we will expose on commit
private final ILifecycleTransaction transaction; // the readers we are rewriting (updated as they are replaced)
private final List<SSTableReader> preparedForCommit = new ArrayList<>();
private long currentlyOpenedEarlyAt; // the position (in MB) in the target file we last (re)opened at
private final List<SSTableWriter> writers = new ArrayList<>();
private final boolean keepOriginals; // true if we do not want to obsolete the originals
private SSTableWriter writer;
private Map<DecoratedKey, RowIndexEntry> cachedKeys = new HashMap<>();
// for testing (TODO: remove when have byteman setup)
private boolean throwEarly, throwLate;
@Deprecated
public SSTableRewriter(ILifecycleTransaction transaction, long maxAge, boolean isOffline)
{
this(transaction, maxAge, isOffline, true);
}
@Deprecated
public SSTableRewriter(ILifecycleTransaction transaction, long maxAge, boolean isOffline, boolean shouldOpenEarly)
{
this(transaction, maxAge, calculateOpenInterval(shouldOpenEarly), false);
}
@VisibleForTesting
public SSTableRewriter(ILifecycleTransaction transaction, long maxAge, long preemptiveOpenInterval, boolean keepOriginals)
{
this.transaction = transaction;
this.maxAge = maxAge;
this.keepOriginals = keepOriginals;
this.preemptiveOpenInterval = preemptiveOpenInterval;
}
@Deprecated
public static SSTableRewriter constructKeepingOriginals(ILifecycleTransaction transaction, boolean keepOriginals, long maxAge, boolean isOffline)
{
return constructKeepingOriginals(transaction, keepOriginals, maxAge);
}
public static SSTableRewriter constructKeepingOriginals(ILifecycleTransaction transaction, boolean keepOriginals, long maxAge)
{
return new SSTableRewriter(transaction, maxAge, calculateOpenInterval(true), keepOriginals);
}
public static SSTableRewriter constructWithoutEarlyOpening(ILifecycleTransaction transaction, boolean keepOriginals, long maxAge)
{
return new SSTableRewriter(transaction, maxAge, calculateOpenInterval(false), keepOriginals);
}
public static SSTableRewriter construct(ColumnFamilyStore cfs, ILifecycleTransaction transaction, boolean keepOriginals, long maxAge)
{
return new SSTableRewriter(transaction, maxAge, calculateOpenInterval(cfs.supportsEarlyOpen()), keepOriginals);
}
private static long calculateOpenInterval(boolean shouldOpenEarly)
{
long interval = DatabaseDescriptor.getSSTablePreempiveOpenIntervalInMB() * (1L << 20);
if (disableEarlyOpeningForTests || !shouldOpenEarly || interval < 0)
interval = Long.MAX_VALUE;
return interval;
}
public SSTableWriter currentWriter()
{
return writer;
}
public RowIndexEntry append(UnfilteredRowIterator partition)
{
// we do this before appending to ensure we can resetAndTruncate() safely if the append fails
DecoratedKey key = partition.partitionKey();
maybeReopenEarly(key);
RowIndexEntry index = writer.append(partition);
if (!transaction.isOffline() && index != null)
{
for (SSTableReader reader : transaction.originals())
{
if (reader.getCachedPosition(key, false) != null)
{
cachedKeys.put(key, index);
break;
}
}
}
return index;
}
// attempts to append the row, if fails resets the writer position
public RowIndexEntry tryAppend(UnfilteredRowIterator partition)
{
writer.mark();
try
{
return append(partition);
}
catch (Throwable t)
{
writer.resetAndTruncate();
throw t;
}
}
private void maybeReopenEarly(DecoratedKey key)
{
if (writer.getFilePointer() - currentlyOpenedEarlyAt > preemptiveOpenInterval)
{
if (transaction.isOffline())
{
for (SSTableReader reader : transaction.originals())
{
RowIndexEntry index = reader.getPosition(key, SSTableReader.Operator.GE);
NativeLibrary.trySkipCache(reader.getFilename(), 0, index == null ? 0 : index.position);
}
}
else
{
SSTableReader reader = writer.setMaxDataAge(maxAge).openEarly();
if (reader != null)
{
transaction.update(reader, false);
currentlyOpenedEarlyAt = writer.getFilePointer();
moveStarts(reader, reader.last);
transaction.checkpoint();
}
}
}
}
protected Throwable doAbort(Throwable accumulate)
{
// abort the writers
for (SSTableWriter writer : writers)
accumulate = writer.abort(accumulate);
// abort the lifecycle transaction
accumulate = transaction.abort(accumulate);
return accumulate;
}
protected Throwable doCommit(Throwable accumulate)
{
for (SSTableWriter writer : writers)
accumulate = writer.commit(accumulate);
accumulate = transaction.commit(accumulate);
return accumulate;
}
/**
* Replace the readers we are rewriting with cloneWithNewStart, reclaiming any page cache that is no longer
* needed, and transferring any key cache entries over to the new reader, expiring them from the old. if reset
* is true, we are instead restoring the starts of the readers from before the rewriting began
*
* note that we replace an existing sstable with a new *instance* of the same sstable, the replacement
* sstable .equals() the old one, BUT, it is a new instance, so, for example, since we releaseReference() on the old
* one, the old *instance* will have reference count == 0 and if we were to start a new compaction with that old
* instance, we would get exceptions.
*
* @param newReader the rewritten reader that replaces them for this region
* @param lowerbound if !reset, must be non-null, and marks the exclusive lowerbound of the start for each sstable
*/
private void moveStarts(SSTableReader newReader, DecoratedKey lowerbound)
{
if (transaction.isOffline())
return;
if (preemptiveOpenInterval == Long.MAX_VALUE)
return;
newReader.setupOnline();
List<DecoratedKey> invalidateKeys = null;
if (!cachedKeys.isEmpty())
{
invalidateKeys = new ArrayList<>(cachedKeys.size());
for (Map.Entry<DecoratedKey, RowIndexEntry> cacheKey : cachedKeys.entrySet())
{
invalidateKeys.add(cacheKey.getKey());
newReader.cacheKey(cacheKey.getKey(), cacheKey.getValue());
}
}
cachedKeys.clear();
for (SSTableReader sstable : transaction.originals())
{
// we call getCurrentReplacement() to support multiple rewriters operating over the same source readers at once.
// note: only one such writer should be written to at any moment
final SSTableReader latest = transaction.current(sstable);
// skip any sstables that we know to already be shadowed
if (latest.first.compareTo(lowerbound) > 0)
continue;
Runnable runOnClose = invalidateKeys != null ? new InvalidateKeys(latest, invalidateKeys) : null;
if (lowerbound.compareTo(latest.last) >= 0)
{
if (!transaction.isObsolete(latest))
{
if (runOnClose != null)
{
latest.runOnClose(runOnClose);
}
transaction.obsolete(latest);
}
continue;
}
DecoratedKey newStart = latest.firstKeyBeyond(lowerbound);
assert newStart != null;
SSTableReader replacement = latest.cloneWithNewStart(newStart, runOnClose);
transaction.update(replacement, true);
}
}
private static final class InvalidateKeys implements Runnable
{
final List<KeyCacheKey> cacheKeys = new ArrayList<>();
final WeakReference<InstrumentingCache<KeyCacheKey, ?>> cacheRef;
private InvalidateKeys(SSTableReader reader, Collection<DecoratedKey> invalidate)
{
this.cacheRef = new WeakReference<>(reader.getKeyCache());
if (cacheRef.get() != null)
{
for (DecoratedKey key : invalidate)
cacheKeys.add(reader.getCacheKey(key));
}
}
public void run()
{
for (KeyCacheKey key : cacheKeys)
{
InstrumentingCache<KeyCacheKey, ?> cache = cacheRef.get();
if (cache != null)
cache.remove(key);
}
}
}
public void switchWriter(SSTableWriter newWriter)
{
if (newWriter != null)
writers.add(newWriter.setMaxDataAge(maxAge));
if (writer == null || writer.getFilePointer() == 0)
{
if (writer != null)
{
writer.abort();
transaction.untrackNew(writer);
writers.remove(writer);
}
writer = newWriter;
return;
}
if (preemptiveOpenInterval != Long.MAX_VALUE)
{
// we leave it as a tmp file, but we open it and add it to the Tracker
SSTableReader reader = writer.setMaxDataAge(maxAge).openFinalEarly();
transaction.update(reader, false);
moveStarts(reader, reader.last);
transaction.checkpoint();
}
currentlyOpenedEarlyAt = 0;
writer = newWriter;
}
/**
* @param repairedAt the repair time, -1 if we should use the time we supplied when we created
* the SSTableWriter (and called rewriter.switchWriter(..)), actual time if we want to override the
* repair time.
*/
public SSTableRewriter setRepairedAt(long repairedAt)
{
this.repairedAt = repairedAt;
return this;
}
/**
* Finishes the new file(s)
*
* Creates final files, adds the new files to the Tracker (via replaceReader).
*
* We add them to the tracker to be able to get rid of the tmpfiles
*
* It is up to the caller to do the compacted sstables replacement
* gymnastics (ie, call Tracker#markCompactedSSTablesReplaced(..))
*
*
*/
public List<SSTableReader> finish()
{
super.finish();
return finished();
}
// returns, in list form, the
public List<SSTableReader> finished()
{
assert state() == State.COMMITTED || state() == State.READY_TO_COMMIT;
return preparedForCommit;
}
protected void doPrepare()
{
switchWriter(null);
if (throwEarly)
throw new RuntimeException("exception thrown early in finish, for testing");
// No early open to finalize and replace
for (SSTableWriter writer : writers)
{
assert writer.getFilePointer() > 0;
writer.setRepairedAt(repairedAt).setOpenResult(true).prepareToCommit();
SSTableReader reader = writer.finished();
transaction.update(reader, false);
preparedForCommit.add(reader);
}
transaction.checkpoint();
if (throwLate)
throw new RuntimeException("exception thrown after all sstables finished, for testing");
if (!keepOriginals)
transaction.obsoleteOriginals();
transaction.prepareToCommit();
}
public void throwDuringPrepare(boolean earlyException)
{
if (earlyException)
throwEarly = true;
else
throwLate = true;
}
}