| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.pinot.segment.local.segment.index.loader; |
| |
| import java.io.File; |
| import java.net.URI; |
| import java.util.List; |
| import javax.annotation.Nullable; |
| import org.apache.commons.io.FileUtils; |
| import org.apache.pinot.segment.local.segment.index.loader.columnminmaxvalue.ColumnMinMaxValueGenerator; |
| import org.apache.pinot.segment.local.segment.index.loader.columnminmaxvalue.ColumnMinMaxValueGeneratorMode; |
| import org.apache.pinot.segment.local.segment.index.loader.defaultcolumn.DefaultColumnHandler; |
| import org.apache.pinot.segment.local.segment.index.loader.defaultcolumn.DefaultColumnHandlerFactory; |
| import org.apache.pinot.segment.local.segment.index.loader.invertedindex.InvertedIndexHandler; |
| import org.apache.pinot.segment.local.startree.StarTreeBuilderUtils; |
| import org.apache.pinot.segment.local.startree.v2.builder.MultipleTreesBuilder; |
| import org.apache.pinot.segment.local.startree.v2.builder.StarTreeV2BuilderConfig; |
| import org.apache.pinot.segment.spi.V1Constants; |
| import org.apache.pinot.segment.spi.creator.IndexCreatorProvider; |
| import org.apache.pinot.segment.spi.index.IndexingOverrides; |
| import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl; |
| import org.apache.pinot.segment.spi.index.startree.StarTreeV2Metadata; |
| import org.apache.pinot.segment.spi.store.ColumnIndexType; |
| import org.apache.pinot.segment.spi.store.SegmentDirectory; |
| import org.apache.pinot.spi.data.Schema; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| |
| /** |
| * Use mmap to load the segment and perform all pre-processing steps. (This can be slow) |
| * <p>Pre-processing steps include: |
| * <ul> |
| * <li>Use {@link InvertedIndexHandler} to create inverted indices</li> |
| * <li>Use {@link DefaultColumnHandler} to update auto-generated default columns</li> |
| * <li>Use {@link ColumnMinMaxValueGenerator} to add min/max value to column metadata</li> |
| * </ul> |
| */ |
| public class SegmentPreProcessor implements AutoCloseable { |
| private static final Logger LOGGER = LoggerFactory.getLogger(SegmentPreProcessor.class); |
| |
| private final URI _indexDirURI; |
| private final IndexLoadingConfig _indexLoadingConfig; |
| private final Schema _schema; |
| private final SegmentDirectory _segmentDirectory; |
| private SegmentMetadataImpl _segmentMetadata; |
| |
| public SegmentPreProcessor(SegmentDirectory segmentDirectory, IndexLoadingConfig indexLoadingConfig, |
| @Nullable Schema schema) { |
| _segmentDirectory = segmentDirectory; |
| _indexDirURI = segmentDirectory.getIndexDir(); |
| _indexLoadingConfig = indexLoadingConfig; |
| _schema = schema; |
| _segmentMetadata = segmentDirectory.getSegmentMetadata(); |
| } |
| |
| @Override |
| public void close() |
| throws Exception { |
| _segmentDirectory.close(); |
| } |
| |
| public void process() |
| throws Exception { |
| if (_segmentMetadata.getTotalDocs() == 0) { |
| LOGGER.info("Skip preprocessing empty segment: {}", _segmentMetadata.getName()); |
| return; |
| } |
| |
| // Segment processing has to be done with a local directory. |
| File indexDir = new File(_indexDirURI); |
| |
| // This fixes the issue of temporary files not getting deleted after creating new inverted indexes. |
| removeInvertedIndexTempFiles(indexDir); |
| |
| try (SegmentDirectory.Writer segmentWriter = _segmentDirectory.createWriter()) { |
| // Update default columns according to the schema. |
| if (_schema != null) { |
| DefaultColumnHandler defaultColumnHandler = DefaultColumnHandlerFactory |
| .getDefaultColumnHandler(indexDir, _segmentMetadata, _indexLoadingConfig, _schema, segmentWriter); |
| defaultColumnHandler.updateDefaultColumns(); |
| _segmentMetadata = new SegmentMetadataImpl(indexDir); |
| _segmentDirectory.reloadMetadata(); |
| } else { |
| LOGGER.warn("Skip creating default columns for segment: {} without schema", _segmentMetadata.getName()); |
| } |
| |
| // Update single-column indices, like inverted index, json index etc. |
| IndexCreatorProvider indexCreatorProvider = IndexingOverrides.getIndexCreatorProvider(); |
| for (ColumnIndexType type : ColumnIndexType.values()) { |
| IndexHandler handler = IndexHandlerFactory.getIndexHandler(type, _segmentMetadata, _indexLoadingConfig); |
| handler.updateIndices(segmentWriter, indexCreatorProvider); |
| } |
| |
| // Create/modify/remove star-trees if required. |
| processStarTrees(indexDir); |
| |
| // Add min/max value to column metadata according to the prune mode. |
| // For star-tree index, because it can only increase the range, so min/max value can still be used in pruner. |
| ColumnMinMaxValueGeneratorMode columnMinMaxValueGeneratorMode = |
| _indexLoadingConfig.getColumnMinMaxValueGeneratorMode(); |
| if (columnMinMaxValueGeneratorMode != ColumnMinMaxValueGeneratorMode.NONE) { |
| ColumnMinMaxValueGenerator columnMinMaxValueGenerator = |
| new ColumnMinMaxValueGenerator(_segmentMetadata, segmentWriter, columnMinMaxValueGeneratorMode); |
| columnMinMaxValueGenerator.addColumnMinMaxValue(); |
| // NOTE: This step may modify the segment metadata. When adding new steps after this, un-comment the next line. |
| // _segmentMetadata = new SegmentMetadataImpl(indexDir); |
| } |
| |
| segmentWriter.save(); |
| } |
| } |
| |
| /** |
| * This method checks if there is any discrepancy between the segment and current table config and schema. |
| * If so, it returns true indicating the segment needs to be reprocessed. Right now, the default columns, |
| * all types of indices and column min/max values are checked against what's set in table config and schema. |
| */ |
| public boolean needProcess() |
| throws Exception { |
| if (_segmentMetadata.getTotalDocs() == 0) { |
| return false; |
| } |
| try (SegmentDirectory.Reader segmentReader = _segmentDirectory.createReader()) { |
| // Check if there is need to update default columns according to the schema. |
| if (_schema != null) { |
| DefaultColumnHandler defaultColumnHandler = DefaultColumnHandlerFactory |
| .getDefaultColumnHandler(null, _segmentMetadata, _indexLoadingConfig, _schema, null); |
| if (defaultColumnHandler.needUpdateDefaultColumns()) { |
| LOGGER.info("Found default columns need updates"); |
| return true; |
| } |
| } |
| // Check if there is need to update single-column indices, like inverted index, json index etc. |
| for (ColumnIndexType type : ColumnIndexType.values()) { |
| if (IndexHandlerFactory.getIndexHandler(type, _segmentMetadata, _indexLoadingConfig) |
| .needUpdateIndices(segmentReader)) { |
| LOGGER.info("Found index type: {} needs updates", type); |
| return true; |
| } |
| } |
| // Check if there is need to create/modify/remove star-trees. |
| if (needProcessStarTrees()) { |
| LOGGER.info("Found startree index needs updates"); |
| return true; |
| } |
| // Check if there is need to update column min max value. |
| if (needUpdateColumnMinMaxValue()) { |
| LOGGER.info("Found min max values need updates"); |
| return true; |
| } |
| } |
| return false; |
| } |
| |
| private boolean needUpdateColumnMinMaxValue() { |
| ColumnMinMaxValueGeneratorMode columnMinMaxValueGeneratorMode = |
| _indexLoadingConfig.getColumnMinMaxValueGeneratorMode(); |
| if (columnMinMaxValueGeneratorMode == ColumnMinMaxValueGeneratorMode.NONE) { |
| return false; |
| } |
| ColumnMinMaxValueGenerator columnMinMaxValueGenerator = |
| new ColumnMinMaxValueGenerator(_segmentMetadata, null, columnMinMaxValueGeneratorMode); |
| return columnMinMaxValueGenerator.needAddColumnMinMaxValue(); |
| } |
| |
| private boolean needProcessStarTrees() { |
| // Check if there is need to create/modify/remove star-trees. |
| if (!_indexLoadingConfig.isEnableDynamicStarTreeCreation()) { |
| return false; |
| } |
| List<StarTreeV2BuilderConfig> starTreeBuilderConfigs = StarTreeBuilderUtils |
| .generateBuilderConfigs(_indexLoadingConfig.getStarTreeIndexConfigs(), |
| _indexLoadingConfig.isEnableDefaultStarTree(), _segmentMetadata); |
| List<StarTreeV2Metadata> starTreeMetadataList = _segmentMetadata.getStarTreeV2MetadataList(); |
| // There are existing star-trees, but if they match the builder configs exactly, |
| // then there is no need to generate the star-trees |
| if (starTreeMetadataList != null && !StarTreeBuilderUtils |
| .shouldRemoveExistingStarTrees(starTreeBuilderConfigs, starTreeMetadataList)) { |
| return false; |
| } |
| return !starTreeBuilderConfigs.isEmpty(); |
| } |
| |
| private void processStarTrees(File indexDir) |
| throws Exception { |
| // Create/modify/remove star-trees if required |
| if (_indexLoadingConfig.isEnableDynamicStarTreeCreation()) { |
| List<StarTreeV2BuilderConfig> starTreeBuilderConfigs = StarTreeBuilderUtils |
| .generateBuilderConfigs(_indexLoadingConfig.getStarTreeIndexConfigs(), |
| _indexLoadingConfig.isEnableDefaultStarTree(), _segmentMetadata); |
| boolean shouldGenerateStarTree = !starTreeBuilderConfigs.isEmpty(); |
| List<StarTreeV2Metadata> starTreeMetadataList = _segmentMetadata.getStarTreeV2MetadataList(); |
| if (starTreeMetadataList != null) { |
| // There are existing star-trees |
| if (StarTreeBuilderUtils.shouldRemoveExistingStarTrees(starTreeBuilderConfigs, starTreeMetadataList)) { |
| // Remove the existing star-trees |
| LOGGER.info("Removing star-trees from segment: {}", _segmentMetadata.getName()); |
| StarTreeBuilderUtils.removeStarTrees(indexDir); |
| _segmentMetadata = new SegmentMetadataImpl(indexDir); |
| } else { |
| // Existing star-trees match the builder configs, no need to generate the star-trees |
| shouldGenerateStarTree = false; |
| } |
| } |
| // Generate the star-trees if needed |
| if (shouldGenerateStarTree) { |
| // NOTE: Always use OFF_HEAP mode on server side. |
| try (MultipleTreesBuilder builder = new MultipleTreesBuilder(starTreeBuilderConfigs, indexDir, |
| MultipleTreesBuilder.BuildMode.OFF_HEAP)) { |
| builder.build(); |
| } |
| _segmentMetadata = new SegmentMetadataImpl(indexDir); |
| } |
| } |
| } |
| |
| /** |
| * Remove all the existing inverted index temp files before loading segments, by looking |
| * for all files in the directory and remove the ones with '.bitmap.inv.tmp' extension. |
| */ |
| private void removeInvertedIndexTempFiles(File indexDir) { |
| File[] directoryListing = indexDir.listFiles(); |
| if (directoryListing == null) { |
| return; |
| } |
| String tempFileExtension = V1Constants.Indexes.BITMAP_INVERTED_INDEX_FILE_EXTENSION + ".tmp"; |
| for (File child : directoryListing) { |
| if (child.getName().endsWith(tempFileExtension)) { |
| FileUtils.deleteQuietly(child); |
| } |
| } |
| } |
| } |