| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.cassandra.db.memtable; |
| |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.List; |
| import java.util.concurrent.Callable; |
| |
| import com.google.common.base.Throwables; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.cassandra.db.ColumnFamilyStore; |
| import org.apache.cassandra.db.Directories; |
| import org.apache.cassandra.db.DiskBoundaries; |
| import org.apache.cassandra.db.PartitionPosition; |
| import org.apache.cassandra.db.SerializationHeader; |
| import org.apache.cassandra.db.SystemKeyspace; |
| import org.apache.cassandra.db.commitlog.IntervalSet; |
| import org.apache.cassandra.db.lifecycle.LifecycleTransaction; |
| import org.apache.cassandra.db.partitions.Partition; |
| import org.apache.cassandra.db.rows.UnfilteredRowIterator; |
| import org.apache.cassandra.io.sstable.Descriptor; |
| import org.apache.cassandra.io.sstable.SSTableMultiWriter; |
| import org.apache.cassandra.io.sstable.format.SSTableFormat; |
| import org.apache.cassandra.io.sstable.metadata.MetadataCollector; |
| import org.apache.cassandra.metrics.TableMetrics; |
| import org.apache.cassandra.service.ActiveRepairService; |
| import org.apache.cassandra.utils.FBUtilities; |
| |
| public class Flushing |
| { |
| private static final Logger logger = LoggerFactory.getLogger(Flushing.class); |
| |
| private Flushing() // prevent instantiation |
| { |
| } |
| |
| public static List<FlushRunnable> flushRunnables(ColumnFamilyStore cfs, |
| Memtable memtable, |
| LifecycleTransaction txn) |
| { |
| DiskBoundaries diskBoundaries = cfs.getDiskBoundaries(); |
| List<PartitionPosition> boundaries = diskBoundaries.positions; |
| List<Directories.DataDirectory> locations = diskBoundaries.directories; |
| if (boundaries == null) |
| { |
| FlushRunnable runnable = flushRunnable(cfs, memtable, null, null, txn, null); |
| return Collections.singletonList(runnable); |
| } |
| |
| List<FlushRunnable> runnables = new ArrayList<>(boundaries.size()); |
| PartitionPosition rangeStart = boundaries.get(0).getPartitioner().getMinimumToken().minKeyBound(); |
| try |
| { |
| for (int i = 0; i < boundaries.size(); i++) |
| { |
| PartitionPosition t = boundaries.get(i); |
| FlushRunnable runnable = flushRunnable(cfs, memtable, rangeStart, t, txn, locations.get(i)); |
| |
| runnables.add(runnable); |
| rangeStart = t; |
| } |
| return runnables; |
| } |
| catch (Throwable e) |
| { |
| throw Throwables.propagate(abortRunnables(runnables, e)); |
| } |
| } |
| |
| @SuppressWarnings("resource") // writer owned by runnable, to be closed or aborted by its caller |
| static FlushRunnable flushRunnable(ColumnFamilyStore cfs, |
| Memtable memtable, |
| PartitionPosition from, |
| PartitionPosition to, |
| LifecycleTransaction txn, |
| Directories.DataDirectory flushLocation) |
| { |
| Memtable.FlushablePartitionSet<?> flushSet = memtable.getFlushSet(from, to); |
| SSTableFormat.Type formatType = SSTableFormat.Type.current(); |
| long estimatedSize = formatType.info.getWriterFactory().estimateSize(flushSet); |
| |
| Descriptor descriptor = flushLocation == null |
| ? cfs.newSSTableDescriptor(cfs.getDirectories().getWriteableLocationAsFile(estimatedSize), formatType) |
| : cfs.newSSTableDescriptor(cfs.getDirectories().getLocationForDisk(flushLocation), formatType); |
| |
| SSTableMultiWriter writer = createFlushWriter(cfs, |
| flushSet, |
| txn, |
| descriptor, |
| flushSet.partitionCount()); |
| |
| return new FlushRunnable(flushSet, writer, cfs.metric, true); |
| } |
| |
| public static Throwable abortRunnables(List<FlushRunnable> runnables, Throwable t) |
| { |
| if (runnables != null) |
| for (FlushRunnable runnable : runnables) |
| t = runnable.writer.abort(t); |
| return t; |
| } |
| |
| public static class FlushRunnable implements Callable<SSTableMultiWriter> |
| { |
| private final Memtable.FlushablePartitionSet<?> toFlush; |
| |
| private final SSTableMultiWriter writer; |
| private final TableMetrics metrics; |
| private final boolean isBatchLogTable; |
| private final boolean logCompletion; |
| |
| public FlushRunnable(Memtable.FlushablePartitionSet<?> flushSet, |
| SSTableMultiWriter writer, |
| TableMetrics metrics, |
| boolean logCompletion) |
| { |
| this.toFlush = flushSet; |
| this.writer = writer; |
| this.metrics = metrics; |
| this.isBatchLogTable = toFlush.metadata() == SystemKeyspace.Batches; |
| this.logCompletion = logCompletion; |
| } |
| |
| private void writeSortedContents() |
| { |
| logger.info("Writing {}, flushed range = [{}, {})", toFlush.memtable(), toFlush.from(), toFlush.to()); |
| |
| // (we can't clear out the map as-we-go to free up memory, |
| // since the memtable is being used for queries in the "pending flush" category) |
| for (Partition partition : toFlush) |
| { |
| // Each batchlog partition is a separate entry in the log. And for an entry, we only do 2 |
| // operations: 1) we insert the entry and 2) we delete it. Further, BL data is strictly local, |
| // we don't need to preserve tombstones for repair. So if both operation are in this |
| // memtable (which will almost always be the case if there is no ongoing failure), we can |
| // just skip the entry (CASSANDRA-4667). |
| if (isBatchLogTable && !partition.partitionLevelDeletion().isLive() && partition.hasRows()) |
| continue; |
| |
| if (!partition.isEmpty()) |
| { |
| try (UnfilteredRowIterator iter = partition.unfilteredIterator()) |
| { |
| writer.append(iter); |
| } |
| } |
| } |
| |
| if (logCompletion) |
| { |
| long bytesFlushed = writer.getFilePointer(); |
| logger.info("Completed flushing {} ({}) for commitlog position {}", |
| writer.getFilename(), |
| FBUtilities.prettyPrintMemory(bytesFlushed), |
| toFlush.memtable().getFinalCommitLogUpperBound()); |
| // Update the metrics |
| metrics.bytesFlushed.inc(bytesFlushed); |
| } |
| } |
| |
| @Override |
| public SSTableMultiWriter call() |
| { |
| writeSortedContents(); |
| return writer; |
| // We don't close the writer on error as the caller aborts all runnables if one happens. |
| } |
| |
| @Override |
| public String toString() |
| { |
| return "Flush " + toFlush.metadata().keyspace + '.' + toFlush.metadata().name; |
| } |
| } |
| |
| public static SSTableMultiWriter createFlushWriter(ColumnFamilyStore cfs, |
| Memtable.FlushablePartitionSet<?> flushSet, |
| LifecycleTransaction txn, |
| Descriptor descriptor, |
| long partitionCount) |
| { |
| MetadataCollector sstableMetadataCollector = new MetadataCollector(flushSet.metadata().comparator) |
| .commitLogIntervals(new IntervalSet<>(flushSet.commitLogLowerBound(), |
| flushSet.commitLogUpperBound())); |
| |
| return cfs.createSSTableMultiWriter(descriptor, |
| partitionCount, |
| ActiveRepairService.UNREPAIRED_SSTABLE, |
| ActiveRepairService.NO_PENDING_REPAIR, |
| false, |
| sstableMetadataCollector, |
| new SerializationHeader(true, |
| flushSet.metadata(), |
| flushSet.columns(), |
| flushSet.encodingStats()), |
| txn); |
| } |
| } |