| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.drill.exec.store.parquet.columnreaders.batchsizing; |
| |
| import io.netty.buffer.DrillBuf; |
| import java.util.List; |
| import java.util.Map; |
| import org.apache.drill.common.map.CaseInsensitiveMap; |
| import org.apache.drill.exec.memory.BufferAllocator; |
| import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchOverflow.FieldOverflowDefinition; |
| import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchOverflow.FieldOverflowEntry; |
| import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchOverflow.RecordOverflowContainer; |
| import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchOverflow.RecordOverflowDefinition; |
| import org.apache.drill.exec.util.record.RecordBatchStats; |
| import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchStatsContext; |
| import org.apache.drill.exec.vector.UInt1Vector; |
| import org.apache.drill.exec.vector.UInt4Vector; |
| |
| /** |
| * Field overflow SERDE utility; note that overflow data is serialized as a way to minimize |
| * memory usage. This information is deserialized back to ValueVectors when it is needed in |
| * the next batch. |
| * |
| * <p><b>NOTE -</b>We use a specialized implementation for overflow SERDE (instead of reusing |
| * existing ones) because of the following reasons: |
| * <ul> |
| * <li>We want to only serialize a subset of the VV data |
| * <li>Other SERDE methods will not copy the data contiguously and instead rely on the |
| * RPC layer to write the drill buffers in the correct order so that they are |
| * de-serialized as a single contiguous drill buffer |
| * </ul> |
| */ |
| final class OverflowSerDeUtil { |
| |
| /** |
| * Serializes a collection of overflow fields into a memory buffer: |
| * <ul> |
| * <li>Serialization logic can handle a subset of values (should be contiguous) |
| * <li>Serialized data is copied into a single DrillBuf |
| * <li>Currently, only variable length data is supported |
| * </ul> |
| * |
| * @param fieldOverflowEntries input collection of field overflow entries |
| * @param allocator buffer allocator |
| * @param batchStatsContext batch statistics context object |
| * @return record overflow container; null if the input buffer is empty |
| */ |
| static RecordOverflowContainer serialize(List<FieldOverflowEntry> fieldOverflowEntries, |
| BufferAllocator allocator, |
| RecordBatchStatsContext batchStatsContext) { |
| |
| if (fieldOverflowEntries == null || fieldOverflowEntries.isEmpty()) { |
| return null; |
| } |
| |
| // We need to: |
| // - Construct a map of VLVectorSerDe for each overflow field |
| // - Compute the total space required for efficient serialization of all overflow data |
| final Map<String, VLVectorSerializer> fieldSerDeMap = CaseInsensitiveMap.newHashMap(); |
| int bufferLength = 0; |
| |
| for (FieldOverflowEntry fieldOverflowEntry : fieldOverflowEntries) { |
| final VLVectorSerializer fieldVLSerDe = new VLVectorSerializer(fieldOverflowEntry); |
| fieldSerDeMap.put(fieldOverflowEntry.vector.getField().getName(), fieldVLSerDe); |
| |
| bufferLength += fieldVLSerDe.getBytesUsed(fieldOverflowEntry.firstValueIdx, fieldOverflowEntry.numValues); |
| } |
| assert bufferLength >= 0; |
| |
| // Allocate the required memory to serialize the overflow fields |
| final DrillBuf buffer = allocator.buffer(bufferLength); |
| |
| RecordBatchStats.logRecordBatchStats(batchStatsContext, |
| "Allocated a buffer of length [%d] to handle overflow", bufferLength); |
| |
| // Create the result object |
| final RecordOverflowContainer recordOverflowContainer = new RecordOverflowContainer(); |
| final RecordOverflowDefinition recordOverflowDef = recordOverflowContainer.recordOverflowDef; |
| |
| // Now serialize field overflow into the drill buffer |
| int bufferOffset = 0; |
| FieldSerializerContainer fieldSerializerContainer = new FieldSerializerContainer(); |
| |
| for (FieldOverflowEntry fieldOverflowEntry : fieldOverflowEntries) { |
| fieldSerializerContainer.clear(); |
| |
| // Serialize the field overflow data into the buffer |
| VLVectorSerializer fieldSerDe = fieldSerDeMap.get(fieldOverflowEntry.vector.getField().getName()); |
| assert fieldSerDe != null; |
| |
| fieldSerDe.copyValueVector(fieldOverflowEntry.firstValueIdx, |
| fieldOverflowEntry.numValues, |
| buffer, |
| bufferOffset, |
| fieldSerializerContainer); |
| |
| // Create a view DrillBuf for isolating this field overflow data |
| DrillBuf fieldBuffer = buffer.slice(bufferOffset, fieldSerializerContainer.totalByteLength); |
| fieldBuffer.retain(1); // Increase the reference count |
| fieldBuffer.writerIndex(fieldSerializerContainer.totalByteLength); |
| |
| // Enqueue a field overflow definition object for the current field |
| FieldOverflowDefinition fieldOverflowDef = new FieldOverflowDefinition( |
| fieldOverflowEntry.vector.getField(), |
| fieldOverflowEntry.numValues, |
| fieldSerializerContainer.dataByteLen, |
| fieldBuffer); |
| |
| recordOverflowDef.getFieldOverflowDefs().put(fieldOverflowEntry.vector.getField().getName(), fieldOverflowDef); |
| |
| // Update this drill buffer current offset |
| bufferOffset += fieldSerializerContainer.totalByteLength; |
| } |
| |
| // Finally, release the original buffer |
| boolean isReleased = buffer.release(); |
| assert !isReleased; // the reference count should still be higher than zero |
| |
| return recordOverflowContainer; |
| } |
| |
| /** Disabling object instantiation */ |
| private OverflowSerDeUtil() { |
| // NOOP |
| } |
| |
| // ---------------------------------------------------------------------------- |
| // Internal Data Structure |
| // ---------------------------------------------------------------------------- |
| |
| /** Container class to store the result of field overflow serialization */ |
| private static final class FieldSerializerContainer { |
| /** Data byte length */ |
| int dataByteLen; |
| /** Total byte length */ |
| int totalByteLength; |
| |
| void clear() { |
| dataByteLen = 0; |
| totalByteLength = 0; |
| } |
| } |
| |
| /** |
| * Helper class for handling variable length {@link ValueVector} overflow serialization logic |
| */ |
| private static final class VLVectorSerializer { |
| private static final int BYTE_VALUE_WIDTH = UInt1Vector.VALUE_WIDTH; |
| private static final int INT_VALUE_WIDTH = UInt4Vector.VALUE_WIDTH; |
| |
| /** Field overflow entry */ |
| private final FieldOverflowEntry fieldOverflowEntry; |
| |
| /** Set of DrillBuf's that make up the underlying ValueVector. Only nullable |
| * (VarChar or VarBinary) vectors have three entries. The format is |
| * ["bits-vector"] "offsets-vector" "values-vector" |
| */ |
| private final DrillBuf[] buffers; |
| |
| /** |
| * Constructor. |
| * @param fieldOverflowEntry field overflow entry |
| */ |
| private VLVectorSerializer(FieldOverflowEntry fieldOverflowEntry) { |
| this.fieldOverflowEntry = fieldOverflowEntry; |
| this.buffers = this.fieldOverflowEntry.vector.getBuffers(false); |
| } |
| |
| /** |
| * The number of bytes used (by the {@link ValueVector}) to store a value range delimited |
| * by the parameters "firstValueIdx" and "numValues". |
| * |
| * @param firstValueIdx start index of the first value to copy |
| * @param numValues number of values to copy |
| */ |
| private int getBytesUsed(int firstValueIdx, int numValues) { |
| int bytesUsed = 0; |
| |
| // Only nullable (VarChar or VarBinary) vectors have three entries. The format is |
| // ["bits-vector"] "offsets-vector" "values-vector" |
| if (isNullable()) { |
| bytesUsed += numValues * BYTE_VALUE_WIDTH; |
| } |
| |
| // Add the length of the "offsets" vector for the requested range |
| bytesUsed += (numValues + 1) * INT_VALUE_WIDTH; |
| |
| // Add the length of the "values" vector for the requested range |
| bytesUsed += getDataLen(firstValueIdx, numValues); |
| |
| return bytesUsed; |
| } |
| |
| private void copyValueVector(int firstValueIdx, |
| int numValues, |
| DrillBuf targetBuffer, |
| int targetStartIdx, |
| FieldSerializerContainer fieldSerializerContainer) { |
| |
| int len = 0; |
| int totalLen = 0; |
| |
| // First copy the "bits" vector |
| len = copyBitsVector(firstValueIdx, numValues, targetBuffer, targetStartIdx); |
| assert len >= 0; |
| |
| // Then copy the "offsets" vector |
| totalLen += len; |
| len = copyOffsetVector(firstValueIdx, numValues, targetBuffer, targetStartIdx + totalLen); |
| assert len >= 0; |
| |
| // Then copy the "values" vector |
| totalLen += len; |
| len = copyValuesVector(firstValueIdx, numValues, targetBuffer, targetStartIdx + totalLen); |
| assert len >= 0; |
| |
| // Finally, update the field serializer container |
| fieldSerializerContainer.dataByteLen = len; |
| fieldSerializerContainer.totalByteLength = (totalLen + len); |
| } |
| |
| /** |
| * Copy the "bits" vector if the {@link ValueVector} is nullable; NOOP otherwise. |
| * |
| * @param firstValueIdx start index of the first value to copy |
| * @param numValues number of values to copy |
| * @param targetBuffer target buffer |
| * @param targetStartIdx target buffer start index |
| * |
| * @return number of bytes written |
| */ |
| private int copyBitsVector(int firstValueIdx, int numValues, DrillBuf targetBuffer, int targetStartIdx) { |
| int bytesCopied = 0; |
| |
| if (!isNullable()) { |
| return bytesCopied; |
| } |
| |
| DrillBuf srcBuffer = getBitsBuffer(); |
| assert srcBuffer != null; |
| |
| bytesCopied = numValues * BYTE_VALUE_WIDTH; |
| |
| // Now copy the bits data into the target buffer |
| srcBuffer.getBytes(firstValueIdx * BYTE_VALUE_WIDTH, |
| targetBuffer, |
| targetStartIdx, |
| bytesCopied); |
| |
| return bytesCopied; |
| } |
| |
| /** |
| * Copy the "offset" vector if the {@link ValueVector}; note that no adjustment of the offsets will be done. |
| * This task will be done during overflow data de-serialization. |
| * |
| * @param firstValueIdx start index of the first value to copy |
| * @param numValues number of values to copy |
| * @param targetBuffer target buffer |
| * @param targetStartIdx target buffer start index |
| * |
| * @return number of bytes written |
| */ |
| private int copyOffsetVector(int firstValueIdx, |
| int numValues, |
| DrillBuf targetBuffer, |
| int targetStartIdx) { |
| |
| final int bytesCopied = (numValues + 1) * INT_VALUE_WIDTH; |
| |
| DrillBuf srcBuffer = getOffsetsBuffer(); |
| assert srcBuffer != null; |
| |
| // Now copy the bits data into the target buffer |
| srcBuffer.getBytes(firstValueIdx * INT_VALUE_WIDTH, |
| targetBuffer, |
| targetStartIdx, |
| bytesCopied); |
| |
| return bytesCopied; |
| } |
| |
| /** |
| * Copy the "values" vector if the {@link ValueVector}. |
| * |
| * @param firstValueIdx start index of the first value to copy |
| * @param numValues number of values to copy |
| * @param targetBuffer target buffer |
| * @param targetStartIdx target buffer start index |
| * |
| * @return number of bytes written |
| */ |
| private int copyValuesVector(int firstValueIdx, |
| int numValues, |
| DrillBuf targetBuffer, |
| int targetStartIdx) { |
| |
| final DrillBuf offsets = getOffsetsBuffer(); |
| final int startDataOffset = offsets.getInt(firstValueIdx * INT_VALUE_WIDTH); |
| final int bytesCopied = getDataLen(firstValueIdx, numValues); |
| final DrillBuf srcBuffer = getValuesBuffer(); |
| assert srcBuffer != null; |
| |
| // Now copy the bits data into the target buffer |
| srcBuffer.getBytes(startDataOffset, |
| targetBuffer, |
| targetStartIdx, |
| bytesCopied); |
| |
| return bytesCopied; |
| } |
| |
| /** |
| * |
| * @param firstValueIdx start index of the first value to copy |
| * @param numValues number of values to copy |
| * |
| * @return data length contained in the range [first-val, (first-val+range-len)] |
| */ |
| private int getDataLen(int firstValueIdx, int numValues) { |
| // Data values for a range can be copied by the following formula: |
| // offsets[firstValueIdx+numValues] - offsets[firstValueIdx] |
| final DrillBuf offsets = getOffsetsBuffer(); |
| final int endDataOffset = offsets.getInt((firstValueIdx + numValues) * INT_VALUE_WIDTH); |
| final int startDataOffset = offsets.getInt(firstValueIdx * INT_VALUE_WIDTH); |
| |
| return endDataOffset - startDataOffset; |
| |
| } |
| |
| /** @return "bits" buffer; null if the associated {@link ValueVector} is not nullable */ |
| private DrillBuf getBitsBuffer() { |
| return buffers.length == 3 ? buffers[0] : null; |
| } |
| |
| /** @return "offsets" buffer */ |
| private DrillBuf getOffsetsBuffer() { |
| return buffers.length == 3 ? buffers[1] : buffers[0]; |
| } |
| |
| /** @return "values" buffer */ |
| private DrillBuf getValuesBuffer() { |
| return buffers.length == 3 ? buffers[2] : buffers[1]; |
| } |
| |
| /** @return whether this vector is nullable */ |
| private boolean isNullable() { |
| return getBitsBuffer() != null; |
| } |
| |
| } // End of VLVectorSerDe class |
| |
| } |