blob: fa50f1c7a19397f563109132530bfc9550137398 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.processing.sort.sortdata;
import java.io.BufferedOutputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.AbstractQueue;
import java.util.PriorityQueue;
import java.util.concurrent.Callable;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.NonDictionaryUtil;
import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
public class IntermediateFileMerger implements Callable<Void> {
/**
* LOGGER
*/
private static final LogService LOGGER =
LogServiceFactory.getLogService(IntermediateFileMerger.class.getName());
/**
* recordHolderHeap
*/
private AbstractQueue<SortTempFileChunkHolder> recordHolderHeap;
/**
* fileCounter
*/
private int fileCounter;
/**
* stream
*/
private DataOutputStream stream;
/**
* totalNumberOfRecords
*/
private int totalNumberOfRecords;
/**
* records
*/
private Object[][] records;
/**
* entryCount
*/
private int entryCount;
/**
* writer
*/
private TempSortFileWriter writer;
/**
* totalSize
*/
private int totalSize;
private SortParameters mergerParameters;
private File[] intermediateFiles;
private File outPutFile;
private boolean[] noDictionarycolumnMapping;
private Throwable throwable;
/**
* IntermediateFileMerger Constructor
*/
public IntermediateFileMerger(SortParameters mergerParameters, File[] intermediateFiles,
File outPutFile) {
this.mergerParameters = mergerParameters;
this.fileCounter = intermediateFiles.length;
this.intermediateFiles = intermediateFiles;
this.outPutFile = outPutFile;
noDictionarycolumnMapping = mergerParameters.getNoDictionaryDimnesionColumn();
}
@Override public Void call() throws Exception {
long intermediateMergeStartTime = System.currentTimeMillis();
int fileConterConst = fileCounter;
try {
startSorting();
initialize();
while (hasNext()) {
writeDataTofile(next());
}
if (mergerParameters.isSortFileCompressionEnabled() || mergerParameters.isPrefetch()) {
if (entryCount > 0) {
if (entryCount < totalSize) {
Object[][] temp = new Object[entryCount][];
System.arraycopy(records, 0, temp, 0, entryCount);
records = temp;
this.writer.writeSortTempFile(temp);
} else {
this.writer.writeSortTempFile(records);
}
}
}
double intermediateMergeCostTime =
(System.currentTimeMillis() - intermediateMergeStartTime) / 1000.0;
LOGGER.info("============================== Intermediate Merge of " + fileConterConst +
" Sort Temp Files Cost Time: " + intermediateMergeCostTime + "(s)");
} catch (Exception e) {
LOGGER.error(e, "Problem while intermediate merging");
clear();
throwable = e;
} finally {
records = null;
CarbonUtil.closeStreams(this.stream);
if (null != writer) {
writer.finish();
}
if (null == throwable) {
try {
finish();
} catch (CarbonSortKeyAndGroupByException e) {
LOGGER.error(e, "Problem while deleting the merge file");
throwable = e;
}
} else {
if (!outPutFile.delete()) {
LOGGER.error("Problem while deleting the merge file");
}
}
}
if (null != throwable) {
throw new CarbonSortKeyAndGroupByException(throwable);
}
return null;
}
/**
* This method is responsible for initializing the out stream
*
* @throws CarbonSortKeyAndGroupByException
*/
private void initialize() throws CarbonSortKeyAndGroupByException {
if (!mergerParameters.isSortFileCompressionEnabled() && !mergerParameters.isPrefetch()) {
try {
this.stream = new DataOutputStream(
new BufferedOutputStream(new FileOutputStream(outPutFile),
mergerParameters.getFileWriteBufferSize()));
this.stream.writeInt(this.totalNumberOfRecords);
} catch (FileNotFoundException e) {
throw new CarbonSortKeyAndGroupByException("Problem while getting the file", e);
} catch (IOException e) {
throw new CarbonSortKeyAndGroupByException("Problem while writing the data to file", e);
}
} else {
writer = TempSortFileWriterFactory.getInstance()
.getTempSortFileWriter(mergerParameters.isSortFileCompressionEnabled(),
mergerParameters.getDimColCount(), mergerParameters.getComplexDimColCount(),
mergerParameters.getMeasureColCount(), mergerParameters.getNoDictionaryCount(),
mergerParameters.getFileWriteBufferSize());
writer.initiaize(outPutFile, totalNumberOfRecords);
if (mergerParameters.isPrefetch()) {
totalSize = mergerParameters.getBufferSize();
} else {
totalSize = mergerParameters.getSortTempFileNoOFRecordsInCompression();
}
}
}
/**
* This method will be used to get the sorted record from file
*
* @return sorted record sorted record
* @throws CarbonSortKeyAndGroupByException
*/
private Object[] getSortedRecordFromFile() throws CarbonSortKeyAndGroupByException {
Object[] row = null;
// poll the top object from heap
// heap maintains binary tree which is based on heap condition that will
// be based on comparator we are passing the heap
// when will call poll it will always delete root of the tree and then
// it does trickel down operation complexity is log(n)
SortTempFileChunkHolder poll = this.recordHolderHeap.poll();
// get the row from chunk
row = poll.getRow();
// check if there no entry present
if (!poll.hasNext()) {
// if chunk is empty then close the stream
poll.closeStream();
// change the file counter
--this.fileCounter;
// reaturn row
return row;
}
// read new row
poll.readRow();
// add to heap
this.recordHolderHeap.add(poll);
// return row
return row;
}
/**
* Below method will be used to start storing process This method will get
* all the temp files present in sort temp folder then it will create the
* record holder heap and then it will read first record from each file and
* initialize the heap
*
* @throws CarbonSortKeyAndGroupByException
*/
private void startSorting() throws CarbonSortKeyAndGroupByException {
LOGGER.info("Number of temp file: " + this.fileCounter);
// create record holder heap
createRecordHolderQueue(intermediateFiles);
// iterate over file list and create chunk holder and add to heap
LOGGER.info("Started adding first record from each file");
SortTempFileChunkHolder sortTempFileChunkHolder = null;
for (File tempFile : intermediateFiles) {
// create chunk holder
sortTempFileChunkHolder =
new SortTempFileChunkHolder(tempFile, mergerParameters.getDimColCount(),
mergerParameters.getComplexDimColCount(), mergerParameters.getMeasureColCount(),
mergerParameters.getFileBufferSize(), mergerParameters.getNoDictionaryCount(),
mergerParameters.getMeasureDataType(),
mergerParameters.getNoDictionaryDimnesionColumn(),
mergerParameters.getNoDictionarySortColumn(), mergerParameters.getTableName());
// initialize
sortTempFileChunkHolder.initialize();
sortTempFileChunkHolder.readRow();
this.totalNumberOfRecords += sortTempFileChunkHolder.getEntryCount();
// add to heap
this.recordHolderHeap.add(sortTempFileChunkHolder);
}
LOGGER.info("Heap Size" + this.recordHolderHeap.size());
}
/**
* This method will be used to create the heap which will be used to hold
* the chunk of data
*
* @param listFiles list of temp files
*/
private void createRecordHolderQueue(File[] listFiles) {
// creating record holder heap
this.recordHolderHeap = new PriorityQueue<>(listFiles.length);
}
/**
* This method will be used to get the sorted row
*
* @return sorted row
* @throws CarbonSortKeyAndGroupByException
*/
private Object[] next() throws CarbonSortKeyAndGroupByException {
return getSortedRecordFromFile();
}
/**
* This method will be used to check whether any more element is present or
* not
*
* @return more element is present
*/
private boolean hasNext() {
return this.fileCounter > 0;
}
/**
* Below method will be used to write data to file
*
* @throws CarbonSortKeyAndGroupByException problem while writing
*/
private void writeDataTofile(Object[] row) throws CarbonSortKeyAndGroupByException {
if (mergerParameters.isSortFileCompressionEnabled() || mergerParameters.isPrefetch()) {
if (entryCount == 0) {
records = new Object[totalSize][];
records[entryCount++] = row;
return;
}
records[entryCount++] = row;
if (entryCount == totalSize) {
this.writer.writeSortTempFile(records);
entryCount = 0;
records = new Object[totalSize][];
}
return;
}
try {
DataType[] aggType = mergerParameters.getMeasureDataType();
int[] mdkArray = (int[]) row[0];
byte[][] nonDictArray = (byte[][]) row[1];
int mdkIndex = 0;
int nonDictKeyIndex = 0;
// write dictionary and non dictionary dimensions here.
for (boolean nodictinary : noDictionarycolumnMapping) {
if (nodictinary) {
byte[] col = nonDictArray[nonDictKeyIndex++];
stream.writeShort(col.length);
stream.write(col);
} else {
stream.writeInt(mdkArray[mdkIndex++]);
}
}
int fieldIndex = 0;
for (int counter = 0; counter < mergerParameters.getMeasureColCount(); counter++) {
if (null != NonDictionaryUtil.getMeasure(fieldIndex, row)) {
stream.write((byte) 1);
DataType dataType = aggType[counter];
if (dataType == DataTypes.BOOLEAN) {
stream.writeBoolean((boolean)NonDictionaryUtil.getMeasure(fieldIndex, row));
} else if (dataType == DataTypes.SHORT) {
stream.writeShort((short) NonDictionaryUtil.getMeasure(fieldIndex, row));
} else if (dataType == DataTypes.INT) {
stream.writeInt((int) NonDictionaryUtil.getMeasure(fieldIndex, row));
} else if (dataType == DataTypes.LONG) {
stream.writeLong((long) NonDictionaryUtil.getMeasure(fieldIndex, row));
} else if (dataType == DataTypes.DOUBLE) {
stream.writeDouble((Double) NonDictionaryUtil.getMeasure(fieldIndex, row));
} else if (dataType == DataTypes.DECIMAL) {
byte[] bigDecimalInBytes = (byte[]) NonDictionaryUtil.getMeasure(fieldIndex, row);
stream.writeInt(bigDecimalInBytes.length);
stream.write(bigDecimalInBytes);
} else {
throw new IllegalArgumentException("unsupported data type:" + aggType[counter]);
}
} else {
stream.write((byte) 0);
}
fieldIndex++;
}
} catch (IOException e) {
throw new CarbonSortKeyAndGroupByException("Problem while writing the file", e);
}
}
private void finish() throws CarbonSortKeyAndGroupByException {
clear();
try {
CarbonUtil.deleteFiles(intermediateFiles);
} catch (IOException e) {
throw new CarbonSortKeyAndGroupByException("Problem while deleting the intermediate files");
}
}
private void clear() {
if (recordHolderHeap != null) {
SortTempFileChunkHolder sortTempFileChunkHolder;
while (!recordHolderHeap.isEmpty()) {
sortTempFileChunkHolder = recordHolderHeap.poll();
if (null != sortTempFileChunkHolder) {
sortTempFileChunkHolder.closeStream();
}
}
}
recordHolderHeap = null;
}
}