blob: 12bb8b61a2576d65c7837dafaa2fafd22d51ed22 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.segment.local.segment.index.loader;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader;
import org.apache.pinot.segment.local.segment.store.SegmentLocalFSDirectory;
import org.apache.pinot.segment.spi.ColumnMetadata;
import org.apache.pinot.segment.spi.V1Constants;
import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
import org.apache.pinot.segment.spi.creator.IndexCreatorProvider;
import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
import org.apache.pinot.segment.spi.index.IndexingOverrides;
import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
import org.apache.pinot.segment.spi.store.SegmentDirectory;
import org.apache.pinot.spi.config.table.FieldConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.RecordReader;
import org.apache.pinot.spi.utils.ReadMode;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import static org.testng.Assert.assertEquals;
public class ForwardIndexHandlerTest {
private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), "ForwardIndexHandlerTest");
private static final String TABLE_NAME = "myTable";
private static final String SEGMENT_NAME = "testSegment";
// TODO:
// 1. Add other datatypes (double, float, bigdecimal, bytes). Also add MV columns.
// 2. Add text index and other index types for raw columns.
private static final String DIM_SNAPPY_STRING = "DIM_SNAPPY_STRING";
private static final String DIM_PASS_THROUGH_STRING = "DIM_PASS_THROUGH_STRING";
private static final String DIM_ZSTANDARD_STRING = "DIM_ZSTANDARD_STRING";
private static final String DIM_LZ4_STRING = "DIM_LZ4_STRING";
private static final String DIM_SNAPPY_LONG = "DIM_SNAPPY_LONG";
private static final String DIM_PASS_THROUGH_LONG = "DIM_PASS_THROUGH_LONG";
private static final String DIM_ZSTANDARD_LONG = "DIM_ZSTANDARD_LONG";
private static final String DIM_LZ4_LONG = "DIM_LZ4_LONG";
private static final String DIM_SNAPPY_INTEGER = "DIM_SNAPPY_INTEGER";
private static final String DIM_PASS_THROUGH_INTEGER = "DIM_PASS_THROUGH_INTEGER";
private static final String DIM_ZSTANDARD_INTEGER = "DIM_ZSTANDARD_INTEGER";
private static final String DIM_LZ4_INTEGER = "DIM_LZ4_INTEGER";
private static final String DIM_DICT_INTEGER = "DIM_DICT_INTEGER";
private static final String DIM_DICT_STRING = "DIM_DICT_STRING";
private static final String DIM_DICT_LONG = "DIM_DICT_LONG";
private static final String METRIC_PASSTHROUGH_INTEGER = "METRIC_PASSTHROUGH_INTEGER";
private static final String METRIC_SNAPPY_INTEGER = "METRIC_SNAPPY_INTEGER";
private static final String METRIC_ZSTANDARD_INTEGER = "METRIC_ZSTANDARD_INTEGER";
private static final String METRIC_LZ4_INTEGER = "METRIC_LZ4_INTEGER";
private static final List<String> RAW_SNAPPY_INDEX_COLUMNS =
Arrays.asList(DIM_SNAPPY_STRING, DIM_SNAPPY_LONG, DIM_SNAPPY_INTEGER, METRIC_SNAPPY_INTEGER);
private static final List<String> RAW_ZSTANDARD_INDEX_COLUMNS =
Arrays.asList(DIM_ZSTANDARD_STRING, DIM_ZSTANDARD_LONG, DIM_ZSTANDARD_INTEGER, METRIC_ZSTANDARD_INTEGER);
private static final List<String> RAW_PASS_THROUGH_INDEX_COLUMNS =
Arrays.asList(DIM_PASS_THROUGH_STRING, DIM_PASS_THROUGH_LONG, DIM_PASS_THROUGH_INTEGER,
METRIC_PASSTHROUGH_INTEGER);
private static final List<String> RAW_LZ4_INDEX_COLUMNS =
Arrays.asList(DIM_LZ4_STRING, DIM_LZ4_LONG, DIM_LZ4_INTEGER, METRIC_LZ4_INTEGER);
private final List<String> _noDictionaryColumns = new ArrayList<>();
TableConfig _tableConfig;
Schema _schema;
File _segmentDirectory;
private List<FieldConfig.CompressionCodec> _allCompressionTypes =
Arrays.asList(FieldConfig.CompressionCodec.values());
@BeforeClass
public void setUp()
throws Exception {
// Delete index directly if it already exists.
FileUtils.deleteQuietly(INDEX_DIR);
buildSegment();
}
private void buildSegment()
throws Exception {
List<GenericRow> rows = createTestData();
List<FieldConfig> fieldConfigs = new ArrayList<>(
RAW_SNAPPY_INDEX_COLUMNS.size() + RAW_ZSTANDARD_INDEX_COLUMNS.size() + RAW_PASS_THROUGH_INDEX_COLUMNS.size()
+ RAW_LZ4_INDEX_COLUMNS.size());
for (String indexColumn : RAW_SNAPPY_INDEX_COLUMNS) {
fieldConfigs.add(new FieldConfig(indexColumn, FieldConfig.EncodingType.RAW, Collections.emptyList(),
FieldConfig.CompressionCodec.SNAPPY, null));
}
for (String indexColumn : RAW_ZSTANDARD_INDEX_COLUMNS) {
fieldConfigs.add(new FieldConfig(indexColumn, FieldConfig.EncodingType.RAW, Collections.emptyList(),
FieldConfig.CompressionCodec.ZSTANDARD, null));
}
for (String indexColumn : RAW_PASS_THROUGH_INDEX_COLUMNS) {
fieldConfigs.add(new FieldConfig(indexColumn, FieldConfig.EncodingType.RAW, Collections.emptyList(),
FieldConfig.CompressionCodec.PASS_THROUGH, null));
}
for (String indexColumn : RAW_LZ4_INDEX_COLUMNS) {
fieldConfigs.add(new FieldConfig(indexColumn, FieldConfig.EncodingType.RAW, Collections.emptyList(),
FieldConfig.CompressionCodec.LZ4, null));
}
_noDictionaryColumns.addAll(RAW_SNAPPY_INDEX_COLUMNS);
_noDictionaryColumns.addAll(RAW_ZSTANDARD_INDEX_COLUMNS);
_noDictionaryColumns.addAll(RAW_PASS_THROUGH_INDEX_COLUMNS);
_noDictionaryColumns.addAll(RAW_LZ4_INDEX_COLUMNS);
_tableConfig =
new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setNoDictionaryColumns(_noDictionaryColumns)
.setFieldConfigList(fieldConfigs).build();
_schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
.addSingleValueDimension(DIM_SNAPPY_STRING, FieldSpec.DataType.STRING)
.addSingleValueDimension(DIM_PASS_THROUGH_STRING, FieldSpec.DataType.STRING)
.addSingleValueDimension(DIM_ZSTANDARD_STRING, FieldSpec.DataType.STRING)
.addSingleValueDimension(DIM_LZ4_STRING, FieldSpec.DataType.STRING)
.addSingleValueDimension(DIM_SNAPPY_INTEGER, FieldSpec.DataType.INT)
.addSingleValueDimension(DIM_ZSTANDARD_INTEGER, FieldSpec.DataType.INT)
.addSingleValueDimension(DIM_PASS_THROUGH_INTEGER, FieldSpec.DataType.INT)
.addSingleValueDimension(DIM_LZ4_INTEGER, FieldSpec.DataType.INT)
.addSingleValueDimension(DIM_SNAPPY_LONG, FieldSpec.DataType.LONG)
.addSingleValueDimension(DIM_ZSTANDARD_LONG, FieldSpec.DataType.LONG)
.addSingleValueDimension(DIM_PASS_THROUGH_LONG, FieldSpec.DataType.LONG)
.addSingleValueDimension(DIM_LZ4_LONG, FieldSpec.DataType.LONG)
.addSingleValueDimension(DIM_DICT_INTEGER, FieldSpec.DataType.INT)
.addSingleValueDimension(DIM_DICT_LONG, FieldSpec.DataType.LONG)
.addSingleValueDimension(DIM_DICT_STRING, FieldSpec.DataType.STRING)
.addMetric(METRIC_PASSTHROUGH_INTEGER, FieldSpec.DataType.INT)
.addMetric(METRIC_SNAPPY_INTEGER, FieldSpec.DataType.INT).addMetric(METRIC_LZ4_INTEGER, FieldSpec.DataType.INT)
.addMetric(METRIC_ZSTANDARD_INTEGER, FieldSpec.DataType.INT)
.build();
SegmentGeneratorConfig config = new SegmentGeneratorConfig(_tableConfig, _schema);
config.setOutDir(INDEX_DIR.getPath());
config.setTableName(TABLE_NAME);
config.setSegmentName(SEGMENT_NAME);
SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
try (RecordReader recordReader = new GenericRowRecordReader(rows)) {
driver.init(config, recordReader);
driver.build();
}
_segmentDirectory = new File(INDEX_DIR, driver.getSegmentName());
}
private List<GenericRow> createTestData() {
List<GenericRow> rows = new ArrayList<>();
//Generate random data
int rowLength = 1000;
Random random = new Random();
String[] tempStringRows = new String[rowLength];
Integer[] tempIntRows = new Integer[rowLength];
Long[] tempLongRows = new Long[rowLength];
for (int i = 0; i < rowLength; i++) {
//Adding a fixed value to check for filter queries
if (i % 10 == 0) {
tempStringRows[i] = "testRow";
tempIntRows[i] = 1001;
tempLongRows[i] = 1001L;
} else {
tempStringRows[i] = "n" + i;
tempIntRows[i] = i;
tempLongRows[i] = (long) i;
}
}
for (int i = 0; i < rowLength; i++) {
GenericRow row = new GenericRow();
// Raw String columns
row.putValue(DIM_SNAPPY_STRING, tempStringRows[i]);
row.putValue(DIM_ZSTANDARD_STRING, tempStringRows[i]);
row.putValue(DIM_PASS_THROUGH_STRING, tempStringRows[i]);
row.putValue(DIM_LZ4_STRING, tempStringRows[i]);
// Raw integer columns
row.putValue(DIM_SNAPPY_INTEGER, tempIntRows[i]);
row.putValue(DIM_ZSTANDARD_INTEGER, tempIntRows[i]);
row.putValue(DIM_PASS_THROUGH_INTEGER, tempIntRows[i]);
row.putValue(DIM_LZ4_INTEGER, tempIntRows[i]);
row.putValue(METRIC_LZ4_INTEGER, tempIntRows[i]);
row.putValue(METRIC_PASSTHROUGH_INTEGER, tempIntRows[i]);
row.putValue(METRIC_ZSTANDARD_INTEGER, tempIntRows[i]);
row.putValue(METRIC_SNAPPY_INTEGER, tempIntRows[i]);
// Raw long columns
row.putValue(DIM_SNAPPY_LONG, tempLongRows[i]);
row.putValue(DIM_ZSTANDARD_LONG, tempLongRows[i]);
row.putValue(DIM_PASS_THROUGH_LONG, tempLongRows[i]);
row.putValue(DIM_LZ4_LONG, tempLongRows[i]);
// Dictionary columns
row.putValue(DIM_DICT_INTEGER, tempIntRows[i]);
row.putValue(DIM_DICT_LONG, tempLongRows[i]);
row.putValue(DIM_DICT_STRING, tempStringRows[i]);
rows.add(row);
}
return rows;
}
@Test
public void testComputeOperation()
throws Exception {
// Setup
SegmentMetadataImpl existingSegmentMetadata = new SegmentMetadataImpl(_segmentDirectory);
SegmentDirectory segmentLocalFSDirectory =
new SegmentLocalFSDirectory(_segmentDirectory, existingSegmentMetadata, ReadMode.mmap);
SegmentDirectory.Writer writer = segmentLocalFSDirectory.createWriter();
// TEST1 : Validate with zero changes. ForwardIndexHandler should be a No-Op.
IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig(null, _tableConfig);
ForwardIndexHandler fwdIndexHandler = new ForwardIndexHandler(existingSegmentMetadata, indexLoadingConfig);
Map<String, ForwardIndexHandler.Operation> operationMap = new HashMap<>();
operationMap = fwdIndexHandler.computeOperation(writer);
assertEquals(operationMap, Collections.EMPTY_MAP);
// TEST2: Enable dictionary for a RAW_ZSTANDARD_INDEX_COLUMN. ForwardIndexHandler should be a No-Op.
indexLoadingConfig = new IndexLoadingConfig(null, _tableConfig);
indexLoadingConfig.getNoDictionaryColumns().remove(DIM_ZSTANDARD_STRING);
fwdIndexHandler = new ForwardIndexHandler(existingSegmentMetadata, indexLoadingConfig);
operationMap = fwdIndexHandler.computeOperation(writer);
assertEquals(operationMap, Collections.EMPTY_MAP);
// TEST3: Disable dictionary. ForwardIndexHandler should be a No-Op.
indexLoadingConfig = new IndexLoadingConfig(null, _tableConfig);
indexLoadingConfig.getNoDictionaryColumns().add(DIM_DICT_INTEGER);
fwdIndexHandler = new ForwardIndexHandler(existingSegmentMetadata, indexLoadingConfig);
operationMap = fwdIndexHandler.computeOperation(writer);
assertEquals(operationMap, Collections.EMPTY_MAP);
// TEST4: Add random index. ForwardIndexHandler should be a No-Op.
indexLoadingConfig = new IndexLoadingConfig(null, _tableConfig);
indexLoadingConfig.getTextIndexColumns().add(DIM_DICT_INTEGER);
indexLoadingConfig.getTextIndexColumns().add(DIM_LZ4_INTEGER);
fwdIndexHandler = new ForwardIndexHandler(existingSegmentMetadata, indexLoadingConfig);
operationMap = fwdIndexHandler.computeOperation(writer);
assertEquals(operationMap, Collections.EMPTY_MAP);
// TEST5: Change compression
// Create new tableConfig with the modified fieldConfigs.
List<FieldConfig> fieldConfigs = new ArrayList<>(_tableConfig.getFieldConfigList());
FieldConfig config = fieldConfigs.remove(0);
FieldConfig newConfig = new FieldConfig(config.getName(), FieldConfig.EncodingType.RAW, Collections.emptyList(),
FieldConfig.CompressionCodec.ZSTANDARD, null);
fieldConfigs.add(newConfig);
TableConfig tableConfig =
new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setNoDictionaryColumns(_noDictionaryColumns)
.setFieldConfigList(fieldConfigs).build();
tableConfig.setFieldConfigList(fieldConfigs);
indexLoadingConfig = new IndexLoadingConfig(null, tableConfig);
fwdIndexHandler = new ForwardIndexHandler(existingSegmentMetadata, indexLoadingConfig);
operationMap = fwdIndexHandler.computeOperation(writer);
assertEquals(operationMap.size(), 1);
assertEquals(operationMap.get(config.getName()), ForwardIndexHandler.Operation.CHANGE_RAW_INDEX_COMPRESSION_TYPE);
// TEST6: Change compression and add index. Change compressionType for more than 1 column.
fieldConfigs = new ArrayList<>(_tableConfig.getFieldConfigList());
FieldConfig config1 = fieldConfigs.remove(0);
FieldConfig config2 = fieldConfigs.remove(1);
FieldConfig newConfig1 = new FieldConfig(config1.getName(), FieldConfig.EncodingType.RAW, Collections.emptyList(),
FieldConfig.CompressionCodec.ZSTANDARD, null);
fieldConfigs.add(newConfig1);
FieldConfig newConfig2 = new FieldConfig(config2.getName(), FieldConfig.EncodingType.RAW, Collections.emptyList(),
FieldConfig.CompressionCodec.ZSTANDARD, null);
fieldConfigs.add(newConfig2);
tableConfig =
new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setNoDictionaryColumns(_noDictionaryColumns)
.setFieldConfigList(fieldConfigs).build();
tableConfig.setFieldConfigList(fieldConfigs);
indexLoadingConfig = new IndexLoadingConfig(null, tableConfig);
indexLoadingConfig.getTextIndexColumns().add(config1.getName());
indexLoadingConfig.getInvertedIndexColumns().add(config1.getName());
fwdIndexHandler = new ForwardIndexHandler(existingSegmentMetadata, indexLoadingConfig);
operationMap = fwdIndexHandler.computeOperation(writer);
assertEquals(operationMap.size(), 2);
assertEquals(operationMap.get(config1.getName()), ForwardIndexHandler.Operation.CHANGE_RAW_INDEX_COMPRESSION_TYPE);
assertEquals(operationMap.get(config2.getName()), ForwardIndexHandler.Operation.CHANGE_RAW_INDEX_COMPRESSION_TYPE);
// Tear down
segmentLocalFSDirectory.close();
}
@Test
public void testRewriteRawForwardIndexForSingleColumn()
throws Exception {
for (int i = 0; i < _noDictionaryColumns.size(); i++) {
// For every noDictionaryColumn, change the compressionType to all available types, one by one.
for (FieldConfig.CompressionCodec compressionType : _allCompressionTypes) {
// Setup
SegmentMetadataImpl existingSegmentMetadata = new SegmentMetadataImpl(_segmentDirectory);
SegmentDirectory segmentLocalFSDirectory =
new SegmentLocalFSDirectory(_segmentDirectory, existingSegmentMetadata, ReadMode.mmap);
SegmentDirectory.Writer writer = segmentLocalFSDirectory.createWriter();
List<FieldConfig> fieldConfigs = new ArrayList<>(_tableConfig.getFieldConfigList());
FieldConfig config = fieldConfigs.remove(i);
String columnName = config.getName();
FieldConfig.CompressionCodec newCompressionType = compressionType;
FieldConfig newConfig =
new FieldConfig(columnName, FieldConfig.EncodingType.RAW, Collections.emptyList(), newCompressionType,
null);
fieldConfigs.add(newConfig);
TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
.setNoDictionaryColumns(_noDictionaryColumns).setFieldConfigList(fieldConfigs).build();
tableConfig.setFieldConfigList(fieldConfigs);
IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig(null, tableConfig);
IndexCreatorProvider indexCreatorProvider = IndexingOverrides.getIndexCreatorProvider();
ForwardIndexHandler fwdIndexHandler = new ForwardIndexHandler(existingSegmentMetadata, indexLoadingConfig);
fwdIndexHandler.updateIndices(writer, indexCreatorProvider);
// Tear down before validation. Because columns.psf and index map cleanup happens at segmentDirectory.close()
segmentLocalFSDirectory.close();
// Validation
validateIndexMap();
validateForwardIndex(columnName, newCompressionType);
}
}
}
@Test
public void testRewriteRawForwardIndexForMultipleColumns()
throws Exception {
// Setup
SegmentMetadataImpl existingSegmentMetadata = new SegmentMetadataImpl(_segmentDirectory);
SegmentDirectory segmentLocalFSDirectory =
new SegmentLocalFSDirectory(_segmentDirectory, existingSegmentMetadata, ReadMode.mmap);
SegmentDirectory.Writer writer = segmentLocalFSDirectory.createWriter();
List<FieldConfig> fieldConfigs = new ArrayList<>(_tableConfig.getFieldConfigList());
Random rand = new Random();
int randomIdx = rand.nextInt(_allCompressionTypes.size());
FieldConfig.CompressionCodec newCompressionType = _allCompressionTypes.get(randomIdx);
// Column 1
randomIdx = rand.nextInt(fieldConfigs.size());
FieldConfig config1 = fieldConfigs.remove(randomIdx);
String column1 = config1.getName();
FieldConfig newConfig1 =
new FieldConfig(column1, FieldConfig.EncodingType.RAW, Collections.emptyList(), newCompressionType, null);
fieldConfigs.add(newConfig1);
// Column 2
randomIdx = rand.nextInt(fieldConfigs.size());
FieldConfig config2 = fieldConfigs.remove(randomIdx);
String column2 = config2.getName();
FieldConfig newConfig2 =
new FieldConfig(column2, FieldConfig.EncodingType.RAW, Collections.emptyList(), newCompressionType, null);
fieldConfigs.add(newConfig2);
TableConfig tableConfig =
new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setNoDictionaryColumns(_noDictionaryColumns)
.setFieldConfigList(fieldConfigs).build();
tableConfig.setFieldConfigList(fieldConfigs);
IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig(null, tableConfig);
IndexCreatorProvider indexCreatorProvider = IndexingOverrides.getIndexCreatorProvider();
ForwardIndexHandler fwdIndexHandler = new ForwardIndexHandler(existingSegmentMetadata, indexLoadingConfig);
fwdIndexHandler.updateIndices(writer, indexCreatorProvider);
// Tear down before validation. Because columns.psf and index map cleanup happens at segmentDirectory.close()
segmentLocalFSDirectory.close();
validateIndexMap();
validateForwardIndex(column1, newCompressionType);
validateForwardIndex(column2, newCompressionType);
}
private void validateForwardIndex(String columnName, FieldConfig.CompressionCodec expectedCompressionType)
throws IOException {
// Setup
SegmentMetadataImpl existingSegmentMetadata = new SegmentMetadataImpl(_segmentDirectory);
SegmentDirectory segmentLocalFSDirectory =
new SegmentLocalFSDirectory(_segmentDirectory, existingSegmentMetadata, ReadMode.mmap);
SegmentDirectory.Writer writer = segmentLocalFSDirectory.createWriter();
ColumnMetadata columnMetadata = existingSegmentMetadata.getColumnMetadataFor(columnName);
// Check Compression type in header
ForwardIndexReader fwdIndexReader = LoaderUtils.getForwardIndexReader(writer, columnMetadata);
ChunkCompressionType fwdIndexCompressionType = fwdIndexReader.getCompressionType();
assertEquals(fwdIndexCompressionType.name(), expectedCompressionType.name());
try (ForwardIndexReader forwardIndexReader = LoaderUtils.getForwardIndexReader(writer, columnMetadata)) {
PinotSegmentColumnReader columnReader =
new PinotSegmentColumnReader(forwardIndexReader, null, null, columnMetadata.getMaxNumberOfMultiValues());
for (int rowIdx = 0; rowIdx < columnMetadata.getTotalDocs(); rowIdx++) {
if (rowIdx % 10 == 0) {
Object val = columnReader.getValue(rowIdx);
FieldSpec.DataType dataType = forwardIndexReader.getStoredType();
if (dataType == FieldSpec.DataType.STRING) {
assertEquals((String) val, "testRow");
} else if (dataType == FieldSpec.DataType.INT) {
assertEquals((int) val, 1001, columnName + " " + rowIdx + " " + expectedCompressionType);
} else if (dataType == FieldSpec.DataType.LONG) {
assertEquals((long) val, 1001L, columnName + " " + rowIdx + " " + expectedCompressionType);
}
}
}
}
}
private void validateIndexMap()
throws IOException {
// Panic validation to make sure all columns have only one forward index entry in index map.
for (String columnName : _noDictionaryColumns) {
String segmentDir = INDEX_DIR + "/" + SEGMENT_NAME + "/v3";
File idxMapFile = new File(segmentDir, V1Constants.INDEX_MAP_FILE_NAME);
String indexMapStr = FileUtils.readFileToString(idxMapFile, StandardCharsets.UTF_8);
assertEquals(StringUtils.countMatches(indexMapStr, columnName + ".forward_index" + ".startOffset"), 1);
assertEquals(StringUtils.countMatches(indexMapStr, columnName + ".forward_index" + ".size"), 1);
}
}
}