blob: f99292e1ea4cd4ea29ce0f2c23c1b95227a12939 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.io.sstable.format;
import com.google.common.collect.Sets;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.RowIndexEntry;
import org.apache.cassandra.db.compaction.AbstractCompactedRow;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.SSTable;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
import org.apache.cassandra.io.sstable.metadata.MetadataComponent;
import org.apache.cassandra.io.sstable.metadata.MetadataType;
import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.utils.concurrent.Transactional;
import java.io.DataInput;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
/**
* This is the API all table writers must implement.
*
* TableWriter.create() is the primary way to create a writer for a particular format.
* The format information is part of the Descriptor.
*/
public abstract class SSTableWriter extends SSTable implements Transactional
{
protected long repairedAt;
protected long maxDataAge = -1;
protected final long keyCount;
protected final MetadataCollector metadataCollector;
protected final RowIndexEntry.IndexSerializer rowIndexEntrySerializer;
protected final TransactionalProxy txnProxy = txnProxy();
protected abstract TransactionalProxy txnProxy();
// due to lack of multiple inheritance, we use an inner class to proxy our Transactional implementation details
protected abstract class TransactionalProxy extends AbstractTransactional
{
// should be set during doPrepare()
protected SSTableReader finalReader;
protected boolean openResult;
}
protected SSTableWriter(Descriptor descriptor, long keyCount, long repairedAt, CFMetaData metadata, IPartitioner partitioner, MetadataCollector metadataCollector)
{
super(descriptor, components(metadata), metadata, partitioner);
this.keyCount = keyCount;
this.repairedAt = repairedAt;
this.metadataCollector = metadataCollector;
this.rowIndexEntrySerializer = descriptor.version.getSSTableFormat().getIndexSerializer(metadata);
}
public static SSTableWriter create(Descriptor descriptor, Long keyCount, Long repairedAt, CFMetaData metadata, IPartitioner partitioner, MetadataCollector metadataCollector)
{
Factory writerFactory = descriptor.getFormat().getWriterFactory();
return writerFactory.open(descriptor, keyCount, repairedAt, metadata, partitioner, metadataCollector);
}
public static SSTableWriter create(Descriptor descriptor, long keyCount, long repairedAt)
{
return create(descriptor, keyCount, repairedAt, 0);
}
public static SSTableWriter create(Descriptor descriptor, long keyCount, long repairedAt, int sstableLevel)
{
CFMetaData metadata = Schema.instance.getCFMetaData(descriptor);
return create(metadata, descriptor, keyCount, repairedAt, sstableLevel, DatabaseDescriptor.getPartitioner());
}
public static SSTableWriter create(CFMetaData metadata,
Descriptor descriptor,
long keyCount,
long repairedAt,
int sstableLevel,
IPartitioner partitioner)
{
MetadataCollector collector = new MetadataCollector(metadata.comparator).sstableLevel(sstableLevel);
return create(descriptor, keyCount, repairedAt, metadata, partitioner, collector);
}
public static SSTableWriter create(String filename, long keyCount, long repairedAt, int sstableLevel)
{
return create(Descriptor.fromFilename(filename), keyCount, repairedAt, sstableLevel);
}
public static SSTableWriter create(String filename, long keyCount, long repairedAt)
{
return create(Descriptor.fromFilename(filename), keyCount, repairedAt, 0);
}
private static Set<Component> components(CFMetaData metadata)
{
Set<Component> components = new HashSet<Component>(Arrays.asList(Component.DATA,
Component.PRIMARY_INDEX,
Component.STATS,
Component.SUMMARY,
Component.TOC,
Component.DIGEST));
if (metadata.getBloomFilterFpChance() < 1.0)
components.add(Component.FILTER);
if (metadata.compressionParameters().sstableCompressor != null)
{
components.add(Component.COMPRESSION_INFO);
}
else
{
// it would feel safer to actually add this component later in maybeWriteDigest(),
// but the components are unmodifiable after construction
components.add(Component.CRC);
}
return components;
}
public abstract void mark();
/**
* @param row
* @return null if the row was compacted away entirely; otherwise, the PK index entry for this row
*/
public abstract RowIndexEntry append(AbstractCompactedRow row);
public abstract void append(DecoratedKey decoratedKey, ColumnFamily cf);
public abstract long appendFromStream(DecoratedKey key, CFMetaData metadata, DataInput in, Version version) throws IOException;
public abstract long getFilePointer();
public abstract long getOnDiskFilePointer();
public abstract void resetAndTruncate();
public SSTableWriter setRepairedAt(long repairedAt)
{
if (repairedAt > 0)
this.repairedAt = repairedAt;
return this;
}
public SSTableWriter setMaxDataAge(long maxDataAge)
{
this.maxDataAge = maxDataAge;
return this;
}
public SSTableWriter setOpenResult(boolean openResult)
{
txnProxy.openResult = openResult;
return this;
}
/**
* Open the resultant SSTableReader before it has been fully written
*/
public abstract SSTableReader openEarly();
/**
* Open the resultant SSTableReader once it has been fully written, but before the
* _set_ of tables that are being written together as one atomic operation are all ready
*/
public abstract SSTableReader openFinalEarly();
public SSTableReader finish(long repairedAt, long maxDataAge, boolean openResult)
{
if (repairedAt > 0)
this.repairedAt = repairedAt;
this.maxDataAge = maxDataAge;
return finish(openResult);
}
public SSTableReader finish(boolean openResult)
{
txnProxy.openResult = openResult;
txnProxy.finish();
return finished();
}
/**
* Open the resultant SSTableReader once it has been fully written, and all related state
* is ready to be finalised including other sstables being written involved in the same operation
*/
public SSTableReader finished()
{
return txnProxy.finalReader;
}
// finalise our state on disk, including renaming
public final void prepareToCommit()
{
txnProxy.prepareToCommit();
}
public final Throwable commit(Throwable accumulate)
{
return txnProxy.commit(accumulate);
}
public final Throwable abort(Throwable accumulate)
{
return txnProxy.abort(accumulate);
}
public final void close()
{
txnProxy.close();
}
public final void abort()
{
txnProxy.abort();
}
protected Map<MetadataType, MetadataComponent> finalizeMetadata()
{
return metadataCollector.finalizeMetadata(partitioner.getClass().getCanonicalName(),
metadata.getBloomFilterFpChance(), repairedAt);
}
protected StatsMetadata statsMetadata()
{
return (StatsMetadata) finalizeMetadata().get(MetadataType.STATS);
}
public static Descriptor rename(Descriptor tmpdesc, Set<Component> components)
{
Descriptor newdesc = tmpdesc.asType(Descriptor.Type.FINAL);
rename(tmpdesc, newdesc, components);
return newdesc;
}
public static void rename(Descriptor tmpdesc, Descriptor newdesc, Set<Component> components)
{
for (Component component : Sets.difference(components, Sets.newHashSet(Component.DATA, Component.SUMMARY)))
{
FileUtils.renameWithConfirm(tmpdesc.filenameFor(component), newdesc.filenameFor(component));
}
// do -Data last because -Data present should mean the sstable was completely renamed before crash
FileUtils.renameWithConfirm(tmpdesc.filenameFor(Component.DATA), newdesc.filenameFor(Component.DATA));
// rename it without confirmation because summary can be available for loadNewSSTables but not for closeAndOpenReader
FileUtils.renameWithOutConfirm(tmpdesc.filenameFor(Component.SUMMARY), newdesc.filenameFor(Component.SUMMARY));
}
public static abstract class Factory
{
public abstract SSTableWriter open(Descriptor descriptor, long keyCount, long repairedAt, CFMetaData metadata, IPartitioner partitioner, MetadataCollector metadataCollector);
}
}