| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.pinot.segment.local.segment.index.readers.forward; |
| |
| import com.google.common.base.Preconditions; |
| import java.io.IOException; |
| import java.nio.ByteBuffer; |
| import java.nio.DoubleBuffer; |
| import java.nio.FloatBuffer; |
| import java.nio.IntBuffer; |
| import java.nio.LongBuffer; |
| import org.apache.pinot.segment.local.io.compression.ChunkCompressorFactory; |
| import org.apache.pinot.segment.local.io.writer.impl.BaseChunkSVForwardIndexWriter; |
| import org.apache.pinot.segment.spi.compression.ChunkCompressionType; |
| import org.apache.pinot.segment.spi.compression.ChunkDecompressor; |
| import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader; |
| import org.apache.pinot.segment.spi.memory.PinotDataBuffer; |
| import org.apache.pinot.spi.data.FieldSpec.DataType; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| |
| /** |
| * Base implementation for chunk-based raw (non-dictionary-encoded) forward index reader. |
| */ |
| public abstract class BaseChunkForwardIndexReader implements ForwardIndexReader<ChunkReaderContext> { |
| private static final Logger LOGGER = LoggerFactory.getLogger(BaseChunkForwardIndexReader.class); |
| |
| protected final PinotDataBuffer _dataBuffer; |
| protected final DataType _storedType; |
| protected final int _numChunks; |
| protected final int _numDocsPerChunk; |
| protected final int _lengthOfLongestEntry; |
| protected final boolean _isCompressed; |
| protected final ChunkCompressionType _compressionType; |
| protected final ChunkDecompressor _chunkDecompressor; |
| protected final PinotDataBuffer _dataHeader; |
| protected final int _headerEntryChunkOffsetSize; |
| protected final PinotDataBuffer _rawData; |
| protected final boolean _isSingleValue; |
| |
| public BaseChunkForwardIndexReader(PinotDataBuffer dataBuffer, DataType storedType, boolean isSingleValue) { |
| _dataBuffer = dataBuffer; |
| _storedType = storedType; |
| |
| int headerOffset = 0; |
| int version = _dataBuffer.getInt(headerOffset); |
| headerOffset += Integer.BYTES; |
| |
| _numChunks = _dataBuffer.getInt(headerOffset); |
| headerOffset += Integer.BYTES; |
| |
| _numDocsPerChunk = _dataBuffer.getInt(headerOffset); |
| headerOffset += Integer.BYTES; |
| |
| _lengthOfLongestEntry = _dataBuffer.getInt(headerOffset); |
| if (storedType.isFixedWidth() && isSingleValue) { |
| Preconditions.checkState(_lengthOfLongestEntry == storedType.size()); |
| } |
| headerOffset += Integer.BYTES; |
| |
| int dataHeaderStart = headerOffset; |
| if (version > 1) { |
| _dataBuffer.getInt(headerOffset); // Total docs |
| headerOffset += Integer.BYTES; |
| |
| _compressionType = ChunkCompressionType.valueOf(_dataBuffer.getInt(headerOffset)); |
| _chunkDecompressor = ChunkCompressorFactory.getDecompressor(_compressionType); |
| _isCompressed = !_compressionType.equals(ChunkCompressionType.PASS_THROUGH); |
| |
| headerOffset += Integer.BYTES; |
| dataHeaderStart = _dataBuffer.getInt(headerOffset); |
| } else { |
| _isCompressed = true; |
| _compressionType = ChunkCompressionType.SNAPPY; |
| _chunkDecompressor = ChunkCompressorFactory.getDecompressor(_compressionType); |
| } |
| |
| _headerEntryChunkOffsetSize = BaseChunkSVForwardIndexWriter.getHeaderEntryChunkOffsetSize(version); |
| |
| // Slice out the header from the data buffer. |
| int dataHeaderLength = _numChunks * _headerEntryChunkOffsetSize; |
| int rawDataStart = dataHeaderStart + dataHeaderLength; |
| _dataHeader = _dataBuffer.view(dataHeaderStart, rawDataStart); |
| |
| // Useful for uncompressed data. |
| _rawData = _dataBuffer.view(rawDataStart, _dataBuffer.size()); |
| |
| _isSingleValue = isSingleValue; |
| } |
| |
| /** |
| * Helper method to return the chunk buffer that contains the value at the given document id. |
| * <ul> |
| * <li> If the chunk already exists in the reader context, returns the same. </li> |
| * <li> Otherwise, loads the chunk for the row, and sets it in the reader context. </li> |
| * </ul> |
| * @param docId Document id |
| * @param context Reader context |
| * @return Chunk for the row |
| */ |
| protected ByteBuffer getChunkBuffer(int docId, ChunkReaderContext context) { |
| int chunkId = docId / _numDocsPerChunk; |
| if (context.getChunkId() == chunkId) { |
| return context.getChunkBuffer(); |
| } |
| return decompressChunk(chunkId, context); |
| } |
| |
| protected ByteBuffer decompressChunk(int chunkId, ChunkReaderContext context) { |
| int chunkSize; |
| long chunkPosition = getChunkPosition(chunkId); |
| |
| // Size of chunk can be determined using next chunks offset, or end of data buffer for last chunk. |
| if (chunkId == (_numChunks - 1)) { // Last chunk. |
| chunkSize = (int) (_dataBuffer.size() - chunkPosition); |
| } else { |
| long nextChunkOffset = getChunkPosition(chunkId + 1); |
| chunkSize = (int) (nextChunkOffset - chunkPosition); |
| } |
| |
| ByteBuffer decompressedBuffer = context.getChunkBuffer(); |
| decompressedBuffer.clear(); |
| |
| try { |
| _chunkDecompressor.decompress(_dataBuffer.toDirectByteBuffer(chunkPosition, chunkSize), decompressedBuffer); |
| } catch (IOException e) { |
| LOGGER.error("Exception caught while decompressing data chunk", e); |
| throw new RuntimeException(e); |
| } |
| context.setChunkId(chunkId); |
| return decompressedBuffer; |
| } |
| |
| /** |
| * Helper method to get the offset of the chunk in the data. |
| * @param chunkId Id of the chunk for which to return the position. |
| * @return Position (offset) of the chunk in the data. |
| */ |
| protected long getChunkPosition(int chunkId) { |
| if (_headerEntryChunkOffsetSize == Integer.BYTES) { |
| return _dataHeader.getInt(chunkId * _headerEntryChunkOffsetSize); |
| } else { |
| return _dataHeader.getLong(chunkId * _headerEntryChunkOffsetSize); |
| } |
| } |
| |
| @Override |
| public boolean isDictionaryEncoded() { |
| return false; |
| } |
| |
| @Override |
| public boolean isSingleValue() { |
| return _isSingleValue; |
| } |
| |
| @Override |
| public DataType getStoredType() { |
| return _storedType; |
| } |
| |
| @Override |
| public ChunkCompressionType getCompressionType() { |
| return _compressionType; |
| } |
| |
| @Override |
| public int getLengthOfLongestEntry() { |
| return _lengthOfLongestEntry; |
| } |
| |
| @Override |
| public void readValuesSV(int[] docIds, int length, int[] values, ChunkReaderContext context) { |
| if (_storedType.isFixedWidth() && !_isCompressed && isContiguousRange(docIds, length)) { |
| switch (_storedType) { |
| case INT: { |
| int minOffset = docIds[0] * Integer.BYTES; |
| IntBuffer buffer = _rawData.toDirectByteBuffer(minOffset, length * Integer.BYTES).asIntBuffer(); |
| buffer.get(values, 0, length); |
| } |
| break; |
| case LONG: { |
| int minOffset = docIds[0] * Long.BYTES; |
| LongBuffer buffer = _rawData.toDirectByteBuffer(minOffset, length * Long.BYTES).asLongBuffer(); |
| for (int i = 0; i < buffer.limit(); i++) { |
| values[i] = (int) buffer.get(i); |
| } |
| } |
| break; |
| case FLOAT: { |
| int minOffset = docIds[0] * Float.BYTES; |
| FloatBuffer buffer = _rawData.toDirectByteBuffer(minOffset, length * Float.BYTES).asFloatBuffer(); |
| for (int i = 0; i < buffer.limit(); i++) { |
| values[i] = (int) buffer.get(i); |
| } |
| } |
| break; |
| case DOUBLE: { |
| int minOffset = docIds[0] * Double.BYTES; |
| DoubleBuffer buffer = _rawData.toDirectByteBuffer(minOffset, length * Double.BYTES).asDoubleBuffer(); |
| for (int i = 0; i < buffer.limit(); i++) { |
| values[i] = (int) buffer.get(i); |
| } |
| } |
| break; |
| default: |
| throw new IllegalArgumentException(); |
| } |
| } else { |
| ForwardIndexReader.super.readValuesSV(docIds, length, values, context); |
| } |
| } |
| |
| @Override |
| public void readValuesSV(int[] docIds, int length, long[] values, ChunkReaderContext context) { |
| if (_storedType.isFixedWidth() && !_isCompressed && isContiguousRange(docIds, length)) { |
| switch (_storedType) { |
| case INT: { |
| int minOffset = docIds[0] * Integer.BYTES; |
| IntBuffer buffer = _rawData.toDirectByteBuffer(minOffset, length * Integer.BYTES).asIntBuffer(); |
| for (int i = 0; i < buffer.limit(); i++) { |
| values[i] = buffer.get(i); |
| } |
| } |
| break; |
| case LONG: { |
| int minOffset = docIds[0] * Long.BYTES; |
| LongBuffer buffer = _rawData.toDirectByteBuffer(minOffset, length * Long.BYTES).asLongBuffer(); |
| buffer.get(values, 0, length); |
| } |
| break; |
| case FLOAT: { |
| int minOffset = docIds[0] * Float.BYTES; |
| FloatBuffer buffer = _rawData.toDirectByteBuffer(minOffset, length * Float.BYTES).asFloatBuffer(); |
| for (int i = 0; i < buffer.limit(); i++) { |
| values[i] = (long) buffer.get(i); |
| } |
| } |
| break; |
| case DOUBLE: { |
| int minOffset = docIds[0] * Double.BYTES; |
| DoubleBuffer buffer = _rawData.toDirectByteBuffer(minOffset, length * Double.BYTES).asDoubleBuffer(); |
| for (int i = 0; i < buffer.limit(); i++) { |
| values[i] = (long) buffer.get(i); |
| } |
| } |
| break; |
| default: |
| throw new IllegalArgumentException(); |
| } |
| } else { |
| ForwardIndexReader.super.readValuesSV(docIds, length, values, context); |
| } |
| } |
| |
| @Override |
| public void readValuesSV(int[] docIds, int length, float[] values, ChunkReaderContext context) { |
| if (_storedType.isFixedWidth() && !_isCompressed && isContiguousRange(docIds, length)) { |
| switch (_storedType) { |
| case INT: { |
| int minOffset = docIds[0] * Integer.BYTES; |
| IntBuffer buffer = _rawData.toDirectByteBuffer(minOffset, length * Integer.BYTES).asIntBuffer(); |
| for (int i = 0; i < buffer.limit(); i++) { |
| values[i] = buffer.get(i); |
| } |
| } |
| break; |
| case LONG: { |
| int minOffset = docIds[0] * Long.BYTES; |
| LongBuffer buffer = _rawData.toDirectByteBuffer(minOffset, length * Long.BYTES).asLongBuffer(); |
| for (int i = 0; i < buffer.limit(); i++) { |
| values[i] = buffer.get(i); |
| } |
| } |
| break; |
| case FLOAT: { |
| int minOffset = docIds[0] * Float.BYTES; |
| FloatBuffer buffer = _rawData.toDirectByteBuffer(minOffset, length * Float.BYTES).asFloatBuffer(); |
| buffer.get(values, 0, length); |
| } |
| break; |
| case DOUBLE: { |
| int minOffset = docIds[0] * Double.BYTES; |
| DoubleBuffer buffer = _rawData.toDirectByteBuffer(minOffset, length * Double.BYTES).asDoubleBuffer(); |
| for (int i = 0; i < buffer.limit(); i++) { |
| values[i] = (float) buffer.get(i); |
| } |
| } |
| break; |
| default: |
| throw new IllegalArgumentException(); |
| } |
| } else { |
| ForwardIndexReader.super.readValuesSV(docIds, length, values, context); |
| } |
| } |
| |
| @Override |
| public void readValuesSV(int[] docIds, int length, double[] values, ChunkReaderContext context) { |
| if (_storedType.isFixedWidth() && !_isCompressed && isContiguousRange(docIds, length)) { |
| switch (_storedType) { |
| case INT: { |
| int minOffset = docIds[0] * Integer.BYTES; |
| IntBuffer buffer = _rawData.toDirectByteBuffer(minOffset, length * Integer.BYTES).asIntBuffer(); |
| for (int i = 0; i < buffer.limit(); i++) { |
| values[i] = buffer.get(i); |
| } |
| } |
| break; |
| case LONG: { |
| int minOffset = docIds[0] * Long.BYTES; |
| getLong(0, context); |
| LongBuffer buffer = _rawData.toDirectByteBuffer(minOffset, length * Long.BYTES).asLongBuffer(); |
| for (int i = 0; i < buffer.limit(); i++) { |
| values[i] = buffer.get(i); |
| } |
| } |
| break; |
| case FLOAT: { |
| int minOffset = docIds[0] * Float.BYTES; |
| FloatBuffer buffer = _rawData.toDirectByteBuffer(minOffset, length * Float.BYTES).asFloatBuffer(); |
| for (int i = 0; i < buffer.limit(); i++) { |
| values[i] = buffer.get(i); |
| } |
| } |
| break; |
| case DOUBLE: { |
| int minOffset = docIds[0] * Double.BYTES; |
| DoubleBuffer buffer = _rawData.toDirectByteBuffer(minOffset, length * Double.BYTES).asDoubleBuffer(); |
| buffer.get(values, 0, length); |
| } |
| break; |
| default: |
| throw new IllegalArgumentException(); |
| } |
| } else { |
| ForwardIndexReader.super.readValuesSV(docIds, length, values, context); |
| } |
| } |
| |
| @Override |
| public void close() { |
| // NOTE: DO NOT close the PinotDataBuffer here because it is tracked by the caller and might be reused later. The |
| // caller is responsible of closing the PinotDataBuffer. |
| } |
| |
| private boolean isContiguousRange(int[] docIds, int length) { |
| return docIds[length - 1] - docIds[0] == length - 1; |
| } |
| } |