blob: 558a40af9bbd9d563f573d6d12c6bdfbd73c88ab [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.processing.loading.sort.unsafe.merger;
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.util.NoSuchElementException;
import java.util.Random;
import java.util.concurrent.Callable;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow;
import org.apache.carbondata.processing.loading.sort.CarbonPriorityQueue;
import org.apache.carbondata.processing.loading.sort.SortStepRowHandler;
import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeCarbonRowPage;
import org.apache.carbondata.processing.loading.sort.unsafe.holder.UnsafeCarbonRowForMerge;
import org.apache.carbondata.processing.loading.sort.unsafe.holder.UnsafeInmemoryMergeHolder;
import org.apache.carbondata.processing.sort.sortdata.SortParameters;
import org.apache.log4j.Logger;
public class UnsafeInMemoryIntermediateDataMerger implements Callable<Void> {
/**
* LOGGER
*/
private static final Logger LOGGER =
LogServiceFactory.getLogService(UnsafeInMemoryIntermediateDataMerger.class.getName());
/**
* recordHolderHeap
*/
private CarbonPriorityQueue<UnsafeInmemoryMergeHolder> recordHolderHeap;
/**
* fileCounter
*/
private int holderCounter;
/**
* entryCount
*/
private int entryCount;
private UnsafeCarbonRowPage[] unsafeCarbonRowPages;
private long[] mergedAddresses;
private byte[] rowPageIndexes;
private int totalSize;
private SortParameters sortParameters;
private SortStepRowHandler sortStepRowHandler;
private boolean spillDisk;
private File outputFile;
private DataOutputStream outputStream;
/**
* IntermediateFileMerger Constructor
*/
public UnsafeInMemoryIntermediateDataMerger(UnsafeCarbonRowPage[] unsafeCarbonRowPages,
int totalSize, SortParameters sortParameters, boolean spillDisk) {
this.holderCounter = unsafeCarbonRowPages.length;
this.unsafeCarbonRowPages = unsafeCarbonRowPages;
this.mergedAddresses = new long[totalSize];
this.rowPageIndexes = new byte[totalSize];
this.entryCount = 0;
this.totalSize = totalSize;
this.sortParameters = sortParameters;
this.sortStepRowHandler = new SortStepRowHandler(sortParameters);
this.spillDisk = spillDisk;
}
@Override
public Void call() throws Exception {
long intermediateMergeStartTime = System.currentTimeMillis();
int holderCounterConst = holderCounter;
try {
startSorting();
if (spillDisk) {
initSortTempFile();
while (hasNext()) {
writeDataToFile(next());
}
} else {
while (hasNext()) {
writeDataToMemory(next());
}
}
double intermediateMergeCostTime =
(System.currentTimeMillis() - intermediateMergeStartTime) / 1000.0;
LOGGER.info("Intermediate Merge of " + holderCounterConst
+ " in-memory sort Cost Time: " + intermediateMergeCostTime + "(s)");
if (spillDisk) {
LOGGER.info("Merge and spill in-memory pages to disk, location: "
+ outputFile.getAbsolutePath()
+ ", file size in MB: " + outputFile.length() * 0.1 * 10 / 1024 / 1024
+ ", containing rows: " + totalSize);
}
} catch (Exception e) {
LOGGER.error("Problem while intermediate merging", e);
throw e;
} finally {
if (spillDisk) {
CarbonUtil.closeStreams(outputStream);
close();
}
}
return null;
}
/**
* This method will be used to get the sorted record from file
*
* @return sorted record sorted record
*/
private UnsafeCarbonRowForMerge getSortedRecordFromMemory() {
UnsafeCarbonRowForMerge row = null;
// poll the top object from heap
// heap maintains binary tree which is based on heap condition that will
// be based on comparator we are passing the heap
// when will call poll it will always delete root of the tree and then
// it does trickel down operation complexity is log(n)
UnsafeInmemoryMergeHolder poll = this.recordHolderHeap.peek();
// get the row from chunk
row = poll.getRow();
// check if there no entry present
if (!poll.hasNext()) {
poll.close();
this.recordHolderHeap.poll();
// change the file counter
--this.holderCounter;
// return row
return row;
}
// read new row
poll.readRow();
// maintain heap
this.recordHolderHeap.siftTopDown();
// return row
return row;
}
/**
* Below method will be used to start storing process This method will get
* all the temp files present in sort temp folder then it will create the
* record holder heap and then it will read first record from each file and
* initialize the heap
*
*/
private void startSorting() {
LOGGER.info("Number of row pages in intermediate merger: " + this.holderCounter);
// create record holder heap
createRecordHolderQueue(unsafeCarbonRowPages);
// iterate over file list and create chunk holder and add to heap
LOGGER.info("Started adding first record from row page");
UnsafeInmemoryMergeHolder unsafePageHolder = null;
byte index = 0;
for (UnsafeCarbonRowPage unsafeCarbonRowPage : unsafeCarbonRowPages) {
// create chunk holder
unsafePageHolder = new UnsafeInmemoryMergeHolder(unsafeCarbonRowPage, index++);
// initialize
unsafePageHolder.readRow();
// add to heap
this.recordHolderHeap.add(unsafePageHolder);
}
LOGGER.info("Heap Size" + this.recordHolderHeap.size());
}
/**
* This method will be used to create the heap which will be used to hold
* the chunk of data
*/
private void createRecordHolderQueue(UnsafeCarbonRowPage[] pages) {
// creating record holder heap
this.recordHolderHeap = new CarbonPriorityQueue<>(pages.length);
}
/**
* This method will be used to get the sorted row
*
* @return sorted row
*/
private UnsafeCarbonRowForMerge next() {
if (hasNext()) {
return getSortedRecordFromMemory();
} else {
throw new NoSuchElementException("No more elements to return");
}
}
/**
* This method will be used to check whether any more element is present or
* not
*
* @return more element is present
*/
private boolean hasNext() {
return this.holderCounter > 0;
}
/**
* Below method will be used to write data to file
*/
private void writeDataToMemory(UnsafeCarbonRowForMerge row) {
mergedAddresses[entryCount] = row.address;
rowPageIndexes[entryCount] = row.index;
entryCount++;
}
private void initSortTempFile() throws IOException {
String tmpDir = sortParameters.getTempFileLocation()[
new Random().nextInt(sortParameters.getTempFileLocation().length)];
outputFile = new File(tmpDir + File.separator
+ sortParameters.getTableName() + '_'
+ sortParameters.getRangeId() + '_' + System.nanoTime()
+ CarbonCommonConstants.SORT_TEMP_FILE_EXT);
outputStream = FileFactory.getDataOutputStream(outputFile.getPath(),
sortParameters.getFileWriteBufferSize(), sortParameters.getSortTempCompressorName());
outputStream.writeInt(totalSize);
}
private void writeDataToFile(UnsafeCarbonRowForMerge row) throws IOException {
IntermediateSortTempRow sortTempRow = unsafeCarbonRowPages[row.index].getRow(row.address);
sortStepRowHandler.writeIntermediateSortTempRowToOutputStream(sortTempRow, outputStream);
}
public int getEntryCount() {
return entryCount;
}
public UnsafeCarbonRowPage[] getUnsafeCarbonRowPages() {
return unsafeCarbonRowPages;
}
public long[] getMergedAddresses() {
return mergedAddresses;
}
public byte[] getRowPageIndexes() {
return rowPageIndexes;
}
public boolean isSpillDisk() {
return spillDisk;
}
public void close() {
for (UnsafeCarbonRowPage rowPage : unsafeCarbonRowPages) {
rowPage.freeMemory();
}
}
}