| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.carbondata.processing.sort.sortdata; |
| |
| import java.io.File; |
| import java.io.FileFilter; |
| import java.util.AbstractQueue; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.List; |
| import java.util.NoSuchElementException; |
| import java.util.PriorityQueue; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.TimeUnit; |
| |
| import org.apache.carbondata.common.CarbonIterator; |
| import org.apache.carbondata.common.logging.LogServiceFactory; |
| import org.apache.carbondata.core.constants.CarbonCommonConstants; |
| import org.apache.carbondata.core.datastore.block.SegmentProperties; |
| import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException; |
| import org.apache.carbondata.core.metadata.datatype.DataType; |
| import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; |
| import org.apache.carbondata.core.scan.result.iterator.RawResultIterator; |
| import org.apache.carbondata.core.util.CarbonProperties; |
| import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow; |
| import org.apache.carbondata.processing.loading.sort.SortStepRowHandler; |
| import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException; |
| |
| import org.apache.log4j.Logger; |
| |
| public class SingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]> { |
| /** |
| * LOGGER |
| */ |
| private static final Logger LOGGER = |
| LogServiceFactory.getLogService(SingleThreadFinalSortFilesMerger.class.getName()); |
| |
| /** |
| * lockObject |
| */ |
| private static final Object LOCKOBJECT = new Object(); |
| |
| /** |
| * recordHolderHeap |
| */ |
| private AbstractQueue<SortTempFileChunkHolder> recordHolderHeapLocal; |
| |
| /** |
| * tableName |
| */ |
| private String tableName; |
| private SortParameters sortParameters; |
| private SortStepRowHandler sortStepRowHandler; |
| /** |
| * tempFileLocation |
| */ |
| private String[] tempFileLocation; |
| |
| private int maxThreadForSorting; |
| |
| private ExecutorService executorService; |
| |
| private List<Future<Void>> mergerTask; |
| |
| public SingleThreadFinalSortFilesMerger(String[] tempFileLocation, String tableName, |
| SortParameters sortParameters) { |
| this.tempFileLocation = tempFileLocation; |
| this.tableName = tableName; |
| this.sortParameters = sortParameters; |
| this.sortStepRowHandler = new SortStepRowHandler(sortParameters); |
| try { |
| maxThreadForSorting = Integer.parseInt(CarbonProperties.getInstance() |
| .getProperty(CarbonCommonConstants.CARBON_MERGE_SORT_READER_THREAD, |
| CarbonCommonConstants.CARBON_MERGE_SORT_READER_THREAD_DEFAULTVALUE)); |
| } catch (NumberFormatException e) { |
| maxThreadForSorting = |
| Integer.parseInt(CarbonCommonConstants.CARBON_MERGE_SORT_READER_THREAD_DEFAULTVALUE); |
| } |
| this.mergerTask = new ArrayList<>(); |
| } |
| |
| /** |
| * This method will be used to merger the merged files |
| * |
| * @throws CarbonSortKeyAndGroupByException |
| */ |
| public void startFinalMerge() throws CarbonDataWriterException { |
| List<File> filesToMerge = getFilesToMergeSort(); |
| if (filesToMerge.size() == 0) { |
| LOGGER.info("No file to merge in final merge stage"); |
| return; |
| } |
| |
| startSorting(filesToMerge); |
| } |
| |
| /** |
| * Below method will be used to add in memory raw result iterator to priority queue. |
| * This will be called in case of compaction, when it is compacting sorted and unsorted |
| * both type of carbon data file |
| * This method will add sorted file's RawResultIterator to priority queue using |
| * InMemorySortTempChunkHolder as wrapper |
| * |
| * @param sortedRawResultMergerList |
| * @param segmentProperties |
| * @param noDicAndComplexColumns |
| */ |
| public void addInMemoryRawResultIterator(List<RawResultIterator> sortedRawResultMergerList, |
| SegmentProperties segmentProperties, CarbonColumn[] noDicAndComplexColumns, |
| DataType[] measureDataType) { |
| for (RawResultIterator rawResultIterator : sortedRawResultMergerList) { |
| InMemorySortTempChunkHolder inMemorySortTempChunkHolder = |
| new InMemorySortTempChunkHolder(rawResultIterator, segmentProperties, |
| noDicAndComplexColumns, sortParameters, measureDataType); |
| if (inMemorySortTempChunkHolder.hasNext()) { |
| inMemorySortTempChunkHolder.readRow(); |
| recordHolderHeapLocal.add(inMemorySortTempChunkHolder); |
| } |
| } |
| } |
| |
| private List<File> getFilesToMergeSort() { |
| final int rangeId = sortParameters.getRangeId(); |
| FileFilter fileFilter = new FileFilter() { |
| public boolean accept(File pathname) { |
| return pathname.getName().startsWith(tableName + '_' + rangeId); |
| } |
| }; |
| |
| // get all the merged files |
| List<File> files = new ArrayList<File>(tempFileLocation.length); |
| for (String tempLoc : tempFileLocation) { |
| File[] subFiles = new File(tempLoc).listFiles(fileFilter); |
| if (null != subFiles && subFiles.length > 0) { |
| files.addAll(Arrays.asList(subFiles)); |
| } |
| } |
| |
| return files; |
| } |
| |
| /** |
| * Below method will be used to start storing process This method will get |
| * all the temp files present in sort temp folder then it will create the |
| * record holder heap and then it will read first record from each file and |
| * initialize the heap |
| * |
| * @throws CarbonSortKeyAndGroupByException |
| */ |
| private void startSorting(List<File> files) throws CarbonDataWriterException { |
| if (files.size() == 0) { |
| LOGGER.info("No files to merge sort"); |
| return; |
| } |
| |
| LOGGER.info("Started Final Merge"); |
| |
| LOGGER.info("Number of temp file: " + files.size()); |
| |
| // create record holder heap |
| createRecordHolderQueue(files.size()); |
| |
| // iterate over file list and create chunk holder and add to heap |
| LOGGER.info("Started adding first record from each file"); |
| this.executorService = Executors.newFixedThreadPool(maxThreadForSorting); |
| |
| for (final File tempFile : files) { |
| |
| Callable<Void> callable = new Callable<Void>() { |
| @Override |
| public Void call() { |
| // create chunk holder |
| SortTempFileChunkHolder sortTempFileChunkHolder = |
| new SortTempFileChunkHolder(tempFile, sortParameters, tableName, true); |
| try { |
| // initialize |
| sortTempFileChunkHolder.initialize(); |
| sortTempFileChunkHolder.readRow(); |
| } catch (CarbonSortKeyAndGroupByException ex) { |
| sortTempFileChunkHolder.closeStream(); |
| notifyFailure(ex); |
| } |
| synchronized (LOCKOBJECT) { |
| recordHolderHeapLocal.add(sortTempFileChunkHolder); |
| } |
| return null; |
| } |
| }; |
| mergerTask.add(executorService.submit(callable)); |
| } |
| executorService.shutdown(); |
| try { |
| executorService.awaitTermination(2, TimeUnit.HOURS); |
| } catch (Exception e) { |
| throw new CarbonDataWriterException(e); |
| } |
| checkFailure(); |
| LOGGER.info("final merger Heap Size" + this.recordHolderHeapLocal.size()); |
| } |
| |
| private void checkFailure() { |
| for (int i = 0; i < mergerTask.size(); i++) { |
| try { |
| mergerTask.get(i).get(); |
| } catch (InterruptedException | ExecutionException e) { |
| throw new CarbonDataWriterException(e); |
| } |
| } |
| } |
| |
| /** |
| * This method will be used to create the heap which will be used to hold |
| * the chunk of data |
| */ |
| private void createRecordHolderQueue(int size) { |
| // creating record holder heap |
| this.recordHolderHeapLocal = new PriorityQueue<SortTempFileChunkHolder>(size); |
| } |
| |
| private synchronized void notifyFailure(Throwable throwable) { |
| close(); |
| LOGGER.error(throwable); |
| } |
| |
| /** |
| * This method will be used to get the sorted sort temp row from the sort temp files |
| * |
| * @return sorted row |
| * @throws CarbonSortKeyAndGroupByException |
| */ |
| public Object[] next() { |
| if (hasNext()) { |
| IntermediateSortTempRow sortTempRow = getSortedRecordFromFile(); |
| return sortStepRowHandler.convertIntermediateSortTempRowTo3Parted(sortTempRow); |
| } else { |
| throw new NoSuchElementException("No more elements to return"); |
| } |
| } |
| |
| /** |
| * This method will be used to get the sorted record from file |
| * |
| * @return sorted record sorted record |
| * @throws CarbonSortKeyAndGroupByException |
| */ |
| private IntermediateSortTempRow getSortedRecordFromFile() throws CarbonDataWriterException { |
| IntermediateSortTempRow row = null; |
| |
| // poll the top object from heap |
| // heap maintains binary tree which is based on heap condition that will |
| // be based on comparator we are passing the heap |
| // when will call poll it will always delete root of the tree and then |
| // it does trickel down operation complexity is log(n) |
| SortTempFileChunkHolder poll = this.recordHolderHeapLocal.poll(); |
| |
| // get the row from chunk |
| row = poll.getRow(); |
| |
| // check if there no entry present |
| if (!poll.hasNext()) { |
| // if chunk is empty then close the stream |
| poll.closeStream(); |
| |
| // reaturn row |
| return row; |
| } |
| |
| // read new row |
| try { |
| poll.readRow(); |
| } catch (CarbonSortKeyAndGroupByException e) { |
| close(); |
| throw new CarbonDataWriterException(e); |
| } |
| |
| // add to heap |
| this.recordHolderHeapLocal.add(poll); |
| |
| // return row |
| return row; |
| } |
| |
| /** |
| * This method will be used to check whether any more element is present or |
| * not |
| * |
| * @return more element is present |
| */ |
| public boolean hasNext() { |
| return this.recordHolderHeapLocal != null && this.recordHolderHeapLocal.size() > 0; |
| } |
| |
| public void close() { |
| if (null != executorService && !executorService.isShutdown()) { |
| executorService.shutdownNow(); |
| } |
| if (null != recordHolderHeapLocal) { |
| SortTempFileChunkHolder sortTempFileChunkHolder; |
| while (!recordHolderHeapLocal.isEmpty()) { |
| sortTempFileChunkHolder = recordHolderHeapLocal.poll(); |
| if (null != sortTempFileChunkHolder) { |
| sortTempFileChunkHolder.closeStream(); |
| } |
| } |
| } |
| } |
| } |