blob: a0cd3bd893538f014617050fc2656ed5e6d85d86 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.processing.loading.sort;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.Serializable;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.memory.CarbonUnsafe;
import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.util.CarbonUnsafeUtil;
import org.apache.carbondata.core.util.DataTypeUtil;
import org.apache.carbondata.core.util.NonDictionaryUtil;
import org.apache.carbondata.core.util.ReUsableByteArrayDataOutputStream;
import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow;
import org.apache.carbondata.processing.sort.SortTempRowUpdater;
import org.apache.carbondata.processing.sort.sortdata.SortParameters;
import org.apache.carbondata.processing.sort.sortdata.TableFieldStat;
/**
* This class is used to convert/write/read row in sort step in carbondata.
* It consists the following function:
* 1. convert raw row & intermediate sort temp row to 3-parted row
* 2. read/write intermediate sort temp row to sort temp file & unsafe memory
* 3. write raw row directly to sort temp file & unsafe memory as intermediate sort temp row
*/
public class SortStepRowHandler implements Serializable {
private static final long serialVersionUID = 1L;
private int dictSortDimCnt = 0;
private int dictNoSortDimCnt = 0;
private int noDictSortDimCnt = 0;
private int noDictNoSortDimCnt = 0;
private int varcharDimCnt = 0;
private int complexDimCnt = 0;
private int measureCnt;
// indices for dict & sort dimension columns
private int[] dictSortDimIdx;
// indices for dict & no-sort dimension columns
private int[] dictNoSortDimIdx;
// indices for no-dict & sort dimension columns
private int[] noDictSortDimIdx;
// indices for no-dict & no-sort dimension columns, excluding complex/varchar columns
private int[] noDictNoSortDimIdx;
private int[] varcharDimIdx;
private int[] complexDimIdx;
// indices for measure columns
private int[] measureIdx;
private DataType[] dataTypes;
private DataType[] noDictSortDataTypes;
private boolean[] noDictSortColMapping;
private DataType[] noDictNoSortDataTypes;
private boolean[] noDictNoSortColMapping;
private SortTempRowUpdater sortTempRowUpdater;
/**
* constructor
* @param tableFieldStat table field stat
*/
public SortStepRowHandler(TableFieldStat tableFieldStat) {
this.dictSortDimCnt = tableFieldStat.getDictSortDimCnt();
this.dictNoSortDimCnt = tableFieldStat.getDictNoSortDimCnt();
this.noDictSortDimCnt = tableFieldStat.getNoDictSortDimCnt();
this.noDictNoSortDimCnt = tableFieldStat.getNoDictNoSortDimCnt();
this.varcharDimCnt = tableFieldStat.getVarcharDimCnt();
this.complexDimCnt = tableFieldStat.getComplexDimCnt();
this.measureCnt = tableFieldStat.getMeasureCnt();
this.dictSortDimIdx = tableFieldStat.getDictSortDimIdx();
this.dictNoSortDimIdx = tableFieldStat.getDictNoSortDimIdx();
this.noDictSortDimIdx = tableFieldStat.getNoDictSortDimIdx();
this.noDictNoSortDimIdx = tableFieldStat.getNoDictNoSortDimIdx();
this.varcharDimIdx = tableFieldStat.getVarcharDimIdx();
this.complexDimIdx = tableFieldStat.getComplexDimIdx();
this.measureIdx = tableFieldStat.getMeasureIdx();
this.dataTypes = tableFieldStat.getMeasureDataType();
this.noDictSortDataTypes = tableFieldStat.getNoDictSortDataType();
noDictSortColMapping = new boolean[noDictSortDataTypes.length];
for (int i = 0; i < noDictSortDataTypes.length; i++) {
noDictSortColMapping[i] = DataTypeUtil.isPrimitiveColumn(noDictSortDataTypes[i]);
}
this.noDictNoSortDataTypes = tableFieldStat.getNoDictNoSortDataType();
noDictNoSortColMapping = new boolean[noDictNoSortDataTypes.length];
for (int i = 0; i < noDictNoSortDataTypes.length; i++) {
noDictNoSortColMapping[i] = DataTypeUtil.isPrimitiveColumn(noDictNoSortDataTypes[i]);
}
this.sortTempRowUpdater = tableFieldStat.getSortTempRowUpdater();
}
/**
* constructor
* @param sortParameters sort parameters
*/
public SortStepRowHandler(SortParameters sortParameters) {
this(new TableFieldStat(sortParameters));
}
/**
* Convert carbon row from raw format to 3-parted format.
* This method is used in global-sort.
*
* @param row raw row whose length is the same as field number
* @return 3-parted row whose length is 3. (1 for dict dims ,1 for non-dict and complex,
* 1 for measures)
*/
public Object[] convertRawRowTo3Parts(Object[] row) {
Object[] holder = new Object[3];
try {
int[] dictDims = new int[this.dictSortDimCnt + this.dictNoSortDimCnt];
Object[] nonDictArray = new Object[this.noDictSortDimCnt + this.noDictNoSortDimCnt
+ this.varcharDimCnt + this.complexDimCnt];
Object[] measures = new Object[this.measureCnt];
// convert dict & data
int idxAcc = 0;
for (int idx = 0; idx < this.dictSortDimCnt; idx++) {
dictDims[idxAcc++] = (int) row[this.dictSortDimIdx[idx]];
}
// convert dict & no-sort
for (int idx = 0; idx < this.dictNoSortDimCnt; idx++) {
dictDims[idxAcc++] = (int) row[this.dictNoSortDimIdx[idx]];
}
// convert no-dict & sort
idxAcc = 0;
for (int idx = 0; idx < this.noDictSortDimCnt; idx++) {
nonDictArray[idxAcc++] = row[this.noDictSortDimIdx[idx]];
}
// convert no-dict & no-sort
for (int idx = 0; idx < this.noDictNoSortDimCnt; idx++) {
nonDictArray[idxAcc++] = row[this.noDictNoSortDimIdx[idx]];
}
// convert varchar dims
for (int idx = 0; idx < this.varcharDimCnt; idx++) {
nonDictArray[idxAcc++] = row[this.varcharDimIdx[idx]];
}
// convert complex dims
for (int idx = 0; idx < this.complexDimCnt; idx++) {
nonDictArray[idxAcc++] = row[this.complexDimIdx[idx]];
}
// convert measure data
for (int idx = 0; idx < this.measureCnt; idx++) {
measures[idx] = row[this.measureIdx[idx]];
}
sortTempRowUpdater.updateOutputRow(holder, dictDims, nonDictArray, measures);
} catch (Exception e) {
throw new RuntimeException("Problem while converting row to 3 parts", e);
}
return holder;
}
/**
* Convert intermediate sort temp row to 3-parted row.
* This method is used in the final merge sort to feed rows to the next write step.
*
* @param sortTempRow intermediate sort temp row
* @return 3-parted row
*/
public Object[] convertIntermediateSortTempRowTo3Parted(IntermediateSortTempRow sortTempRow) {
Object[] out = new Object[3];
NonDictionaryUtil
.prepareOutObj(out, sortTempRow.getDictSortDims(), sortTempRow.getNoDictSortDims(),
sortTempRow.getMeasures());
return out;
}
/**
* Read intermediate sort temp row from InputStream.
* This method is used during the intermediate merge sort phase to read row from sort temp file.
*
* @param inputStream input stream
* @return a row that contains three parts
* @throws IOException if error occrus while reading from stream
*/
public IntermediateSortTempRow readWithoutNoSortFieldConvert(
DataInputStream inputStream) throws IOException {
int[] dictSortDims = new int[this.dictSortDimCnt];
Object[] noDictSortDims = new Object[this.noDictSortDimCnt];
// read dict & sort dim data
for (int idx = 0; idx < this.dictSortDimCnt; idx++) {
dictSortDims[idx] = inputStream.readInt();
}
// read no-dict & sort data
for (int idx = 0; idx < this.noDictSortDimCnt; idx++) {
// for no dict measure column get the original data
noDictSortDims[idx] = getDataForNoDictSortColumn(inputStream, idx);
}
// read no-dict dims & measures
int len = inputStream.readInt();
byte[] noSortDimsAndMeasures = new byte[len];
inputStream.readFully(noSortDimsAndMeasures);
// keeping no sort fields and measure in pack byte array as it will not participate in sort
return new IntermediateSortTempRow(dictSortDims, noDictSortDims, noSortDimsAndMeasures);
}
/**
* Read intermediate sort temp row from InputStream.
* This method is used during the final merge sort phase to read row from sort temp file and
* merged sort temp file.
*
* @param inputStream input stream
* @return a row that contains three parts
* @throws IOException if error occrus while reading from stream
*/
public IntermediateSortTempRow readWithNoSortFieldConvert(
DataInputStream inputStream) throws IOException {
int[] dictSortDims = new int[this.dictSortDimCnt + this.dictNoSortDimCnt];
Object[] noDictSortDims =
new Object[this.noDictSortDimCnt + this.noDictNoSortDimCnt + this.varcharDimCnt
+ this.complexDimCnt];
// read dict & sort dim data
for (int idx = 0; idx < this.dictSortDimCnt; idx++) {
dictSortDims[idx] = inputStream.readInt();
}
// read no-dict & sort data
for (int idx = 0; idx < this.noDictSortDimCnt; idx++) {
// for no dict measure column get the original data
noDictSortDims[idx] = getDataForNoDictSortColumn(inputStream, idx);
}
// read no-dict dims & measures
int len = inputStream.readInt();
byte[] noSortDimsAndMeasures = new byte[len];
inputStream.readFully(noSortDimsAndMeasures);
Object[] measure = new Object[this.measureCnt];
// unpack the no sort fields and measure fields
unpackNoSortFromBytes(noSortDimsAndMeasures, dictSortDims, noDictSortDims, measure);
return new IntermediateSortTempRow(dictSortDims, noDictSortDims, measure);
}
/**
* Return the data from the stream according to the column type
*
* @param inputStream
* @param idx
* @throws IOException
*/
private Object getDataForNoDictSortColumn(DataInputStream inputStream, int idx)
throws IOException {
if (this.noDictSortColMapping[idx]) {
return readDataFromStream(inputStream, idx);
} else {
short len = inputStream.readShort();
byte[] bytes = new byte[len];
inputStream.readFully(bytes);
return bytes;
}
}
/**
* Read the data from the stream
*
* @param inputStream
* @param idx
* @return
* @throws IOException
*/
private Object readDataFromStream(DataInputStream inputStream, int idx) throws IOException {
DataType dataType = noDictSortDataTypes[idx];
Object data = null;
if (!inputStream.readBoolean()) {
return null;
}
if (dataType == DataTypes.BOOLEAN) {
data = inputStream.readBoolean();
} else if (dataType == DataTypes.BYTE) {
data = inputStream.readByte();
} else if (dataType == DataTypes.SHORT) {
data = inputStream.readShort();
} else if (dataType == DataTypes.INT) {
data = inputStream.readInt();
} else if (dataType == DataTypes.LONG || dataType == DataTypes.TIMESTAMP) {
data = inputStream.readLong();
} else if (dataType == DataTypes.DOUBLE) {
data = inputStream.readDouble();
} else if (dataType == DataTypes.FLOAT) {
data = inputStream.readFloat();
} else if (dataType == DataTypes.BYTE_ARRAY || DataTypes.isDecimal(dataType)) {
byte[] bytes =
inputStream.readUTF().getBytes(Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET));
data = bytes;
}
return data;
}
private void unpackNoSortFromBytes(byte[] noSortDimsAndMeasures, int[] dictDims,
Object[] noDictDims, Object[] measures) {
ByteBuffer rowBuffer = ByteBuffer.wrap(noSortDimsAndMeasures);
// read dict_no_sort
for (int i = dictSortDimCnt; i < dictDims.length; i++) {
dictDims[i] = rowBuffer.getInt();
}
int noDictIndex = noDictSortDimCnt;
// read no_dict_no_sort
for (int i = 0; i < noDictNoSortDimCnt; i++) {
// for no dict measure column get the original data
if (this.noDictNoSortColMapping[i]) {
noDictDims[noDictIndex++] = getDataFromRowBuffer(noDictNoSortDataTypes[i], rowBuffer);
} else {
short len = rowBuffer.getShort();
byte[] bytes = new byte[len];
rowBuffer.get(bytes);
noDictDims[noDictIndex++] = bytes;
}
}
// read varchar dims
for (int i = 0; i < varcharDimCnt; i++) {
int len = rowBuffer.getInt();
byte[] bytes = new byte[len];
rowBuffer.get(bytes);
noDictDims[noDictIndex++] = bytes;
}
// read complex dims
for (int i = 0; i < complexDimCnt; i++) {
int len = rowBuffer.getInt();
byte[] bytes = new byte[len];
rowBuffer.get(bytes);
noDictDims[noDictIndex++] = bytes;
}
// read measure
int measureCnt = measures.length;
Object tmpContent;
for (short idx = 0; idx < measureCnt; idx++) {
tmpContent = getDataFromRowBuffer(dataTypes[idx], rowBuffer);
measures[idx] = tmpContent;
}
}
/**
* Retrieve/Get the data from the row buffer.
*
* @param tmpDataType
* @param rowBuffer
* @return
*/
private Object getDataFromRowBuffer(DataType tmpDataType, ByteBuffer rowBuffer) {
Object tmpContent;
if ((byte) 0 == rowBuffer.get()) {
return null;
}
if (DataTypes.BOOLEAN == tmpDataType) {
if ((byte) 1 == rowBuffer.get()) {
tmpContent = true;
} else {
tmpContent = false;
}
} else if (DataTypes.SHORT == tmpDataType) {
tmpContent = rowBuffer.getShort();
} else if (DataTypes.INT == tmpDataType) {
tmpContent = rowBuffer.getInt();
} else if (DataTypes.LONG == tmpDataType || DataTypes.TIMESTAMP == tmpDataType) {
tmpContent = rowBuffer.getLong();
} else if (DataTypes.DOUBLE == tmpDataType) {
tmpContent = rowBuffer.getDouble();
} else if (DataTypes.FLOAT == tmpDataType) {
tmpContent = rowBuffer.getFloat();
} else if (DataTypes.BYTE == tmpDataType) {
tmpContent = rowBuffer.get();
} else if (DataTypes.isDecimal(tmpDataType)) {
short len = rowBuffer.getShort();
byte[] decimalBytes = new byte[len];
rowBuffer.get(decimalBytes);
tmpContent = DataTypeUtil.byteToBigDecimal(decimalBytes);
} else if (DataTypes.BINARY == tmpDataType) {
int len = rowBuffer.getInt();
byte[] bytes = new byte[len];
rowBuffer.get(bytes);
tmpContent = bytes;
} else {
throw new IllegalArgumentException("Unsupported data type: " + tmpDataType);
}
return tmpContent;
}
/**
* Write intermediate sort temp row to OutputStream
* This method is used during the merge sort phase to write row to sort temp file.
*
* @param sortTempRow intermediate sort temp row
* @param outputStream output stream
* @throws IOException if error occurs while writing to stream
*/
public void writeIntermediateSortTempRowToOutputStream(IntermediateSortTempRow sortTempRow,
DataOutputStream outputStream) throws IOException {
// write dict & sort dim
for (int idx = 0; idx < this.dictSortDimCnt; idx++) {
outputStream.writeInt(sortTempRow.getDictSortDims()[idx]);
}
// write no-dict & sort dim
for (int idx = 0; idx < this.noDictSortDimCnt; idx++) {
if (this.noDictSortColMapping[idx]) {
// write the original data to the stream
writeDataToStream(sortTempRow.getNoDictSortDims()[idx], outputStream, idx);
} else {
byte[] bytes = (byte[]) sortTempRow.getNoDictSortDims()[idx];
outputStream.writeShort(bytes.length);
outputStream.write(bytes);
}
}
// write packed no-sort dim & measure
outputStream.writeInt(sortTempRow.getNoSortDimsAndMeasures().length);
outputStream.write(sortTempRow.getNoSortDimsAndMeasures());
}
/**
* Write raw row as an intermediate sort temp row to sort temp file.
* This method is used in the beginning of the sort phase. Comparing with converting raw row to
* intermediate sort temp row and then writing the converted one, Writing raw row directly will
* save the intermediate trivial loss.
* This method use an array backend buffer to save memory allocation. The buffer will be reused
* for all rows (per thread).
*
* @param row raw row
* @param outputStream output stream
* @param reUsableByteArrayDataOutputStream DataOutputStream backend by ByteArrayOutputStream
* @throws IOException if error occurs while writing to stream
*/
public void writeRawRowAsIntermediateSortTempRowToOutputStream(Object[] row,
DataOutputStream outputStream,
ReUsableByteArrayDataOutputStream reUsableByteArrayDataOutputStream) throws IOException {
// write dict & sort
for (int idx = 0; idx < this.dictSortDimCnt; idx++) {
outputStream.writeInt((int) row[this.dictSortDimIdx[idx]]);
}
// write no-dict & sort
for (int idx = 0; idx < this.noDictSortDimCnt; idx++) {
if (this.noDictSortColMapping[idx]) {
// write the original data to the stream
writeDataToStream(row[this.noDictSortDimIdx[idx]], outputStream, idx);
} else {
byte[] bytes = (byte[]) row[this.noDictSortDimIdx[idx]];
outputStream.writeShort(bytes.length);
outputStream.write(bytes);
}
}
// pack no-sort
reUsableByteArrayDataOutputStream.reset();
packNoSortFieldsToBytes(row, reUsableByteArrayDataOutputStream);
int packSize = reUsableByteArrayDataOutputStream.getSize();
// write no-sort
outputStream.writeInt(packSize);
outputStream.write(reUsableByteArrayDataOutputStream.getByteArray(), 0, packSize);
}
/**
* Write the data to stream
*
* @param data
* @param outputStream
* @param idx
* @throws IOException
*/
private void writeDataToStream(Object data, DataOutputStream outputStream, int idx)
throws IOException {
DataType dataType = noDictSortDataTypes[idx];
if (null == data) {
outputStream.writeBoolean(false);
} else {
outputStream.writeBoolean(true);
if (dataType == DataTypes.BOOLEAN) {
outputStream.writeBoolean((boolean) data);
} else if (dataType == DataTypes.BYTE) {
outputStream.writeByte((byte) data);
} else if (dataType == DataTypes.SHORT) {
outputStream.writeShort((short) data);
} else if (dataType == DataTypes.INT) {
outputStream.writeInt((int) data);
} else if (dataType == DataTypes.LONG || dataType == DataTypes.TIMESTAMP) {
outputStream.writeLong((long) data);
} else if (dataType == DataTypes.DOUBLE) {
outputStream.writeDouble((double) data);
} else if (DataTypes.isDecimal(dataType)) {
BigDecimal val = (BigDecimal) data;
byte[] bigDecimalInBytes = DataTypeUtil.bigDecimalToByte(val);
outputStream.writeShort(bigDecimalInBytes.length);
outputStream.write(bigDecimalInBytes);
} else if (dataType == DataTypes.FLOAT) {
outputStream.writeFloat((float) data);
} else if (dataType == DataTypes.BYTE_ARRAY) {
outputStream.writeUTF(data.toString());
}
}
}
/**
* Read intermediate sort temp row from unsafe memory.
* This method is used during merge sort phase for off-heap sort.
*
* @param baseObject base object of memory block
* @param address address of the row
* @return intermediate sort temp row
*/
public IntermediateSortTempRow readFromMemoryWithoutNoSortFieldConvert(Object baseObject,
long address) {
int size = 0;
int[] dictSortDims = new int[this.dictSortDimCnt];
byte[][] noDictSortDims = new byte[this.noDictSortDimCnt][];
// read dict & sort dim
for (int idx = 0; idx < dictSortDims.length; idx++) {
dictSortDims[idx] = CarbonUnsafe.getUnsafe().getInt(baseObject, address + size);
size += 4;
}
// read no-dict & sort dim
for (int idx = 0; idx < noDictSortDims.length; idx++) {
short length = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size);
size += 2;
byte[] bytes = new byte[length];
CarbonUnsafe.getUnsafe().copyMemory(baseObject, address + size,
bytes, CarbonUnsafe.BYTE_ARRAY_OFFSET, length);
size += length;
noDictSortDims[idx] = bytes;
}
// read no-sort dims & measures
int len = CarbonUnsafe.getUnsafe().getInt(baseObject, address + size);
size += 4;
byte[] noSortDimsAndMeasures = new byte[len];
CarbonUnsafe.getUnsafe().copyMemory(baseObject, address + size,
noSortDimsAndMeasures, CarbonUnsafe.BYTE_ARRAY_OFFSET, len);
return new IntermediateSortTempRow(dictSortDims, noDictSortDims, noSortDimsAndMeasures);
}
/**
* Read intermediate sort temp row from unsafe memory.
* This method is used during merge sort phase for off-heap sort.
*
* @param baseObject base object of memory block
* @param address address of the row
* @return intermediate sort temp row
*/
public IntermediateSortTempRow readRowFromMemoryWithNoSortFieldConvert(Object baseObject,
long address) {
int size = 0;
int[] dictSortDims = new int[this.dictSortDimCnt + this.dictNoSortDimCnt];
Object[] noDictSortDims =
new Object[this.noDictSortDimCnt + this.noDictNoSortDimCnt + this.varcharDimCnt
+ this.complexDimCnt];
// read dict & sort dim
for (int idx = 0; idx < dictSortDimCnt; idx++) {
dictSortDims[idx] = CarbonUnsafe.getUnsafe().getInt(baseObject, address + size);
size += 4;
}
// read no-dict & sort dim
for (int idx = 0; idx < this.noDictSortDimCnt; idx++) {
short length = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size);
size += 2;
if (this.noDictSortColMapping[idx]) {
// get the original data from the unsafe memory
if (0 == length) {
// if the length is 0, the the data is null
noDictSortDims[idx] = null;
} else {
Object data = CarbonUnsafeUtil
.getDataFromUnsafe(noDictSortDataTypes[idx], baseObject, address, size, length);
size += length;
noDictSortDims[idx] = data;
}
} else {
byte[] bytes = new byte[length];
CarbonUnsafe.getUnsafe()
.copyMemory(baseObject, address + size, bytes, CarbonUnsafe.BYTE_ARRAY_OFFSET, length);
size += length;
noDictSortDims[idx] = bytes;
}
}
// read no-sort dims & measures
int len = CarbonUnsafe.getUnsafe().getInt(baseObject, address + size);
size += 4;
byte[] noSortDimsAndMeasures = new byte[len];
CarbonUnsafe.getUnsafe().copyMemory(baseObject, address + size,
noSortDimsAndMeasures, CarbonUnsafe.BYTE_ARRAY_OFFSET, len);
Object[] measures = new Object[measureCnt];
unpackNoSortFromBytes(noSortDimsAndMeasures, dictSortDims, noDictSortDims, measures);
return new IntermediateSortTempRow(dictSortDims, noDictSortDims, measures);
}
/**
* Write intermediate sort temp row directly from unsafe memory to stream.
* This method is used at the late beginning of the sort phase to write in-memory pages
* to sort temp file. Comparing with reading intermediate sort temp row from memory and then
* writing it, Writing directly from memory to stream will save the intermediate trivial loss.
*
* @param baseObject base object of the memory block
* @param address base address of the row
* @param outputStream output stream
* @param unsafeTotalLength
* @throws IOException if error occurs while writing to stream
*/
public void writeIntermediateSortTempRowFromUnsafeMemoryToStream(Object baseObject, long address,
DataOutputStream outputStream, long unsafeRemainingLength, long unsafeTotalLength)
throws IOException, MemoryException {
int size = 0;
// dict & sort
for (int idx = 0; idx < dictSortDimCnt; idx++) {
outputStream.writeInt(CarbonUnsafe.getUnsafe().getInt(baseObject, address + size));
size += 4;
}
// no-dict & sort
for (int idx = 0; idx < noDictSortDimCnt; idx++) {
short length = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size);
size += 2;
if (this.noDictSortColMapping[idx]) {
// get the original data from unsafe memory
if (0 == length) {
// if the length is 0, then the data is null
writeDataToStream(null, outputStream, idx);
} else {
Object data = CarbonUnsafeUtil
.getDataFromUnsafe(noDictSortDataTypes[idx], baseObject, address, size, length);
size += length;
writeDataToStream(data, outputStream, idx);
}
} else {
validateUnsafeMemoryBlockSizeLimit(unsafeRemainingLength, length, unsafeTotalLength);
byte[] bytes = new byte[length];
CarbonUnsafe.getUnsafe()
.copyMemory(baseObject, address + size, bytes, CarbonUnsafe.BYTE_ARRAY_OFFSET, length);
size += length;
outputStream.writeShort(length);
outputStream.write(bytes);
}
}
// packed no-sort & measure
int len = CarbonUnsafe.getUnsafe().getInt(baseObject, address + size);
size += 4;
validateUnsafeMemoryBlockSizeLimit(unsafeRemainingLength, len, unsafeTotalLength);
byte[] noSortDimsAndMeasures = new byte[len];
CarbonUnsafe.getUnsafe().copyMemory(baseObject, address + size,
noSortDimsAndMeasures, CarbonUnsafe.BYTE_ARRAY_OFFSET, len);
outputStream.writeInt(len);
outputStream.write(noSortDimsAndMeasures);
}
/**
* Write raw row as an intermediate sort temp row to memory.
* This method is used in the beginning of the off-heap sort phase. Comparing with converting
* raw row to intermediate sort temp row and then writing the converted one,
* Writing raw row directly will save the intermediate trivial loss.
* This method use an array backend buffer to save memory allocation. The buffer will be reused
* for all rows (per thread).
*
* @param row raw row
* @param baseObject base object of the memory block
* @param address base address for the row
* @param unsafeTotalLength
* @return number of bytes written to memory
*/
public int writeRawRowAsIntermediateSortTempRowToUnsafeMemory(Object[] row, Object baseObject,
long address, ReUsableByteArrayDataOutputStream reUsableByteArrayDataOutputStream,
long unsafeRemainingLength, long unsafeTotalLength) throws MemoryException, IOException {
int size = 0;
// write dict & sort
for (int idx = 0; idx < this.dictSortDimCnt; idx++) {
validateUnsafeMemoryBlockSizeLimit(unsafeRemainingLength, 4, unsafeTotalLength);
CarbonUnsafe.getUnsafe()
.putInt(baseObject, address + size, (int) row[this.dictSortDimIdx[idx]]);
size += 4;
}
// write no-dict & sort
for (int idx = 0; idx < this.noDictSortDimCnt; idx++) {
if (this.noDictSortColMapping[idx]) {
Object data = row[this.noDictSortDimIdx[idx]];
if (null == data) {
// if the data is null, then write only the length as 0.
CarbonUnsafe.getUnsafe().putShort(baseObject, address + size, (short) 0);
size += 2;
} else {
int sizeInBytes = this.noDictSortDataTypes[idx].getSizeInBytes();
if (this.noDictSortDataTypes[idx] == DataTypes.TIMESTAMP) {
sizeInBytes = DataTypes.LONG.getSizeInBytes();
}
CarbonUnsafe.getUnsafe().putShort(baseObject, address + size, (short) sizeInBytes);
size += 2;
// put data to unsafe according to the data types
CarbonUnsafeUtil
.putDataToUnsafe(noDictSortDataTypes[idx], data, baseObject, address, size,
sizeInBytes);
size += sizeInBytes;
}
} else {
byte[] bytes = (byte[]) row[this.noDictSortDimIdx[idx]];
validateUnsafeMemoryBlockSizeLimit(unsafeRemainingLength, 2 + bytes.length,
unsafeTotalLength);
CarbonUnsafe.getUnsafe().putShort(baseObject, address + size, (short) bytes.length);
size += 2;
CarbonUnsafe.getUnsafe()
.copyMemory(bytes, CarbonUnsafe.BYTE_ARRAY_OFFSET, baseObject, address + size,
bytes.length);
size += bytes.length;
}
}
// convert pack no-sort
reUsableByteArrayDataOutputStream.reset();
packNoSortFieldsToBytes(row, reUsableByteArrayDataOutputStream);
int packSize = reUsableByteArrayDataOutputStream.getSize();
validateUnsafeMemoryBlockSizeLimit(unsafeRemainingLength, 4 + packSize, unsafeTotalLength);
// write no-sort
CarbonUnsafe.getUnsafe().putInt(baseObject, address + size, packSize);
size += 4;
CarbonUnsafe.getUnsafe().copyMemory(reUsableByteArrayDataOutputStream.getByteArray(),
CarbonUnsafe.BYTE_ARRAY_OFFSET, baseObject, address + size, packSize);
size += packSize;
return size;
}
private void validateUnsafeMemoryBlockSizeLimit(long unsafeRemainingLength, int requestedSize,
long unsafeTotalLength) throws MemoryException {
if (unsafeTotalLength <= requestedSize) {
throw new MemoryException(
"not enough unsafe memory for sort: increase the 'offheap.sort.chunk.size.inmb' ");
} else if (unsafeRemainingLength <= requestedSize) {
throw new MemoryException("cannot handle this row. create new page");
}
}
/**
* Pack to no-sort fields to byte array
*
* @param row raw row
* @param @param reUsableByteArrayDataOutputStream
* DataOutputStream backend by ByteArrayOutputStream
*/
private void packNoSortFieldsToBytes(Object[] row,
ReUsableByteArrayDataOutputStream reUsableByteArrayDataOutputStream) throws IOException {
// convert dict & no-sort
for (int idx = 0; idx < this.dictNoSortDimCnt; idx++) {
reUsableByteArrayDataOutputStream.writeInt((int) row[this.dictNoSortDimIdx[idx]]);
}
// convert no-dict & no-sort
for (int idx = 0; idx < this.noDictNoSortDimCnt; idx++) {
if (this.noDictNoSortColMapping[idx]) {
// put the original data to buffer
putDataToRowBuffer(this.noDictNoSortDataTypes[idx], row[this.noDictNoSortDimIdx[idx]],
reUsableByteArrayDataOutputStream);
} else {
byte[] bytes = (byte[]) row[this.noDictNoSortDimIdx[idx]];
reUsableByteArrayDataOutputStream.writeShort((short) bytes.length);
reUsableByteArrayDataOutputStream.write(bytes);
}
}
// convert varchar dims
for (int idx = 0; idx < this.varcharDimCnt; idx++) {
byte[] bytes = (byte[]) row[this.varcharDimIdx[idx]];
reUsableByteArrayDataOutputStream.writeInt(bytes.length);
reUsableByteArrayDataOutputStream.write(bytes);
}
// convert complex dims
for (int idx = 0; idx < this.complexDimCnt; idx++) {
byte[] bytes = (byte[]) row[this.complexDimIdx[idx]];
reUsableByteArrayDataOutputStream.writeInt(bytes.length);
reUsableByteArrayDataOutputStream.write(bytes);
}
// convert measure
for (int idx = 0; idx < this.measureCnt; idx++) {
putDataToRowBuffer(this.dataTypes[idx], row[this.measureIdx[idx]],
reUsableByteArrayDataOutputStream);
}
}
/**
* Put the data to the row buffer
* @param tmpDataType
* @param tmpValue
* @param reUsableByteArrayDataOutputStream
*/
private void putDataToRowBuffer(DataType tmpDataType, Object tmpValue,
ReUsableByteArrayDataOutputStream reUsableByteArrayDataOutputStream) throws IOException {
if (null == tmpValue) {
reUsableByteArrayDataOutputStream.write((byte) 0);
return;
}
reUsableByteArrayDataOutputStream.write((byte) 1);
if (DataTypes.BOOLEAN == tmpDataType) {
if ((boolean) tmpValue) {
reUsableByteArrayDataOutputStream.write((byte) 1);
} else {
reUsableByteArrayDataOutputStream.write((byte) 0);
}
} else if (DataTypes.SHORT == tmpDataType) {
reUsableByteArrayDataOutputStream.writeShort((Short) tmpValue);
} else if (DataTypes.INT == tmpDataType) {
reUsableByteArrayDataOutputStream.writeInt((Integer) tmpValue);
} else if (DataTypes.LONG == tmpDataType || DataTypes.TIMESTAMP == tmpDataType) {
reUsableByteArrayDataOutputStream.writeLong((Long) tmpValue);
} else if (DataTypes.DOUBLE == tmpDataType) {
reUsableByteArrayDataOutputStream.writeDouble((Double) tmpValue);
} else if (DataTypes.FLOAT == tmpDataType) {
reUsableByteArrayDataOutputStream.writeFloat((Float) tmpValue);
} else if (DataTypes.BYTE == tmpDataType) {
reUsableByteArrayDataOutputStream.write((byte) tmpValue);
} else if (DataTypes.isDecimal(tmpDataType)) {
byte[] decimalBytes = DataTypeUtil.bigDecimalToByte((BigDecimal) tmpValue);
reUsableByteArrayDataOutputStream.writeShort((short) decimalBytes.length);
reUsableByteArrayDataOutputStream.write(decimalBytes);
} else if (DataTypes.BINARY == tmpDataType) {
byte[] bytes = (byte[]) tmpValue;
reUsableByteArrayDataOutputStream.writeInt(bytes.length);
reUsableByteArrayDataOutputStream.write(bytes);
} else {
throw new IllegalArgumentException("Unsupported data type: " + tmpDataType);
}
}
}