blob: b7c3627320c1fddcb08313115e3ee9b3483bdc15 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.processing.loading.sort.unsafe.merger;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.NoSuchElementException;
import java.util.concurrent.Callable;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow;
import org.apache.carbondata.processing.loading.sort.CarbonPriorityQueue;
import org.apache.carbondata.processing.loading.sort.SortStepRowHandler;
import org.apache.carbondata.processing.loading.sort.unsafe.holder.SortTempChunkHolder;
import org.apache.carbondata.processing.loading.sort.unsafe.holder.UnsafeSortTempFileChunkHolder;
import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
import org.apache.carbondata.processing.sort.sortdata.SortParameters;
import org.apache.carbondata.processing.sort.sortdata.TableFieldStat;
import org.apache.log4j.Logger;
public class UnsafeIntermediateFileMerger implements Callable<Void> {
/**
* LOGGER
*/
private static final Logger LOGGER =
LogServiceFactory.getLogService(UnsafeIntermediateFileMerger.class.getName());
/**
* recordHolderHeap
*/
private CarbonPriorityQueue<SortTempChunkHolder> recordHolderHeap;
/**
* fileCounter
*/
private int fileCounter;
/**
* stream
*/
private DataOutputStream stream;
/**
* totalNumberOfRecords
*/
private int totalNumberOfRecords;
private SortParameters mergerParameters;
private TableFieldStat tableFieldStat;
private File[] intermediateFiles;
private File outPutFile;
private int writeBufferSize;
private String compressorName;
private SortStepRowHandler sortStepRowHandler;
private Throwable throwable;
/**
* IntermediateFileMerger Constructor
*/
public UnsafeIntermediateFileMerger(SortParameters mergerParameters, File[] intermediateFiles,
File outPutFile) {
this.mergerParameters = mergerParameters;
this.fileCounter = intermediateFiles.length;
this.intermediateFiles = intermediateFiles;
this.outPutFile = outPutFile;
this.writeBufferSize = mergerParameters.getBufferSize();
this.compressorName = mergerParameters.getSortTempCompressorName();
this.tableFieldStat = new TableFieldStat(mergerParameters);
this.sortStepRowHandler = new SortStepRowHandler(tableFieldStat);
}
@Override
public Void call() throws Exception {
long intermediateMergeStartTime = System.currentTimeMillis();
int fileConterConst = fileCounter;
try {
startSorting();
initialize();
while (hasNext()) {
writeDataToFile(next());
}
double intermediateMergeCostTime =
(System.currentTimeMillis() - intermediateMergeStartTime) / 1000.0;
LOGGER.info("Intermediate Merge of " + fileConterConst
+ " Sort Temp Files Cost Time: " + intermediateMergeCostTime + "(s)");
} catch (Exception e) {
LOGGER.error("Problem while intermediate merging", e);
clear();
throwable = e;
} finally {
CarbonUtil.closeStreams(this.stream);
if (null == throwable) {
try {
finish();
} catch (CarbonSortKeyAndGroupByException e) {
LOGGER.error("Problem while deleting the merge file", e);
throwable = e;
}
} else {
if (!outPutFile.delete()) {
LOGGER.error("Problem while deleting the merge file");
}
}
}
if (null != throwable) {
throw new CarbonSortKeyAndGroupByException(throwable);
}
return null;
}
/**
* This method is responsible for initializing the out stream
*
* @throws CarbonSortKeyAndGroupByException
*/
private void initialize() throws CarbonSortKeyAndGroupByException {
try {
stream =
FileFactory.getDataOutputStream(outPutFile.getPath(), writeBufferSize, compressorName);
this.stream.writeInt(this.totalNumberOfRecords);
} catch (FileNotFoundException e) {
throw new CarbonSortKeyAndGroupByException("Problem while getting the file", e);
} catch (IOException e) {
throw new CarbonSortKeyAndGroupByException("Problem while writing the data to file", e);
}
}
/**
* This method will be used to get sorted sort temp row from the sort temp files
*
* @return sorted record sorted record
* @throws CarbonSortKeyAndGroupByException
*/
private IntermediateSortTempRow getSortedRecordFromFile()
throws CarbonSortKeyAndGroupByException {
IntermediateSortTempRow row = null;
// poll the top object from heap
// heap maintains binary tree which is based on heap condition that will
// be based on comparator we are passing the heap
// when will call poll it will always delete root of the tree and then
// it does trickel down operation complexity is log(n)
SortTempChunkHolder poll = this.recordHolderHeap.peek();
// get the row from chunk
row = poll.getRow();
// check if there no entry present
if (!poll.hasNext()) {
// if chunk is empty then close the stream
poll.close();
this.recordHolderHeap.poll();
// change the file counter
--this.fileCounter;
// return row
return row;
}
// read new row
poll.readRow();
// maintain heap
this.recordHolderHeap.siftTopDown();
// return row
return row;
}
/**
* Below method will be used to start storing process This method will get
* all the temp files present in sort temp folder then it will create the
* record holder heap and then it will read first record from each file and
* initialize the heap
*
* @throws CarbonSortKeyAndGroupByException
*/
private void startSorting() throws CarbonSortKeyAndGroupByException {
LOGGER.info("Number of temp file: " + this.fileCounter);
// create record holder heap
createRecordHolderQueue(intermediateFiles);
// iterate over file list and create chunk holder and add to heap
LOGGER.info("Started adding first record from each file");
SortTempChunkHolder sortTempFileChunkHolder = null;
for (File tempFile : intermediateFiles) {
// create chunk holder
sortTempFileChunkHolder =
new UnsafeSortTempFileChunkHolder(tempFile, mergerParameters, false, tableFieldStat);
sortTempFileChunkHolder.readRow();
this.totalNumberOfRecords += sortTempFileChunkHolder.numberOfRows();
// add to heap
this.recordHolderHeap.add(sortTempFileChunkHolder);
}
LOGGER.info("Heap Size: " + this.recordHolderHeap.size());
}
/**
* This method will be used to create the heap which will be used to hold
* the chunk of data
*
* @param listFiles list of temp files
*/
private void createRecordHolderQueue(File[] listFiles) {
// creating record holder heap
this.recordHolderHeap = new CarbonPriorityQueue<>(listFiles.length);
}
/**
* This method will be used to get the sorted sort temp row
*
* @return sorted row
* @throws CarbonSortKeyAndGroupByException
*/
private IntermediateSortTempRow next() throws CarbonSortKeyAndGroupByException {
if (hasNext()) {
return getSortedRecordFromFile();
} else {
throw new NoSuchElementException("No more elements to return");
}
}
/**
* This method will be used to check whether any more element is present or
* not
*
* @return more element is present
*/
private boolean hasNext() {
return this.fileCounter > 0;
}
/**
* Below method will be used to write data to file
*
* @throws IOException problem while writing
*/
private void writeDataToFile(IntermediateSortTempRow row) throws IOException {
sortStepRowHandler.writeIntermediateSortTempRowToOutputStream(row, stream);
}
private void finish() throws CarbonSortKeyAndGroupByException {
clear();
try {
CarbonUtil.deleteFiles(intermediateFiles);
} catch (IOException e) {
throw new CarbonSortKeyAndGroupByException("Problem while deleting the intermediate files");
}
}
private void clear() {
if (null != recordHolderHeap) {
SortTempChunkHolder sortTempChunkHolder;
while (!recordHolderHeap.isEmpty()) {
sortTempChunkHolder = recordHolderHeap.poll();
if (null != sortTempChunkHolder) {
sortTempChunkHolder.close();
}
}
}
}
}