blob: afa4cd7bce7a7005a1ce85466fce1bd1d79ac8b0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.processing.sort.sortdata;
import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.CarbonThreadFactory;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.DataTypeUtil;
import org.apache.carbondata.core.util.NonDictionaryUtil;
import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHolder> {
/**
* LOGGER
*/
private static final LogService LOGGER =
LogServiceFactory.getLogService(SortTempFileChunkHolder.class.getName());
/**
* temp file
*/
private File tempFile;
/**
* read stream
*/
private DataInputStream stream;
/**
* entry count
*/
private int entryCount;
/**
* number record read
*/
private int numberOfObjectRead;
/**
* return row
*/
private Object[] returnRow;
/**
* number of measures
*/
private int measureCount;
/**
* number of dimensionCount
*/
private int dimensionCount;
/**
* number of complexDimensionCount
*/
private int complexDimensionCount;
/**
* fileBufferSize for file reader stream size
*/
private int fileBufferSize;
private Object[][] currentBuffer;
private Object[][] backupBuffer;
private boolean isBackupFilled;
private boolean prefetch;
private int bufferSize;
private int bufferRowCounter;
private ExecutorService executorService;
private Future<Void> submit;
private int prefetchRecordsProceesed;
/**
* sortTempFileNoOFRecordsInCompression
*/
private int sortTempFileNoOFRecordsInCompression;
/**
* isSortTempFileCompressionEnabled
*/
private boolean isSortTempFileCompressionEnabled;
/**
* totalRecordFetch
*/
private int totalRecordFetch;
private int noDictionaryCount;
private DataType[] aggType;
/**
* to store whether dimension is of dictionary type or not
*/
private boolean[] isNoDictionaryDimensionColumn;
/**
* to store whether sort column is of dictionary type or not
*/
private boolean[] isNoDictionarySortColumn;
/**
* Constructor to initialize
*
* @param tempFile
* @param dimensionCount
* @param complexDimensionCount
* @param measureCount
* @param fileBufferSize
* @param noDictionaryCount
* @param aggType
* @param isNoDictionaryDimensionColumn
*/
public SortTempFileChunkHolder(File tempFile, int dimensionCount, int complexDimensionCount,
int measureCount, int fileBufferSize, int noDictionaryCount, DataType[] aggType,
boolean[] isNoDictionaryDimensionColumn, boolean[] isNoDictionarySortColumn,
String tableName) {
// set temp file
this.tempFile = tempFile;
// set measure and dimension count
this.measureCount = measureCount;
this.dimensionCount = dimensionCount;
this.complexDimensionCount = complexDimensionCount;
this.noDictionaryCount = noDictionaryCount;
// set mdkey length
this.fileBufferSize = fileBufferSize;
this.executorService = Executors
.newFixedThreadPool(1, new CarbonThreadFactory("SafeSortTempChunkHolderPool:" + tableName));
this.aggType = aggType;
this.isNoDictionaryDimensionColumn = isNoDictionaryDimensionColumn;
this.isNoDictionarySortColumn = isNoDictionarySortColumn;
}
/**
* This method will be used to initialize
*
* @throws CarbonSortKeyAndGroupByException problem while initializing
*/
public void initialize() throws CarbonSortKeyAndGroupByException {
prefetch = Boolean.parseBoolean(CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.CARBON_MERGE_SORT_PREFETCH,
CarbonCommonConstants.CARBON_MERGE_SORT_PREFETCH_DEFAULT));
bufferSize = Integer.parseInt(CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE,
CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE_DEFAULT));
this.isSortTempFileCompressionEnabled = Boolean.parseBoolean(CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.IS_SORT_TEMP_FILE_COMPRESSION_ENABLED,
CarbonCommonConstants.IS_SORT_TEMP_FILE_COMPRESSION_ENABLED_DEFAULTVALUE));
if (this.isSortTempFileCompressionEnabled) {
LOGGER.info("Compression was used while writing the sortTempFile");
}
try {
this.sortTempFileNoOFRecordsInCompression = Integer.parseInt(CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORDS_FOR_COMPRESSION,
CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORD_FOR_COMPRESSION_DEFAULTVALUE));
if (this.sortTempFileNoOFRecordsInCompression < 1) {
LOGGER.error("Invalid value for: "
+ CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORDS_FOR_COMPRESSION
+ ": Only Positive Integer value(greater than zero) is allowed.Default value will"
+ " be used");
this.sortTempFileNoOFRecordsInCompression = Integer.parseInt(
CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORD_FOR_COMPRESSION_DEFAULTVALUE);
}
} catch (NumberFormatException e) {
LOGGER.error(
"Invalid value for: " + CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORDS_FOR_COMPRESSION
+ ", only Positive Integer value is allowed.Default value will be used");
this.sortTempFileNoOFRecordsInCompression = Integer
.parseInt(CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORD_FOR_COMPRESSION_DEFAULTVALUE);
}
initialise();
}
private void initialise() throws CarbonSortKeyAndGroupByException {
try {
if (isSortTempFileCompressionEnabled) {
this.bufferSize = sortTempFileNoOFRecordsInCompression;
}
stream = new DataInputStream(
new BufferedInputStream(new FileInputStream(tempFile), this.fileBufferSize));
this.entryCount = stream.readInt();
if (prefetch) {
new DataFetcher(false).call();
totalRecordFetch += currentBuffer.length;
if (totalRecordFetch < this.entryCount) {
submit = executorService.submit(new DataFetcher(true));
}
} else {
if (isSortTempFileCompressionEnabled) {
new DataFetcher(false).call();
}
}
} catch (FileNotFoundException e) {
LOGGER.error(e);
throw new CarbonSortKeyAndGroupByException(tempFile + " No Found", e);
} catch (IOException e) {
LOGGER.error(e);
throw new CarbonSortKeyAndGroupByException(tempFile + " No Found", e);
} catch (Exception e) {
LOGGER.error(e);
throw new CarbonSortKeyAndGroupByException(tempFile + " Problem while reading", e);
}
}
/**
* This method will be used to read new row from file
*
* @throws CarbonSortKeyAndGroupByException problem while reading
*/
public void readRow() throws CarbonSortKeyAndGroupByException {
if (prefetch) {
fillDataForPrefetch();
} else if (isSortTempFileCompressionEnabled) {
if (bufferRowCounter >= bufferSize) {
try {
new DataFetcher(false).call();
bufferRowCounter = 0;
} catch (Exception e) {
LOGGER.error(e);
throw new CarbonSortKeyAndGroupByException(tempFile + " Problem while reading", e);
}
}
prefetchRecordsProceesed++;
returnRow = currentBuffer[bufferRowCounter++];
} else {
this.returnRow = getRowFromStream();
}
}
private void fillDataForPrefetch() {
if (bufferRowCounter >= bufferSize) {
if (isBackupFilled) {
bufferRowCounter = 0;
currentBuffer = backupBuffer;
totalRecordFetch += currentBuffer.length;
isBackupFilled = false;
if (totalRecordFetch < this.entryCount) {
submit = executorService.submit(new DataFetcher(true));
}
} else {
try {
submit.get();
} catch (Exception e) {
LOGGER.error(e);
}
bufferRowCounter = 0;
currentBuffer = backupBuffer;
isBackupFilled = false;
totalRecordFetch += currentBuffer.length;
if (totalRecordFetch < this.entryCount) {
submit = executorService.submit(new DataFetcher(true));
}
}
}
prefetchRecordsProceesed++;
returnRow = currentBuffer[bufferRowCounter++];
}
/**
* Reads row from file
* @return Object[]
* @throws CarbonSortKeyAndGroupByException
*/
private Object[] getRowFromStream() throws CarbonSortKeyAndGroupByException {
// create new row of size 3 (1 for dims , 1 for high card , 1 for measures)
Object[] holder = new Object[3];
int index = 0;
int nonDicIndex = 0;
int[] dim = new int[this.dimensionCount - this.noDictionaryCount];
byte[][] nonDicArray = new byte[this.noDictionaryCount + this.complexDimensionCount][];
Object[] measures = new Object[this.measureCount];
try {
// read dimension values
for (int i = 0; i < isNoDictionaryDimensionColumn.length; i++) {
if (isNoDictionaryDimensionColumn[i]) {
short len = stream.readShort();
byte[] array = new byte[len];
stream.readFully(array);
nonDicArray[nonDicIndex++] = array;
} else {
dim[index++] = stream.readInt();
}
}
for (int i = 0; i < complexDimensionCount; i++) {
short len = stream.readShort();
byte[] array = new byte[len];
stream.readFully(array);
nonDicArray[nonDicIndex++] = array;
}
index = 0;
// read measure values
for (int i = 0; i < this.measureCount; i++) {
if (stream.readByte() == 1) {
DataType dataType = aggType[i];
if (dataType == DataTypes.BOOLEAN) {
measures[index++] = stream.readBoolean();
} else if (dataType == DataTypes.SHORT) {
measures[index++] = stream.readShort();
} else if (dataType == DataTypes.INT) {
measures[index++] = stream.readInt();
} else if (dataType == DataTypes.LONG) {
measures[index++] = stream.readLong();
} else if (dataType == DataTypes.DOUBLE) {
measures[index++] = stream.readDouble();
} else if (dataType == DataTypes.DECIMAL) {
int len = stream.readInt();
byte[] buff = new byte[len];
stream.readFully(buff);
measures[index++] = DataTypeUtil.byteToBigDecimal(buff);
} else {
throw new IllegalArgumentException("unsupported data type:" + aggType[i]);
}
} else {
measures[index++] = null;
}
}
NonDictionaryUtil.prepareOutObj(holder, dim, nonDicArray, measures);
// increment number if record read
this.numberOfObjectRead++;
} catch (IOException e) {
LOGGER.error("Problme while reading the madkey fom sort temp file");
throw new CarbonSortKeyAndGroupByException("Problem while reading the sort temp file ", e);
}
//return out row
return holder;
}
/**
* below method will be used to get the row
*
* @return row
*/
public Object[] getRow() {
return this.returnRow;
}
/**
* below method will be used to check whether any more records are present
* in file or not
*
* @return more row present in file
*/
public boolean hasNext() {
if (prefetch || isSortTempFileCompressionEnabled) {
return this.prefetchRecordsProceesed < this.entryCount;
}
return this.numberOfObjectRead < this.entryCount;
}
/**
* Below method will be used to close streams
*/
public void closeStream() {
CarbonUtil.closeStreams(stream);
if (null != executorService) {
executorService.shutdownNow();
}
this.backupBuffer = null;
this.currentBuffer = null;
}
/**
* This method will number of entries
*
* @return entryCount
*/
public int getEntryCount() {
return entryCount;
}
@Override public int compareTo(SortTempFileChunkHolder other) {
int diff = 0;
int index = 0;
int noDictionaryIndex = 0;
int[] leftMdkArray = (int[]) returnRow[0];
int[] rightMdkArray = (int[]) other.returnRow[0];
byte[][] leftNonDictArray = (byte[][]) returnRow[1];
byte[][] rightNonDictArray = (byte[][]) other.returnRow[1];
for (boolean isNoDictionary : isNoDictionarySortColumn) {
if (isNoDictionary) {
diff = UnsafeComparer.INSTANCE
.compareTo(leftNonDictArray[noDictionaryIndex], rightNonDictArray[noDictionaryIndex]);
if (diff != 0) {
return diff;
}
noDictionaryIndex++;
} else {
diff = leftMdkArray[index] - rightMdkArray[index];
if (diff != 0) {
return diff;
}
index++;
}
}
return diff;
}
@Override public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (!(obj instanceof SortTempFileChunkHolder)) {
return false;
}
SortTempFileChunkHolder o = (SortTempFileChunkHolder) obj;
return this == o;
}
@Override public int hashCode() {
int hash = 0;
hash += 31 * measureCount;
hash += 31 * dimensionCount;
hash += 31 * complexDimensionCount;
hash += 31 * noDictionaryCount;
hash += tempFile.hashCode();
return hash;
}
private final class DataFetcher implements Callable<Void> {
private boolean isBackUpFilling;
private int numberOfRecords;
private DataFetcher(boolean backUp) {
isBackUpFilling = backUp;
calculateNumberOfRecordsToBeFetched();
}
private void calculateNumberOfRecordsToBeFetched() {
int numberOfRecordsLeftToBeRead = entryCount - totalRecordFetch;
numberOfRecords =
bufferSize < numberOfRecordsLeftToBeRead ? bufferSize : numberOfRecordsLeftToBeRead;
}
@Override public Void call() throws Exception {
try {
if (isBackUpFilling) {
backupBuffer = prefetchRecordsFromFile(numberOfRecords);
isBackupFilled = true;
} else {
currentBuffer = prefetchRecordsFromFile(numberOfRecords);
}
} catch (Exception e) {
LOGGER.error(e);
}
return null;
}
}
/**
* This method will read the records from sort temp file and keep it in a buffer
*
* @param numberOfRecords
* @return
* @throws CarbonSortKeyAndGroupByException
*/
private Object[][] prefetchRecordsFromFile(int numberOfRecords)
throws CarbonSortKeyAndGroupByException {
Object[][] records = new Object[numberOfRecords][];
for (int i = 0; i < numberOfRecords; i++) {
records[i] = getRowFromStream();
}
return records;
}
}