blob: d9fabbd319e4c27dfe3e91604a3cac2ebf177e85 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.sources.parquet;
import org.apache.flink.core.memory.MemoryUtils;
import org.apache.flink.table.dataformat.vector.BooleanColumnVector;
import org.apache.flink.table.dataformat.vector.ByteColumnVector;
import org.apache.flink.table.dataformat.vector.BytesColumnVector;
import org.apache.flink.table.dataformat.vector.DoubleColumnVector;
import org.apache.flink.table.dataformat.vector.FloatColumnVector;
import org.apache.flink.table.dataformat.vector.IntegerColumnVector;
import org.apache.flink.table.dataformat.vector.LongColumnVector;
import org.apache.parquet.column.values.ValuesReader;
import org.apache.parquet.io.api.Binary;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
/**
* An implementation of the Parquet PLAIN decoder that supports the vectorized interface.
*/
public class VectorizedPlainValuesReader extends ValuesReader implements VectorizedValuesReader {
private static final sun.misc.Unsafe UNSAFE = MemoryUtils.UNSAFE;
private static final int BYTE_ARRAY_OFFSET = UNSAFE.arrayBaseOffset(byte[].class);
private static final ByteOrder NATIVE_BYTE_ORDER = ByteOrder.nativeOrder();
private static final boolean BIG_ENDIAN_PLATFORM = NATIVE_BYTE_ORDER.equals(ByteOrder.BIG_ENDIAN);
private static final int DOUBLE_ARRAY_OFFSET = UNSAFE.arrayBaseOffset(double[].class);
private static final long UNSAFE_COPY_THRESHOLD = 1024L * 1024L;
private byte[] buffer;
private int offset;
private int bitOffset; // Only used for booleans.
private ByteBuffer byteBuffer; // used to wrap the byte array buffer
@Override
public void initFromPage(int valueCount, byte[] bytes, int offset) throws IOException {
this.buffer = bytes;
this.offset = offset + BYTE_ARRAY_OFFSET;
if (BIG_ENDIAN_PLATFORM) {
byteBuffer = ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN);
}
}
@Override
public void skip() {
throw new UnsupportedOperationException();
}
@Override
public final void readIntegers(int total, IntegerColumnVector c, int rowId) {
for (int i = 0; i < total; ++i, offset += 4) {
c.vector[i + rowId] = UNSAFE.getInt(buffer, (long) offset);
if (BIG_ENDIAN_PLATFORM) {
c.vector[i + rowId] = java.lang.Integer.reverseBytes(c.vector[i + rowId]);
}
}
}
@Override
public void readLongs(int total, LongColumnVector c, int rowId) {
for (int i = 0; i < total; ++i, offset += 8) {
c.vector[i + rowId] = UNSAFE.getLong(buffer, (long) offset);
if (BIG_ENDIAN_PLATFORM) {
c.vector[i + rowId] = java.lang.Long.reverseBytes(c.vector[i + rowId]);
}
}
}
@Override
public final long readLong() {
long v = UNSAFE.getLong(buffer, (long) offset);
if (BIG_ENDIAN_PLATFORM) {
v = java.lang.Long.reverseBytes(v);
}
offset += 8;
return v;
}
@Override
public void readFloats(int total, FloatColumnVector c, int rowId) {
if (!BIG_ENDIAN_PLATFORM) {
copyMemory(buffer, offset, c.vector,
DOUBLE_ARRAY_OFFSET + rowId * 4, total * 4);
} else {
ByteBuffer bb = ByteBuffer.wrap(buffer).order(ByteOrder.LITTLE_ENDIAN);
for (int i = 0; i < total; ++i) {
c.vector[i + rowId] = bb.getFloat(offset - BYTE_ARRAY_OFFSET + (4 * i));
}
}
offset += 4 * total;
}
@Override
public final float readFloat() {
float v;
if (!BIG_ENDIAN_PLATFORM) {
v = UNSAFE.getFloat(buffer, (long) offset);
} else {
v = byteBuffer.getFloat(offset - BYTE_ARRAY_OFFSET);
}
offset += 4;
return v;
}
@Override
public void readDoubles(int total, DoubleColumnVector c, int rowId) {
if (!BIG_ENDIAN_PLATFORM) {
copyMemory(buffer, offset, c.vector,
DOUBLE_ARRAY_OFFSET + rowId * 8, total * 8);
} else {
ByteBuffer bb = ByteBuffer.wrap(buffer).order(ByteOrder.LITTLE_ENDIAN);
for (int i = 0; i < total; ++i) {
c.vector[i + rowId] = bb.getDouble(offset - BYTE_ARRAY_OFFSET + (8 * i));
}
}
offset += 8 * total;
}
@Override
public byte readByte() {
return (byte) readInteger();
}
@Override
public final int readInteger() {
int v = UNSAFE.getInt(buffer, (long) offset);
if (BIG_ENDIAN_PLATFORM) {
v = java.lang.Integer.reverseBytes(v);
}
offset += 4;
return v;
}
@Override
public final double readDouble() {
double v;
if (!BIG_ENDIAN_PLATFORM) {
v = UNSAFE.getDouble(buffer, (long) offset);
} else {
v = byteBuffer.getDouble(offset - BYTE_ARRAY_OFFSET);
}
offset += 8;
return v;
}
@Override
public void readBooleans(int total, BooleanColumnVector c, int rowId) {
for (int i = 0; i < total; i++) {
c.vector[rowId + i] = readBoolean();
}
}
@Override
public final boolean readBoolean() {
byte b = UNSAFE.getByte(buffer, (long) offset);
boolean v = (b & (1 << bitOffset)) != 0;
bitOffset += 1;
if (bitOffset == 8) {
bitOffset = 0;
offset++;
}
return v;
}
@Override
public final void readBinaries(int total, BytesColumnVector v, int rowId) {
for (int i = 0; i < total; i++) {
int len = readInteger();
v.setVal(rowId + i, buffer, offset - BYTE_ARRAY_OFFSET, len);
offset += len;
}
}
@Override
public final Binary readBinary(int len) {
Binary result = Binary.fromConstantByteArray(buffer, offset - BYTE_ARRAY_OFFSET, len);
offset += len;
return result;
}
public final void readBytes(int total, ByteColumnVector c, int rowId) {
for (int i = 0; i < total; i++) {
// Bytes are stored as a 4-byte little endian int. Just read the first byte.
c.vector[rowId + i] = UNSAFE.getByte(buffer, (long) offset);
offset += 4;
}
}
private void copyMemory(Object src, long srcOffset, Object dst, long dstOffset, long length) {
while (length > 0) {
long size = Math.min(length, UNSAFE_COPY_THRESHOLD);
UNSAFE.copyMemory(src, srcOffset, dst, dstOffset, size);
length -= size;
srcOffset += size;
dstOffset += size;
}
}
}