blob: 57f18a3044ab2a4a0e90eb914e1d10424222eec1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.processing.loading.sort.impl;
import java.io.File;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.carbondata.common.CarbonIterator;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
import org.apache.carbondata.core.datastore.row.CarbonRow;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
import org.apache.carbondata.processing.loading.row.CarbonRowBatch;
import org.apache.carbondata.processing.loading.row.CarbonSortBatch;
import org.apache.carbondata.processing.loading.sort.AbstractMergeSorter;
import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeCarbonRowPage;
import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeSortDataRows;
import org.apache.carbondata.processing.loading.sort.unsafe.merger.UnsafeIntermediateMerger;
import org.apache.carbondata.processing.loading.sort.unsafe.merger.UnsafeSingleThreadFinalSortFilesMerger;
import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
import org.apache.carbondata.processing.sort.sortdata.SortParameters;
import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
import org.apache.log4j.Logger;
/**
* It parallely reads data from array of iterates and do merge sort.
* It sorts data in batches and send to the next step.
*/
public class UnsafeBatchParallelReadMergeSorterImpl extends AbstractMergeSorter {
private static final Logger LOGGER =
LogServiceFactory.getLogService(UnsafeBatchParallelReadMergeSorterImpl.class.getName());
private SortParameters sortParameters;
private ExecutorService executorService;
private AtomicLong rowCounter;
/* will be incremented for each batch. This ID is used in sort temp files name,
to identify files of that batch */
private AtomicInteger batchId;
public UnsafeBatchParallelReadMergeSorterImpl(AtomicLong rowCounter) {
this.rowCounter = rowCounter;
}
@Override
public void initialize(SortParameters sortParameters) {
this.sortParameters = sortParameters;
batchId = new AtomicInteger(0);
}
@Override
public Iterator<CarbonRowBatch>[] sort(Iterator<CarbonRowBatch>[] iterators)
throws CarbonDataLoadingException {
this.executorService = Executors.newFixedThreadPool(iterators.length);
this.threadStatusObserver = new ThreadStatusObserver(this.executorService);
int batchSize = CarbonProperties.getInstance().getBatchSize();
final SortBatchHolder sortBatchHolder = new SortBatchHolder(sortParameters, iterators.length,
this.threadStatusObserver);
try {
for (int i = 0; i < iterators.length; i++) {
executorService.execute(
new SortIteratorThread(iterators[i], sortBatchHolder, batchSize, rowCounter,
this.threadStatusObserver));
}
} catch (Exception e) {
checkError();
throw new CarbonDataLoadingException("Problem while shutdown the server ", e);
}
checkError();
// Creates the iterator to read from merge sorter.
Iterator<CarbonSortBatch> batchIterator = new CarbonIterator<CarbonSortBatch>() {
@Override
public boolean hasNext() {
return sortBatchHolder.hasNext();
}
@Override
public CarbonSortBatch next() {
return new CarbonSortBatch(sortBatchHolder.next());
}
};
return new Iterator[] { batchIterator };
}
@Override
public void close() {
executorService.shutdown();
try {
executorService.awaitTermination(2, TimeUnit.DAYS);
} catch (InterruptedException e) {
LOGGER.error(e.getMessage(), e);
}
}
/**
* This thread iterates the iterator and adds the rows
*/
private static class SortIteratorThread implements Runnable {
private Iterator<CarbonRowBatch> iterator;
private SortBatchHolder sortDataRows;
private Object[][] buffer;
private AtomicLong rowCounter;
private ThreadStatusObserver threadStatusObserver;
public SortIteratorThread(Iterator<CarbonRowBatch> iterator, SortBatchHolder sortDataRows,
int batchSize, AtomicLong rowCounter, ThreadStatusObserver threadStatusObserver) {
this.iterator = iterator;
this.sortDataRows = sortDataRows;
this.buffer = new Object[batchSize][];
this.rowCounter = rowCounter;
this.threadStatusObserver = threadStatusObserver;
}
@Override
public void run() {
try {
while (iterator.hasNext()) {
CarbonRowBatch batch = iterator.next();
int i = 0;
while (batch.hasNext()) {
CarbonRow row = batch.next();
if (row != null) {
buffer[i++] = row.getData();
}
}
if (i > 0) {
synchronized (sortDataRows) {
sortDataRows.getSortDataRow().addRowBatchWithOutSync(buffer, i);
rowCounter.getAndAdd(i);
if (!sortDataRows.getSortDataRow().canAdd()) {
sortDataRows.finish(false);
sortDataRows.createSortDataRows();
}
}
}
}
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
this.threadStatusObserver.notifyFailed(e);
} finally {
synchronized (sortDataRows) {
sortDataRows.finishThread();
}
}
}
}
private class SortBatchHolder
extends CarbonIterator<UnsafeSingleThreadFinalSortFilesMerger> {
private SortParameters sortParameters;
private UnsafeSingleThreadFinalSortFilesMerger finalMerger;
private UnsafeIntermediateMerger unsafeIntermediateFileMerger;
private UnsafeSortDataRows sortDataRow;
private final BlockingQueue<UnsafeSingleThreadFinalSortFilesMerger> mergerQueue;
private AtomicInteger iteratorCount;
private int batchCount;
private ThreadStatusObserver threadStatusObserver;
private final Object lock = new Object();
SortBatchHolder(SortParameters sortParameters, int numberOfThreads,
ThreadStatusObserver threadStatusObserver) {
this.sortParameters = sortParameters.getCopy();
this.iteratorCount = new AtomicInteger(numberOfThreads);
this.mergerQueue = new LinkedBlockingQueue<>(1);
this.threadStatusObserver = threadStatusObserver;
createSortDataRows();
}
private void createSortDataRows() {
// For each batch, createSortDataRows() will be called.
// Files saved to disk during sorting of previous batch,should not be considered
// for this batch.
// Hence use batchID as rangeID field of sorttempfiles.
// so getFilesToMergeSort() will select only this batch files.
this.sortParameters.setRangeId(batchId.incrementAndGet());
int inMemoryChunkSizeInMB = CarbonProperties.getInstance().getSortMemoryChunkSizeInMB();
setTempLocation(sortParameters);
this.finalMerger = new UnsafeSingleThreadFinalSortFilesMerger(sortParameters,
sortParameters.getTempFileLocation());
unsafeIntermediateFileMerger = new UnsafeIntermediateMerger(sortParameters);
sortDataRow = new UnsafeSortDataRows(sortParameters, unsafeIntermediateFileMerger,
inMemoryChunkSizeInMB);
try {
sortDataRow.initialize();
} catch (Exception e) {
throw new CarbonDataLoadingException(e);
}
batchCount++;
}
private void setTempLocation(SortParameters parameters) {
String[] carbonDataDirectoryPath = CarbonDataProcessorUtil
.getLocalDataFolderLocation(parameters.getCarbonTable(), parameters.getTaskNo(),
parameters.getSegmentId(), false, false);
String[] tempDirs = CarbonDataProcessorUtil.arrayAppend(carbonDataDirectoryPath,
File.separator, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
parameters.setTempFileLocation(tempDirs);
}
@Override
public UnsafeSingleThreadFinalSortFilesMerger next() {
try {
UnsafeSingleThreadFinalSortFilesMerger unsafeSingleThreadFinalSortFilesMerger =
mergerQueue.take();
if (unsafeSingleThreadFinalSortFilesMerger.isStopProcess()) {
throw new RuntimeException(threadStatusObserver.getThrowable());
}
return unsafeSingleThreadFinalSortFilesMerger;
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
public UnsafeSortDataRows getSortDataRow() {
return sortDataRow;
}
public void finish(boolean isFinalAttempt) {
try {
// if the mergerQue is empty and some CarbonDataLoadingException exception has occurred
// then set stop process to true in the finalmerger instance
if (mergerQueue.isEmpty() && threadStatusObserver != null
&& threadStatusObserver.getThrowable() != null && threadStatusObserver
.getThrowable() instanceof CarbonDataLoadingException) {
finalMerger.setStopProcess(true);
if (isFinalAttempt) {
iteratorCount.decrementAndGet();
}
mergerQueue.put(finalMerger);
return;
}
processRowToNextStep(sortDataRow, sortParameters);
unsafeIntermediateFileMerger.finish();
List<UnsafeCarbonRowPage> rowPages = unsafeIntermediateFileMerger.getRowPages();
finalMerger.startFinalMerge(rowPages.toArray(new UnsafeCarbonRowPage[rowPages.size()]),
unsafeIntermediateFileMerger.getMergedPages());
unsafeIntermediateFileMerger.close();
if (isFinalAttempt) {
iteratorCount.decrementAndGet();
}
mergerQueue.put(finalMerger);
sortDataRow = null;
unsafeIntermediateFileMerger = null;
finalMerger = null;
} catch (CarbonDataWriterException e) {
throw new CarbonDataLoadingException(e);
} catch (CarbonSortKeyAndGroupByException e) {
throw new CarbonDataLoadingException(e);
} catch (InterruptedException e) {
// if fails to put in queue because of interrupted exception, we can offer to free the main
// thread from waiting.
if (finalMerger != null) {
finalMerger.setStopProcess(true);
boolean offered = mergerQueue.offer(finalMerger);
if (!offered) {
throw new CarbonDataLoadingException(e);
}
}
throw new CarbonDataLoadingException(e);
}
}
public void finishThread() {
synchronized (lock) {
if (iteratorCount.get() <= 1) {
finish(true);
} else {
iteratorCount.decrementAndGet();
}
}
}
public boolean hasNext() {
return iteratorCount.get() > 0 || !mergerQueue.isEmpty();
}
/**
* Below method will be used to process data to next step
*/
private boolean processRowToNextStep(UnsafeSortDataRows sortDataRows, SortParameters parameters)
throws CarbonDataLoadingException {
try {
// start sorting
sortDataRows.startSorting();
// check any more rows are present
LOGGER.info("Record Processed For table: " + parameters.getTableName());
CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
.recordSortRowsStepTotalTime(parameters.getPartitionID(), System.currentTimeMillis());
CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
.recordDictionaryValuesTotalTime(parameters.getPartitionID(),
System.currentTimeMillis());
return false;
} catch (Exception e) {
throw new CarbonDataLoadingException(e);
}
}
}
}