blob: 7d23d7c7bc635250381f1ab538a333c0eb6ea09c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql;
import java.lang.reflect.Field;
import java.math.BigInteger;
import org.apache.carbondata.core.scan.result.vector.CarbonDictionary;
import org.apache.carbondata.core.scan.scanner.LazyPageLoader;
import org.apache.parquet.column.Encoding;
import org.apache.spark.memory.MemoryMode;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.execution.vectorized.ColumnVector;
import org.apache.spark.sql.execution.vectorized.ColumnarBatch;
import org.apache.spark.sql.types.CalendarIntervalType;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.Decimal;
import org.apache.spark.sql.types.DecimalType;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.unsafe.types.CalendarInterval;
import org.apache.spark.unsafe.types.UTF8String;
/**
* Adapter class which handles the columnar vector reading of the carbondata
* based on the spark ColumnVector and ColumnarBatch API. This proxy class
* handles the complexity of spark 2.3 version related api changes since
* spark ColumnVector and ColumnarBatch interfaces are still evolving.
*/
public class CarbonVectorProxy {
private ColumnarBatch columnarBatch;
private ColumnVectorProxy[] columnVectorProxies;
private void updateColumnVectors() {
try {
Field field = columnarBatch.getClass().getDeclaredField("columns");
field.setAccessible(true);
field.set(columnarBatch, columnVectorProxies);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* Adapter class which handles the columnar vector reading of the carbondata
* based on the spark ColumnVector and ColumnarBatch API. This proxy class
* handles the complexity of spark 2.3 version related api changes since
* spark ColumnVector and ColumnarBatch interfaces are still evolving.
*
* @param memMode which represent the type onheap or offheap vector.
* @param outputSchema, metadata related to current schema of table.
* @param rowNum rows number for vector reading
* @param useLazyLoad Whether to use lazy load while getting the data.
*/
public CarbonVectorProxy(MemoryMode memMode, StructType outputSchema, int rowNum,
boolean useLazyLoad) {
columnarBatch = ColumnarBatch.allocate(outputSchema, memMode, rowNum);
columnVectorProxies = new ColumnVectorProxy[columnarBatch.numCols()];
for (int i = 0; i < columnVectorProxies.length; i++) {
if (useLazyLoad) {
columnVectorProxies[i] =
new ColumnVectorProxyWithLazyLoad(columnarBatch.column(i), rowNum, memMode);
} else {
columnVectorProxies[i] = new ColumnVectorProxy(columnarBatch.column(i), rowNum, memMode);
}
}
updateColumnVectors();
}
public ColumnVectorProxy getColumnVector(int ordinal) {
return columnVectorProxies[ordinal];
}
/**
* Returns the number of rows for read, including filtered rows.
*/
public int numRows() {
return columnarBatch.capacity();
}
/**
* This API will return a columnvector from a batch of column vector rows
* based on the ordinal
*
* @param ordinal
* @return
*/
public ColumnVector column(int ordinal) {
return columnarBatch.column(ordinal);
}
/**
* Resets this column for writing. The currently stored values are no longer accessible.
*/
public void reset() {
for (int i = 0; i < columnarBatch.numCols(); i++) {
((ColumnVectorProxy) columnarBatch.column(i)).reset();
}
}
public void resetDictionaryIds(int ordinal) {
(((ColumnVectorProxy) columnarBatch.column(ordinal)).getVector()).getDictionaryIds().reset();
}
/**
* Returns the row in this batch at `rowId`. Returned row is reused across calls.
*/
public InternalRow getRow(int rowId) {
return columnarBatch.getRow(rowId);
}
/**
* Returns the row in this batch at `rowId`. Returned row is reused across calls.
*/
public Object getColumnarBatch() {
return columnarBatch;
}
/**
* Called to close all the columns in this batch. It is not valid to access the data after
* calling this. This must be called at the end to clean up memory allocations.
*/
public void close() {
columnarBatch.close();
}
/**
* Sets the number of rows in this batch.
*/
public void setNumRows(int numRows) {
columnarBatch.setNumRows(numRows);
}
public DataType dataType(int ordinal) {
return columnarBatch.column(ordinal).dataType();
}
public static class ColumnVectorProxy extends ColumnVector {
private ColumnVector vector;
public ColumnVectorProxy(ColumnVector columnVector, int capacity, MemoryMode mode) {
super(capacity, columnVector.dataType(), mode);
try {
Field childColumns =
columnVector.getClass().getSuperclass().getDeclaredField("childColumns");
childColumns.setAccessible(true);
Object o = childColumns.get(columnVector);
childColumns.set(this, o);
Field resultArray =
columnVector.getClass().getSuperclass().getDeclaredField("resultArray");
resultArray.setAccessible(true);
Object o1 = resultArray.get(columnVector);
resultArray.set(this, o1);
} catch (Exception e) {
throw new RuntimeException(e);
}
vector = columnVector;
}
public void putRowToColumnBatch(int rowId, Object value) {
org.apache.spark.sql.types.DataType t = dataType();
if (null == value) {
putNull(rowId);
} else {
if (t == org.apache.spark.sql.types.DataTypes.BooleanType) {
putBoolean(rowId, (boolean) value);
} else if (t == org.apache.spark.sql.types.DataTypes.ByteType) {
putByte(rowId, (byte) value);
} else if (t == org.apache.spark.sql.types.DataTypes.ShortType) {
putShort(rowId, (short) value);
} else if (t == org.apache.spark.sql.types.DataTypes.IntegerType) {
putInt(rowId, (int) value);
} else if (t == org.apache.spark.sql.types.DataTypes.LongType) {
putLong(rowId, (long) value);
} else if (t == org.apache.spark.sql.types.DataTypes.FloatType) {
putFloat(rowId, (float) value);
} else if (t == org.apache.spark.sql.types.DataTypes.DoubleType) {
putDouble(rowId, (double) value);
} else if (t == org.apache.spark.sql.types.DataTypes.StringType) {
UTF8String v = (UTF8String) value;
putByteArray(rowId, v.getBytes());
} else if (t instanceof org.apache.spark.sql.types.DecimalType) {
DecimalType dt = (DecimalType) t;
Decimal d = Decimal.fromDecimal(value);
if (dt.precision() <= Decimal.MAX_INT_DIGITS()) {
putInt(rowId, (int) d.toUnscaledLong());
} else if (dt.precision() <= Decimal.MAX_LONG_DIGITS()) {
putLong(rowId, d.toUnscaledLong());
} else {
final BigInteger integer = d.toJavaBigDecimal().unscaledValue();
byte[] bytes = integer.toByteArray();
putByteArray(rowId, bytes, 0, bytes.length);
}
} else if (t instanceof CalendarIntervalType) {
CalendarInterval c = (CalendarInterval) value;
vector.getChildColumn(0).putInt(rowId, c.months);
vector.getChildColumn(1).putLong(rowId, c.microseconds);
} else if (t instanceof org.apache.spark.sql.types.DateType) {
putInt(rowId, (int) value);
} else if (t instanceof org.apache.spark.sql.types.TimestampType) {
putLong(rowId, (long) value);
}
}
}
public void putBoolean(int rowId, boolean value) {
vector.putBoolean(rowId, value);
}
public void putByte(int rowId, byte value) {
vector.putByte(rowId, value);
}
public void putBytes(int rowId, int count, byte[] src, int srcIndex) {
vector.putBytes(rowId, count, src, srcIndex);
}
public void putShort(int rowId, short value) {
vector.putShort(rowId, value);
}
public void putInt(int rowId, int value) {
vector.putInt(rowId, value);
}
public void putFloat(int rowId, float value) {
vector.putFloat(rowId, value);
}
public void putFloats(int rowId, int count, float[] src, int srcIndex) {
vector.putFloats(rowId, count, src, srcIndex);
}
public void putLong(int rowId, long value) {
vector.putLong(rowId, value);
}
public void putDouble(int rowId, double value) {
vector.putDouble(rowId, value);
}
public void putInts(int rowId, int count, int value) {
vector.putInts(rowId, count, value);
}
public void putInts(int rowId, int count, int[] src, int srcIndex) {
vector.putInts(rowId, count, src, srcIndex);
}
public void putShorts(int rowId, int count, short value) {
vector.putShorts(rowId, count, value);
}
public void putShorts(int rowId, int count, short[] src, int srcIndex) {
vector.putShorts(rowId, count, src, srcIndex);
}
public void putLongs(int rowId, int count, long value) {
vector.putLongs(rowId, count, value);
}
public void putLongs(int rowId, int count, long[] src, int srcIndex) {
vector.putLongs(rowId, count, src, srcIndex);
}
public void putDoubles(int rowId, int count, double value) {
vector.putDoubles(rowId, count, value);
}
public void putDoubles(int rowId, int count, double[] src, int srcIndex) {
vector.putDoubles(rowId, count, src, srcIndex);
}
public DataType dataType(int ordinal) {
return vector.dataType();
}
public void putNotNull(int rowId) {
vector.putNotNull(rowId);
}
public void putNotNulls(int rowId, int count) {
vector.putNotNulls(rowId, count);
}
public void putDictionaryInt(int rowId, int value) {
vector.getDictionaryIds().putInt(rowId, value);
}
public void setDictionary(CarbonDictionary dictionary) {
if (null != dictionary) {
CarbonDictionaryWrapper dictionaryWrapper =
new CarbonDictionaryWrapper(Encoding.PLAIN, dictionary);
vector.setDictionary(dictionaryWrapper);
this.dictionary = dictionaryWrapper;
} else {
this.dictionary = null;
vector.setDictionary(null);
}
}
public void putNull(int rowId) {
vector.putNull(rowId);
}
public void putNulls(int rowId, int count) {
vector.putNulls(rowId, count);
}
public boolean hasDictionary() {
return vector.hasDictionary();
}
public ColumnVector reserveDictionaryIds(int capacity) {
this.dictionaryIds = vector.reserveDictionaryIds(capacity);
return dictionaryIds;
}
@Override
public boolean isNullAt(int i) {
return vector.isNullAt(i);
}
@Override
public boolean getBoolean(int i) {
return vector.getBoolean(i);
}
@Override
public byte getByte(int i) {
return vector.getByte(i);
}
@Override
public short getShort(int i) {
return vector.getShort(i);
}
@Override
public int getInt(int i) {
return vector.getInt(i);
}
@Override
public long getLong(int i) {
return vector.getLong(i);
}
@Override
public float getFloat(int i) {
return vector.getFloat(i);
}
@Override
public double getDouble(int i) {
return vector.getDouble(i);
}
@Override
protected void reserveInternal(int capacity) {
}
@Override
public void reserve(int requiredCapacity) {
vector.reserve(requiredCapacity);
}
@Override
public long nullsNativeAddress() {
return vector.nullsNativeAddress();
}
@Override
public long valuesNativeAddress() {
return vector.valuesNativeAddress();
}
@Override
public void putBooleans(int rowId, int count, boolean value) {
vector.putBooleans(rowId, count, value);
}
@Override
public void putBytes(int rowId, int count, byte value) {
vector.putBytes(rowId, count, value);
}
@Override
public void putIntsLittleEndian(int rowId, int count, byte[] src, int srcIndex) {
vector.putIntsLittleEndian(rowId, count, src, srcIndex);
}
@Override
public int getDictId(int rowId) {
return vector.getDictId(rowId);
}
@Override
public void putLongsLittleEndian(int rowId, int count, byte[] src, int srcIndex) {
vector.putLongsLittleEndian(rowId, count, src, srcIndex);
}
@Override
public void putFloats(int rowId, int count, float value) {
vector.putFloats(rowId, count, value);
}
@Override
public void putFloats(int rowId, int count, byte[] src, int srcIndex) {
vector.putFloats(rowId, count, src, srcIndex);
}
@Override
public void putDoubles(int rowId, int count, byte[] src, int srcIndex) {
vector.putDoubles(rowId, count, src, srcIndex);
}
@Override
public void putArray(int rowId, int offset, int length) {
vector.putArray(rowId, offset, length);
}
@Override
public int getArrayLength(int rowId) {
return vector.getArrayLength(rowId);
}
@Override
public int getArrayOffset(int rowId) {
return vector.getArrayOffset(rowId);
}
@Override
public void loadBytes(Array array) {
vector.loadBytes(array);
}
@Override
public int putByteArray(int rowId, byte[] value, int offset, int count) {
return vector.putByteArray(rowId, value, offset, count);
}
/**
* It keeps all binary data of all rows to it.
* Should use along with @{putArray(int rowId, int offset, int length)} to keep lengths
* and offset.
*/
public void putAllByteArray(byte[] data, int offset, int length) {
vector.arrayData().appendBytes(length, data, offset);
}
@Override
public void close() {
vector.close();
}
public void reset() {
if (isConstant) {
return;
}
vector.reset();
}
public void setLazyPage(LazyPageLoader lazyPage) {
lazyPage.loadPage();
}
public ColumnVector getVector() {
return vector;
}
}
public static class ColumnVectorProxyWithLazyLoad extends ColumnVectorProxy {
private ColumnVector vector;
private LazyPageLoader pageLoad;
private boolean isLoaded;
public ColumnVectorProxyWithLazyLoad(ColumnVector columnVector, int capacity, MemoryMode mode) {
super(columnVector, capacity, mode);
vector = columnVector;
}
@Override
public boolean isNullAt(int i) {
checkPageLoaded();
return vector.isNullAt(i);
}
@Override
public boolean getBoolean(int i) {
checkPageLoaded();
return vector.getBoolean(i);
}
@Override
public byte getByte(int i) {
checkPageLoaded();
return vector.getByte(i);
}
@Override
public short getShort(int i) {
checkPageLoaded();
return vector.getShort(i);
}
@Override
public int getInt(int i) {
checkPageLoaded();
return vector.getInt(i);
}
@Override
public long getLong(int i) {
checkPageLoaded();
return vector.getLong(i);
}
@Override
public float getFloat(int i) {
checkPageLoaded();
return vector.getFloat(i);
}
@Override
public double getDouble(int i) {
checkPageLoaded();
return vector.getDouble(i);
}
@Override
public int getArrayLength(int rowId) {
checkPageLoaded();
return vector.getArrayLength(rowId);
}
@Override
public int getArrayOffset(int rowId) {
checkPageLoaded();
return vector.getArrayOffset(rowId);
}
private void checkPageLoaded() {
if (!isLoaded) {
if (pageLoad != null) {
pageLoad.loadPage();
}
isLoaded = true;
}
}
public void reset() {
if (isConstant) {
return;
}
isLoaded = false;
vector.reset();
}
public void setLazyPage(LazyPageLoader lazyPage) {
this.pageLoad = lazyPage;
}
}
}