blob: ca1ea954f2577fdcd34b414c32d0c1203098e7db [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql;
import java.math.BigInteger;
import org.apache.carbondata.core.scan.result.vector.CarbonDictionary;
import org.apache.carbondata.core.scan.scanner.LazyPageLoader;
import org.apache.spark.memory.MemoryMode;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
import org.apache.spark.sql.types.*;
import org.apache.spark.sql.vectorized.ColumnVector;
import org.apache.spark.sql.vectorized.ColumnarArray;
import org.apache.spark.sql.vectorized.ColumnarBatch;
import org.apache.spark.sql.vectorized.ColumnarMap;
import org.apache.spark.unsafe.types.CalendarInterval;
import org.apache.spark.unsafe.types.UTF8String;
/**
* Adapter class which handles the columnar vector reading of the carbondata
* based on the spark ColumnVector and ColumnarBatch API. This proxy class
* handles the complexity of spark 2.3 version related api changes since
* spark ColumnVector and ColumnarBatch interfaces are still evolving.
*/
public class CarbonVectorProxy {
private ColumnarBatch columnarBatch;
private ColumnVectorProxy[] columnVectorProxies;
/**
* Adapter class which handles the columnar vector reading of the carbondata
* based on the spark ColumnVector and ColumnarBatch API. This proxy class
* handles the complexity of spark 2.3 version related api changes since
* spark ColumnVector and ColumnarBatch interfaces are still evolving.
*
* @param memMode which represent the type onheap or offheap vector.
* @param outputSchema, metadata related to current schema of table.
* @param rowNum rows number for vector reading
* @param useLazyLoad Whether to use lazy load while getting the data.
*/
public CarbonVectorProxy(MemoryMode memMode, StructType outputSchema, int rowNum,
boolean useLazyLoad) {
WritableColumnVector[] columnVectors = ColumnVectorFactory
.getColumnVector(memMode, outputSchema, rowNum);
columnVectorProxies = new ColumnVectorProxy[columnVectors.length];
for (int i = 0; i < columnVectorProxies.length; i++) {
if (useLazyLoad) {
columnVectorProxies[i] = new ColumnVectorProxyWithLazyLoad(columnVectors[i]);
} else {
columnVectorProxies[i] = new ColumnVectorProxy(columnVectors[i]);
}
}
columnarBatch = new ColumnarBatch(columnVectorProxies);
columnarBatch.setNumRows(rowNum);
}
/**
* Returns the number of rows for read, including filtered rows.
*/
public int numRows() {
return columnarBatch.numRows();
}
/**
* This API will return a columnvector from a batch of column vector rows
* based on the ordinal
*
* @param ordinal
* @return
*/
public WritableColumnVector column(int ordinal) {
return ((ColumnVectorProxy) columnarBatch.column(ordinal)).getVector();
}
public ColumnVectorProxy getColumnVector(int ordinal) {
return columnVectorProxies[ordinal];
}
/**
* Resets this column for writing. The currently stored values are no longer accessible.
*/
public void reset() {
for (int i = 0; i < columnarBatch.numCols(); i++) {
((ColumnVectorProxy) columnarBatch.column(i)).reset();
}
}
public void resetDictionaryIds(int ordinal) {
(((ColumnVectorProxy) columnarBatch.column(ordinal)).getVector()).getDictionaryIds().reset();
}
/**
* Returns the row in this batch at `rowId`. Returned row is reused across calls.
*/
public InternalRow getRow(int rowId) {
return columnarBatch.getRow(rowId);
}
/**
* Returns the row in this batch at `rowId`. Returned row is reused across calls.
*/
public Object getColumnarBatch() {
return columnarBatch;
}
/**
* Called to close all the columns in this batch. It is not valid to access the data after
* calling this. This must be called at the end to clean up memory allocations.
*/
public void close() {
columnarBatch.close();
}
/**
* Sets the number of rows in this batch.
*/
public void setNumRows(int numRows) {
columnarBatch.setNumRows(numRows);
}
public DataType dataType(int ordinal) {
return columnarBatch.column(ordinal).dataType();
}
public static class ColumnVectorProxy extends ColumnVector {
private WritableColumnVector vector;
public ColumnVectorProxy(ColumnVector columnVector) {
super(columnVector.dataType());
vector = (WritableColumnVector) columnVector;
}
public void putRowToColumnBatch(int rowId, Object value) {
org.apache.spark.sql.types.DataType t = vector.dataType();
if (null == value) {
putNull(rowId);
} else {
if (t == org.apache.spark.sql.types.DataTypes.BooleanType) {
putBoolean(rowId, (boolean) value);
} else if (t == org.apache.spark.sql.types.DataTypes.ByteType) {
putByte(rowId, (byte) value);
} else if (t == org.apache.spark.sql.types.DataTypes.ShortType) {
putShort(rowId, (short) value);
} else if (t == org.apache.spark.sql.types.DataTypes.IntegerType) {
putInt(rowId, (int) value);
} else if (t == org.apache.spark.sql.types.DataTypes.LongType) {
putLong(rowId, (long) value);
} else if (t == org.apache.spark.sql.types.DataTypes.FloatType) {
putFloat(rowId, (float) value);
} else if (t == org.apache.spark.sql.types.DataTypes.DoubleType) {
putDouble(rowId, (double) value);
} else if (t == org.apache.spark.sql.types.DataTypes.StringType) {
UTF8String v = (UTF8String) value;
putByteArray(rowId, v.getBytes());
} else if (t instanceof org.apache.spark.sql.types.DecimalType) {
DecimalType dt = (DecimalType) t;
Decimal d = Decimal.fromDecimal(value);
if (dt.precision() <= Decimal.MAX_INT_DIGITS()) {
putInt(rowId, (int) d.toUnscaledLong());
} else if (dt.precision() <= Decimal.MAX_LONG_DIGITS()) {
putLong(rowId, d.toUnscaledLong());
} else {
final BigInteger integer = d.toJavaBigDecimal().unscaledValue();
byte[] bytes = integer.toByteArray();
putByteArray(rowId, bytes, 0, bytes.length);
}
} else if (t instanceof CalendarIntervalType) {
CalendarInterval c = (CalendarInterval) value;
vector.getChild(0).putInt(rowId, c.months);
vector.getChild(1).putLong(rowId, c.microseconds);
} else if (t instanceof org.apache.spark.sql.types.DateType) {
putInt(rowId, (int) value);
} else if (t instanceof org.apache.spark.sql.types.TimestampType) {
putLong(rowId, (long) value);
}
}
}
public void putBoolean(int rowId, boolean value) {
vector.putBoolean(rowId, value);
}
public void putByte(int rowId, byte value) {
vector.putByte(rowId, value);
}
public void putBytes(int rowId, int count, byte[] src, int srcIndex) {
vector.putBytes(rowId, count, src, srcIndex);
}
public void putShort(int rowId, short value) {
vector.putShort(rowId, value);
}
public void putInt(int rowId, int value) {
vector.putInt(rowId, value);
}
public void putFloat(int rowId, float value) {
vector.putFloat(rowId, value);
}
public void putFloats(int rowId, int count, float[] src, int srcIndex) {
vector.putFloats(rowId, count, src, srcIndex);
}
public void putLong(int rowId, long value) {
vector.putLong(rowId, value);
}
public void putDouble(int rowId, double value) {
vector.putDouble(rowId, value);
}
public void putByteArray(int rowId, byte[] value) {
vector.putByteArray(rowId, value);
}
public void putInts(int rowId, int count, int value) {
vector.putInts(rowId, count, value);
}
public void putInts(int rowId, int count, int[] src, int srcIndex) {
vector.putInts(rowId, count, src, srcIndex);
}
public void putShorts(int rowId, int count, short value) {
vector.putShorts(rowId, count, value);
}
public void putShorts(int rowId, int count, short[] src, int srcIndex) {
vector.putShorts(rowId, count, src, srcIndex);
}
public void putLongs(int rowId, int count, long value) {
vector.putLongs(rowId, count, value);
}
public void putLongs(int rowId, int count, long[] src, int srcIndex) {
vector.putLongs(rowId, count, src, srcIndex);
}
public void putDecimal(int rowId, Decimal value, int precision) {
vector.putDecimal(rowId, value, precision);
}
public void putDoubles(int rowId, int count, double value) {
vector.putDoubles(rowId, count, value);
}
public void putDoubles(int rowId, int count, double[] src, int srcIndex) {
vector.putDoubles(rowId, count, src, srcIndex);
}
public void putByteArray(int rowId, byte[] value, int offset, int length) {
vector.putByteArray(rowId, value, offset, length);
}
public void putNotNull(int rowId) {
vector.putNotNull(rowId);
}
public void putNotNulls(int rowId, int count) {
vector.putNotNulls(rowId, count);
}
public void putDictionaryInt(int rowId, int value) {
vector.getDictionaryIds().putInt(rowId, value);
}
public void setDictionary(CarbonDictionary dictionary) {
if (null != dictionary) {
vector.setDictionary(new CarbonDictionaryWrapper(dictionary));
} else {
vector.setDictionary(null);
}
}
public void putNull(int rowId) {
vector.putNull(rowId);
}
public void putNulls(int rowId, int count) {
vector.putNulls(rowId, count);
}
public boolean hasDictionary() {
return vector.hasDictionary();
}
public Object reserveDictionaryIds(int capacity) {
return vector.reserveDictionaryIds(capacity);
}
@Override
public boolean isNullAt(int i) {
return vector.isNullAt(i);
}
@Override
public boolean getBoolean(int i) {
return vector.getBoolean(i);
}
@Override
public byte getByte(int i) {
return vector.getByte(i);
}
@Override
public short getShort(int i) {
return vector.getShort(i);
}
@Override
public int getInt(int i) {
return vector.getInt(i);
}
@Override
public long getLong(int i) {
return vector.getLong(i);
}
@Override
public float getFloat(int i) {
return vector.getFloat(i);
}
@Override
public double getDouble(int i) {
return vector.getDouble(i);
}
@Override
public void close() {
vector.close();
}
@Override
public boolean hasNull() {
return vector.hasNull();
}
@Override
public int numNulls() {
return vector.numNulls();
}
@Override
public ColumnarArray getArray(int i) {
return vector.getArray(i);
}
@Override
public ColumnarMap getMap(int i) {
return vector.getMap(i);
}
@Override
public Decimal getDecimal(int i, int i1, int i2) {
return vector.getDecimal(i, i1, i2);
}
@Override
public UTF8String getUTF8String(int i) {
return vector.getUTF8String(i);
}
@Override
public byte[] getBinary(int i) {
return vector.getBinary(i);
}
@Override
protected ColumnVector getChild(int i) {
return vector.getChild(i);
}
public void reset() {
vector.reset();
}
public void setLazyPage(LazyPageLoader lazyPage) {
lazyPage.loadPage();
}
/**
* It keeps all binary data of all rows to it.
* Should use along with @{putArray(int rowId, int offset, int length)} to keep lengths
* and offset.
*/
public void putAllByteArray(byte[] data, int offset, int length) {
vector.arrayData().appendBytes(length, data, offset);
}
public void putArray(int rowId, int offset, int length) {
vector.putArray(rowId, offset, length);
}
public WritableColumnVector getVector() {
return vector;
}
}
public static class ColumnVectorProxyWithLazyLoad extends ColumnVectorProxy {
private WritableColumnVector vector;
private LazyPageLoader pageLoad;
private boolean isLoaded;
public ColumnVectorProxyWithLazyLoad(ColumnVector columnVector) {
super(columnVector);
vector = (WritableColumnVector) columnVector;
}
@Override
public boolean isNullAt(int i) {
checkPageLoaded();
return vector.isNullAt(i);
}
@Override
public boolean getBoolean(int i) {
checkPageLoaded();
return vector.getBoolean(i);
}
@Override
public byte getByte(int i) {
checkPageLoaded();
return vector.getByte(i);
}
@Override
public short getShort(int i) {
checkPageLoaded();
return vector.getShort(i);
}
@Override
public int getInt(int i) {
checkPageLoaded();
return vector.getInt(i);
}
@Override
public long getLong(int i) {
checkPageLoaded();
return vector.getLong(i);
}
@Override
public float getFloat(int i) {
checkPageLoaded();
return vector.getFloat(i);
}
@Override
public double getDouble(int i) {
checkPageLoaded();
return vector.getDouble(i);
}
@Override
public boolean hasNull() {
checkPageLoaded();
return vector.hasNull();
}
@Override
public int numNulls() {
checkPageLoaded();
return vector.numNulls();
}
@Override
public ColumnarArray getArray(int i) {
checkPageLoaded();
return vector.getArray(i);
}
@Override
public ColumnarMap getMap(int i) {
checkPageLoaded();
return vector.getMap(i);
}
@Override
public Decimal getDecimal(int i, int i1, int i2) {
checkPageLoaded();
return vector.getDecimal(i, i1, i2);
}
@Override
public UTF8String getUTF8String(int i) {
checkPageLoaded();
return vector.getUTF8String(i);
}
@Override
public byte[] getBinary(int i) {
checkPageLoaded();
return vector.getBinary(i);
}
@Override
protected ColumnVector getChild(int i) {
checkPageLoaded();
return vector.getChild(i);
}
public void reset() {
isLoaded = false;
pageLoad = null;
vector.reset();
}
private void checkPageLoaded() {
if (!isLoaded) {
if (pageLoad != null) {
pageLoad.loadPage();
}
isLoaded = true;
}
}
public void setLazyPage(LazyPageLoader lazyPage) {
this.pageLoad = lazyPage;
}
}
}