| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.cassandra.index.sasi.disk; |
| |
| import java.io.File; |
| import java.nio.ByteBuffer; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.*; |
| |
| import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor; |
| import org.apache.cassandra.concurrent.NamedThreadFactory; |
| import org.apache.cassandra.config.ColumnDefinition; |
| import org.apache.cassandra.db.DecoratedKey; |
| import org.apache.cassandra.db.compaction.OperationType; |
| import org.apache.cassandra.db.rows.Row; |
| import org.apache.cassandra.db.rows.Unfiltered; |
| import org.apache.cassandra.index.sasi.analyzer.AbstractAnalyzer; |
| import org.apache.cassandra.index.sasi.conf.ColumnIndex; |
| import org.apache.cassandra.index.sasi.utils.CombinedTermIterator; |
| import org.apache.cassandra.index.sasi.utils.TypeUtil; |
| import org.apache.cassandra.db.marshal.AbstractType; |
| import org.apache.cassandra.io.FSError; |
| import org.apache.cassandra.io.sstable.Descriptor; |
| import org.apache.cassandra.io.sstable.format.SSTableFlushObserver; |
| import org.apache.cassandra.io.util.FileUtils; |
| import org.apache.cassandra.utils.FBUtilities; |
| import org.apache.cassandra.utils.Pair; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.util.concurrent.Futures; |
| import com.google.common.util.concurrent.Uninterruptibles; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| public class PerSSTableIndexWriter implements SSTableFlushObserver |
| { |
| private static final Logger logger = LoggerFactory.getLogger(PerSSTableIndexWriter.class); |
| |
| private static final ThreadPoolExecutor INDEX_FLUSHER_MEMTABLE; |
| private static final ThreadPoolExecutor INDEX_FLUSHER_GENERAL; |
| |
| static |
| { |
| INDEX_FLUSHER_GENERAL = new JMXEnabledThreadPoolExecutor(1, 8, 60, TimeUnit.SECONDS, |
| new LinkedBlockingQueue<>(), |
| new NamedThreadFactory("SASI-General"), |
| "internal"); |
| INDEX_FLUSHER_GENERAL.allowCoreThreadTimeOut(true); |
| |
| INDEX_FLUSHER_MEMTABLE = new JMXEnabledThreadPoolExecutor(1, 8, 60, TimeUnit.SECONDS, |
| new LinkedBlockingQueue<>(), |
| new NamedThreadFactory("SASI-Memtable"), |
| "internal"); |
| INDEX_FLUSHER_MEMTABLE.allowCoreThreadTimeOut(true); |
| } |
| |
| private final int nowInSec = FBUtilities.nowInSeconds(); |
| |
| private final Descriptor descriptor; |
| private final OperationType source; |
| |
| private final AbstractType<?> keyValidator; |
| private final Map<ColumnDefinition, ColumnIndex> supportedIndexes; |
| |
| @VisibleForTesting |
| protected final Map<ColumnDefinition, Index> indexes; |
| |
| private DecoratedKey currentKey; |
| private long currentKeyPosition; |
| private boolean isComplete; |
| |
| public PerSSTableIndexWriter(AbstractType<?> keyValidator, |
| Descriptor descriptor, |
| OperationType source, |
| Map<ColumnDefinition, ColumnIndex> supportedIndexes) |
| { |
| this.keyValidator = keyValidator; |
| this.descriptor = descriptor; |
| this.source = source; |
| this.supportedIndexes = supportedIndexes; |
| this.indexes = new HashMap<>(); |
| } |
| |
| public void begin() |
| {} |
| |
| public void startPartition(DecoratedKey key, long curPosition) |
| { |
| currentKey = key; |
| currentKeyPosition = curPosition; |
| } |
| |
| public void nextUnfilteredCluster(Unfiltered unfiltered) |
| { |
| if (!unfiltered.isRow()) |
| return; |
| |
| Row row = (Row) unfiltered; |
| |
| supportedIndexes.keySet().forEach((column) -> { |
| ByteBuffer value = ColumnIndex.getValueOf(column, row, nowInSec); |
| if (value == null) |
| return; |
| |
| ColumnIndex columnIndex = supportedIndexes.get(column); |
| if (columnIndex == null) |
| return; |
| |
| Index index = indexes.get(column); |
| if (index == null) |
| indexes.put(column, (index = newIndex(columnIndex))); |
| |
| index.add(value.duplicate(), currentKey, currentKeyPosition); |
| }); |
| } |
| |
| public void complete() |
| { |
| if (isComplete) |
| return; |
| |
| currentKey = null; |
| |
| try |
| { |
| CountDownLatch latch = new CountDownLatch(indexes.size()); |
| for (Index index : indexes.values()) |
| index.complete(latch); |
| |
| Uninterruptibles.awaitUninterruptibly(latch); |
| } |
| finally |
| { |
| indexes.clear(); |
| isComplete = true; |
| } |
| } |
| |
| public Index getIndex(ColumnDefinition columnDef) |
| { |
| return indexes.get(columnDef); |
| } |
| |
| public Descriptor getDescriptor() |
| { |
| return descriptor; |
| } |
| |
| @VisibleForTesting |
| protected Index newIndex(ColumnIndex columnIndex) |
| { |
| return new Index(columnIndex); |
| } |
| |
| @VisibleForTesting |
| protected class Index |
| { |
| @VisibleForTesting |
| protected final String outputFile; |
| |
| private final ColumnIndex columnIndex; |
| private final AbstractAnalyzer analyzer; |
| private final long maxMemorySize; |
| |
| @VisibleForTesting |
| protected final Set<Future<OnDiskIndex>> segments; |
| private int segmentNumber = 0; |
| |
| private OnDiskIndexBuilder currentBuilder; |
| |
| public Index(ColumnIndex columnIndex) |
| { |
| this.columnIndex = columnIndex; |
| this.outputFile = descriptor.filenameFor(columnIndex.getComponent()); |
| this.analyzer = columnIndex.getAnalyzer(); |
| this.segments = new HashSet<>(); |
| this.maxMemorySize = maxMemorySize(columnIndex); |
| this.currentBuilder = newIndexBuilder(); |
| } |
| |
| public void add(ByteBuffer term, DecoratedKey key, long keyPosition) |
| { |
| if (term.remaining() == 0) |
| return; |
| |
| boolean isAdded = false; |
| |
| analyzer.reset(term); |
| while (analyzer.hasNext()) |
| { |
| ByteBuffer token = analyzer.next(); |
| int size = token.remaining(); |
| |
| if (token.remaining() >= OnDiskIndexBuilder.MAX_TERM_SIZE) |
| { |
| logger.info("Rejecting value (size {}, maximum {}) for column {} (analyzed {}) at {} SSTable.", |
| FBUtilities.prettyPrintMemory(term.remaining()), |
| FBUtilities.prettyPrintMemory(OnDiskIndexBuilder.MAX_TERM_SIZE), |
| columnIndex.getColumnName(), |
| columnIndex.getMode().isAnalyzed, |
| descriptor); |
| continue; |
| } |
| |
| if (!TypeUtil.isValid(token, columnIndex.getValidator())) |
| { |
| if ((token = TypeUtil.tryUpcast(token, columnIndex.getValidator())) == null) |
| { |
| logger.info("({}) Failed to add {} to index for key: {}, value size was {}, validator is {}.", |
| outputFile, |
| columnIndex.getColumnName(), |
| keyValidator.getString(key.getKey()), |
| FBUtilities.prettyPrintMemory(size), |
| columnIndex.getValidator()); |
| continue; |
| } |
| } |
| |
| currentBuilder.add(token, key, keyPosition); |
| isAdded = true; |
| } |
| |
| if (!isAdded || currentBuilder.estimatedMemoryUse() < maxMemorySize) |
| return; // non of the generated tokens were added to the index or memory size wasn't reached |
| |
| segments.add(getExecutor().submit(scheduleSegmentFlush(false))); |
| } |
| |
| @VisibleForTesting |
| protected Callable<OnDiskIndex> scheduleSegmentFlush(final boolean isFinal) |
| { |
| final OnDiskIndexBuilder builder = currentBuilder; |
| currentBuilder = newIndexBuilder(); |
| |
| final String segmentFile = filename(isFinal); |
| |
| return () -> { |
| long start = System.nanoTime(); |
| |
| try |
| { |
| File index = new File(segmentFile); |
| return builder.finish(index) ? new OnDiskIndex(index, columnIndex.getValidator(), null) : null; |
| } |
| catch (Exception | FSError e) |
| { |
| logger.error("Failed to build index segment {}", segmentFile, e); |
| return null; |
| } |
| finally |
| { |
| if (!isFinal) |
| logger.info("Flushed index segment {}, took {} ms.", segmentFile, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start)); |
| } |
| }; |
| } |
| |
| public void complete(final CountDownLatch latch) |
| { |
| logger.info("Scheduling index flush to {}", outputFile); |
| |
| getExecutor().submit((Runnable) () -> { |
| long start1 = System.nanoTime(); |
| |
| OnDiskIndex[] parts = new OnDiskIndex[segments.size() + 1]; |
| |
| try |
| { |
| // no parts present, build entire index from memory |
| if (segments.isEmpty()) |
| { |
| scheduleSegmentFlush(true).call(); |
| return; |
| } |
| |
| // parts are present but there is something still in memory, let's flush that inline |
| if (!currentBuilder.isEmpty()) |
| { |
| @SuppressWarnings("resource") |
| OnDiskIndex last = scheduleSegmentFlush(false).call(); |
| segments.add(Futures.immediateFuture(last)); |
| } |
| |
| int index = 0; |
| ByteBuffer combinedMin = null, combinedMax = null; |
| |
| for (Future<OnDiskIndex> f : segments) |
| { |
| OnDiskIndex part = f.get(); |
| if (part == null) |
| continue; |
| |
| parts[index++] = part; |
| combinedMin = (combinedMin == null || keyValidator.compare(combinedMin, part.minKey()) > 0) ? part.minKey() : combinedMin; |
| combinedMax = (combinedMax == null || keyValidator.compare(combinedMax, part.maxKey()) < 0) ? part.maxKey() : combinedMax; |
| } |
| |
| OnDiskIndexBuilder builder = newIndexBuilder(); |
| builder.finish(Pair.create(combinedMin, combinedMax), |
| new File(outputFile), |
| new CombinedTermIterator(parts)); |
| } |
| catch (Exception | FSError e) |
| { |
| logger.error("Failed to flush index {}.", outputFile, e); |
| FileUtils.delete(outputFile); |
| } |
| finally |
| { |
| logger.info("Index flush to {} took {} ms.", outputFile, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start1)); |
| |
| for (int segment = 0; segment < segmentNumber; segment++) |
| { |
| OnDiskIndex part = parts[segment]; |
| |
| if (part != null) |
| FileUtils.closeQuietly(part); |
| |
| FileUtils.delete(outputFile + "_" + segment); |
| } |
| |
| latch.countDown(); |
| } |
| }); |
| } |
| |
| private ExecutorService getExecutor() |
| { |
| return source == OperationType.FLUSH ? INDEX_FLUSHER_MEMTABLE : INDEX_FLUSHER_GENERAL; |
| } |
| |
| private OnDiskIndexBuilder newIndexBuilder() |
| { |
| return new OnDiskIndexBuilder(keyValidator, columnIndex.getValidator(), columnIndex.getMode().mode); |
| } |
| |
| public String filename(boolean isFinal) |
| { |
| return outputFile + (isFinal ? "" : "_" + segmentNumber++); |
| } |
| } |
| |
| protected long maxMemorySize(ColumnIndex columnIndex) |
| { |
| // 1G for memtable and configuration for compaction |
| return source == OperationType.FLUSH ? 1073741824L : columnIndex.getMode().maxCompactionFlushMemoryInMb; |
| } |
| |
| public int hashCode() |
| { |
| return descriptor.hashCode(); |
| } |
| |
| public boolean equals(Object o) |
| { |
| return !(o == null || !(o instanceof PerSSTableIndexWriter)) && descriptor.equals(((PerSSTableIndexWriter) o).descriptor); |
| } |
| } |