| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.cassandra.io.sstable.format; |
| |
| import com.google.common.collect.Sets; |
| import org.apache.cassandra.config.CFMetaData; |
| import org.apache.cassandra.config.DatabaseDescriptor; |
| import org.apache.cassandra.config.Schema; |
| import org.apache.cassandra.db.ColumnFamily; |
| import org.apache.cassandra.db.DecoratedKey; |
| import org.apache.cassandra.db.RowIndexEntry; |
| import org.apache.cassandra.db.compaction.AbstractCompactedRow; |
| import org.apache.cassandra.dht.IPartitioner; |
| import org.apache.cassandra.io.sstable.Component; |
| import org.apache.cassandra.io.sstable.Descriptor; |
| import org.apache.cassandra.io.sstable.SSTable; |
| import org.apache.cassandra.io.sstable.metadata.MetadataCollector; |
| import org.apache.cassandra.io.sstable.metadata.MetadataComponent; |
| import org.apache.cassandra.io.sstable.metadata.MetadataType; |
| import org.apache.cassandra.io.sstable.metadata.StatsMetadata; |
| import org.apache.cassandra.io.util.FileUtils; |
| import org.apache.cassandra.utils.concurrent.Transactional; |
| |
| import java.io.DataInput; |
| import java.io.IOException; |
| import java.util.Arrays; |
| import java.util.HashSet; |
| import java.util.Map; |
| import java.util.Set; |
| |
| /** |
| * This is the API all table writers must implement. |
| * |
| * TableWriter.create() is the primary way to create a writer for a particular format. |
| * The format information is part of the Descriptor. |
| */ |
| public abstract class SSTableWriter extends SSTable implements Transactional |
| { |
| protected long repairedAt; |
| protected long maxDataAge = -1; |
| protected final long keyCount; |
| protected final MetadataCollector metadataCollector; |
| protected final RowIndexEntry.IndexSerializer rowIndexEntrySerializer; |
| protected final TransactionalProxy txnProxy = txnProxy(); |
| |
| protected abstract TransactionalProxy txnProxy(); |
| |
| // due to lack of multiple inheritance, we use an inner class to proxy our Transactional implementation details |
| protected abstract class TransactionalProxy extends AbstractTransactional |
| { |
| // should be set during doPrepare() |
| protected SSTableReader finalReader; |
| protected boolean openResult; |
| } |
| |
| protected SSTableWriter(Descriptor descriptor, long keyCount, long repairedAt, CFMetaData metadata, IPartitioner partitioner, MetadataCollector metadataCollector) |
| { |
| super(descriptor, components(metadata), metadata, partitioner); |
| this.keyCount = keyCount; |
| this.repairedAt = repairedAt; |
| this.metadataCollector = metadataCollector; |
| this.rowIndexEntrySerializer = descriptor.version.getSSTableFormat().getIndexSerializer(metadata); |
| } |
| |
| public static SSTableWriter create(Descriptor descriptor, Long keyCount, Long repairedAt, CFMetaData metadata, IPartitioner partitioner, MetadataCollector metadataCollector) |
| { |
| Factory writerFactory = descriptor.getFormat().getWriterFactory(); |
| return writerFactory.open(descriptor, keyCount, repairedAt, metadata, partitioner, metadataCollector); |
| } |
| |
| public static SSTableWriter create(Descriptor descriptor, long keyCount, long repairedAt) |
| { |
| return create(descriptor, keyCount, repairedAt, 0); |
| } |
| |
| public static SSTableWriter create(Descriptor descriptor, long keyCount, long repairedAt, int sstableLevel) |
| { |
| CFMetaData metadata = Schema.instance.getCFMetaData(descriptor); |
| return create(metadata, descriptor, keyCount, repairedAt, sstableLevel, DatabaseDescriptor.getPartitioner()); |
| } |
| |
| public static SSTableWriter create(CFMetaData metadata, |
| Descriptor descriptor, |
| long keyCount, |
| long repairedAt, |
| int sstableLevel, |
| IPartitioner partitioner) |
| { |
| MetadataCollector collector = new MetadataCollector(metadata.comparator).sstableLevel(sstableLevel); |
| return create(descriptor, keyCount, repairedAt, metadata, partitioner, collector); |
| } |
| |
| public static SSTableWriter create(String filename, long keyCount, long repairedAt, int sstableLevel) |
| { |
| return create(Descriptor.fromFilename(filename), keyCount, repairedAt, sstableLevel); |
| } |
| |
| public static SSTableWriter create(String filename, long keyCount, long repairedAt) |
| { |
| return create(Descriptor.fromFilename(filename), keyCount, repairedAt, 0); |
| } |
| |
| private static Set<Component> components(CFMetaData metadata) |
| { |
| Set<Component> components = new HashSet<Component>(Arrays.asList(Component.DATA, |
| Component.PRIMARY_INDEX, |
| Component.STATS, |
| Component.SUMMARY, |
| Component.TOC, |
| Component.DIGEST)); |
| |
| if (metadata.getBloomFilterFpChance() < 1.0) |
| components.add(Component.FILTER); |
| |
| if (metadata.compressionParameters().sstableCompressor != null) |
| { |
| components.add(Component.COMPRESSION_INFO); |
| } |
| else |
| { |
| // it would feel safer to actually add this component later in maybeWriteDigest(), |
| // but the components are unmodifiable after construction |
| components.add(Component.CRC); |
| } |
| return components; |
| } |
| |
| |
| public abstract void mark(); |
| |
| |
| /** |
| * @param row |
| * @return null if the row was compacted away entirely; otherwise, the PK index entry for this row |
| */ |
| public abstract RowIndexEntry append(AbstractCompactedRow row); |
| |
| public abstract void append(DecoratedKey decoratedKey, ColumnFamily cf); |
| |
| public abstract long appendFromStream(DecoratedKey key, CFMetaData metadata, DataInput in, Version version) throws IOException; |
| |
| public abstract long getFilePointer(); |
| |
| public abstract long getOnDiskFilePointer(); |
| |
| public abstract void resetAndTruncate(); |
| |
| public SSTableWriter setRepairedAt(long repairedAt) |
| { |
| if (repairedAt > 0) |
| this.repairedAt = repairedAt; |
| return this; |
| } |
| |
| public SSTableWriter setMaxDataAge(long maxDataAge) |
| { |
| this.maxDataAge = maxDataAge; |
| return this; |
| } |
| |
| public SSTableWriter setOpenResult(boolean openResult) |
| { |
| txnProxy.openResult = openResult; |
| return this; |
| } |
| |
| /** |
| * Open the resultant SSTableReader before it has been fully written |
| */ |
| public abstract SSTableReader openEarly(); |
| |
| /** |
| * Open the resultant SSTableReader once it has been fully written, but before the |
| * _set_ of tables that are being written together as one atomic operation are all ready |
| */ |
| public abstract SSTableReader openFinalEarly(); |
| |
| public SSTableReader finish(long repairedAt, long maxDataAge, boolean openResult) |
| { |
| if (repairedAt > 0) |
| this.repairedAt = repairedAt; |
| this.maxDataAge = maxDataAge; |
| return finish(openResult); |
| } |
| |
| public SSTableReader finish(boolean openResult) |
| { |
| txnProxy.openResult = openResult; |
| txnProxy.finish(); |
| return finished(); |
| } |
| |
| /** |
| * Open the resultant SSTableReader once it has been fully written, and all related state |
| * is ready to be finalised including other sstables being written involved in the same operation |
| */ |
| public SSTableReader finished() |
| { |
| return txnProxy.finalReader; |
| } |
| |
| // finalise our state on disk, including renaming |
| public final void prepareToCommit() |
| { |
| txnProxy.prepareToCommit(); |
| } |
| |
| public final Throwable commit(Throwable accumulate) |
| { |
| return txnProxy.commit(accumulate); |
| } |
| |
| public final Throwable abort(Throwable accumulate) |
| { |
| return txnProxy.abort(accumulate); |
| } |
| |
| public final void close() |
| { |
| txnProxy.close(); |
| } |
| |
| public final void abort() |
| { |
| txnProxy.abort(); |
| } |
| |
| protected Map<MetadataType, MetadataComponent> finalizeMetadata() |
| { |
| return metadataCollector.finalizeMetadata(partitioner.getClass().getCanonicalName(), |
| metadata.getBloomFilterFpChance(), repairedAt); |
| } |
| |
| protected StatsMetadata statsMetadata() |
| { |
| return (StatsMetadata) finalizeMetadata().get(MetadataType.STATS); |
| } |
| |
| public static Descriptor rename(Descriptor tmpdesc, Set<Component> components) |
| { |
| Descriptor newdesc = tmpdesc.asType(Descriptor.Type.FINAL); |
| rename(tmpdesc, newdesc, components); |
| return newdesc; |
| } |
| |
| public static void rename(Descriptor tmpdesc, Descriptor newdesc, Set<Component> components) |
| { |
| for (Component component : Sets.difference(components, Sets.newHashSet(Component.DATA, Component.SUMMARY))) |
| { |
| FileUtils.renameWithConfirm(tmpdesc.filenameFor(component), newdesc.filenameFor(component)); |
| } |
| |
| // do -Data last because -Data present should mean the sstable was completely renamed before crash |
| FileUtils.renameWithConfirm(tmpdesc.filenameFor(Component.DATA), newdesc.filenameFor(Component.DATA)); |
| |
| // rename it without confirmation because summary can be available for loadNewSSTables but not for closeAndOpenReader |
| FileUtils.renameWithOutConfirm(tmpdesc.filenameFor(Component.SUMMARY), newdesc.filenameFor(Component.SUMMARY)); |
| } |
| |
| |
| public static abstract class Factory |
| { |
| public abstract SSTableWriter open(Descriptor descriptor, long keyCount, long repairedAt, CFMetaData metadata, IPartitioner partitioner, MetadataCollector metadataCollector); |
| } |
| } |