blob: b3a2fed4118365847998890f65579665729a4856 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package org.apache.pinot.core.indexsegment.mutable;
import it.unimi.dsi.fastutil.ints.IntArrays;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.Nullable;
import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.core.common.DataSource;
import org.apache.pinot.core.indexsegment.IndexSegmentUtils;
import org.apache.pinot.core.realtime.impl.RealtimeSegmentConfig;
import org.apache.pinot.core.realtime.impl.RealtimeSegmentStatsHistory;
import org.apache.pinot.core.realtime.impl.dictionary.BaseMutableDictionary;
import org.apache.pinot.core.realtime.impl.dictionary.BaseOffHeapMutableDictionary;
import org.apache.pinot.core.realtime.impl.dictionary.MutableDictionaryFactory;
import org.apache.pinot.core.realtime.impl.invertedindex.RealtimeInvertedIndexReader;
import org.apache.pinot.core.realtime.impl.invertedindex.RealtimeLuceneIndexRefreshState;
import org.apache.pinot.core.realtime.impl.invertedindex.RealtimeLuceneIndexRefreshState.RealtimeLuceneReaders;
import org.apache.pinot.core.realtime.impl.invertedindex.RealtimeLuceneTextIndexReader;
import org.apache.pinot.core.realtime.impl.nullvalue.RealtimeNullValueVectorReaderWriter;
import org.apache.pinot.core.segment.creator.impl.V1Constants;
import org.apache.pinot.core.segment.index.datasource.ImmutableDataSource;
import org.apache.pinot.core.segment.index.datasource.MutableDataSource;
import org.apache.pinot.core.segment.index.metadata.SegmentMetadata;
import org.apache.pinot.core.segment.index.metadata.SegmentMetadataImpl;
import org.apache.pinot.core.segment.index.readers.BloomFilterReader;
import org.apache.pinot.core.segment.index.readers.InvertedIndexReader;
import org.apache.pinot.core.segment.index.readers.NullValueVectorReader;
import org.apache.pinot.core.segment.virtualcolumn.VirtualColumnContext;
import org.apache.pinot.core.segment.virtualcolumn.VirtualColumnProvider;
import org.apache.pinot.core.segment.virtualcolumn.VirtualColumnProviderFactory;
import org.apache.pinot.core.startree.v2.StarTreeV2;
import org.apache.pinot.core.util.FixedIntArray;
import org.apache.pinot.core.util.FixedIntArrayOffHeapIdMap;
import org.apache.pinot.core.util.IdMap;
import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
import org.roaringbitmap.IntIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MutableSegmentImpl implements MutableSegment {
// For multi-valued column, forward-index.
// Maximum number of multi-values per row. We assert on this.
private static final int MAX_MULTI_VALUES_PER_ROW = 1000;
private static final String RECORD_ID_MAP = "__recordIdMap__";
private static final int EXPECTED_COMPRESSION = 1000;
private static final int MIN_ROWS_TO_INDEX = 1000_000; // Min size of recordIdMap for updatable metrics.
private static final int MIN_RECORD_ID_MAP_CACHE_SIZE = 10000; // Min overflow map size for updatable metrics.
private final Logger _logger;
private final long _startTimeMillis = System.currentTimeMillis();
private final String _segmentName;
private final Schema _schema;
private final String _timeColumnName;
private final int _capacity;
private final SegmentMetadata _segmentMetadata;
private final boolean _offHeap;
private final PinotDataBufferMemoryManager _memoryManager;
private final RealtimeSegmentStatsHistory _statsHistory;
private final String _partitionColumn;
private final PartitionFunction _partitionFunction;
private final int _partitionId;
private final boolean _nullHandlingEnabled;
// TODO: Keep one map to store all these info
private final Map<String, NumValuesInfo> _numValuesInfoMap = new HashMap<>();
private final Map<String, BaseMutableDictionary> _dictionaryMap = new HashMap<>();
private final Map<String, DataFileReader> _indexReaderWriterMap = new HashMap<>();
private final Map<String, InvertedIndexReader> _invertedIndexMap = new HashMap<>();
private final Map<String, InvertedIndexReader> _rangeIndexMap = new HashMap<>();
private final Map<String, BloomFilterReader> _bloomFilterMap = new HashMap<>();
private final Map<String, RealtimeNullValueVectorReaderWriter> _nullValueVectorMap = new HashMap<>();
private final IdMap<FixedIntArray> _recordIdMap;
private boolean _aggregateMetrics;
private volatile int _numDocsIndexed = 0;
// to compute the rolling interval
private volatile long _minTime = Long.MAX_VALUE;
private volatile long _maxTime = Long.MIN_VALUE;
private final int _numKeyColumns;
// Cache the physical (non-virtual) field specs
private final Collection<FieldSpec> _physicalFieldSpecs;
private final Collection<DimensionFieldSpec> _physicalDimensionFieldSpecs;
private final Collection<MetricFieldSpec> _physicalMetricFieldSpecs;
private final Collection<String> _physicalTimeColumnNames;
// default message metadata
private volatile long _lastIndexedTimeMs = Long.MIN_VALUE;
private volatile long _latestIngestionTimeMs = Long.MIN_VALUE;
private RealtimeLuceneReaders _realtimeLuceneReaders;
// If the table schema is changed before the consuming segment is committed, newly added columns would appear in _newlyAddedColumnsFieldMap.
private final Map<String, FieldSpec> _newlyAddedColumnsFieldMap = new ConcurrentHashMap();
private final Map<String, FieldSpec> _newlyAddedPhysicalColumnsFieldMap = new ConcurrentHashMap();
public MutableSegmentImpl(RealtimeSegmentConfig config) {
_segmentName = config.getSegmentName();
_schema = config.getSchema();
_timeColumnName = config.getTimeColumnName();
_capacity = config.getCapacity();
_segmentMetadata = new SegmentMetadataImpl(config.getRealtimeSegmentZKMetadata(), _schema) {
public int getTotalDocs() {
return _numDocsIndexed;
public long getLastIndexedTimestamp() {
return _lastIndexedTimeMs;
public long getLatestIngestionTimestamp() {
return _latestIngestionTimeMs;
_offHeap = config.isOffHeap();
_memoryManager = config.getMemoryManager();
_statsHistory = config.getStatsHistory();
_partitionColumn = config.getPartitionColumn();
_partitionFunction = config.getPartitionFunction();
_partitionId = config.getPartitionId();
_nullHandlingEnabled = config.isNullHandlingEnabled();
_aggregateMetrics = config.aggregateMetrics();
Collection<FieldSpec> allFieldSpecs = _schema.getAllFieldSpecs();
List<FieldSpec> physicalFieldSpecs = new ArrayList<>(allFieldSpecs.size());
List<DimensionFieldSpec> physicalDimensionFieldSpecs = new ArrayList<>(_schema.getDimensionNames().size());
List<MetricFieldSpec> physicalMetricFieldSpecs = new ArrayList<>(_schema.getMetricNames().size());
List<String> physicalTimeColumnNames = new ArrayList<>();
for (FieldSpec fieldSpec : allFieldSpecs) {
if (!fieldSpec.isVirtualColumn()) {
FieldSpec.FieldType fieldType = fieldSpec.getFieldType();
if (fieldType == FieldSpec.FieldType.DIMENSION) {
physicalDimensionFieldSpecs.add((DimensionFieldSpec) fieldSpec);
} else if (fieldType == FieldSpec.FieldType.METRIC) {
physicalMetricFieldSpecs.add((MetricFieldSpec) fieldSpec);
} else if (fieldType == FieldSpec.FieldType.DATE_TIME || fieldType == FieldSpec.FieldType.TIME) {
_physicalFieldSpecs = Collections.unmodifiableCollection(physicalFieldSpecs);
_physicalDimensionFieldSpecs = Collections.unmodifiableCollection(physicalDimensionFieldSpecs);
_physicalMetricFieldSpecs = Collections.unmodifiableCollection(physicalMetricFieldSpecs);
_physicalTimeColumnNames = Collections.unmodifiableCollection(physicalTimeColumnNames);
_numKeyColumns = _physicalDimensionFieldSpecs.size() + _physicalTimeColumnNames.size();
_logger =
LoggerFactory.getLogger(MutableSegmentImpl.class.getName() + "_" + _segmentName + "_" + config.getStreamName());
Set<String> noDictionaryColumns = config.getNoDictionaryColumns();
Set<String> invertedIndexColumns = config.getInvertedIndexColumns();
Set<String> textIndexColumns = config.getTextIndexColumns();
int avgNumMultiValues = config.getAvgNumMultiValues();
// Initialize for each column
for (FieldSpec fieldSpec : _physicalFieldSpecs) {
String column = fieldSpec.getName();
_numValuesInfoMap.put(column, new NumValuesInfo());
// Check whether to generate raw index for the column while consuming
// Only support generating raw index on single-value columns that do not have inverted index while
// consuming. After consumption completes and the segment is built, all single-value columns can have raw index
FieldSpec.DataType dataType = fieldSpec.getDataType();
boolean isFixedWidthColumn = dataType.isFixedWidth();
int forwardIndexColumnSize = -1;
if (isNoDictionaryColumn(noDictionaryColumns, invertedIndexColumns, fieldSpec, column)) {
// no dictionary
// each forward index entry will be equal to size of data for that row
// For INT, LONG, FLOAT, DOUBLE it is equal to the number of fixed bytes used to store the value,
if (isFixedWidthColumn) {
forwardIndexColumnSize = dataType.size();
} else {
// dictionary encoded index
// each forward index entry will contain a 4 byte dictionary ID
forwardIndexColumnSize = FieldSpec.DataType.INT.size();
int dictionaryColumnSize;
if (isFixedWidthColumn) {
dictionaryColumnSize = dataType.size();
} else {
dictionaryColumnSize = _statsHistory.getEstimatedAvgColSize(column);
// NOTE: preserve 10% buffer for cardinality to reduce the chance of re-sizing the dictionary
int estimatedCardinality = (int) (_statsHistory.getEstimatedCardinality(column) * 1.1);
String allocationContext = buildAllocationContext(_segmentName, column, V1Constants.Dict.FILE_EXTENSION);
BaseMutableDictionary dictionary = MutableDictionaryFactory
.getMutableDictionary(dataType, _offHeap, _memoryManager, dictionaryColumnSize,
Math.min(estimatedCardinality, _capacity), allocationContext);
_dictionaryMap.put(column, dictionary);
// Even though the column is defined as 'no-dictionary' in the config, we did create dictionary for consuming segment.
DataFileReader indexReaderWriter;
// create forward index reader/writer
if (forwardIndexColumnSize == -1) {
// for STRING/BYTES SV column, we support raw index in consuming segments
// RealtimeSegmentStatsHistory does not have the stats for no-dictionary columns
// from previous consuming segments
// TODO: Add support for updating RealtimeSegmentStatsHistory with average column value size for no dictionary columns as well
// TODO: Use the stats to get estimated average length
// Use a smaller capacity as opposed to segment flush size
String allocationContext =
buildAllocationContext(_segmentName, column, V1Constants.Indexes.UNSORTED_SV_FORWARD_INDEX_FILE_EXTENSION);
indexReaderWriter =
new VarByteSingleColumnSingleValueReaderWriter(_memoryManager, allocationContext, initialCapacity,
} else {
// two possible cases can lead here:
// (1) dictionary encoded forward index
// (2) raw forward index for fixed width types -- INT, LONG, FLOAT, DOUBLE
if (fieldSpec.isSingleValueField()) {
// SV column -- both dictionary encoded and raw index are supported on SV
// columns for both fixed and variable width types
String allocationContext = buildAllocationContext(_segmentName, column,
indexReaderWriter =
new FixedByteSingleColumnSingleValueReaderWriter(_capacity, forwardIndexColumnSize, _memoryManager,
} else {
// MV column -- only dictionary encoded index is supported on MV columns
// for both fixed and variable width types
// TODO: Start with a smaller capacity on FixedByteSingleColumnMultiValueReaderWriter and let it expand
String allocationContext = buildAllocationContext(_segmentName, column,
indexReaderWriter =
new FixedByteSingleColumnMultiValueReaderWriter(MAX_MULTI_VALUES_PER_ROW, avgNumMultiValues, _capacity,
forwardIndexColumnSize, _memoryManager, allocationContext);
_indexReaderWriterMap.put(column, indexReaderWriter);
if (invertedIndexColumns.contains(column)) {
_invertedIndexMap.put(column, new RealtimeInvertedIndexReader());
if (_nullHandlingEnabled) {
_nullValueVectorMap.put(column, new RealtimeNullValueVectorReaderWriter());
if (textIndexColumns.contains(column)) {
RealtimeLuceneTextIndexReader realtimeLuceneIndexReader =
new RealtimeLuceneTextIndexReader(column, new File(config.getConsumerDir()), _segmentName);
_invertedIndexMap.put(column, realtimeLuceneIndexReader);
if (_realtimeLuceneReaders == null) {
_realtimeLuceneReaders = new RealtimeLuceneReaders(_segmentName);
if (_realtimeLuceneReaders != null) {
// add the realtime lucene index readers to the global queue for refresh task to pick up
RealtimeLuceneIndexRefreshState realtimeLuceneIndexRefreshState = RealtimeLuceneIndexRefreshState.getInstance();
// Metric aggregation can be enabled only if config is specified, and all dimensions have dictionary,
// and no metrics have dictionary. If not enabled, the map returned is null.
_recordIdMap = enableMetricsAggregationIfPossible(config, noDictionaryColumns);
* Decide whether a given column should be dictionary encoded or not
* @param noDictionaryColumns no dictionary column set
* @param invertedIndexColumns inverted index column set
* @param fieldSpec field spec of column
* @param column column name
* @return true if column is no-dictionary, false if dictionary encoded
private boolean isNoDictionaryColumn(Set<String> noDictionaryColumns, Set<String> invertedIndexColumns,
FieldSpec fieldSpec, String column) {
FieldSpec.DataType dataType = fieldSpec.getDataType();
if (noDictionaryColumns.contains(column)) {
// Earlier we didn't support noDict in consuming segments for STRING and BYTES columns.
// So even if the user had the column in noDictionaryColumns set in table config, we still
// created dictionary in consuming segments.
// Later on we added this support. There is a particular impact of this change on the use cases
// that have set noDict on their STRING dimension columns for other performance
// reasons and also want metricsAggregation. These use cases don't get to
// aggregateMetrics because the new implementation is able to honor their table config setting
// of noDict on STRING/BYTES. Without metrics aggregation, memory pressure increases.
// So to continue aggregating metrics for such cases, we will create dictionary even
// if the column is part of noDictionary set from table config
if (fieldSpec instanceof DimensionFieldSpec && _aggregateMetrics && (dataType == FieldSpec.DataType.STRING ||
dataType == FieldSpec.DataType.BYTES)) {"Aggregate metrics is enabled. Will create dictionary in consuming segment for column {} of type {}",
column, dataType.toString());
return false;
// So don't create dictionary if the column is member of noDictionary, is single-value
// and doesn't have an inverted index
return fieldSpec.isSingleValueField() && !invertedIndexColumns.contains(column);
// column is not a part of noDictionary set, so create dictionary
return false;
public SegmentPartitionConfig getSegmentPartitionConfig() {
if (_partitionColumn != null) {
return new SegmentPartitionConfig(Collections.singletonMap(_partitionColumn,
new ColumnPartitionConfig(_partitionFunction.toString(), _partitionFunction.getNumPartitions())));
} else {
return null;
public long getMinTime() {
return _minTime;
public long getMaxTime() {
return _maxTime;
public void addExtraColumns(Schema newSchema) {
for (String columnName : newSchema.getColumnNames()) {
if (!_schema.getColumnNames().contains(columnName)) {
FieldSpec fieldSpec = newSchema.getFieldSpecFor(columnName);
_newlyAddedColumnsFieldMap.put(columnName, fieldSpec);
if (!fieldSpec.isVirtualColumn()) {
_newlyAddedPhysicalColumnsFieldMap.put(columnName, fieldSpec);
}"Newly added columns: " + _newlyAddedColumnsFieldMap.toString());
public boolean index(GenericRow row, @Nullable RowMetadata rowMetadata) {
boolean canTakeMore;
// Update dictionary first
Map<String, Object> dictIdMap = updateDictionary(row);
int numDocs = _numDocsIndexed;
// If metrics aggregation is enabled and if the dimension values were already seen, this will return existing docId,
// else this will return a new docId.
int docId = getOrCreateDocId(dictIdMap);
// docId == numDocs implies new docId.
if (docId == numDocs) {
// Add forward and inverted indices for new document.
addForwardIndex(row, docId, dictIdMap);
addInvertedIndex(row, docId, dictIdMap);
if (_nullHandlingEnabled) {
handleNullValues(row, docId);
// Update number of document indexed at last to make the latest record queryable
canTakeMore = _numDocsIndexed++ < _capacity;
} else {
.checkState(_aggregateMetrics, "Invalid document-id during indexing: " + docId + " expected: " + numDocs);
// Update metrics for existing document.
canTakeMore = aggregateMetrics(row, docId);
_lastIndexedTimeMs = System.currentTimeMillis();
if (rowMetadata != null && rowMetadata.getIngestionTimeMs() != Long.MIN_VALUE) {
_latestIngestionTimeMs = Math.max(_latestIngestionTimeMs, rowMetadata.getIngestionTimeMs());
return canTakeMore;
private Map<String, Object> updateDictionary(GenericRow row) {
Map<String, Object> dictIdMap = new HashMap<>();
for (FieldSpec fieldSpec : _physicalFieldSpecs) {
String column = fieldSpec.getName();
Object value = row.getValue(column);
BaseMutableDictionary dictionary = _dictionaryMap.get(column);
if (dictionary != null) {
if (fieldSpec.isSingleValueField()) {
dictIdMap.put(column, dictionary.index(value));
} else {
int[] dictIds = dictionary.index((Object[]) value);
dictIdMap.put(column, dictIds);
// No need to update min/max time value as time column cannot be multi-valued
// Update min/max value for time column
if (column.equals(_timeColumnName)) {
long timeValue;
if (value instanceof Number) {
timeValue = ((Number) value).longValue();
_minTime = Math.min(_minTime, timeValue);
_maxTime = Math.max(_maxTime, timeValue);
} else {
String stringValue = value.toString();
if (StringUtils.isNumeric(stringValue)) {
timeValue = Long.parseLong(stringValue);
_minTime = Math.min(_minTime, timeValue);
_maxTime = Math.max(_maxTime, timeValue);
return dictIdMap;
private void addForwardIndex(GenericRow row, int docId, Map<String, Object> dictIdMap) {
// Store dictionary Id(s) for columns with dictionary
for (FieldSpec fieldSpec : _physicalFieldSpecs) {
String column = fieldSpec.getName();
Object value = row.getValue(column);
NumValuesInfo numValuesInfo = _numValuesInfoMap.get(column);
if (fieldSpec.isSingleValueField()) {
// SV column
BaseSingleColumnSingleValueReaderWriter indexReaderWriter =
(BaseSingleColumnSingleValueReaderWriter) _indexReaderWriterMap.get(column);
Integer dictId = (Integer) dictIdMap.get(column);
if (dictId != null) {
// SV Column with dictionary
indexReaderWriter.setInt(docId, dictId);
} else {
// No-dictionary SV column
FieldSpec.DataType dataType = fieldSpec.getDataType();
switch (dataType) {
case INT:
indexReaderWriter.setInt(docId, (Integer) value);
case LONG:
indexReaderWriter.setLong(docId, (Long) value);
case FLOAT:
indexReaderWriter.setFloat(docId, (Float) value);
case DOUBLE:
indexReaderWriter.setDouble(docId, (Double) value);
case STRING:
indexReaderWriter.setString(docId, (String) value);
case BYTES:
indexReaderWriter.setBytes(docId, (byte[]) value);
throw new UnsupportedOperationException(
"Unsupported data type: " + dataType + " for no-dictionary column: " + column);
} else {
// MV column: always dictionary encoded
int[] dictIds = (int[]) dictIdMap.get(column);
((FixedByteSingleColumnMultiValueReaderWriter) _indexReaderWriterMap.get(column)).setIntArray(docId, dictIds);
private void addInvertedIndex(GenericRow row, int docId, Map<String, Object> dictIdMap) {
// Update inverted index at last
// NOTE: inverted index have to be updated at last because once it gets updated, the latest record will become
// queryable
for (FieldSpec fieldSpec : _physicalFieldSpecs) {
String column = fieldSpec.getName();
InvertedIndexReader invertedIndex = _invertedIndexMap.get(column);
if (invertedIndex != null) {
if (invertedIndex instanceof RealtimeLuceneTextIndexReader) {
((RealtimeLuceneTextIndexReader) invertedIndex).addDoc(row.getValue(column), docId);
} else {
RealtimeInvertedIndexReader realtimeInvertedIndexReader = (RealtimeInvertedIndexReader) invertedIndex;
if (fieldSpec.isSingleValueField()) {
realtimeInvertedIndexReader.add(((Integer) dictIdMap.get(column)), docId);
} else {
int[] dictIds = (int[]) dictIdMap.get(column);
for (int dictId : dictIds) {
realtimeInvertedIndexReader.add(dictId, docId);
* Check if the row has any null fields and update the
* column null value vectors accordingly
* @param row specifies row being ingested
* @param docId specified docId for this row
private void handleNullValues(GenericRow row, int docId) {
if (!row.hasNullValues()) {
for (String columnName : row.getNullValueFields()) {
private boolean aggregateMetrics(GenericRow row, int docId) {
for (MetricFieldSpec metricFieldSpec : _physicalMetricFieldSpecs) {
String column = metricFieldSpec.getName();
Object value = row.getValue(column);
FixedByteSingleColumnSingleValueReaderWriter indexReaderWriter =
(FixedByteSingleColumnSingleValueReaderWriter) _indexReaderWriterMap.get(column);
FieldSpec.DataType dataType = metricFieldSpec.getDataType();
switch (dataType) {
case INT:
indexReaderWriter.setInt(docId, (Integer) value + indexReaderWriter.getInt(docId));
case LONG:
indexReaderWriter.setLong(docId, (Long) value + indexReaderWriter.getLong(docId));
case FLOAT:
indexReaderWriter.setFloat(docId, (Float) value + indexReaderWriter.getFloat(docId));
case DOUBLE:
indexReaderWriter.setDouble(docId, (Double) value + indexReaderWriter.getDouble(docId));
throw new UnsupportedOperationException(
"Unsupported data type: " + dataType + " for no-dictionary column: " + column);
return true;
public int getNumDocsIndexed() {
return _numDocsIndexed;
public String getSegmentName() {
return _segmentName;
public SegmentMetadata getSegmentMetadata() {
return _segmentMetadata;
public Set<String> getColumnNames() {
// Return all column names, virtual and physical.
return Sets.union(_schema.getColumnNames(), _newlyAddedColumnsFieldMap.keySet());
public Set<String> getPhysicalColumnNames() {
HashSet<String> physicalColumnNames = new HashSet<>();
for (FieldSpec fieldSpec : _physicalFieldSpecs) {
// We should include newly added columns in the physical columns
return Sets.union(physicalColumnNames, _newlyAddedPhysicalColumnsFieldMap.keySet());
public DataSource getDataSource(String column) {
FieldSpec fieldSpec = _schema.getFieldSpecFor(column);
if (fieldSpec == null || fieldSpec.isVirtualColumn()) {
// Column is either added during ingestion, or was initiated with a virtual column provider
if (fieldSpec == null) {
// If the column was added during ingestion, we will construct the column provider based on its fieldSpec to provide values
fieldSpec = _newlyAddedColumnsFieldMap.get(column);
Preconditions.checkNotNull(fieldSpec, "FieldSpec for " + column + " should not be null");
// TODO: Refactor virtual column provider to directly generate data source
VirtualColumnContext virtualColumnContext = new VirtualColumnContext(fieldSpec, _numDocsIndexed);
VirtualColumnProvider virtualColumnProvider = VirtualColumnProviderFactory.buildProvider(virtualColumnContext);
return new ImmutableDataSource(virtualColumnProvider.buildMetadata(virtualColumnContext),
} else {
PartitionFunction partitionFunction = null;
int partitionId = 0;
if (column.equals(_partitionColumn)) {
partitionFunction = _partitionFunction;
partitionId = _partitionId;
NumValuesInfo numValuesInfo = _numValuesInfoMap.get(column);
DataFileReader forwardIndex = _indexReaderWriterMap.get(column);
BaseMutableDictionary dictionary = _dictionaryMap.get(column);
InvertedIndexReader invertedIndex = _invertedIndexMap.get(column);
InvertedIndexReader rangeIndex = _rangeIndexMap.get(column);
BloomFilterReader bloomFilter = _bloomFilterMap.get(column);
RealtimeNullValueVectorReaderWriter nullValueVector = _nullValueVectorMap.get(column);
return new MutableDataSource(fieldSpec, _numDocsIndexed, numValuesInfo.getNumValues(),
numValuesInfo.getMaxNumValuesPerMVEntry(), partitionFunction, partitionId, forwardIndex, dictionary,
invertedIndex, rangeIndex, bloomFilter, nullValueVector);
public List<StarTreeV2> getStarTrees() {
return null;
* Returns a record that contains only physical columns
* @param docId document ID
* @param reuse a GenericRow object that will be re-used if provided. Otherwise, this method will allocate a new one
* @return Generic row with physical columns of the specified row.
public GenericRow getRecord(int docId, GenericRow reuse) {
for (FieldSpec fieldSpec : _physicalFieldSpecs) {
String column = fieldSpec.getName();
Object value = IndexSegmentUtils
.getValue(docId, fieldSpec, _indexReaderWriterMap.get(column), _dictionaryMap.get(column),
reuse.putValue(column, value);
if (_nullHandlingEnabled) {
NullValueVectorReader reader = _nullValueVectorMap.get(column);
// If column has null value for this docId, set that accordingly in GenericRow
if (reader.isNull(docId)) {
reuse.putDefaultNullValue(column, value);
return reuse;
public void destroy() {"Trying to close RealtimeSegmentImpl : {}", _segmentName);
// Gather statistics for off-heap mode
if (_offHeap) {
if (_numDocsIndexed > 0) {
int numSeconds = (int) ((System.currentTimeMillis() - _startTimeMillis) / 1000);
long totalMemBytes = _memoryManager.getTotalAllocatedBytes();
.info("Segment used {} bytes of memory for {} rows consumed in {} seconds", totalMemBytes, _numDocsIndexed,
RealtimeSegmentStatsHistory.SegmentStats segmentStats = new RealtimeSegmentStatsHistory.SegmentStats();
for (Map.Entry<String, BaseMutableDictionary> entry : _dictionaryMap.entrySet()) {
String columnName = entry.getKey();
BaseOffHeapMutableDictionary dictionary = (BaseOffHeapMutableDictionary) entry.getValue();
RealtimeSegmentStatsHistory.ColumnStats columnStats = new RealtimeSegmentStatsHistory.ColumnStats();
segmentStats.setColumnStats(columnName, columnStats);
for (DataFileReader dfReader : _indexReaderWriterMap.values()) {
try {
} catch (IOException e) {
_logger.error("Failed to close index. Service will continue with potential memory leak, error: ", e);
// fall through to close other segments
// clear map now that index is closed to prevent accidental usage
for (InvertedIndexReader index : _invertedIndexMap.values()) {
if (index instanceof RealtimeInvertedIndexReader) {
((RealtimeInvertedIndexReader) index).close();
if (_realtimeLuceneReaders != null) {
// set this to true as a way of signalling the refresh task thread to
// not attempt refresh on this segment here onwards
try {
for (RealtimeLuceneTextIndexReader realtimeLuceneReader : _realtimeLuceneReaders.getRealtimeLuceneReaders()) {
// close each realtime lucene reader for this segment
// clear the list.
} finally {
for (Map.Entry<String, BaseMutableDictionary> entry : _dictionaryMap.entrySet()) {
try {
} catch (IOException e) {
_logger.error("Failed to close the dictionary for column: {}. Continuing with error.", entry.getKey(), e);
if (_recordIdMap != null) {
try {
} catch (IOException e) {
_logger.error("Failed to close the record id map. Continuing with error.", e);
// NOTE: Close the memory manager as the last step. It will release all the PinotDataBuffers allocated.
try {
} catch (IOException e) {
_logger.error("Failed to close the memory manager", e);
* Returns the docIds to use for iteration when the data is sorted by the given column.
* <p>Called only by realtime record reader.
* @param column The column to use for sorting
* @return The docIds to use for iteration
public int[] getSortedDocIdIterationOrderWithSortedColumn(String column) {
BaseMutableDictionary dictionary = _dictionaryMap.get(column);
int numValues = dictionary.length();
int[] dictIds = new int[numValues];
for (int i = 0; i < numValues; i++) {
dictIds[i] = i;
IntArrays.quickSort(dictIds, (dictId1, dictId2) ->, dictId2));
RealtimeInvertedIndexReader invertedIndex = (RealtimeInvertedIndexReader) _invertedIndexMap.get(column);
int[] docIds = new int[_numDocsIndexed];
int docIdIndex = 0;
for (int dictId : dictIds) {
IntIterator intIterator = invertedIndex.getDocIds(dictId).getIntIterator();
while (intIterator.hasNext()) {
docIds[docIdIndex++] =;
// Sanity check
Preconditions.checkState(_numDocsIndexed == docIdIndex,
"The number of documents indexed: %s is not equal to the number of sorted documents: %s", _numDocsIndexed,
return docIds;
* Helper method that builds allocation context that includes segment name, column name, and index type.
* @param segmentName Name of segment.
* @param columnName Name of column.
* @param indexType Index type.
* @return Allocation context built from segment name, column name and index type.
private String buildAllocationContext(String segmentName, String columnName, String indexType) {
return segmentName + ":" + columnName + indexType;
private int getOrCreateDocId(Map<String, Object> dictIdMap) {
if (!_aggregateMetrics) {
return _numDocsIndexed;
int i = 0;
int[] dictIds = new int[_numKeyColumns]; // dimensions + date time columns + time column.
// FIXME: this for loop breaks for multi value dimensions.
for (FieldSpec fieldSpec : _physicalDimensionFieldSpecs) {
dictIds[i++] = (Integer) dictIdMap.get(fieldSpec.getName());
for (String timeColumnName : _physicalTimeColumnNames) {
dictIds[i++] = (Integer) dictIdMap.get(timeColumnName);
return _recordIdMap.put(new FixedIntArray(dictIds));
* Helper method to enable/initialize aggregation of metrics, based on following conditions:
* <ul>
* <li> Config to enable aggregation of metrics is specified. </li>
* <li> All dimensions and time are dictionary encoded. This is because an integer array containing dictionary id's
* is used as key for dimensions to record Id map. </li>
* <li> None of the metrics are dictionary encoded. </li>
* <li> All columns should be single-valued (see</li>
* </ul>
* TODO: Eliminate the requirement on dictionary encoding for dimension and metric columns.
* @param config Segment config.
* @param noDictionaryColumns Set of no dictionary columns.
* @return Map from dictionary id array to doc id, null if metrics aggregation cannot be enabled.
private IdMap<FixedIntArray> enableMetricsAggregationIfPossible(RealtimeSegmentConfig config,
Set<String> noDictionaryColumns) {
if (!_aggregateMetrics) {"Metrics aggregation is disabled.");
return null;
// All metric columns should have no-dictionary index.
// All metric columns must be single value
for (FieldSpec fieldSpec : _physicalMetricFieldSpecs) {
String metric = fieldSpec.getName();
if (!noDictionaryColumns.contains(metric)) {
.warn("Metrics aggregation cannot be turned ON in presence of dictionary encoded metrics, eg: {}", metric);
_aggregateMetrics = false;
if (!fieldSpec.isSingleValueField()) {
.warn("Metrics aggregation cannot be turned ON in presence of multi-value metric columns, eg: {}", metric);
_aggregateMetrics = false;
// All dimension columns should be dictionary encoded.
// All dimension columns must be single value
for (FieldSpec fieldSpec : _physicalDimensionFieldSpecs) {
String dimension = fieldSpec.getName();
if (noDictionaryColumns.contains(dimension)) {
.warn("Metrics aggregation cannot be turned ON in presence of no-dictionary dimensions, eg: {}", dimension);
_aggregateMetrics = false;
if (!fieldSpec.isSingleValueField()) {
_logger.warn("Metrics aggregation cannot be turned ON in presence of multi-value dimension columns, eg: {}",
_aggregateMetrics = false;
// Time columns should be dictionary encoded.
for (String timeColumnName : _physicalTimeColumnNames) {
if (noDictionaryColumns.contains(timeColumnName)) {
.warn("Metrics aggregation cannot be turned ON in presence of no-dictionary datetime/time columns, eg: {}",
_aggregateMetrics = false;
if (!_aggregateMetrics) {
return null;
int estimatedRowsToIndex;
if (_statsHistory.isEmpty()) {
// Choose estimated rows to index as maxNumRowsPerSegment / EXPECTED_COMPRESSION (1000, to be conservative in size).
// These are just heuristics at the moment, and can be refined based on experimental results.
estimatedRowsToIndex = Math.max(config.getCapacity() / EXPECTED_COMPRESSION, MIN_ROWS_TO_INDEX);
} else {
estimatedRowsToIndex = Math.max(_statsHistory.getEstimatedRowsToIndex(), MIN_ROWS_TO_INDEX);
// Compute size of overflow map.
int maxOverFlowHashSize = Math.max(estimatedRowsToIndex / 1000, MIN_RECORD_ID_MAP_CACHE_SIZE);"Initializing metrics update: estimatedRowsToIndex:{}, cacheSize:{}", estimatedRowsToIndex,
return new FixedIntArrayOffHeapIdMap(estimatedRowsToIndex, maxOverFlowHashSize, _numKeyColumns, _memoryManager,
// NOTE: Okay for single-writer
private static class NumValuesInfo {
volatile int _numValues = 0;
volatile int _maxNumValuesPerMVEntry = -1;
void updateSVEntry() {
void updateMVEntry(int numValuesInMVEntry) {
_numValues += numValuesInMVEntry;
_maxNumValuesPerMVEntry = Math.max(_maxNumValuesPerMVEntry, numValuesInMVEntry);
int getNumValues() {
return _numValues;
int getMaxNumValuesPerMVEntry() {
return _maxNumValuesPerMVEntry;