blob: 2bcad84f23e7e461d0d981089b32261779d08fe9 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.segment.local.segment.index.loader;
import com.google.common.collect.ImmutableMap;
import java.io.File;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.attribute.FileTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.segment.local.segment.creator.SegmentTestUtils;
import org.apache.pinot.segment.local.segment.creator.impl.SegmentCreationDriverFactory;
import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
import org.apache.pinot.segment.local.segment.index.converter.SegmentV1V2ToV3FormatConverter;
import org.apache.pinot.segment.local.segment.index.loader.columnminmaxvalue.ColumnMinMaxValueGeneratorMode;
import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
import org.apache.pinot.segment.spi.ColumnMetadata;
import org.apache.pinot.segment.spi.V1Constants;
import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
import org.apache.pinot.segment.spi.creator.SegmentIndexCreationDriver;
import org.apache.pinot.segment.spi.creator.SegmentVersion;
import org.apache.pinot.segment.spi.index.creator.H3IndexConfig;
import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderContext;
import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderRegistry;
import org.apache.pinot.segment.spi.store.ColumnIndexType;
import org.apache.pinot.segment.spi.store.SegmentDirectory;
import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths;
import org.apache.pinot.spi.config.table.BloomFilterConfig;
import org.apache.pinot.spi.config.table.IndexingConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
import org.apache.pinot.spi.config.table.ingestion.TransformConfig;
import org.apache.pinot.spi.data.DimensionFieldSpec;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.ByteArray;
import org.apache.pinot.spi.utils.ReadMode;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import static org.testng.Assert.*;
public class SegmentPreProcessorTest {
private static final File INDEX_DIR = new File(SegmentPreProcessorTest.class.toString());
private static final String AVRO_DATA = "data/test_data-mv.avro";
private static final String SCHEMA = "data/testDataMVSchema.json";
// For create inverted indices tests.
private static final String COLUMN1_NAME = "column1";
private static final String COLUMN7_NAME = "column7";
private static final String COLUMN13_NAME = "column13";
private static final String NO_SUCH_COLUMN_NAME = "noSuchColumn";
private static final String NEW_COLUMN_INVERTED_INDEX = "newStringMVDimension";
// For create text index tests
private static final String EXISTING_STRING_COL_RAW = "column4";
private static final String EXISTING_STRING_COL_DICT = "column5";
private static final String NEWLY_ADDED_STRING_COL_RAW = "newTextColRaw";
private static final String NEWLY_ADDED_STRING_COL_DICT = "newTextColDict";
private static final String NEWLY_ADDED_STRING_MV_COL_RAW = "newTextMVColRaw";
private static final String NEWLY_ADDED_STRING_MV_COL_DICT = "newTextMVColDict";
// For create fst index tests
private static final String NEWLY_ADDED_FST_COL_DICT = "newFSTColDict";
// For update default value tests.
private static final String NEW_COLUMNS_SCHEMA1 = "data/newColumnsSchema1.json";
private static final String NEW_COLUMNS_SCHEMA2 = "data/newColumnsSchema2.json";
private static final String NEW_COLUMNS_SCHEMA3 = "data/newColumnsSchema3.json";
private static final String NEW_COLUMNS_SCHEMA_WITH_FST = "data/newColumnsSchemaWithFST.json";
private static final String NEW_COLUMNS_SCHEMA_WITH_TEXT = "data/newColumnsSchemaWithText.json";
private static final String NEW_COLUMNS_SCHEMA_WITH_H3_JSON = "data/newColumnsSchemaWithH3Json.json";
private static final String NEW_INT_METRIC_COLUMN_NAME = "newIntMetric";
private static final String NEW_LONG_METRIC_COLUMN_NAME = "newLongMetric";
private static final String NEW_FLOAT_METRIC_COLUMN_NAME = "newFloatMetric";
private static final String NEW_DOUBLE_METRIC_COLUMN_NAME = "newDoubleMetric";
private static final String NEW_BOOLEAN_SV_DIMENSION_COLUMN_NAME = "newBooleanSVDimension";
private static final String NEW_INT_SV_DIMENSION_COLUMN_NAME = "newIntSVDimension";
private static final String NEW_STRING_MV_DIMENSION_COLUMN_NAME = "newStringMVDimension";
private static final String NEW_HLL_BYTE_METRIC_COLUMN_NAME = "newHLLByteMetric";
private static final String NEW_TDIGEST_BYTE_METRIC_COLUMN_NAME = "newTDigestByteMetric";
private File _indexDir;
private PinotConfiguration _configuration;
private IndexLoadingConfig _indexLoadingConfig;
private File _avroFile;
private Schema _schema;
private TableConfig _tableConfig;
private Schema _newColumnsSchema1;
private Schema _newColumnsSchema2;
private Schema _newColumnsSchema3;
private Schema _newColumnsSchemaWithFST;
private Schema _newColumnsSchemaWithText;
private Schema _newColumnsSchemaWithH3Json;
@BeforeMethod
public void setUp()
throws Exception {
FileUtils.deleteQuietly(INDEX_DIR);
Map<String, Object> props = new HashMap<>();
props.put(IndexLoadingConfig.READ_MODE_KEY, ReadMode.mmap.toString());
_configuration = new PinotConfiguration(props);
// We specify two columns without inverted index ('column1', 'column13'), one non-existing column ('noSuchColumn')
// and one column with existed inverted index ('column7').
_indexLoadingConfig = new IndexLoadingConfig();
_indexLoadingConfig.setInvertedIndexColumns(
new HashSet<>(Arrays.asList(COLUMN1_NAME, COLUMN7_NAME, COLUMN13_NAME, NO_SUCH_COLUMN_NAME)));
// The segment generation code in SegmentColumnarIndexCreator will throw
// exception if start and end time in time column are not in acceptable
// range. For this test, we first need to fix the input avro data
// to have the time column values in allowed range. Until then, the check
// is explicitly disabled
IngestionConfig ingestionConfig = new IngestionConfig();
ingestionConfig.setRowTimeValueCheck(false);
ingestionConfig.setSegmentTimeValueCheck(false);
_tableConfig =
new TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setTimeColumnName("daysSinceEpoch")
.setIngestionConfig(ingestionConfig).build();
_indexLoadingConfig.setTableConfig(_tableConfig);
ClassLoader classLoader = getClass().getClassLoader();
URL resourceUrl = classLoader.getResource(AVRO_DATA);
assertNotNull(resourceUrl);
_avroFile = new File(resourceUrl.getFile());
// For newColumnsSchema, we add 4 different data type metric columns with one user-defined default null value, and
// 3 different data type dimension columns with one user-defined default null value and one multi-value column.
resourceUrl = classLoader.getResource(SCHEMA);
assertNotNull(resourceUrl);
_schema = Schema.fromFile(new File(resourceUrl.getFile()));
resourceUrl = classLoader.getResource(NEW_COLUMNS_SCHEMA1);
assertNotNull(resourceUrl);
_newColumnsSchema1 = Schema.fromFile(new File(resourceUrl.getFile()));
resourceUrl = classLoader.getResource(NEW_COLUMNS_SCHEMA2);
assertNotNull(resourceUrl);
_newColumnsSchema2 = Schema.fromFile(new File(resourceUrl.getFile()));
resourceUrl = classLoader.getResource(NEW_COLUMNS_SCHEMA3);
assertNotNull(resourceUrl);
_newColumnsSchema3 = Schema.fromFile(new File(resourceUrl.getFile()));
resourceUrl = classLoader.getResource(NEW_COLUMNS_SCHEMA_WITH_FST);
assertNotNull(resourceUrl);
_newColumnsSchemaWithFST = Schema.fromFile(new File(resourceUrl.getFile()));
resourceUrl = classLoader.getResource(NEW_COLUMNS_SCHEMA_WITH_TEXT);
assertNotNull(resourceUrl);
_newColumnsSchemaWithText = Schema.fromFile(new File(resourceUrl.getFile()));
resourceUrl = classLoader.getResource(NEW_COLUMNS_SCHEMA_WITH_H3_JSON);
assertNotNull(resourceUrl);
_newColumnsSchemaWithH3Json = Schema.fromFile(new File(resourceUrl.getFile()));
}
@AfterMethod
public void tearDown()
throws Exception {
FileUtils.deleteQuietly(INDEX_DIR);
}
private void constructV1Segment()
throws Exception {
FileUtils.deleteQuietly(INDEX_DIR);
// Create inverted index for 'column7' when constructing the segment.
SegmentGeneratorConfig segmentGeneratorConfig =
SegmentTestUtils.getSegmentGeneratorConfigWithSchema(_avroFile, INDEX_DIR, "testTable", _tableConfig, _schema);
segmentGeneratorConfig.setInvertedIndexCreationColumns(Collections.singletonList(COLUMN7_NAME));
segmentGeneratorConfig.setRawIndexCreationColumns(Collections.singletonList(EXISTING_STRING_COL_RAW));
SegmentIndexCreationDriver driver = SegmentCreationDriverFactory.get(null);
driver.init(segmentGeneratorConfig);
driver.build();
_indexDir = new File(INDEX_DIR, driver.getSegmentName());
}
private void constructV3Segment()
throws Exception {
constructV1Segment();
new SegmentV1V2ToV3FormatConverter().convert(_indexDir);
}
/**
* Test to check for default column handling and text index creation during
* segment load after a new raw column is added to the schema with text index
* creation enabled.
* This will exercise both code paths in SegmentPreprocessor (segment load):
* (1) Default column handler to add forward index and dictionary
* (2) Text index handler to add text index
*/
@Test
public void testEnableTextIndexOnNewColumnRaw()
throws Exception {
Set<String> textIndexColumns = new HashSet<>();
textIndexColumns.add(NEWLY_ADDED_STRING_COL_RAW);
textIndexColumns.add(NEWLY_ADDED_STRING_MV_COL_RAW);
_indexLoadingConfig.setTextIndexColumns(textIndexColumns);
_indexLoadingConfig.getNoDictionaryColumns().add(NEWLY_ADDED_STRING_COL_RAW);
_indexLoadingConfig.getNoDictionaryColumns().add(NEWLY_ADDED_STRING_MV_COL_RAW);
// Create a segment in V3, add a new raw column with text index enabled
constructV3Segment();
SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(_indexDir);
ColumnMetadata columnMetadata = segmentMetadata.getColumnMetadataFor(NEWLY_ADDED_STRING_COL_RAW);
// should be null since column does not exist in the schema
assertNull(columnMetadata);
checkTextIndexCreation(NEWLY_ADDED_STRING_COL_RAW, 1, 1, _newColumnsSchemaWithText, true, true, true, 4);
checkTextIndexCreation(NEWLY_ADDED_STRING_MV_COL_RAW, 1, 1, _newColumnsSchemaWithText, true, true, false, 4, false,
1);
// Create a segment in V1, add a new raw column with text index enabled
constructV1Segment();
segmentMetadata = new SegmentMetadataImpl(_indexDir);
columnMetadata = segmentMetadata.getColumnMetadataFor(NEWLY_ADDED_STRING_COL_RAW);
// should be null since column does not exist in the schema
assertNull(columnMetadata);
checkTextIndexCreation(NEWLY_ADDED_STRING_COL_RAW, 1, 1, _newColumnsSchemaWithText, true, true, true, 4);
}
@Test
public void testEnableFSTIndexOnExistingColumnRaw()
throws Exception {
Set<String> fstColumns = new HashSet<>();
fstColumns.add(EXISTING_STRING_COL_RAW);
_indexLoadingConfig.setFSTIndexColumns(fstColumns);
_indexLoadingConfig.getNoDictionaryColumns().add(EXISTING_STRING_COL_RAW);
constructV3Segment();
SegmentDirectory segmentDirectory = SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader()
.load(_indexDir.toURI(),
new SegmentDirectoryLoaderContext.Builder().setSegmentDirectoryConfigs(_configuration).build());
SegmentPreProcessor v3Processor =
new SegmentPreProcessor(segmentDirectory, _indexLoadingConfig, _newColumnsSchemaWithFST);
expectThrows(UnsupportedOperationException.class, () -> v3Processor.process());
constructV1Segment();
segmentDirectory = SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader().load(_indexDir.toURI(),
new SegmentDirectoryLoaderContext.Builder().setSegmentDirectoryConfigs(_configuration).build());
SegmentPreProcessor v1Processor =
new SegmentPreProcessor(segmentDirectory, _indexLoadingConfig, _newColumnsSchemaWithFST);
expectThrows(UnsupportedOperationException.class, () -> v1Processor.process());
}
@Test
public void testEnableFSTIndexOnNewColumnDictEncoded()
throws Exception {
Set<String> fstColumns = new HashSet<>();
fstColumns.add(NEWLY_ADDED_FST_COL_DICT);
_indexLoadingConfig.setFSTIndexColumns(fstColumns);
constructV3Segment();
checkFSTIndexCreation(NEWLY_ADDED_FST_COL_DICT, 1, 1, _newColumnsSchemaWithFST, true, true, 4);
constructV1Segment();
checkFSTIndexCreation(NEWLY_ADDED_FST_COL_DICT, 1, 1, _newColumnsSchemaWithFST, true, true, 4);
}
@Test
public void testEnableFSTIndexOnExistingColumnDictEncoded()
throws Exception {
Set<String> fstColumns = new HashSet<>();
fstColumns.add(EXISTING_STRING_COL_DICT);
_indexLoadingConfig.setFSTIndexColumns(fstColumns);
constructV3Segment();
SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(_indexDir);
ColumnMetadata columnMetadata = segmentMetadata.getColumnMetadataFor(EXISTING_STRING_COL_DICT);
assertNotNull(columnMetadata);
checkFSTIndexCreation(EXISTING_STRING_COL_DICT, 9, 4, _newColumnsSchemaWithFST, false, false, 26);
constructV1Segment();
segmentMetadata = new SegmentMetadataImpl(_indexDir);
columnMetadata = segmentMetadata.getColumnMetadataFor(EXISTING_STRING_COL_DICT);
assertNotNull(columnMetadata);
checkFSTIndexCreation(EXISTING_STRING_COL_DICT, 9, 4, _newColumnsSchemaWithFST, false, false, 26);
}
@Test
public void testForwardIndexHandler()
throws Exception {
Map<String, ChunkCompressionType> compressionConfigs = new HashMap<>();
ChunkCompressionType newCompressionType = ChunkCompressionType.ZSTANDARD;
compressionConfigs.put(EXISTING_STRING_COL_RAW, newCompressionType);
_indexLoadingConfig.setCompressionConfigs(compressionConfigs);
_indexLoadingConfig.setNoDictionaryColumns(new HashSet<String>() {{
add(EXISTING_STRING_COL_RAW);
}});
// Test1: Rewriting forward index will be a no-op for v1 segments. Default LZ4 compressionType will be retained.
constructV1Segment();
checkForwardIndexCreation(EXISTING_STRING_COL_RAW, 5, 3, _schema, false, false, false, 0, ChunkCompressionType.LZ4);
// Convert the segment to V3.
new SegmentV1V2ToV3FormatConverter().convert(_indexDir);
// Test2: Now forward index will be rewritten with ZSTANDARD compressionType.
checkForwardIndexCreation(EXISTING_STRING_COL_RAW, 5, 3, _schema, false, false, false, 0, newCompressionType);
// Test3: Change compression on existing raw index column. Also add text index on same column. Check correctness.
newCompressionType = ChunkCompressionType.SNAPPY;
compressionConfigs.put(EXISTING_STRING_COL_RAW, newCompressionType);
Set<String> textIndexColumns = new HashSet<>();
textIndexColumns.add(EXISTING_STRING_COL_RAW);
_indexLoadingConfig.setTextIndexColumns(textIndexColumns);
constructV3Segment();
SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(_indexDir);
ColumnMetadata columnMetadata = segmentMetadata.getColumnMetadataFor(EXISTING_STRING_COL_RAW);
assertNotNull(columnMetadata);
checkTextIndexCreation(EXISTING_STRING_COL_RAW, 5, 3, _schema, false, false, false, 0);
validateIndex(ColumnIndexType.FORWARD_INDEX, EXISTING_STRING_COL_RAW, 5, 3, _schema, false, false, false, 0, true,
0, newCompressionType);
// Test4: Change compression on RAW index column. Change another index on another column. Check correctness.
newCompressionType = ChunkCompressionType.ZSTANDARD;
compressionConfigs.put(EXISTING_STRING_COL_RAW, newCompressionType);
Set<String> fstColumns = new HashSet<>();
fstColumns.add(EXISTING_STRING_COL_DICT);
_indexLoadingConfig.setFSTIndexColumns(fstColumns);
constructV3Segment();
segmentMetadata = new SegmentMetadataImpl(_indexDir);
columnMetadata = segmentMetadata.getColumnMetadataFor(EXISTING_STRING_COL_DICT);
assertNotNull(columnMetadata);
// Check FST index
checkFSTIndexCreation(EXISTING_STRING_COL_DICT, 9, 4, _newColumnsSchemaWithFST, false, false, 26);
// Check forward index.
validateIndex(ColumnIndexType.FORWARD_INDEX, EXISTING_STRING_COL_RAW, 5, 3, _schema, false, false, false, 0, true,
0, newCompressionType);
}
/**
* Test to check for default column handling and text index creation during
* segment load after a new dictionary encoded column is added to the schema
* with text index creation enabled.
* This will exercise both code paths in SegmentPreprocessor (segment load):
* (1) Default column handler to add forward index and dictionary
* (2) Text index handler to add text index
*/
@Test
public void testEnableTextIndexOnNewColumnDictEncoded()
throws Exception {
Set<String> textIndexColumns = new HashSet<>();
textIndexColumns.add(NEWLY_ADDED_STRING_COL_DICT);
textIndexColumns.add(NEWLY_ADDED_STRING_MV_COL_DICT);
_indexLoadingConfig.setTextIndexColumns(textIndexColumns);
// Create a segment in V3, add a new dict encoded column with text index enabled
constructV3Segment();
SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(_indexDir);
ColumnMetadata columnMetadata = segmentMetadata.getColumnMetadataFor(NEWLY_ADDED_STRING_COL_RAW);
// should be null since column does not exist in the schema
assertNull(columnMetadata);
checkTextIndexCreation(NEWLY_ADDED_STRING_COL_DICT, 1, 1, _newColumnsSchemaWithText, true, true, true, 4);
checkTextIndexCreation(NEWLY_ADDED_STRING_MV_COL_DICT, 1, 1, _newColumnsSchemaWithText, true, true, false, 4, false,
1);
// Create a segment in V1, add a new dict encoded column with text index enabled
constructV1Segment();
segmentMetadata = new SegmentMetadataImpl(_indexDir);
columnMetadata = segmentMetadata.getColumnMetadataFor(NEWLY_ADDED_STRING_COL_RAW);
// should be null since column does not exist in the schema
assertNull(columnMetadata);
checkTextIndexCreation(NEWLY_ADDED_STRING_COL_DICT, 1, 1, _newColumnsSchemaWithText, true, true, true, 4);
}
/**
* Test to check text index creation during segment load after text index
* creation is enabled on an existing raw column.
* This will exercise the SegmentPreprocessor code path during segment load
* @throws Exception
*/
@Test
public void testEnableTextIndexOnExistingRawColumn()
throws Exception {
Set<String> textIndexColumns = new HashSet<>();
textIndexColumns.add(EXISTING_STRING_COL_RAW);
_indexLoadingConfig.setTextIndexColumns(textIndexColumns);
// Create a segment in V3, enable text index on existing column
constructV3Segment();
SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(_indexDir);
ColumnMetadata columnMetadata = segmentMetadata.getColumnMetadataFor(EXISTING_STRING_COL_RAW);
assertNotNull(columnMetadata);
checkTextIndexCreation(EXISTING_STRING_COL_RAW, 5, 3, _schema, false, false, false, 0);
// Create a segment in V1, add a new column with text index enabled
constructV1Segment();
segmentMetadata = new SegmentMetadataImpl(_indexDir);
columnMetadata = segmentMetadata.getColumnMetadataFor(EXISTING_STRING_COL_RAW);
assertNotNull(columnMetadata);
checkTextIndexCreation(EXISTING_STRING_COL_RAW, 5, 3, _schema, false, false, false, 0);
}
/**
* Test to check text index creation during segment load after text index
* creation is enabled on an existing dictionary encoded column.
* This will exercise the SegmentPreprocessor code path during segment load
* @throws Exception
*/
@Test
public void testEnableTextIndexOnExistingDictEncodedColumn()
throws Exception {
constructV3Segment();
Set<String> textIndexColumns = new HashSet<>();
textIndexColumns.add(EXISTING_STRING_COL_DICT);
_indexLoadingConfig.setTextIndexColumns(textIndexColumns);
// Create a segment in V3, enable text index on existing column
constructV3Segment();
SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(_indexDir);
ColumnMetadata columnMetadata = segmentMetadata.getColumnMetadataFor(EXISTING_STRING_COL_RAW);
assertNotNull(columnMetadata);
// SegmentPreprocessor should have created the text index using TextIndexHandler
checkTextIndexCreation(EXISTING_STRING_COL_DICT, 9, 4, _schema, false, true, false, 26);
// Create a segment in V1, add a new column with text index enabled
constructV1Segment();
segmentMetadata = new SegmentMetadataImpl(_indexDir);
columnMetadata = segmentMetadata.getColumnMetadataFor(EXISTING_STRING_COL_RAW);
assertNotNull(columnMetadata);
// SegmentPreprocessor should have created the text index using TextIndexHandler
checkTextIndexCreation(EXISTING_STRING_COL_DICT, 9, 4, _schema, false, true, false, 26);
}
private void checkFSTIndexCreation(String column, int cardinality, int bits, Schema schema, boolean isAutoGenerated,
boolean isSorted, int dictionaryElementSize)
throws Exception {
createAndValidateIndex(ColumnIndexType.FST_INDEX, column, cardinality, bits, schema, isAutoGenerated, true,
isSorted, dictionaryElementSize, true, 0, null);
}
private void checkTextIndexCreation(String column, int cardinality, int bits, Schema schema, boolean isAutoGenerated,
boolean hasDictionary, boolean isSorted, int dictionaryElementSize)
throws Exception {
createAndValidateIndex(ColumnIndexType.TEXT_INDEX, column, cardinality, bits, schema, isAutoGenerated,
hasDictionary, isSorted, dictionaryElementSize, true, 0, null);
}
private void checkTextIndexCreation(String column, int cardinality, int bits, Schema schema, boolean isAutoGenerated,
boolean hasDictionary, boolean isSorted, int dictionaryElementSize, boolean isSingleValue,
int maxNumberOfMultiValues)
throws Exception {
createAndValidateIndex(ColumnIndexType.TEXT_INDEX, column, cardinality, bits, schema, isAutoGenerated,
hasDictionary, isSorted, dictionaryElementSize, isSingleValue, maxNumberOfMultiValues, null);
}
private void checkForwardIndexCreation(String column, int cardinality, int bits, Schema schema,
boolean isAutoGenerated, boolean hasDictionary, boolean isSorted, int dictionaryElementSize,
ChunkCompressionType expectedCompressionType)
throws Exception {
createAndValidateIndex(ColumnIndexType.FORWARD_INDEX, column, cardinality, bits, schema, isAutoGenerated,
hasDictionary, isSorted, dictionaryElementSize, true, 0, expectedCompressionType);
}
private void createAndValidateIndex(ColumnIndexType indexType, String column, int cardinality, int bits,
Schema schema, boolean isAutoGenerated, boolean hasDictionary, boolean isSorted, int dictionaryElementSize,
boolean isSingleValued, int maxNumberOfMultiValues, ChunkCompressionType expectedCompressionType)
throws Exception {
try (SegmentDirectory segmentDirectory = SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader()
.load(_indexDir.toURI(),
new SegmentDirectoryLoaderContext.Builder().setSegmentDirectoryConfigs(_configuration).build());
SegmentPreProcessor processor = new SegmentPreProcessor(segmentDirectory, _indexLoadingConfig, schema)) {
processor.process();
validateIndex(indexType, column, cardinality, bits, schema, isAutoGenerated, hasDictionary, isSorted,
dictionaryElementSize, isSingleValued, maxNumberOfMultiValues, expectedCompressionType);
}
}
private void validateIndex(ColumnIndexType indexType, String column, int cardinality, int bits, Schema schema,
boolean isAutoGenerated, boolean hasDictionary, boolean isSorted, int dictionaryElementSize,
boolean isSingleValued, int maxNumberOfMultiValues, ChunkCompressionType expectedCompressionType)
throws Exception {
SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(_indexDir);
ColumnMetadata columnMetadata = segmentMetadata.getColumnMetadataFor(column);
assertEquals(columnMetadata.getFieldSpec(), new DimensionFieldSpec(column, DataType.STRING, isSingleValued));
assertEquals(columnMetadata.getCardinality(), cardinality);
assertEquals(columnMetadata.getTotalDocs(), 100000);
assertEquals(columnMetadata.getBitsPerElement(), bits);
assertEquals(columnMetadata.getColumnMaxLength(), dictionaryElementSize);
assertEquals(columnMetadata.isSorted(), isSorted);
assertEquals(columnMetadata.hasDictionary(), hasDictionary);
assertEquals(columnMetadata.getMaxNumberOfMultiValues(), maxNumberOfMultiValues);
assertEquals(columnMetadata.getTotalNumberOfEntries(), 100000);
assertEquals(columnMetadata.isAutoGenerated(), isAutoGenerated);
try (SegmentDirectory segmentDirectory1 = SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader()
.load(_indexDir.toURI(),
new SegmentDirectoryLoaderContext.Builder().setSegmentDirectoryConfigs(_configuration).build());
SegmentDirectory.Reader reader = segmentDirectory1.createReader()) {
assertTrue(reader.hasIndexFor(column, indexType));
assertTrue(reader.hasIndexFor(column, ColumnIndexType.FORWARD_INDEX));
// Check if the raw forward index compressionType is correct.
if (expectedCompressionType != null) {
try (ForwardIndexReader fwdIndexReader = LoaderUtils.getForwardIndexReader(reader, columnMetadata)) {
ChunkCompressionType compressionType = fwdIndexReader.getCompressionType();
assertTrue(compressionType.equals(expectedCompressionType), compressionType.toString());
}
File inProgressFile = new File(_indexDir, column + ".fwd.inprogress");
assertTrue(!inProgressFile.exists());
File v1FwdIndexFile = new File(_indexDir, column + V1Constants.Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION);
if (segmentMetadata.getVersion() == SegmentVersion.v3) {
assertTrue(!v1FwdIndexFile.exists());
} else {
assertTrue(v1FwdIndexFile.exists());
}
}
// if the text index is enabled on a new column with dictionary,
// then dictionary should be created by the default column handler
if (hasDictionary) {
assertTrue(reader.hasIndexFor(column, ColumnIndexType.DICTIONARY));
} else {
assertFalse(reader.hasIndexFor(column, ColumnIndexType.DICTIONARY));
}
}
}
@Test
public void testV1CreateInvertedIndices()
throws Exception {
constructV1Segment();
SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(_indexDir);
assertEquals(segmentMetadata.getVersion(), SegmentVersion.v1);
String col1FileName = COLUMN1_NAME + V1Constants.Indexes.BITMAP_INVERTED_INDEX_FILE_EXTENSION;
String col7FileName = COLUMN7_NAME + V1Constants.Indexes.BITMAP_INVERTED_INDEX_FILE_EXTENSION;
String col13FileName = COLUMN13_NAME + V1Constants.Indexes.BITMAP_INVERTED_INDEX_FILE_EXTENSION;
String badColFileName = NO_SUCH_COLUMN_NAME + V1Constants.Indexes.BITMAP_INVERTED_INDEX_FILE_EXTENSION;
File col1File = new File(_indexDir, col1FileName);
File col7File = new File(_indexDir, col7FileName);
File col13File = new File(_indexDir, col13FileName);
File badColFile = new File(_indexDir, badColFileName);
assertFalse(col1File.exists());
assertTrue(col7File.exists());
assertFalse(col13File.exists());
assertFalse(badColFile.exists());
FileTime col7LastModifiedTime = Files.getLastModifiedTime(col7File.toPath());
// Sleep 2 seconds to prevent the same last modified time when modifying the file.
Thread.sleep(2000);
// Create inverted index the first time.
checkInvertedIndexCreation(false);
assertTrue(col1File.exists());
assertTrue(col7File.exists());
assertTrue(col13File.exists());
assertFalse(badColFile.exists());
assertEquals(Files.getLastModifiedTime(col7File.toPath()), col7LastModifiedTime);
// Update inverted index file last modified time.
FileTime col1LastModifiedTime = Files.getLastModifiedTime(col1File.toPath());
FileTime col13LastModifiedTime = Files.getLastModifiedTime(col13File.toPath());
// Sleep 2 seconds to prevent the same last modified time when modifying the file.
Thread.sleep(2000);
// Create inverted index the second time.
checkInvertedIndexCreation(true);
assertTrue(col1File.exists());
assertTrue(col7File.exists());
assertTrue(col13File.exists());
assertFalse(badColFile.exists());
assertEquals(Files.getLastModifiedTime(col1File.toPath()), col1LastModifiedTime);
assertEquals(Files.getLastModifiedTime(col7File.toPath()), col7LastModifiedTime);
assertEquals(Files.getLastModifiedTime(col13File.toPath()), col13LastModifiedTime);
}
@Test
public void testV3CreateInvertedIndices()
throws Exception {
constructV3Segment();
SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(_indexDir);
assertEquals(segmentMetadata.getVersion(), SegmentVersion.v3);
File segmentDirectoryPath = SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, SegmentVersion.v3);
File singleFileIndex = new File(segmentDirectoryPath, "columns.psf");
FileTime lastModifiedTime = Files.getLastModifiedTime(singleFileIndex.toPath());
long fileSize = singleFileIndex.length();
// Sleep 2 seconds to prevent the same last modified time when modifying the file.
Thread.sleep(2000);
// Create inverted index the first time.
checkInvertedIndexCreation(false);
long addedLength = 0L;
try (SegmentDirectory segmentDirectory = SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader()
.load(_indexDir.toURI(),
new SegmentDirectoryLoaderContext.Builder().setSegmentDirectoryConfigs(_configuration).build());
SegmentDirectory.Reader reader = segmentDirectory.createReader()) {
// 8 bytes overhead is for checking integrity of the segment.
addedLength += reader.getIndexFor(COLUMN1_NAME, ColumnIndexType.INVERTED_INDEX).size() + 8;
addedLength += reader.getIndexFor(COLUMN13_NAME, ColumnIndexType.INVERTED_INDEX).size() + 8;
}
FileTime newLastModifiedTime = Files.getLastModifiedTime(singleFileIndex.toPath());
assertTrue(newLastModifiedTime.compareTo(lastModifiedTime) > 0);
long newFileSize = singleFileIndex.length();
assertEquals(fileSize + addedLength, newFileSize);
// Sleep 2 seconds to prevent the same last modified time when modifying the file.
Thread.sleep(2000);
// Create inverted index the second time.
checkInvertedIndexCreation(true);
assertEquals(Files.getLastModifiedTime(singleFileIndex.toPath()), newLastModifiedTime);
assertEquals(singleFileIndex.length(), newFileSize);
}
private void checkInvertedIndexCreation(boolean reCreate)
throws Exception {
try (SegmentDirectory segmentDirectory = SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader()
.load(_indexDir.toURI(),
new SegmentDirectoryLoaderContext.Builder().setSegmentDirectoryConfigs(_configuration).build());
SegmentDirectory.Reader reader = segmentDirectory.createReader()) {
if (reCreate) {
assertTrue(reader.hasIndexFor(COLUMN1_NAME, ColumnIndexType.INVERTED_INDEX));
assertTrue(reader.hasIndexFor(COLUMN13_NAME, ColumnIndexType.INVERTED_INDEX));
assertTrue(reader.hasIndexFor(COLUMN7_NAME, ColumnIndexType.INVERTED_INDEX));
assertFalse(reader.hasIndexFor(NO_SUCH_COLUMN_NAME, ColumnIndexType.INVERTED_INDEX));
} else {
assertFalse(reader.hasIndexFor(COLUMN1_NAME, ColumnIndexType.INVERTED_INDEX));
assertTrue(reader.hasIndexFor(COLUMN7_NAME, ColumnIndexType.INVERTED_INDEX));
assertFalse(reader.hasIndexFor(COLUMN13_NAME, ColumnIndexType.INVERTED_INDEX));
assertFalse(reader.hasIndexFor(NO_SUCH_COLUMN_NAME, ColumnIndexType.INVERTED_INDEX));
}
}
try (SegmentDirectory segmentDirectory = SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader()
.load(_indexDir.toURI(),
new SegmentDirectoryLoaderContext.Builder().setSegmentDirectoryConfigs(_configuration).build());
SegmentPreProcessor processor = new SegmentPreProcessor(segmentDirectory, _indexLoadingConfig, null)) {
processor.process();
}
try (SegmentDirectory segmentDirectory = SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader()
.load(_indexDir.toURI(),
new SegmentDirectoryLoaderContext.Builder().setSegmentDirectoryConfigs(_configuration).build());
SegmentDirectory.Reader reader = segmentDirectory.createReader()) {
assertTrue(reader.hasIndexFor(COLUMN1_NAME, ColumnIndexType.INVERTED_INDEX));
assertTrue(reader.hasIndexFor(COLUMN13_NAME, ColumnIndexType.INVERTED_INDEX));
assertTrue(reader.hasIndexFor(COLUMN7_NAME, ColumnIndexType.INVERTED_INDEX));
assertFalse(reader.hasIndexFor(NO_SUCH_COLUMN_NAME, ColumnIndexType.INVERTED_INDEX));
}
}
@Test
public void testV1UpdateDefaultColumns()
throws Exception {
constructV1Segment();
IngestionConfig ingestionConfig = new IngestionConfig();
ingestionConfig.setTransformConfigs(
Collections.singletonList(new TransformConfig(NEW_INT_SV_DIMENSION_COLUMN_NAME, "plus(column1, 1)")));
_tableConfig.setIngestionConfig(ingestionConfig);
_indexLoadingConfig.getInvertedIndexColumns().add(NEW_COLUMN_INVERTED_INDEX);
checkUpdateDefaultColumns();
// Try to use the third schema and update default value again.
// For the third schema, we changed the default value for column 'newStringMVDimension' to 'notSameLength',
// which is not the same length as before. This should be fine for segment format v1.
// We added two new columns and also removed the NEW_INT_SV_DIMENSION_COLUMN_NAME from schema.
// NEW_INT_SV_DIMENSION_COLUMN_NAME exists before processing but removed afterwards.
SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(_indexDir);
assertNotNull(segmentMetadata.getColumnMetadataFor(NEW_INT_SV_DIMENSION_COLUMN_NAME));
try (SegmentDirectory segmentDirectory = SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader()
.load(_indexDir.toURI(),
new SegmentDirectoryLoaderContext.Builder().setSegmentDirectoryConfigs(_configuration).build());
SegmentPreProcessor processor = new SegmentPreProcessor(segmentDirectory, _indexLoadingConfig,
_newColumnsSchema3)) {
processor.process();
}
segmentMetadata = new SegmentMetadataImpl(_indexDir);
assertNull(segmentMetadata.getColumnMetadataFor(NEW_INT_SV_DIMENSION_COLUMN_NAME));
ColumnMetadata hllMetricMetadata = segmentMetadata.getColumnMetadataFor(NEW_HLL_BYTE_METRIC_COLUMN_NAME);
FieldSpec expectedHllMetricFieldSpec = _newColumnsSchema3.getFieldSpecFor(NEW_HLL_BYTE_METRIC_COLUMN_NAME);
assertEquals(hllMetricMetadata.getFieldSpec(), expectedHllMetricFieldSpec);
ByteArray expectedDefaultValue = new ByteArray((byte[]) expectedHllMetricFieldSpec.getDefaultNullValue());
assertEquals(hllMetricMetadata.getMinValue(), expectedDefaultValue);
assertEquals(hllMetricMetadata.getMaxValue(), expectedDefaultValue);
ColumnMetadata tDigestMetricMetadata = segmentMetadata.getColumnMetadataFor(NEW_TDIGEST_BYTE_METRIC_COLUMN_NAME);
FieldSpec expectedTDigestMetricFieldSpec = _newColumnsSchema3.getFieldSpecFor(NEW_TDIGEST_BYTE_METRIC_COLUMN_NAME);
assertEquals(tDigestMetricMetadata.getFieldSpec(), expectedTDigestMetricFieldSpec);
expectedDefaultValue = new ByteArray((byte[]) expectedTDigestMetricFieldSpec.getDefaultNullValue());
assertEquals(tDigestMetricMetadata.getMinValue(), expectedDefaultValue);
assertEquals(tDigestMetricMetadata.getMaxValue(), expectedDefaultValue);
}
@Test
public void testV3UpdateDefaultColumns()
throws Exception {
constructV3Segment();
SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(_indexDir);
assertEquals(segmentMetadata.getVersion(), SegmentVersion.v3);
IngestionConfig ingestionConfig = new IngestionConfig();
ingestionConfig.setTransformConfigs(
Collections.singletonList(new TransformConfig(NEW_INT_SV_DIMENSION_COLUMN_NAME, "plus(column1, 1)")));
_tableConfig.setIngestionConfig(ingestionConfig);
_indexLoadingConfig.getInvertedIndexColumns().add(NEW_COLUMN_INVERTED_INDEX);
checkUpdateDefaultColumns();
// Try to use the third schema and update default value again.
// For the third schema, we changed the default value for column 'newStringMVDimension' to 'notSameLength', which
// is not the same length as before. This should be fine for segment format v3 as well.
// We added two new columns and also removed the NEW_INT_SV_DIMENSION_COLUMN_NAME from schema.
// NEW_INT_SV_DIMENSION_COLUMN_NAME exists before processing but removed afterwards.
segmentMetadata = new SegmentMetadataImpl(_indexDir);
assertNotNull(segmentMetadata.getColumnMetadataFor(NEW_INT_SV_DIMENSION_COLUMN_NAME));
try (SegmentDirectory segmentDirectory = SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader()
.load(_indexDir.toURI(),
new SegmentDirectoryLoaderContext.Builder().setSegmentDirectoryConfigs(_configuration).build());
SegmentPreProcessor processor = new SegmentPreProcessor(segmentDirectory, _indexLoadingConfig,
_newColumnsSchema3)) {
processor.process();
}
segmentMetadata = new SegmentMetadataImpl(_indexDir);
assertNull(segmentMetadata.getColumnMetadataFor(NEW_INT_SV_DIMENSION_COLUMN_NAME));
ColumnMetadata hllMetricMetadata = segmentMetadata.getColumnMetadataFor(NEW_HLL_BYTE_METRIC_COLUMN_NAME);
FieldSpec expectedHllMetricFieldSpec = _newColumnsSchema3.getFieldSpecFor(NEW_HLL_BYTE_METRIC_COLUMN_NAME);
assertEquals(hllMetricMetadata.getFieldSpec(), expectedHllMetricFieldSpec);
ByteArray expectedDefaultValue = new ByteArray((byte[]) expectedHllMetricFieldSpec.getDefaultNullValue());
assertEquals(hllMetricMetadata.getMinValue(), expectedDefaultValue);
assertEquals(hllMetricMetadata.getMaxValue(), expectedDefaultValue);
ColumnMetadata tDigestMetricMetadata = segmentMetadata.getColumnMetadataFor(NEW_TDIGEST_BYTE_METRIC_COLUMN_NAME);
FieldSpec expectedTDigestMetricFieldSpec = _newColumnsSchema3.getFieldSpecFor(NEW_TDIGEST_BYTE_METRIC_COLUMN_NAME);
assertEquals(tDigestMetricMetadata.getFieldSpec(), expectedTDigestMetricFieldSpec);
expectedDefaultValue = new ByteArray((byte[]) expectedTDigestMetricFieldSpec.getDefaultNullValue());
assertEquals(tDigestMetricMetadata.getMinValue(), expectedDefaultValue);
assertEquals(tDigestMetricMetadata.getMaxValue(), expectedDefaultValue);
}
private void checkUpdateDefaultColumns()
throws Exception {
// Update default value.
try (SegmentDirectory segmentDirectory = SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader()
.load(_indexDir.toURI(),
new SegmentDirectoryLoaderContext.Builder().setSegmentDirectoryConfigs(_configuration).build());
SegmentPreProcessor processor = new SegmentPreProcessor(segmentDirectory, _indexLoadingConfig,
_newColumnsSchema1)) {
processor.process();
}
SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(_indexDir);
// Check column metadata.
// Check all field for one column, and do necessary checks for other columns.
ColumnMetadata columnMetadata = segmentMetadata.getColumnMetadataFor(NEW_INT_METRIC_COLUMN_NAME);
assertEquals(columnMetadata.getFieldSpec(), _newColumnsSchema1.getFieldSpecFor(NEW_INT_METRIC_COLUMN_NAME));
assertEquals(columnMetadata.getCardinality(), 1);
assertEquals(columnMetadata.getTotalDocs(), 100000);
assertEquals(columnMetadata.getBitsPerElement(), 1);
assertEquals(columnMetadata.getColumnMaxLength(), 0);
assertTrue(columnMetadata.isSorted());
assertTrue(columnMetadata.hasDictionary());
assertEquals(columnMetadata.getMaxNumberOfMultiValues(), 0);
assertEquals(columnMetadata.getTotalNumberOfEntries(), 100000);
assertTrue(columnMetadata.isAutoGenerated());
assertEquals(columnMetadata.getMinValue(), 1);
assertEquals(columnMetadata.getMaxValue(), 1);
columnMetadata = segmentMetadata.getColumnMetadataFor(NEW_LONG_METRIC_COLUMN_NAME);
assertEquals(columnMetadata.getFieldSpec(), _newColumnsSchema1.getFieldSpecFor(NEW_LONG_METRIC_COLUMN_NAME));
assertEquals(columnMetadata.getMinValue(), 0L);
assertEquals(columnMetadata.getMaxValue(), 0L);
columnMetadata = segmentMetadata.getColumnMetadataFor(NEW_FLOAT_METRIC_COLUMN_NAME);
assertEquals(columnMetadata.getFieldSpec(), _newColumnsSchema1.getFieldSpecFor(NEW_FLOAT_METRIC_COLUMN_NAME));
assertEquals(columnMetadata.getMinValue(), 0f);
assertEquals(columnMetadata.getMaxValue(), 0f);
columnMetadata = segmentMetadata.getColumnMetadataFor(NEW_DOUBLE_METRIC_COLUMN_NAME);
assertEquals(columnMetadata.getFieldSpec(), _newColumnsSchema1.getFieldSpecFor(NEW_DOUBLE_METRIC_COLUMN_NAME));
assertEquals(columnMetadata.getMinValue(), 0.0);
assertEquals(columnMetadata.getMaxValue(), 0.0);
columnMetadata = segmentMetadata.getColumnMetadataFor(NEW_BOOLEAN_SV_DIMENSION_COLUMN_NAME);
assertEquals(columnMetadata.getFieldSpec(),
_newColumnsSchema1.getFieldSpecFor(NEW_BOOLEAN_SV_DIMENSION_COLUMN_NAME));
assertEquals(columnMetadata.getColumnMaxLength(), 0);
assertEquals(columnMetadata.getMinValue(), 0);
assertEquals(columnMetadata.getMaxValue(), 0);
columnMetadata = segmentMetadata.getColumnMetadataFor(NEW_STRING_MV_DIMENSION_COLUMN_NAME);
assertEquals(columnMetadata.getFieldSpec(),
_newColumnsSchema1.getFieldSpecFor(NEW_STRING_MV_DIMENSION_COLUMN_NAME));
assertEquals(columnMetadata.getColumnMaxLength(), 4);
assertFalse(columnMetadata.isSorted());
assertEquals(columnMetadata.getMaxNumberOfMultiValues(), 1);
assertEquals(columnMetadata.getTotalNumberOfEntries(), 100000);
assertEquals(columnMetadata.getMinValue(), "null");
assertEquals(columnMetadata.getMaxValue(), "null");
// Derived column
columnMetadata = segmentMetadata.getColumnMetadataFor(NEW_INT_SV_DIMENSION_COLUMN_NAME);
assertEquals(columnMetadata.getFieldSpec(), _newColumnsSchema1.getFieldSpecFor(NEW_INT_SV_DIMENSION_COLUMN_NAME));
assertTrue(columnMetadata.isAutoGenerated());
ColumnMetadata originalColumnMetadata = segmentMetadata.getColumnMetadataFor(COLUMN1_NAME);
assertEquals(columnMetadata.getCardinality(), originalColumnMetadata.getCardinality());
assertEquals(columnMetadata.getBitsPerElement(), originalColumnMetadata.getBitsPerElement());
assertEquals(columnMetadata.isSorted(), originalColumnMetadata.isSorted());
assertEquals(columnMetadata.getMinValue(), (int) originalColumnMetadata.getMinValue() + 1);
assertEquals(columnMetadata.getMaxValue(), (int) originalColumnMetadata.getMaxValue() + 1);
// Check dictionary and forward index exist.
try (SegmentDirectory segmentDirectory = SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader()
.load(_indexDir.toURI(),
new SegmentDirectoryLoaderContext.Builder().setSegmentDirectoryConfigs(_configuration).build());
SegmentDirectory.Reader reader = segmentDirectory.createReader()) {
assertTrue(reader.hasIndexFor(NEW_INT_METRIC_COLUMN_NAME, ColumnIndexType.DICTIONARY));
assertTrue(reader.hasIndexFor(NEW_INT_METRIC_COLUMN_NAME, ColumnIndexType.FORWARD_INDEX));
assertTrue(reader.hasIndexFor(NEW_LONG_METRIC_COLUMN_NAME, ColumnIndexType.DICTIONARY));
assertTrue(reader.hasIndexFor(NEW_LONG_METRIC_COLUMN_NAME, ColumnIndexType.FORWARD_INDEX));
assertTrue(reader.hasIndexFor(NEW_FLOAT_METRIC_COLUMN_NAME, ColumnIndexType.DICTIONARY));
assertTrue(reader.hasIndexFor(NEW_FLOAT_METRIC_COLUMN_NAME, ColumnIndexType.FORWARD_INDEX));
assertTrue(reader.hasIndexFor(NEW_DOUBLE_METRIC_COLUMN_NAME, ColumnIndexType.DICTIONARY));
assertTrue(reader.hasIndexFor(NEW_DOUBLE_METRIC_COLUMN_NAME, ColumnIndexType.FORWARD_INDEX));
assertTrue(reader.hasIndexFor(NEW_BOOLEAN_SV_DIMENSION_COLUMN_NAME, ColumnIndexType.DICTIONARY));
assertTrue(reader.hasIndexFor(NEW_BOOLEAN_SV_DIMENSION_COLUMN_NAME, ColumnIndexType.FORWARD_INDEX));
assertTrue(reader.hasIndexFor(NEW_INT_SV_DIMENSION_COLUMN_NAME, ColumnIndexType.DICTIONARY));
assertTrue(reader.hasIndexFor(NEW_INT_SV_DIMENSION_COLUMN_NAME, ColumnIndexType.FORWARD_INDEX));
assertTrue(reader.hasIndexFor(NEW_STRING_MV_DIMENSION_COLUMN_NAME, ColumnIndexType.DICTIONARY));
assertTrue(reader.hasIndexFor(NEW_STRING_MV_DIMENSION_COLUMN_NAME, ColumnIndexType.FORWARD_INDEX));
}
// Use the second schema and update default value again.
// For the second schema, we changed the default value for column 'newIntMetric' to 2, and added default value
// 'abcd' (keep the same length as 'null') to column 'newStringMVDimension'.
try (SegmentDirectory segmentDirectory = SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader()
.load(_indexDir.toURI(),
new SegmentDirectoryLoaderContext.Builder().setSegmentDirectoryConfigs(_configuration).build());
SegmentPreProcessor processor = new SegmentPreProcessor(segmentDirectory, _indexLoadingConfig,
_newColumnsSchema2)) {
processor.process();
}
segmentMetadata = new SegmentMetadataImpl(_indexDir);
// Check column metadata.
columnMetadata = segmentMetadata.getColumnMetadataFor(NEW_INT_METRIC_COLUMN_NAME);
assertEquals(columnMetadata.getMinValue(), 2);
assertEquals(columnMetadata.getMaxValue(), 2);
assertEquals(columnMetadata.getFieldSpec().getDefaultNullValue(), 2);
columnMetadata = segmentMetadata.getColumnMetadataFor(NEW_STRING_MV_DIMENSION_COLUMN_NAME);
assertEquals(columnMetadata.getMinValue(), "abcd");
assertEquals(columnMetadata.getMaxValue(), "abcd");
assertEquals(columnMetadata.getFieldSpec().getDefaultNullValue(), "abcd");
}
@Test
public void testColumnMinMaxValue()
throws Exception {
constructV1Segment();
// Remove min/max value from the metadata
removeMinMaxValuesFromMetadataFile(_indexDir);
IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig();
indexLoadingConfig.setColumnMinMaxValueGeneratorMode(ColumnMinMaxValueGeneratorMode.NONE);
try (SegmentDirectory segmentDirectory = SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader()
.load(_indexDir.toURI(),
new SegmentDirectoryLoaderContext.Builder().setSegmentDirectoryConfigs(_configuration).build());
SegmentPreProcessor processor = new SegmentPreProcessor(segmentDirectory, indexLoadingConfig, null)) {
processor.process();
}
SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(_indexDir);
ColumnMetadata timeColumnMetadata = segmentMetadata.getColumnMetadataFor("daysSinceEpoch");
ColumnMetadata dimensionColumnMetadata = segmentMetadata.getColumnMetadataFor("column1");
ColumnMetadata metricColumnMetadata = segmentMetadata.getColumnMetadataFor("count");
assertNull(timeColumnMetadata.getMinValue());
assertNull(timeColumnMetadata.getMaxValue());
assertNull(dimensionColumnMetadata.getMinValue());
assertNull(dimensionColumnMetadata.getMaxValue());
assertNull(metricColumnMetadata.getMinValue());
assertNull(metricColumnMetadata.getMaxValue());
indexLoadingConfig.setColumnMinMaxValueGeneratorMode(ColumnMinMaxValueGeneratorMode.TIME);
try (SegmentDirectory segmentDirectory = SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader()
.load(_indexDir.toURI(),
new SegmentDirectoryLoaderContext.Builder().setSegmentDirectoryConfigs(_configuration).build());
SegmentPreProcessor processor = new SegmentPreProcessor(segmentDirectory, indexLoadingConfig, null)) {
processor.process();
}
segmentMetadata = new SegmentMetadataImpl(_indexDir);
timeColumnMetadata = segmentMetadata.getColumnMetadataFor("daysSinceEpoch");
dimensionColumnMetadata = segmentMetadata.getColumnMetadataFor("column5");
metricColumnMetadata = segmentMetadata.getColumnMetadataFor("count");
assertEquals(timeColumnMetadata.getMinValue(), 1756015683);
assertEquals(timeColumnMetadata.getMaxValue(), 1756015683);
assertNull(dimensionColumnMetadata.getMinValue());
assertNull(dimensionColumnMetadata.getMaxValue());
assertNull(metricColumnMetadata.getMinValue());
assertNull(metricColumnMetadata.getMaxValue());
indexLoadingConfig.setColumnMinMaxValueGeneratorMode(ColumnMinMaxValueGeneratorMode.NON_METRIC);
try (SegmentDirectory segmentDirectory = SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader()
.load(_indexDir.toURI(),
new SegmentDirectoryLoaderContext.Builder().setSegmentDirectoryConfigs(_configuration).build());
SegmentPreProcessor processor = new SegmentPreProcessor(segmentDirectory, indexLoadingConfig, null)) {
processor.process();
}
segmentMetadata = new SegmentMetadataImpl(_indexDir);
timeColumnMetadata = segmentMetadata.getColumnMetadataFor("daysSinceEpoch");
dimensionColumnMetadata = segmentMetadata.getColumnMetadataFor("column5");
metricColumnMetadata = segmentMetadata.getColumnMetadataFor("count");
assertEquals(timeColumnMetadata.getMinValue(), 1756015683);
assertEquals(timeColumnMetadata.getMaxValue(), 1756015683);
assertEquals(dimensionColumnMetadata.getMinValue(), "AKXcXcIqsqOJFsdwxZ");
assertEquals(dimensionColumnMetadata.getMaxValue(), "yQkJTLOQoOqqhkAClgC");
assertNull(metricColumnMetadata.getMinValue());
assertNull(metricColumnMetadata.getMaxValue());
indexLoadingConfig.setColumnMinMaxValueGeneratorMode(ColumnMinMaxValueGeneratorMode.ALL);
try (SegmentDirectory segmentDirectory = SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader()
.load(_indexDir.toURI(),
new SegmentDirectoryLoaderContext.Builder().setSegmentDirectoryConfigs(_configuration).build());
SegmentPreProcessor processor = new SegmentPreProcessor(segmentDirectory, indexLoadingConfig, null)) {
processor.process();
}
segmentMetadata = new SegmentMetadataImpl(_indexDir);
timeColumnMetadata = segmentMetadata.getColumnMetadataFor("daysSinceEpoch");
dimensionColumnMetadata = segmentMetadata.getColumnMetadataFor("column5");
metricColumnMetadata = segmentMetadata.getColumnMetadataFor("count");
assertEquals(timeColumnMetadata.getMinValue(), 1756015683);
assertEquals(timeColumnMetadata.getMaxValue(), 1756015683);
assertEquals(dimensionColumnMetadata.getMinValue(), "AKXcXcIqsqOJFsdwxZ");
assertEquals(dimensionColumnMetadata.getMaxValue(), "yQkJTLOQoOqqhkAClgC");
assertEquals(metricColumnMetadata.getMinValue(), 890662862);
assertEquals(metricColumnMetadata.getMaxValue(), 890662862);
}
@Test
public void testV1CleanupIndices()
throws Exception {
constructV1Segment();
SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(_indexDir);
assertEquals(segmentMetadata.getVersion(), SegmentVersion.v1);
// Need to create two default columns with Bytes and JSON string for H3 and JSON index.
// Other kinds of indices can all be put on column3 with String values.
String strColumn = "column3";
_indexLoadingConfig = new IndexLoadingConfig();
_indexLoadingConfig.setInvertedIndexColumns(new HashSet<>(Collections.singletonList(strColumn)));
_indexLoadingConfig.setRangeIndexColumns(new HashSet<>(Collections.singletonList(strColumn)));
_indexLoadingConfig.setTextIndexColumns(new HashSet<>(Collections.singletonList(strColumn)));
_indexLoadingConfig.setFSTIndexColumns(new HashSet<>(Collections.singletonList(strColumn)));
_indexLoadingConfig.setBloomFilterConfigs(ImmutableMap.of(strColumn, new BloomFilterConfig(0.1, 1024, true)));
// V1 use separate file for each column index.
File iiFile = new File(_indexDir, strColumn + V1Constants.Indexes.BITMAP_INVERTED_INDEX_FILE_EXTENSION);
File rgFile = new File(_indexDir, strColumn + V1Constants.Indexes.BITMAP_RANGE_INDEX_FILE_EXTENSION);
File txtFile = new File(_indexDir, strColumn + V1Constants.Indexes.LUCENE_TEXT_INDEX_FILE_EXTENSION);
File fstFile = new File(_indexDir, strColumn + V1Constants.Indexes.FST_INDEX_FILE_EXTENSION);
File bfFile = new File(_indexDir, strColumn + V1Constants.Indexes.BLOOM_FILTER_FILE_EXTENSION);
assertFalse(iiFile.exists());
assertFalse(rgFile.exists());
assertFalse(txtFile.exists());
assertFalse(fstFile.exists());
assertFalse(bfFile.exists());
// Create all kinds of indices.
try (SegmentDirectory segmentDirectory = SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader()
.load(_indexDir.toURI(),
new SegmentDirectoryLoaderContext.Builder().setSegmentDirectoryConfigs(_configuration).build());
SegmentPreProcessor processor = new SegmentPreProcessor(segmentDirectory, _indexLoadingConfig, null)) {
processor.process();
}
assertTrue(iiFile.exists());
assertTrue(rgFile.exists());
assertTrue(txtFile.exists());
assertTrue(fstFile.exists());
assertTrue(bfFile.exists());
// Remove all kinds of indices.
try (SegmentDirectory segmentDirectory = SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader()
.load(_indexDir.toURI(),
new SegmentDirectoryLoaderContext.Builder().setSegmentDirectoryConfigs(_configuration).build());
SegmentPreProcessor processor = new SegmentPreProcessor(segmentDirectory, new IndexLoadingConfig(), null)) {
processor.process();
}
assertFalse(iiFile.exists());
assertFalse(rgFile.exists());
assertFalse(txtFile.exists());
assertFalse(fstFile.exists());
assertFalse(bfFile.exists());
}
@Test
public void testV3CleanupIndices()
throws Exception {
constructV3Segment();
SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(_indexDir);
assertEquals(segmentMetadata.getVersion(), SegmentVersion.v3);
// V3 use single file for all column indices.
File segmentDirectoryPath = SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, SegmentVersion.v3);
File singleFileIndex = new File(segmentDirectoryPath, "columns.psf");
// There are a few indices initially. Remove them to prepare an initial state.
long initFileSize = singleFileIndex.length();
try (SegmentDirectory segmentDirectory = SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader()
.load(_indexDir.toURI(),
new SegmentDirectoryLoaderContext.Builder().setSegmentDirectoryConfigs(_configuration).build());
SegmentPreProcessor processor = new SegmentPreProcessor(segmentDirectory, new IndexLoadingConfig(), null)) {
processor.process();
}
assertTrue(singleFileIndex.length() < initFileSize);
initFileSize = singleFileIndex.length();
// Need to create two default columns with Bytes and JSON string for H3 and JSON index.
// Other kinds of indices can all be put on column3 with String values.
String strColumn = "column3";
_indexLoadingConfig = new IndexLoadingConfig();
_indexLoadingConfig.setInvertedIndexColumns(new HashSet<>(Collections.singletonList(strColumn)));
_indexLoadingConfig.setRangeIndexColumns(new HashSet<>(Collections.singletonList(strColumn)));
_indexLoadingConfig.setTextIndexColumns(new HashSet<>(Collections.singletonList(strColumn)));
_indexLoadingConfig.setFSTIndexColumns(new HashSet<>(Collections.singletonList(strColumn)));
_indexLoadingConfig.setBloomFilterConfigs(ImmutableMap.of(strColumn, new BloomFilterConfig(0.1, 1024, true)));
// Create all kinds of indices.
try (SegmentDirectory segmentDirectory = SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader()
.load(_indexDir.toURI(),
new SegmentDirectoryLoaderContext.Builder().setSegmentDirectoryConfigs(_configuration).build());
SegmentPreProcessor processor = new SegmentPreProcessor(segmentDirectory, _indexLoadingConfig, null)) {
processor.process();
}
long addedLength = 0;
try (SegmentDirectory segmentDirectory = SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader()
.load(_indexDir.toURI(),
new SegmentDirectoryLoaderContext.Builder().setSegmentDirectoryConfigs(_configuration).build());
SegmentDirectory.Reader reader = segmentDirectory.createReader()) {
addedLength += reader.getIndexFor(strColumn, ColumnIndexType.INVERTED_INDEX).size() + 8;
addedLength += reader.getIndexFor(strColumn, ColumnIndexType.RANGE_INDEX).size() + 8;
addedLength += reader.getIndexFor(strColumn, ColumnIndexType.FST_INDEX).size() + 8;
addedLength += reader.getIndexFor(strColumn, ColumnIndexType.BLOOM_FILTER).size() + 8;
assertTrue(reader.hasIndexFor(strColumn, ColumnIndexType.TEXT_INDEX));
}
assertEquals(singleFileIndex.length(), initFileSize + addedLength);
// Remove all kinds of indices, and size gets back initial size.
try (SegmentDirectory segmentDirectory = SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader()
.load(_indexDir.toURI(),
new SegmentDirectoryLoaderContext.Builder().setSegmentDirectoryConfigs(_configuration).build());
SegmentPreProcessor processor = new SegmentPreProcessor(segmentDirectory, new IndexLoadingConfig(), null)) {
processor.process();
}
assertEquals(singleFileIndex.length(), initFileSize);
}
@Test
public void testV1CleanupH3AndTextIndices()
throws Exception {
constructV1Segment();
// Remove all indices and add the two derived columns for H3 and Json index.
try (SegmentDirectory segmentDirectory = SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader()
.load(_indexDir.toURI(),
new SegmentDirectoryLoaderContext.Builder().setSegmentDirectoryConfigs(_configuration).build());
SegmentPreProcessor processor = new SegmentPreProcessor(segmentDirectory, new IndexLoadingConfig(),
_newColumnsSchemaWithH3Json)) {
processor.process();
}
SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(_indexDir);
assertNotNull(segmentMetadata.getColumnMetadataFor("newH3Col"));
assertNotNull(segmentMetadata.getColumnMetadataFor("newJsonCol"));
_indexLoadingConfig = new IndexLoadingConfig();
_indexLoadingConfig.setH3IndexConfigs(
ImmutableMap.of("newH3Col", new H3IndexConfig(ImmutableMap.of("resolutions", "5"))));
_indexLoadingConfig.setJsonIndexColumns(new HashSet<>(Collections.singletonList("newJsonCol")));
// V1 use separate file for each column index.
File h3File = new File(_indexDir, "newH3Col" + V1Constants.Indexes.H3_INDEX_FILE_EXTENSION);
File jsFile = new File(_indexDir, "newJsonCol" + V1Constants.Indexes.JSON_INDEX_FILE_EXTENSION);
assertFalse(h3File.exists());
assertFalse(jsFile.exists());
// Create H3 and Json indices.
try (SegmentDirectory segmentDirectory = SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader()
.load(_indexDir.toURI(),
new SegmentDirectoryLoaderContext.Builder().setSegmentDirectoryConfigs(_configuration).build());
SegmentPreProcessor processor = new SegmentPreProcessor(segmentDirectory, _indexLoadingConfig, null)) {
processor.process();
}
assertTrue(h3File.exists());
assertTrue(jsFile.exists());
// Remove H3 and Json indices.
try (SegmentDirectory segmentDirectory = SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader()
.load(_indexDir.toURI(),
new SegmentDirectoryLoaderContext.Builder().setSegmentDirectoryConfigs(_configuration).build());
SegmentPreProcessor processor = new SegmentPreProcessor(segmentDirectory, new IndexLoadingConfig(), null)) {
processor.process();
}
assertFalse(h3File.exists());
assertFalse(jsFile.exists());
}
@Test
public void testV3CleanupH3AndTextIndices()
throws Exception {
constructV3Segment();
SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(_indexDir);
assertEquals(segmentMetadata.getVersion(), SegmentVersion.v3);
// V3 use single file for all column indices.
File segmentDirectoryPath = SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, SegmentVersion.v3);
File singleFileIndex = new File(segmentDirectoryPath, "columns.psf");
// There are a few indices initially. Remove them to prepare an initial state.
// Also use the schema with columns for H3 and Json index to add those columns.
try (SegmentDirectory segmentDirectory = SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader()
.load(_indexDir.toURI(),
new SegmentDirectoryLoaderContext.Builder().setSegmentDirectoryConfigs(_configuration).build());
SegmentPreProcessor processor = new SegmentPreProcessor(segmentDirectory, new IndexLoadingConfig(),
_newColumnsSchemaWithH3Json)) {
processor.process();
}
segmentMetadata = new SegmentMetadataImpl(_indexDir);
assertNotNull(segmentMetadata.getColumnMetadataFor("newH3Col"));
assertNotNull(segmentMetadata.getColumnMetadataFor("newJsonCol"));
long initFileSize = singleFileIndex.length();
_indexLoadingConfig = new IndexLoadingConfig();
_indexLoadingConfig.setH3IndexConfigs(
ImmutableMap.of("newH3Col", new H3IndexConfig(ImmutableMap.of("resolutions", "5"))));
_indexLoadingConfig.setJsonIndexColumns(new HashSet<>(Collections.singletonList("newJsonCol")));
// Create H3 and Json indices.
try (SegmentDirectory segmentDirectory = SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader()
.load(_indexDir.toURI(),
new SegmentDirectoryLoaderContext.Builder().setSegmentDirectoryConfigs(_configuration).build());
SegmentPreProcessor processor = new SegmentPreProcessor(segmentDirectory, _indexLoadingConfig, null)) {
processor.process();
}
long addedLength = 0;
try (SegmentDirectory segmentDirectory = SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader()
.load(_indexDir.toURI(),
new SegmentDirectoryLoaderContext.Builder().setSegmentDirectoryConfigs(_configuration).build());
SegmentDirectory.Reader reader = segmentDirectory.createReader()) {
addedLength += reader.getIndexFor("newH3Col", ColumnIndexType.H3_INDEX).size() + 8;
addedLength += reader.getIndexFor("newJsonCol", ColumnIndexType.JSON_INDEX).size() + 8;
}
assertEquals(singleFileIndex.length(), initFileSize + addedLength);
// Remove H3 and Json indices, and size gets back to initial.
try (SegmentDirectory segmentDirectory = SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader()
.load(_indexDir.toURI(),
new SegmentDirectoryLoaderContext.Builder().setSegmentDirectoryConfigs(_configuration).build());
SegmentPreProcessor processor = new SegmentPreProcessor(segmentDirectory, new IndexLoadingConfig(), null)) {
processor.process();
}
assertEquals(singleFileIndex.length(), initFileSize);
}
@Test
public void testV1IfNeedProcess()
throws Exception {
constructV1Segment();
SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(_indexDir);
assertEquals(segmentMetadata.getVersion(), SegmentVersion.v1);
testIfNeedProcess();
}
@Test
public void testV3IfNeedProcess()
throws Exception {
constructV3Segment();
SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(_indexDir);
assertEquals(segmentMetadata.getVersion(), SegmentVersion.v3);
testIfNeedProcess();
}
private void testIfNeedProcess()
throws Exception {
// There are a few indices initially. Require to remove them with an empty IndexLoadingConfig.
try (SegmentDirectory segmentDirectory = SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader()
.load(_indexDir.toURI(),
new SegmentDirectoryLoaderContext.Builder().setSegmentDirectoryConfigs(_configuration).build());
SegmentPreProcessor processor = new SegmentPreProcessor(segmentDirectory, new IndexLoadingConfig(), null)) {
assertTrue(processor.needProcess());
processor.process();
assertFalse(processor.needProcess());
}
// Require to add some default columns with new schema.
try (SegmentDirectory segmentDirectory = SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader()
.load(_indexDir.toURI(),
new SegmentDirectoryLoaderContext.Builder().setSegmentDirectoryConfigs(_configuration).build());
SegmentPreProcessor processor = new SegmentPreProcessor(segmentDirectory, new IndexLoadingConfig(),
_newColumnsSchemaWithH3Json)) {
assertTrue(processor.needProcess());
processor.process();
assertFalse(processor.needProcess());
}
// No preprocessing needed if required to add certain index on non-existing, sorted or non-dictionary column.
for (Map.Entry<String, Consumer<IndexLoadingConfig>> entry : createConfigPrepFunctionNeedNoops().entrySet()) {
String testCase = entry.getKey();
IndexLoadingConfig config = new IndexLoadingConfig();
entry.getValue().accept(config);
try (SegmentDirectory segmentDirectory = SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader()
.load(_indexDir.toURI(),
new SegmentDirectoryLoaderContext.Builder().setSegmentDirectoryConfigs(_configuration).build());
SegmentPreProcessor processor = new SegmentPreProcessor(segmentDirectory, config,
_newColumnsSchemaWithH3Json)) {
assertFalse(processor.needProcess(), testCase);
}
}
// Require to add different types of indices. Add one new index a time
// to test the index handlers separately.
IndexLoadingConfig config = new IndexLoadingConfig();
for (Map.Entry<String, Consumer<IndexLoadingConfig>> entry : createConfigPrepFunctions().entrySet()) {
String testCase = entry.getKey();
entry.getValue().accept(config);
try (SegmentDirectory segmentDirectory = SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader()
.load(_indexDir.toURI(),
new SegmentDirectoryLoaderContext.Builder().setSegmentDirectoryConfigs(_configuration).build());
SegmentPreProcessor processor = new SegmentPreProcessor(segmentDirectory, config,
_newColumnsSchemaWithH3Json)) {
assertTrue(processor.needProcess(), testCase);
processor.process();
assertFalse(processor.needProcess(), testCase);
}
}
// Require to add startree index.
IndexingConfig indexingConfig = new IndexingConfig();
indexingConfig.setEnableDynamicStarTreeCreation(true);
indexingConfig.setEnableDefaultStarTree(true);
_tableConfig.setIndexingConfig(indexingConfig);
IndexLoadingConfig configWithStarTreeIndex = new IndexLoadingConfig(null, _tableConfig);
createConfigPrepFunctions().forEach((k, v) -> v.accept(configWithStarTreeIndex));
try (SegmentDirectory segmentDirectory = SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader()
.load(_indexDir.toURI(),
new SegmentDirectoryLoaderContext.Builder().setSegmentDirectoryConfigs(_configuration).build());
SegmentPreProcessor processor = new SegmentPreProcessor(segmentDirectory, configWithStarTreeIndex,
_newColumnsSchemaWithH3Json)) {
assertTrue(processor.needProcess());
processor.process();
assertFalse(processor.needProcess());
}
// Require to update min and max values.
removeMinMaxValuesFromMetadataFile(_indexDir);
SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(_indexDir);
segmentMetadata.getColumnMetadataMap().forEach((k, v) -> {
assertNull(v.getMinValue(), "checking column: " + k);
assertNull(v.getMaxValue(), "checking column: " + k);
});
try (SegmentDirectory segmentDirectory = SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader()
.load(_indexDir.toURI(),
new SegmentDirectoryLoaderContext.Builder().setSegmentDirectoryConfigs(_configuration).build());
SegmentPreProcessor processor = new SegmentPreProcessor(segmentDirectory, configWithStarTreeIndex,
_newColumnsSchemaWithH3Json)) {
assertTrue(processor.needProcess());
processor.process();
}
segmentMetadata = new SegmentMetadataImpl(_indexDir);
segmentMetadata.getColumnMetadataMap().forEach((k, v) -> {
if (v.hasDictionary()) {
assertNotNull(v.getMinValue(), "checking column: " + k);
assertNotNull(v.getMaxValue(), "checking column: " + k);
} else {
assertNull(v.getMinValue(), "checking column: " + k);
assertNull(v.getMaxValue(), "checking column: " + k);
}
});
}
@Test
public void testNeedAddMinMaxValue()
throws Exception {
String[] stringValuesInvalid = {"A,", "B,", "C,", "D,", "E"};
String[] stringValuesValid = {"A", "B", "C", "D", "E"};
long[] longValues = {1588316400000L, 1588489200000L, 1588662000000L, 1588834800000L, 1589007600000L};
TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build();
Schema schema = new Schema.SchemaBuilder().addSingleValueDimension("stringCol", FieldSpec.DataType.STRING)
.addMetric("longCol", FieldSpec.DataType.LONG).build();
FileUtils.deleteQuietly(INDEX_DIR);
// build good segment, no needPreprocess
File segment = buildTestSegmentForMinMax(tableConfig, schema, "validSegment", stringValuesValid, longValues);
SegmentDirectory segmentDirectory = SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader()
.load(segment.toURI(),
new SegmentDirectoryLoaderContext.Builder().setSegmentDirectoryConfigs(_configuration).build());
IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig();
indexLoadingConfig.setColumnMinMaxValueGeneratorMode(ColumnMinMaxValueGeneratorMode.ALL);
SegmentPreProcessor processor = new SegmentPreProcessor(segmentDirectory, indexLoadingConfig, schema);
assertFalse(processor.needProcess());
// build bad segment, still no needPreprocess, since minMaxInvalid flag should be set
FileUtils.deleteQuietly(INDEX_DIR);
segment = buildTestSegmentForMinMax(tableConfig, schema, "invalidSegment", stringValuesInvalid, longValues);
segmentDirectory = SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader().load(segment.toURI(),
new SegmentDirectoryLoaderContext.Builder().setSegmentDirectoryConfigs(_configuration).build());
indexLoadingConfig = new IndexLoadingConfig();
indexLoadingConfig.setColumnMinMaxValueGeneratorMode(ColumnMinMaxValueGeneratorMode.NONE);
processor = new SegmentPreProcessor(segmentDirectory, indexLoadingConfig, schema);
assertFalse(processor.needProcess());
indexLoadingConfig.setColumnMinMaxValueGeneratorMode(ColumnMinMaxValueGeneratorMode.ALL);
processor = new SegmentPreProcessor(segmentDirectory, indexLoadingConfig, schema);
assertFalse(processor.needProcess());
// modify metadata, to remove min/max, now needPreprocess
removeMinMaxValuesFromMetadataFile(segment);
segmentDirectory = SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader().load(segment.toURI(),
new SegmentDirectoryLoaderContext.Builder().setSegmentDirectoryConfigs(_configuration).build());
processor = new SegmentPreProcessor(segmentDirectory, indexLoadingConfig, schema);
assertTrue(processor.needProcess());
FileUtils.deleteQuietly(INDEX_DIR);
}
private File buildTestSegmentForMinMax(final TableConfig tableConfig, final Schema schema, String segmentName,
String[] stringValues, long[] longValues)
throws Exception {
SegmentGeneratorConfig config = new SegmentGeneratorConfig(tableConfig, schema);
config.setOutDir(INDEX_DIR.getAbsolutePath());
config.setSegmentName(segmentName);
List<GenericRow> rows = new ArrayList<>(3);
for (int i = 0; i < 5; i++) {
GenericRow row = new GenericRow();
row.putValue("stringCol", stringValues[i]);
row.putValue("longCol", longValues[i]);
rows.add(row);
}
SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
driver.init(config, new GenericRowRecordReader(rows));
driver.build();
driver.getOutputDirectory().deleteOnExit();
return driver.getOutputDirectory();
}
private static void removeMinMaxValuesFromMetadataFile(File indexDir)
throws Exception {
PropertiesConfiguration configuration = SegmentMetadataImpl.getPropertiesConfiguration(indexDir);
Iterator<String> keys = configuration.getKeys();
while (keys.hasNext()) {
String key = keys.next();
if (key.endsWith(V1Constants.MetadataKeys.Column.MIN_VALUE) || key.endsWith(
V1Constants.MetadataKeys.Column.MAX_VALUE) || key.endsWith(
V1Constants.MetadataKeys.Column.MIN_MAX_VALUE_INVALID)) {
configuration.clearProperty(key);
}
}
configuration.save();
}
private static Map<String, Consumer<IndexLoadingConfig>> createConfigPrepFunctions() {
Map<String, Consumer<IndexLoadingConfig>> testCases = new HashMap<>();
testCases.put("addInvertedIndex", (IndexLoadingConfig config) -> config.setInvertedIndexColumns(
new HashSet<>(Collections.singletonList("column3"))));
testCases.put("addRangeIndex", (IndexLoadingConfig config) -> config.setRangeIndexColumns(
new HashSet<>(Collections.singletonList("column3"))));
testCases.put("addTextIndex",
(IndexLoadingConfig config) -> config.setTextIndexColumns(new HashSet<>(Collections.singletonList("column3"))));
testCases.put("addFSTIndex",
(IndexLoadingConfig config) -> config.setFSTIndexColumns(new HashSet<>(Collections.singletonList("column3"))));
testCases.put("addBloomFilter", (IndexLoadingConfig config) -> config.setBloomFilterConfigs(
ImmutableMap.of("column3", new BloomFilterConfig(0.1, 1024, true))));
testCases.put("addH3Index", (IndexLoadingConfig config) -> config.setH3IndexConfigs(
ImmutableMap.of("newH3Col", new H3IndexConfig(ImmutableMap.of("resolutions", "5")))));
testCases.put("addJsonIndex", (IndexLoadingConfig config) -> config.setJsonIndexColumns(
new HashSet<>(Collections.singletonList("newJsonCol"))));
return testCases;
}
private static Map<String, Consumer<IndexLoadingConfig>> createConfigPrepFunctionNeedNoops() {
Map<String, Consumer<IndexLoadingConfig>> testCases = new HashMap<>();
// daysSinceEpoch is a sorted column, thus inverted index and range index skip it.
testCases.put("addInvertedIndexOnSortedColumn", (IndexLoadingConfig config) -> config.setInvertedIndexColumns(
new HashSet<>(Collections.singletonList("daysSinceEpoch"))));
testCases.put("addRangeIndexOnSortedColumn", (IndexLoadingConfig config) -> config.setRangeIndexColumns(
new HashSet<>(Collections.singletonList("daysSinceEpoch"))));
// column4 is unsorted non-dictionary encoded column, so inverted index and bloom filter skip it.
// In fact, the validation logic when updating index configs already blocks this to happen.
testCases.put("addInvertedIndexOnNonDictColumn", (IndexLoadingConfig config) -> config.setInvertedIndexColumns(
new HashSet<>(Collections.singletonList("column4"))));
// No index is added on non-existing columns.
// The validation logic when updating index configs already blocks this to happen.
testCases.put("addInvertedIndexOnAbsentColumn", (IndexLoadingConfig config) -> config.setInvertedIndexColumns(
new HashSet<>(Collections.singletonList("newColumnX"))));
testCases.put("addRangeIndexOnAbsentColumn", (IndexLoadingConfig config) -> config.setRangeIndexColumns(
new HashSet<>(Collections.singletonList("newColumnX"))));
testCases.put("addTextIndexOnAbsentColumn", (IndexLoadingConfig config) -> config.setTextIndexColumns(
new HashSet<>(Collections.singletonList("newColumnX"))));
testCases.put("addFSTIndexOnAbsentColumn", (IndexLoadingConfig config) -> config.setFSTIndexColumns(
new HashSet<>(Collections.singletonList("newColumnX"))));
testCases.put("addBloomFilterOnAbsentColumn", (IndexLoadingConfig config) -> config.setBloomFilterConfigs(
ImmutableMap.of("newColumnX", new BloomFilterConfig(0.1, 1024, true))));
testCases.put("addH3IndexOnAbsentColumn", (IndexLoadingConfig config) -> config.setH3IndexConfigs(
ImmutableMap.of("newColumnX", new H3IndexConfig(ImmutableMap.of("resolutions", "5")))));
testCases.put("addJsonIndexOnAbsentColumn", (IndexLoadingConfig config) -> config.setJsonIndexColumns(
new HashSet<>(Collections.singletonList("newColumnX"))));
return testCases;
}
}