blob: fcc23a2b7213e4dafbbfbf55e09e91c5981430bf [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.io.sstable.format;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Sets;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.RowIndexEntry;
import org.apache.cassandra.db.SerializationHeader;
import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.SSTable;
import org.apache.cassandra.io.sstable.format.big.BigFormat;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
import org.apache.cassandra.io.sstable.metadata.MetadataComponent;
import org.apache.cassandra.io.sstable.metadata.MetadataType;
import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.utils.concurrent.Transactional;
/**
* This is the API all table writers must implement.
*
* TableWriter.create() is the primary way to create a writer for a particular format.
* The format information is part of the Descriptor.
*/
public abstract class SSTableWriter extends SSTable implements Transactional
{
protected long repairedAt;
protected long maxDataAge = -1;
protected final long keyCount;
protected final MetadataCollector metadataCollector;
protected final RowIndexEntry.IndexSerializer rowIndexEntrySerializer;
protected final SerializationHeader header;
protected final TransactionalProxy txnProxy = txnProxy();
protected abstract TransactionalProxy txnProxy();
// due to lack of multiple inheritance, we use an inner class to proxy our Transactional implementation details
protected abstract class TransactionalProxy extends AbstractTransactional
{
// should be set during doPrepare()
protected SSTableReader finalReader;
protected boolean openResult;
}
protected SSTableWriter(Descriptor descriptor,
long keyCount,
long repairedAt,
CFMetaData metadata,
MetadataCollector metadataCollector,
SerializationHeader header)
{
super(descriptor, components(metadata), metadata);
this.keyCount = keyCount;
this.repairedAt = repairedAt;
this.metadataCollector = metadataCollector;
this.header = header != null ? header : SerializationHeader.makeWithoutStats(metadata); //null header indicates streaming from pre-3.0 sstable
this.rowIndexEntrySerializer = descriptor.version.getSSTableFormat().getIndexSerializer(metadata, descriptor.version, header);
}
public static SSTableWriter create(Descriptor descriptor,
Long keyCount,
Long repairedAt,
CFMetaData metadata,
MetadataCollector metadataCollector,
SerializationHeader header,
LifecycleNewTracker lifecycleNewTracker)
{
Factory writerFactory = descriptor.getFormat().getWriterFactory();
return writerFactory.open(descriptor, keyCount, repairedAt, metadata, metadataCollector, header, lifecycleNewTracker);
}
public static SSTableWriter create(Descriptor descriptor, long keyCount, long repairedAt, int sstableLevel, SerializationHeader header, LifecycleNewTracker lifecycleNewTracker)
{
CFMetaData metadata = Schema.instance.getCFMetaData(descriptor);
return create(metadata, descriptor, keyCount, repairedAt, sstableLevel, header, lifecycleNewTracker);
}
public static SSTableWriter create(CFMetaData metadata,
Descriptor descriptor,
long keyCount,
long repairedAt,
int sstableLevel,
SerializationHeader header,
LifecycleNewTracker lifecycleNewTracker)
{
MetadataCollector collector = new MetadataCollector(metadata.comparator).sstableLevel(sstableLevel);
return create(descriptor, keyCount, repairedAt, metadata, collector, header, lifecycleNewTracker);
}
public static SSTableWriter create(String filename, long keyCount, long repairedAt, int sstableLevel, SerializationHeader header, LifecycleNewTracker lifecycleNewTracker)
{
return create(Descriptor.fromFilename(filename), keyCount, repairedAt, sstableLevel, header, lifecycleNewTracker);
}
@VisibleForTesting
public static SSTableWriter create(String filename, long keyCount, long repairedAt, SerializationHeader header, LifecycleNewTracker lifecycleNewTracker)
{
return create(Descriptor.fromFilename(filename), keyCount, repairedAt, 0, header, lifecycleNewTracker);
}
private static Set<Component> components(CFMetaData metadata)
{
Set<Component> components = new HashSet<Component>(Arrays.asList(Component.DATA,
Component.PRIMARY_INDEX,
Component.STATS,
Component.SUMMARY,
Component.TOC,
Component.digestFor(BigFormat.latestVersion.uncompressedChecksumType())));
if (metadata.params.bloomFilterFpChance < 1.0)
components.add(Component.FILTER);
if (metadata.params.compression.isEnabled())
{
components.add(Component.COMPRESSION_INFO);
}
else
{
// it would feel safer to actually add this component later in maybeWriteDigest(),
// but the components are unmodifiable after construction
components.add(Component.CRC);
}
return components;
}
public abstract void mark();
/**
* Appends partition data to this writer.
*
* @param iterator the partition to write
* @return the created index entry if something was written, that is if {@code iterator}
* wasn't empty, {@code null} otherwise.
*
* @throws FSWriteError if a write to the dataFile fails
*/
public abstract RowIndexEntry append(UnfilteredRowIterator iterator);
public abstract long getFilePointer();
public abstract long getOnDiskFilePointer();
public abstract void resetAndTruncate();
public SSTableWriter setRepairedAt(long repairedAt)
{
if (repairedAt > 0)
this.repairedAt = repairedAt;
return this;
}
public SSTableWriter setMaxDataAge(long maxDataAge)
{
this.maxDataAge = maxDataAge;
return this;
}
public SSTableWriter setOpenResult(boolean openResult)
{
txnProxy.openResult = openResult;
return this;
}
/**
* Open the resultant SSTableReader before it has been fully written
*/
public abstract SSTableReader openEarly();
/**
* Open the resultant SSTableReader once it has been fully written, but before the
* _set_ of tables that are being written together as one atomic operation are all ready
*/
public abstract SSTableReader openFinalEarly();
public SSTableReader finish(long repairedAt, long maxDataAge, boolean openResult)
{
if (repairedAt > 0)
this.repairedAt = repairedAt;
this.maxDataAge = maxDataAge;
return finish(openResult);
}
public SSTableReader finish(boolean openResult)
{
setOpenResult(openResult);
txnProxy.finish();
return finished();
}
/**
* Open the resultant SSTableReader once it has been fully written, and all related state
* is ready to be finalised including other sstables being written involved in the same operation
*/
public SSTableReader finished()
{
return txnProxy.finalReader;
}
// finalise our state on disk, including renaming
public final void prepareToCommit()
{
txnProxy.prepareToCommit();
}
public final Throwable commit(Throwable accumulate)
{
return txnProxy.commit(accumulate);
}
public final Throwable abort(Throwable accumulate)
{
return txnProxy.abort(accumulate);
}
public final void close()
{
txnProxy.close();
}
public final void abort()
{
txnProxy.abort();
}
protected Map<MetadataType, MetadataComponent> finalizeMetadata()
{
return metadataCollector.finalizeMetadata(getPartitioner().getClass().getCanonicalName(),
metadata.params.bloomFilterFpChance,
repairedAt,
header);
}
protected StatsMetadata statsMetadata()
{
return (StatsMetadata) finalizeMetadata().get(MetadataType.STATS);
}
public static void rename(Descriptor tmpdesc, Descriptor newdesc, Set<Component> components)
{
for (Component component : Sets.difference(components, Sets.newHashSet(Component.DATA, Component.SUMMARY)))
{
FileUtils.renameWithConfirm(tmpdesc.filenameFor(component), newdesc.filenameFor(component));
}
// do -Data last because -Data present should mean the sstable was completely renamed before crash
FileUtils.renameWithConfirm(tmpdesc.filenameFor(Component.DATA), newdesc.filenameFor(Component.DATA));
// rename it without confirmation because summary can be available for loadNewSSTables but not for closeAndOpenReader
FileUtils.renameWithOutConfirm(tmpdesc.filenameFor(Component.SUMMARY), newdesc.filenameFor(Component.SUMMARY));
}
public static abstract class Factory
{
public abstract SSTableWriter open(Descriptor descriptor,
long keyCount,
long repairedAt,
CFMetaData metadata,
MetadataCollector metadataCollector,
SerializationHeader header,
LifecycleNewTracker lifecycleNewTracker);
}
}