blob: 8428fedfff6127dbafddb38da55551ed9f25094b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.sources.parquet;
import org.apache.parquet.Preconditions;
import org.apache.parquet.bytes.BytesUtils;
import org.apache.parquet.column.values.ValuesReader;
import org.apache.parquet.column.values.bitpacking.BytePacker;
import org.apache.parquet.column.values.bitpacking.Packer;
import org.apache.parquet.io.ParquetDecodingException;
/**
* Base class for {@link VectorizedRleValuesReader} and {@link VectorizedDefValuesReader}.
* This is based off of the version in parquet-mr with these changes:
* - Supports the vectorized interface.
* - Works on byte arrays(byte[]) instead of making byte streams.
*
*<p>This encoding is used in multiple places:
* - Definition/Repetition levels
* - Dictionary ids.
*/
public abstract class VectorizedRleValuesReaderBase extends ValuesReader {
// Current decoding mode. The encoded data contains groups of either run length encoded data
// (RLE) or bit packed data. Each group contains a header that indicates which group it is and
// the number of values in the group.
// More details here: https://github.com/Parquet/parquet-format/blob/master/Encodings.md
/**
* Encode mode.
*/
public enum MODE {
RLE,
PACKED
}
// Encoded data.
private byte[] in;
private int end;
private int offset;
// bit/byte width of decoded data and utility to batch unpack them.
private int bitWidth;
private int bytesWidth;
private BytePacker packer;
// Current decoding mode and values
protected MODE mode;
int currentCount;
int currentValue;
// Buffer of decoded values if the values are PACKED.
int[] currentBuffer = new int[16];
int currentBufferIdx = 0;
// If true, the bit width is fixed. This decoder is used in different places and this also
// controls if we need to read the bitwidth from the beginning of the data stream.
private boolean fixedWidth;
VectorizedRleValuesReaderBase() {
fixedWidth = false;
}
VectorizedRleValuesReaderBase(int bitWidth) {
fixedWidth = true;
init(bitWidth);
}
@Override
public void initFromPage(int valueCount, byte[] page, int start) {
this.offset = start;
this.in = page;
if (fixedWidth) {
if (bitWidth != 0) {
int length = readIntLittleEndian();
this.end = this.offset + length;
}
} else {
this.end = page.length;
if (this.end != this.offset) {
init(page[this.offset++] & 255);
}
}
if (bitWidth == 0) {
// 0 bit width, treat this as an RLE run of valueCount number of 0's.
this.mode = MODE.RLE;
this.currentCount = valueCount;
this.currentValue = 0;
} else {
this.currentCount = 0;
}
}
// Initialize the reader from a buffer. This is used for the V2 page encoding where the
// definition are in its own buffer.
public void initFromBuffer(int valueCount, byte[] data) {
this.offset = 0;
this.in = data;
this.end = data.length;
if (bitWidth == 0) {
// 0 bit width, treat this as an RLE run of valueCount number of 0's.
this.mode = MODE.RLE;
this.currentCount = valueCount;
this.currentValue = 0;
} else {
this.currentCount = 0;
}
}
/**
* Initializes the internal state for decoding ints of `bitWidth`.
*/
protected void init(int bitWidth) {
Preconditions.checkArgument(bitWidth >= 0 && bitWidth <= 32, "bitWidth must be >= 0 and <= 32");
this.bitWidth = bitWidth;
this.bytesWidth = BytesUtils.paddedByteCountFromBits(bitWidth);
this.packer = Packer.LITTLE_ENDIAN.newBytePacker(bitWidth);
}
@Override
public int getNextOffset() {
return this.end;
}
@Override
public void skip() {
this.readInteger();
}
@Override
public int readInteger() {
if (this.currentCount == 0) {
this.readNextGroup();
}
this.currentCount--;
switch (mode) {
case RLE:
return this.currentValue;
case PACKED:
return this.currentBuffer[currentBufferIdx++];
}
throw new RuntimeException("Unreachable");
}
/**
* Reads the next 4 byte little endian int.
*/
private int readIntLittleEndian() {
int ch4 = in[offset] & 255;
int ch3 = in[offset + 1] & 255;
int ch2 = in[offset + 2] & 255;
int ch1 = in[offset + 3] & 255;
offset += 4;
return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4));
}
/**
* Reads the next byteWidth little endian int.
*/
private int readIntLittleEndianPaddedOnBitWidth() {
switch (bytesWidth) {
case 0:
return 0;
case 1:
return in[offset++] & 255;
case 2: {
int ch2 = in[offset] & 255;
int ch1 = in[offset + 1] & 255;
offset += 2;
return (ch1 << 8) + ch2;
}
case 3: {
int ch3 = in[offset] & 255;
int ch2 = in[offset + 1] & 255;
int ch1 = in[offset + 2] & 255;
offset += 3;
return (ch1 << 16) + (ch2 << 8) + (ch3);
}
case 4: {
return readIntLittleEndian();
}
}
throw new RuntimeException("Unreachable");
}
private int ceil8(int value) {
return (value + 7) / 8;
}
/**
* Reads the next varint encoded int.
*/
private int readUnsignedVarInt() {
int value = 0;
int shift = 0;
int b;
do {
b = in[offset++] & 255;
value |= (b & 0x7F) << shift;
shift += 7;
} while ((b & 0x80) != 0);
return value;
}
/**
* Reads the next group.
*/
void readNextGroup() {
int header = readUnsignedVarInt();
this.mode = (header & 1) == 0 ? MODE.RLE : MODE.PACKED;
switch (mode) {
case RLE:
this.currentCount = header >>> 1;
this.currentValue = readIntLittleEndianPaddedOnBitWidth();
return;
case PACKED:
int numGroups = header >>> 1;
this.currentCount = numGroups * 8;
int bytesToRead = ceil8(this.currentCount * this.bitWidth);
if (this.currentBuffer.length < this.currentCount) {
this.currentBuffer = new int[this.currentCount];
}
currentBufferIdx = 0;
int valueIndex = 0;
for (int byteIndex = offset; valueIndex < this.currentCount; byteIndex += this.bitWidth) {
this.packer.unpack8Values(in, byteIndex, this.currentBuffer, valueIndex);
valueIndex += 8;
}
offset += bytesToRead;
return;
default:
throw new ParquetDecodingException("not a valid mode " + this.mode);
}
}
}