blob: 209f2040e816bab7662f66a0ec399ea583d1abab [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.segment.local.segment.index.column;
import com.google.common.base.Preconditions;
import java.io.File;
import java.io.IOException;
import java.util.Map;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.segment.local.segment.index.readers.BaseImmutableDictionary;
import org.apache.pinot.segment.local.segment.index.readers.BigDecimalDictionary;
import org.apache.pinot.segment.local.segment.index.readers.BytesDictionary;
import org.apache.pinot.segment.local.segment.index.readers.DoubleDictionary;
import org.apache.pinot.segment.local.segment.index.readers.FloatDictionary;
import org.apache.pinot.segment.local.segment.index.readers.IntDictionary;
import org.apache.pinot.segment.local.segment.index.readers.LongDictionary;
import org.apache.pinot.segment.local.segment.index.readers.NullValueVectorReaderImpl;
import org.apache.pinot.segment.local.segment.index.readers.OnHeapBigDecimalDictionary;
import org.apache.pinot.segment.local.segment.index.readers.OnHeapBytesDictionary;
import org.apache.pinot.segment.local.segment.index.readers.OnHeapDoubleDictionary;
import org.apache.pinot.segment.local.segment.index.readers.OnHeapFloatDictionary;
import org.apache.pinot.segment.local.segment.index.readers.OnHeapIntDictionary;
import org.apache.pinot.segment.local.segment.index.readers.OnHeapLongDictionary;
import org.apache.pinot.segment.local.segment.index.readers.OnHeapStringDictionary;
import org.apache.pinot.segment.local.segment.index.readers.StringDictionary;
import org.apache.pinot.segment.spi.ColumnMetadata;
import org.apache.pinot.segment.spi.index.column.ColumnIndexContainer;
import org.apache.pinot.segment.spi.index.reader.BloomFilterReader;
import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
import org.apache.pinot.segment.spi.index.reader.H3IndexReader;
import org.apache.pinot.segment.spi.index.reader.InvertedIndexReader;
import org.apache.pinot.segment.spi.index.reader.JsonIndexReader;
import org.apache.pinot.segment.spi.index.reader.NullValueVectorReader;
import org.apache.pinot.segment.spi.index.reader.RangeIndexReader;
import org.apache.pinot.segment.spi.index.reader.SortedIndexReader;
import org.apache.pinot.segment.spi.index.reader.TextIndexReader;
import org.apache.pinot.segment.spi.index.reader.provider.IndexReaderProvider;
import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
import org.apache.pinot.segment.spi.store.ColumnIndexType;
import org.apache.pinot.segment.spi.store.SegmentDirectory;
import org.apache.pinot.spi.config.table.BloomFilterConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.pinot.spi.data.FieldSpec.DataType;
public final class PhysicalColumnIndexContainer implements ColumnIndexContainer {
private static final Logger LOGGER = LoggerFactory.getLogger(PhysicalColumnIndexContainer.class);
private final ForwardIndexReader<?> _forwardIndex;
private final InvertedIndexReader<?> _invertedIndex;
private final RangeIndexReader<?> _rangeIndex;
private final TextIndexReader _textIndex;
private final TextIndexReader _fstIndex;
private final JsonIndexReader _jsonIndex;
private final H3IndexReader _h3Index;
private final BaseImmutableDictionary _dictionary;
private final BloomFilterReader _bloomFilter;
private final NullValueVectorReaderImpl _nullValueVectorReader;
public PhysicalColumnIndexContainer(SegmentDirectory.Reader segmentReader, ColumnMetadata metadata,
IndexLoadingConfig indexLoadingConfig, File segmentIndexDir, IndexReaderProvider indexReaderProvider)
throws IOException {
String columnName = metadata.getColumnName();
boolean loadInvertedIndex = indexLoadingConfig.getInvertedIndexColumns().contains(columnName);
boolean loadRangeIndex = indexLoadingConfig.getRangeIndexColumns().contains(columnName);
boolean loadTextIndex = indexLoadingConfig.getTextIndexColumns().contains(columnName);
boolean loadFSTIndex = indexLoadingConfig.getFSTIndexColumns().contains(columnName);
boolean loadJsonIndex = indexLoadingConfig.getJsonIndexColumns().contains(columnName);
boolean loadH3Index = indexLoadingConfig.getH3IndexConfigs().containsKey(columnName);
boolean loadOnHeapDictionary = indexLoadingConfig.getOnHeapDictionaryColumns().contains(columnName);
BloomFilterConfig bloomFilterConfig = indexLoadingConfig.getBloomFilterConfigs().get(columnName);
if (segmentReader.hasIndexFor(columnName, ColumnIndexType.NULLVALUE_VECTOR)) {
PinotDataBuffer nullValueVectorBuffer = segmentReader.getIndexFor(columnName, ColumnIndexType.NULLVALUE_VECTOR);
_nullValueVectorReader = new NullValueVectorReaderImpl(nullValueVectorBuffer);
} else {
_nullValueVectorReader = null;
}
if (loadTextIndex && segmentIndexDir != null) {
Preconditions.checkState(segmentReader.hasIndexFor(columnName, ColumnIndexType.TEXT_INDEX));
Map<String, Map<String, String>> columnProperties = indexLoadingConfig.getColumnProperties();
_textIndex = indexReaderProvider.newTextIndexReader(segmentIndexDir, metadata, columnProperties.get(columnName));
} else {
_textIndex = null;
}
if (loadJsonIndex) {
Preconditions.checkState(segmentReader.hasIndexFor(columnName, ColumnIndexType.JSON_INDEX));
PinotDataBuffer jsonIndexBuffer = segmentReader.getIndexFor(columnName, ColumnIndexType.JSON_INDEX);
_jsonIndex = indexReaderProvider.newJsonIndexReader(jsonIndexBuffer, metadata);
} else {
_jsonIndex = null;
}
if (loadH3Index) {
Preconditions.checkState(segmentReader.hasIndexFor(columnName, ColumnIndexType.H3_INDEX));
PinotDataBuffer h3IndexBuffer = segmentReader.getIndexFor(columnName, ColumnIndexType.H3_INDEX);
_h3Index = indexReaderProvider.newGeospatialIndexReader(h3IndexBuffer, metadata);
} else {
_h3Index = null;
}
if (bloomFilterConfig != null) {
PinotDataBuffer bloomFilterBuffer = segmentReader.getIndexFor(columnName, ColumnIndexType.BLOOM_FILTER);
_bloomFilter = indexReaderProvider.newBloomFilterReader(bloomFilterBuffer, bloomFilterConfig.isLoadOnHeap());
} else {
_bloomFilter = null;
}
if (loadRangeIndex && !metadata.isSorted()) {
PinotDataBuffer buffer = segmentReader.getIndexFor(columnName, ColumnIndexType.RANGE_INDEX);
_rangeIndex = indexReaderProvider.newRangeIndexReader(buffer, metadata);
} else {
_rangeIndex = null;
}
PinotDataBuffer fwdIndexBuffer = segmentReader.getIndexFor(columnName, ColumnIndexType.FORWARD_INDEX);
if (metadata.hasDictionary()) {
// Dictionary-based index
_dictionary = loadDictionary(segmentReader.getIndexFor(columnName, ColumnIndexType.DICTIONARY), metadata,
loadOnHeapDictionary);
if (metadata.isSingleValue()) {
// Single-value
if (metadata.isSorted()) {
// Sorted
SortedIndexReader<?> sortedIndexReader = indexReaderProvider.newSortedIndexReader(fwdIndexBuffer, metadata);
_forwardIndex = sortedIndexReader;
_invertedIndex = sortedIndexReader;
_fstIndex = null;
return;
}
}
_forwardIndex = indexReaderProvider.newForwardIndexReader(fwdIndexBuffer, metadata);
if (loadInvertedIndex) {
_invertedIndex = indexReaderProvider.newInvertedIndexReader(
segmentReader.getIndexFor(columnName, ColumnIndexType.INVERTED_INDEX), metadata);
} else {
_invertedIndex = null;
}
if (loadFSTIndex) {
PinotDataBuffer buffer = segmentReader.getIndexFor(columnName, ColumnIndexType.FST_INDEX);
_fstIndex = indexReaderProvider.newFSTIndexReader(buffer, metadata);
} else {
_fstIndex = null;
}
} else {
// Raw index
_forwardIndex = indexReaderProvider.newForwardIndexReader(fwdIndexBuffer, metadata);
_dictionary = null;
_invertedIndex = null;
_fstIndex = null;
}
}
@Override
public ForwardIndexReader<?> getForwardIndex() {
return _forwardIndex;
}
@Override
public InvertedIndexReader<?> getInvertedIndex() {
return _invertedIndex;
}
@Override
public RangeIndexReader<?> getRangeIndex() {
return _rangeIndex;
}
@Override
public TextIndexReader getTextIndex() {
return _textIndex;
}
@Override
public JsonIndexReader getJsonIndex() {
return _jsonIndex;
}
@Override
public H3IndexReader getH3Index() {
return _h3Index;
}
@Override
public BaseImmutableDictionary getDictionary() {
return _dictionary;
}
@Override
public BloomFilterReader getBloomFilter() {
return _bloomFilter;
}
@Override
public TextIndexReader getFSTIndex() {
return _fstIndex;
}
@Override
public NullValueVectorReader getNullValueVector() {
return _nullValueVectorReader;
}
//TODO: move this to a DictionaryLoader class
public static BaseImmutableDictionary loadDictionary(PinotDataBuffer dictionaryBuffer, ColumnMetadata metadata,
boolean loadOnHeap) {
DataType dataType = metadata.getDataType();
if (loadOnHeap) {
String columnName = metadata.getColumnName();
LOGGER.info("Loading on-heap dictionary for column: {}", columnName);
}
int length = metadata.getCardinality();
switch (dataType.getStoredType()) {
case INT:
return loadOnHeap ? new OnHeapIntDictionary(dictionaryBuffer, length)
: new IntDictionary(dictionaryBuffer, length);
case LONG:
return loadOnHeap ? new OnHeapLongDictionary(dictionaryBuffer, length)
: new LongDictionary(dictionaryBuffer, length);
case FLOAT:
return loadOnHeap ? new OnHeapFloatDictionary(dictionaryBuffer, length)
: new FloatDictionary(dictionaryBuffer, length);
case DOUBLE:
return loadOnHeap ? new OnHeapDoubleDictionary(dictionaryBuffer, length)
: new DoubleDictionary(dictionaryBuffer, length);
case BIG_DECIMAL:
int numBytesPerValue = metadata.getColumnMaxLength();
return loadOnHeap ? new OnHeapBigDecimalDictionary(dictionaryBuffer, length, numBytesPerValue)
: new BigDecimalDictionary(dictionaryBuffer, length, numBytesPerValue);
case STRING:
numBytesPerValue = metadata.getColumnMaxLength();
byte paddingByte = (byte) metadata.getPaddingCharacter();
return loadOnHeap ? new OnHeapStringDictionary(dictionaryBuffer, length, numBytesPerValue, paddingByte)
: new StringDictionary(dictionaryBuffer, length, numBytesPerValue, paddingByte);
case BYTES:
numBytesPerValue = metadata.getColumnMaxLength();
return loadOnHeap ? new OnHeapBytesDictionary(dictionaryBuffer, length, numBytesPerValue)
: new BytesDictionary(dictionaryBuffer, length, numBytesPerValue);
default:
throw new IllegalStateException("Unsupported data type for dictionary: " + dataType);
}
}
@Override
public void close()
throws IOException {
_forwardIndex.close();
if (_invertedIndex != null) {
_invertedIndex.close();
}
if (_rangeIndex != null) {
_rangeIndex.close();
}
if (_dictionary != null) {
_dictionary.close();
}
if (_textIndex != null) {
_textIndex.close();
}
if (_fstIndex != null) {
_fstIndex.close();
}
if (_jsonIndex != null) {
_jsonIndex.close();
}
if (_h3Index != null) {
_h3Index.close();
}
if (_bloomFilter != null) {
_bloomFilter.close();
}
}
}