| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.tajo.engine.planner.physical; |
| |
| import com.google.common.base.Preconditions; |
| import com.google.common.primitives.*; |
| import com.google.common.util.concurrent.SettableFuture; |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.fs.LocalDirAllocator; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.fs.RawLocalFileSystem; |
| import org.apache.hadoop.io.IOUtils; |
| import org.apache.tajo.BuiltinStorages; |
| import org.apache.tajo.SessionVars; |
| import org.apache.tajo.catalog.*; |
| import org.apache.tajo.catalog.proto.CatalogProtos; |
| import org.apache.tajo.catalog.statistics.TableStats; |
| import org.apache.tajo.common.TajoDataTypes; |
| import org.apache.tajo.conf.TajoConf.ConfVars; |
| import org.apache.tajo.datum.TextDatum; |
| import org.apache.tajo.engine.planner.PhysicalPlanningException; |
| import org.apache.tajo.engine.query.QueryContext; |
| import org.apache.tajo.exception.TajoRuntimeException; |
| import org.apache.tajo.exception.UnsupportedException; |
| import org.apache.tajo.plan.logical.ScanNode; |
| import org.apache.tajo.plan.logical.SortNode; |
| import org.apache.tajo.storage.*; |
| import org.apache.tajo.storage.Scanner; |
| import org.apache.tajo.storage.fragment.FileFragment; |
| import org.apache.tajo.storage.fragment.FragmentConvertor; |
| import org.apache.tajo.storage.rawfile.DirectRawFileWriter; |
| import org.apache.tajo.tuple.memory.OffHeapRowBlockUtils; |
| import org.apache.tajo.tuple.memory.UnSafeTuple; |
| import org.apache.tajo.tuple.memory.UnSafeTupleList; |
| import org.apache.tajo.unit.StorageUnit; |
| import org.apache.tajo.util.FileUtil; |
| import org.apache.tajo.worker.TaskAttemptContext; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.util.*; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.Future; |
| |
| /** |
| * This external sort algorithm can be characterized by the followings: |
| * |
| * <ul> |
| * <li>in-memory sort if input data size fits a sort buffer</li> |
| * <li>k-way merge sort if input data size exceeds the size of sort buffer</li> |
| * <li>parallel merge</li> |
| * <li>final merge avoidance</li> |
| * <li>Unbalance merge if needed</li> |
| * </ul> |
| */ |
| public class ExternalSortExec extends SortExec { |
| |
| enum SortAlgorithm{ |
| TIM, |
| MSD_RADIX, |
| } |
| |
| /** Class logger */ |
| private static final Log LOG = LogFactory.getLog(ExternalSortExec.class); |
| /** The prefix of fragment name for intermediate */ |
| private static final String INTERMEDIATE_FILE_PREFIX = "@interFile_"; |
| |
| private SortNode plan; |
| /** the data format of intermediate file*/ |
| private TableMeta intermediateMeta; |
| /** the defaultFanout of external sort */ |
| private final int defaultFanout; |
| /** It's the size of in-memory table. If memory consumption exceeds it, store the memory table into a disk. */ |
| private final long sortBufferBytesNum; |
| /** the number of available cores */ |
| private final int allocatedCoreNum; |
| /** If there are available multiple cores, it tries parallel merge. */ |
| private ExecutorService executorService; |
| /** used for in-memory sort of each chunk. */ |
| private UnSafeTupleList inMemoryTable; |
| /** for zero copy tuple comparison */ |
| private Comparator<UnSafeTuple> unSafeComparator; |
| /** for other type tuple comparison */ |
| private Comparator<Tuple> primitiveComparator; |
| /** temporal dir */ |
| private Path sortTmpDir; |
| /** It enables round-robin disks allocation */ |
| private final LocalDirAllocator localDirAllocator; |
| /** local file system */ |
| private final RawLocalFileSystem localFS; |
| /** final output files which are used for cleaning */ |
| private List<Chunk> finalOutputFiles = null; |
| /** for directly merging sorted inputs */ |
| private List<Chunk> mergedInputFragments = null; |
| |
| /////////////////////////////////////////////////// |
| // transient variables |
| /////////////////////////////////////////////////// |
| /** already sorted or not */ |
| private boolean sorted = false; |
| /** the final result */ |
| private Scanner result; |
| /** total bytes of input data */ |
| private long inputBytes; |
| |
| private final SortAlgorithm sortAlgorithm; |
| |
| private ExternalSortExec(final TaskAttemptContext context, final SortNode plan) |
| throws PhysicalPlanningException { |
| super(context, plan.getInSchema(), plan.getOutSchema(), null, plan.getSortKeys()); |
| |
| this.plan = plan; |
| this.defaultFanout = context.getConf().getIntVar(ConfVars.EXECUTOR_EXTERNAL_SORT_FANOUT); |
| if (defaultFanout < 2) { |
| throw new PhysicalPlanningException(ConfVars.EXECUTOR_EXTERNAL_SORT_FANOUT.varname + " cannot be lower than 2"); |
| } |
| // TODO - sort buffer and core num should be changed to use the allocated container resource. |
| this.sortBufferBytesNum = context.getQueryContext().getInt(SessionVars.EXTSORT_BUFFER_SIZE) * StorageUnit.MB; |
| this.allocatedCoreNum = context.getConf().getIntVar(ConfVars.EXECUTOR_EXTERNAL_SORT_THREAD_NUM); |
| this.localDirAllocator = new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname); |
| this.localFS = new RawLocalFileSystem(); |
| this.intermediateMeta = CatalogUtil.newTableMeta(BuiltinStorages.DRAW, context.getConf()); |
| this.inputStats = new TableStats(); |
| this.sortAlgorithm = getSortAlgorithm(context.getQueryContext(), sortSpecs); |
| LOG.info(sortAlgorithm.name() + " sort is selected"); |
| } |
| |
| private static SortAlgorithm getSortAlgorithm(QueryContext context, SortSpec[] sortSpecs) { |
| String sortAlgorithm = context.get(SessionVars.SORT_ALGORITHM, SortAlgorithm.TIM.name()); |
| if (Arrays.stream(sortSpecs) |
| .filter(sortSpec -> !RadixSort.isApplicableType(sortSpec)).count() > 0) { |
| if (sortAlgorithm.equalsIgnoreCase(SortAlgorithm.MSD_RADIX.name())) { |
| LOG.warn("Non-applicable types exist. Falling back to " + SortAlgorithm.TIM.name() + " sort"); |
| } |
| return SortAlgorithm.TIM; |
| } |
| if (sortAlgorithm.equalsIgnoreCase(SortAlgorithm.TIM.name())) { |
| return SortAlgorithm.TIM; |
| } else if (sortAlgorithm.equalsIgnoreCase(SortAlgorithm.MSD_RADIX.name())) { |
| return SortAlgorithm.MSD_RADIX; |
| } else { |
| LOG.warn("Unknown sort type: " + sortAlgorithm); |
| LOG.warn("Falling back to " + SortAlgorithm.TIM.name() + " sort"); |
| return SortAlgorithm.TIM; |
| } |
| } |
| |
| public ExternalSortExec(final TaskAttemptContext context,final SortNode plan, final ScanNode scanNode, |
| final CatalogProtos.FragmentProto[] fragments) throws PhysicalPlanningException { |
| this(context, plan); |
| |
| mergedInputFragments = new ArrayList<>(); |
| for (CatalogProtos.FragmentProto proto : fragments) { |
| FileFragment fragment = FragmentConvertor.convert(FileFragment.class, proto); |
| mergedInputFragments.add(new Chunk(inSchema, fragment, scanNode.getTableDesc().getMeta())); |
| } |
| } |
| |
| public ExternalSortExec(final TaskAttemptContext context, final SortNode plan, final PhysicalExec child) |
| throws IOException { |
| this(context, plan); |
| setChild(child); |
| } |
| |
| @Override |
| public void init() throws IOException { |
| if(allocatedCoreNum > 1) { |
| this.executorService = Executors.newFixedThreadPool(this.allocatedCoreNum); |
| } |
| |
| this.sortTmpDir = getExecutorTmpDir(); |
| |
| int initialArraySize = context.getQueryContext().getInt(SessionVars.SORT_LIST_SIZE); |
| this.inMemoryTable = new UnSafeTupleList(SchemaUtil.toDataTypes(inSchema), initialArraySize); |
| this.unSafeComparator = new UnSafeComparator(inSchema, sortSpecs); |
| this.primitiveComparator = new PrimitiveComparator(inSchema, sortSpecs); |
| |
| super.init(); |
| } |
| |
| public SortNode getPlan() { |
| return this.plan; |
| } |
| |
| private List<UnSafeTuple> sort(UnSafeTupleList tupleBlock) { |
| switch (sortAlgorithm) { |
| case TIM: |
| return OffHeapRowBlockUtils.sort(tupleBlock, unSafeComparator); |
| case MSD_RADIX: |
| return RadixSort.sort(context.getQueryContext(), tupleBlock, inSchema, sortSpecs, unSafeComparator); |
| default: |
| // The below line is not reachable. So, an exception should be thrown if it is executed. |
| throw new TajoRuntimeException(new UnsupportedException(sortAlgorithm.name())); |
| } |
| } |
| |
| /** |
| * Sort a tuple block and store them into a chunk file |
| */ |
| private Chunk sortAndStoreChunk(int chunkId, UnSafeTupleList tupleBlock) |
| throws IOException { |
| int rowNum = tupleBlock.size(); |
| |
| long sortStart = System.currentTimeMillis(); |
| this.sort(tupleBlock); |
| long sortEnd = System.currentTimeMillis(); |
| |
| long chunkWriteStart = System.currentTimeMillis(); |
| Path outputPath = getChunkPathForWrite(0, chunkId); |
| final DirectRawFileWriter appender = |
| new DirectRawFileWriter(context.getConf(), null, inSchema, intermediateMeta, outputPath); |
| appender.init(); |
| for (Tuple t : tupleBlock) { |
| appender.addTuple(t); |
| } |
| appender.close(); |
| long chunkWriteEnd = System.currentTimeMillis(); |
| |
| info(LOG, "Chunk #" + chunkId + " sort and written (" + |
| FileUtil.humanReadableByteCount(appender.getOffset(), false) + " bytes, " + rowNum + " rows, " + |
| "sort time: " + (sortEnd - sortStart) + " msec, " + |
| "write time: " + (chunkWriteEnd - chunkWriteStart) + " msec)"); |
| |
| FileFragment frag = new FileFragment("", outputPath, 0, |
| new File(localFS.makeQualified(outputPath).toUri()).length()); |
| return new Chunk(inSchema, frag, intermediateMeta); |
| } |
| |
| /** |
| * It divides all tuples into a number of chunks, then sort for each chunk. |
| * |
| * @return All paths of chunks |
| * @throws java.io.IOException |
| */ |
| private List<Chunk> sortAndStoreAllChunks() throws IOException { |
| Tuple tuple; |
| List<Chunk> chunkPaths = new ArrayList<>(); |
| |
| int chunkId = 0; |
| long runStartTime = System.currentTimeMillis(); |
| |
| while (!context.isStopped() && (tuple = child.next()) != null) { // partition sort start |
| inMemoryTable.addTuple(tuple); |
| |
| if (inMemoryTable.usedMem() > sortBufferBytesNum) { // if input data exceeds main-memory at least once |
| long runEndTime = System.currentTimeMillis(); |
| info(LOG, "Chunk #" + chunkId + " run loading time: " + (runEndTime - runStartTime) + " msec"); |
| runStartTime = runEndTime; |
| |
| info(LOG, "Memory consumption exceeds " + FileUtil.humanReadableByteCount(inMemoryTable.usedMem(), false)); |
| |
| chunkPaths.add(sortAndStoreChunk(chunkId, inMemoryTable)); |
| inMemoryTable.clear(); |
| chunkId++; |
| |
| // When the volume of sorting data once exceed the size of sort buffer, |
| // the total progress of this external sort is divided into two parts. |
| // In contrast, if the data fits in memory, the progress is only one part. |
| // |
| // When the progress is divided into two parts, the first part sorts tuples on memory and stores them |
| // into a chunk. The second part merges stored chunks into fewer chunks, and it continues until the number |
| // of merged chunks is fewer than the default fanout. |
| // |
| // The fact that the code reach here means that the first chunk has been just stored. |
| // That is, the progress was divided into two parts. |
| // So, it multiply the progress of the children operator and 0.5f. |
| progress = child.getProgress() * 0.5f; |
| } |
| } |
| |
| if(inMemoryTable.size() > 0) { //if there are at least one or more input tuples |
| //store the remain data into a memory chunk. |
| chunkPaths.add(new Chunk(inSchema, inMemoryTable, intermediateMeta)); |
| } |
| |
| // get total loaded (or stored) bytes and total row numbers |
| TableStats childTableStats = child.getInputStats(); |
| if (childTableStats != null) { |
| inputBytes = childTableStats.getNumBytes(); |
| } |
| return chunkPaths; |
| } |
| |
| /** |
| * Get a local path from all temporal paths in round-robin manner. |
| */ |
| private synchronized Path getChunkPathForWrite(int level, int chunkId) throws IOException { |
| return localFS.makeQualified(localDirAllocator.getLocalPathForWrite( |
| sortTmpDir + "/" + level + "_" + chunkId, context.getConf())); |
| } |
| |
| @Override |
| public Tuple next() throws IOException { |
| |
| if (!sorted) { // if not sorted, first sort all data |
| |
| // if input files are given, it starts merging directly. |
| if (mergedInputFragments != null) { |
| try { |
| this.result = externalMergeAndSort(mergedInputFragments); |
| this.inputBytes = result.getInputStats().getNumBytes(); |
| } catch (Exception e) { |
| throw new PhysicalPlanningException(e); |
| } |
| } else { |
| // Try to sort all data, and store them as multiple chunks if memory exceeds |
| long startTimeOfChunkSplit = System.currentTimeMillis(); |
| List<Chunk> chunks = sortAndStoreAllChunks(); |
| long endTimeOfChunkSplit = System.currentTimeMillis(); |
| info(LOG, chunks.size() + " Chunks creation time: " + (endTimeOfChunkSplit - startTimeOfChunkSplit) + " msec"); |
| |
| if(chunks.size() == 0) { |
| this.result = new NullScanner(context.getConf(), inSchema, intermediateMeta, null); |
| } else { |
| try { |
| this.result = externalMergeAndSort(chunks); |
| } catch (Exception e) { |
| throw new PhysicalPlanningException(e); |
| } |
| } |
| } |
| |
| sorted = true; |
| result.init(); |
| |
| // if loaded and sorted, we assume that it proceeds the half of one entire external sort operation. |
| progress = 0.5f; |
| } |
| |
| return result.next(); |
| } |
| |
| private int calculateFanout(int remainInputChunks, int inputNum, int outputNum, int startIdx) { |
| int computedFanout = Math.min(remainInputChunks, defaultFanout); |
| |
| // Why should we detect an opportunity for unbalanced merge? |
| // |
| // Assume that a fanout is given by 8 and there are 10 chunks. |
| // If we firstly merge 3 chunks into one chunk, there remain only 8 chunks. |
| // Then, we can just finish the merge phase even though we don't complete merge phase on all chunks. |
| if (checkIfCanBeUnbalancedMerged(inputNum - (startIdx + computedFanout), outputNum + 1)) { |
| int candidateFanout = computedFanout; |
| while (checkIfCanBeUnbalancedMerged(inputNum - (startIdx + candidateFanout), outputNum + 1)) { |
| candidateFanout--; |
| } |
| int beforeFanout = computedFanout; |
| if (computedFanout > candidateFanout + 1) { |
| computedFanout = candidateFanout + 1; |
| info(LOG, "Fanout reduced for unbalanced merge: " + beforeFanout + " -> " + computedFanout); |
| } |
| } |
| |
| return computedFanout; |
| } |
| |
| private Scanner externalMergeAndSort(List<Chunk> chunks) throws Exception { |
| int level = 0; |
| final List<Chunk> inputFiles = new ArrayList<>(chunks); |
| final List<Chunk> outputFiles = new ArrayList<>(); |
| int remainRun = inputFiles.size(); |
| int chunksSize = chunks.size(); |
| |
| long mergeStart = System.currentTimeMillis(); |
| |
| // continue until the remain runs are larger than defaultFanout |
| while (remainRun > defaultFanout) { |
| |
| // reset outChunkId |
| int remainInputRuns = inputFiles.size(); |
| int outChunkId = 0; |
| int outputFileNum = 0; |
| List<Future<Chunk>> futures = new ArrayList<>(); |
| // the number of files being merged in threads. |
| List<Integer> numberOfMergingFiles = new ArrayList<>(); |
| |
| for (int startIdx = 0; startIdx < inputFiles.size();) { |
| |
| // calculate proper fanout |
| int fanout = calculateFanout(remainInputRuns, inputFiles.size(), outputFileNum, startIdx); |
| // how many files are merged in ith thread? |
| numberOfMergingFiles.add(fanout); |
| // launch a merger runner |
| if(allocatedCoreNum > 1) { |
| futures.add(executorService.submit( |
| new KWayMergerCaller(level, outChunkId++, inputFiles, startIdx, fanout, false))); |
| } else { |
| final SettableFuture<Chunk> future = SettableFuture.create(); |
| future.set(new KWayMergerCaller(level, outChunkId++, inputFiles, startIdx, fanout, false).call()); |
| futures.add(future); |
| } |
| outputFileNum++; |
| |
| startIdx += fanout; |
| remainInputRuns = inputFiles.size() - startIdx; |
| |
| // If unbalanced merge is available, it finishes the merge phase earlier. |
| if (checkIfCanBeUnbalancedMerged(remainInputRuns, outputFileNum)) { |
| info(LOG, "Unbalanced merge possibility detected: number of remain input (" + remainInputRuns |
| + ") and output files (" + outputFileNum + ") <= " + defaultFanout); |
| |
| List<Chunk> switched = new ArrayList<>(); |
| // switch the remain inputs to the next outputs |
| for (int j = startIdx; j < inputFiles.size(); j++) { |
| switched.add(inputFiles.get(j)); |
| } |
| inputFiles.removeAll(switched); |
| outputFiles.addAll(switched); |
| |
| break; |
| } |
| } |
| |
| // wait for all sort runners |
| int finishedMerger = 0; |
| int index = 0; |
| for (Future<Chunk> future : futures) { |
| outputFiles.add(future.get()); |
| // Getting the number of merged files |
| finishedMerger += numberOfMergingFiles.get(index++); |
| // progress = (# number of merged files / total number of files) * 0.5; |
| progress = ((float)finishedMerger/(float)chunksSize) * 0.5f; |
| } |
| |
| /* |
| * delete merged intermediate files |
| * |
| * There may be 4 different types of file fragments in the list inputFiles |
| * + A: a fragment created from fetched data from a remote host. By default, this fragment represents |
| * a whole physical file (i.e., startOffset == 0 and length == length of physical file) |
| * + B1: a fragment created from a local file (pseudo-fetched data from local host) in which the fragment |
| * represents the whole physical file (i.e., startOffset == 0 AND length == length of physical file) |
| * + B2: a fragment created from a local file (pseudo-fetched data from local host) in which the fragment |
| * represents only a part of the physical file (i.e., startOffset > 0 OR length != length of physical file) |
| * + C: a fragment created from merging some fragments of the above types. When this fragment is created, |
| * its startOffset is set to 0 and its length is set to the length of the physical file, automatically |
| * |
| * Fragments of types A, B1, and B2 are inputs of ExternalSortExec. Among them, only B2-type fragments will |
| * possibly be used by another task in the future. Thus, ideally, all fragments of types A, B1, and C can be |
| * deleted at this point. However, for the ease of future code maintenance, we delete only type-C fragments here |
| */ |
| int numDeletedFiles = 0; |
| for (Chunk chunk : inputFiles) { |
| if (chunk.isMemory()) { |
| if (LOG.isDebugEnabled()) { |
| debug(LOG, "Remove intermediate memory tuples: " + chunk.getMemoryTuples().usedMem()); |
| } |
| chunk.getMemoryTuples().release(); |
| } else if (chunk.getFragment().getTableName().contains(INTERMEDIATE_FILE_PREFIX)) { |
| localFS.delete(chunk.getFragment().getPath(), true); |
| numDeletedFiles++; |
| |
| if (LOG.isDebugEnabled()) { |
| debug(LOG, "Delete merged intermediate file: " + chunk.getFragment()); |
| } |
| } |
| } |
| if(LOG.isDebugEnabled()) { |
| debug(LOG, numDeletedFiles + " merged intermediate files deleted"); |
| } |
| |
| // switch input files to output files, and then clear outputFiles |
| inputFiles.clear(); |
| inputFiles.addAll(outputFiles); |
| remainRun = inputFiles.size(); |
| outputFiles.clear(); |
| level++; |
| } |
| |
| long mergeEnd = System.currentTimeMillis(); |
| info(LOG, "Total merge time: " + (mergeEnd - mergeStart) + " msec"); |
| |
| // final result |
| finalOutputFiles = inputFiles; |
| |
| result = createFinalMerger(inputFiles); |
| return result; |
| } |
| |
| /** |
| * Merge Thread |
| */ |
| private class KWayMergerCaller implements Callable<Chunk> { |
| final int level; |
| final int nextRunId; |
| final List<Chunk> inputFiles; |
| final int startIdx; |
| final int mergeFanout; |
| final boolean updateInputStats; |
| |
| public KWayMergerCaller(final int level, final int nextRunId, final List<Chunk> inputFiles, |
| final int startIdx, final int mergeFanout, final boolean updateInputStats) { |
| this.level = level; |
| this.nextRunId = nextRunId; |
| this.inputFiles = inputFiles; |
| this.startIdx = startIdx; |
| this.mergeFanout = mergeFanout; |
| this.updateInputStats = updateInputStats; |
| } |
| |
| @Override |
| public Chunk call() throws Exception { |
| final Path outputPath = getChunkPathForWrite(level + 1, nextRunId); |
| info(LOG, mergeFanout + " files are being merged to an output file " + outputPath.getName()); |
| long mergeStartTime = System.currentTimeMillis(); |
| |
| final Scanner merger = createKWayMerger(inputFiles, startIdx, mergeFanout); |
| merger.init(); |
| |
| final DirectRawFileWriter output = |
| new DirectRawFileWriter(context.getConf(), null, inSchema, intermediateMeta, outputPath); |
| output.init(); |
| |
| Tuple mergeTuple; |
| while((mergeTuple = merger.next()) != null) { |
| output.addTuple(mergeTuple); |
| } |
| merger.close(); |
| output.close(); |
| long mergeEndTime = System.currentTimeMillis(); |
| info(LOG, outputPath.getName() + " is written to a disk. (" |
| + FileUtil.humanReadableByteCount(output.getOffset(), false) |
| + " bytes, " + (mergeEndTime - mergeStartTime) + " msec)"); |
| File f = new File(localFS.makeQualified(outputPath).toUri()); |
| FileFragment frag = new FileFragment(INTERMEDIATE_FILE_PREFIX + outputPath.getName(), outputPath, 0, f.length()); |
| return new Chunk(inSchema, frag, intermediateMeta); |
| } |
| } |
| |
| /** |
| * It checks if unbalanced merge is possible. |
| */ |
| private boolean checkIfCanBeUnbalancedMerged(int remainInputNum, int outputNum) { |
| return (remainInputNum + outputNum) <= defaultFanout; |
| } |
| |
| /** |
| * Create a merged file scanner or k-way merge scanner. |
| */ |
| private Scanner createFinalMerger(List<Chunk> inputs) throws IOException { |
| if (inputs.size() == 1) { |
| this.result = getScanner(inputs.get(0)); |
| } else { |
| this.result = createKWayMerger(inputs, 0, inputs.size()); |
| } |
| return result; |
| } |
| |
| private Scanner getScanner(Chunk chunk) throws IOException { |
| if (chunk.isMemory()) { |
| long sortStart = System.currentTimeMillis(); |
| |
| this.sort(inMemoryTable); |
| Scanner scanner = new MemTableScanner<>(inMemoryTable, inMemoryTable.size(), inMemoryTable.usedMem()); |
| if(LOG.isDebugEnabled()) { |
| debug(LOG, "Memory Chunk sort (" + FileUtil.humanReadableByteCount(inMemoryTable.usedMem(), false) |
| + " bytes, " + inMemoryTable.size() + " rows, sort time: " |
| + (System.currentTimeMillis() - sortStart) + " msec)"); |
| } |
| return scanner; |
| } else { |
| return TablespaceManager.getLocalFs().getScanner(chunk.meta, chunk.schema, chunk.fragment, chunk.schema); |
| } |
| } |
| |
| private Scanner createKWayMerger(List<Chunk> inputs, final int startChunkId, final int num) throws IOException { |
| final Scanner [] sources = new Scanner[num]; |
| for (int i = 0; i < num; i++) { |
| sources[i] = getScanner(inputs.get(startChunkId + i)); |
| } |
| return createKWayMergerInternal(sources, 0, num); |
| } |
| |
| private Scanner createKWayMergerInternal(final Scanner [] sources, final int startIdx, final int num) |
| throws IOException { |
| if (num > 1) { |
| final int mid = (int) Math.ceil((float)num / 2); |
| Scanner left = createKWayMergerInternal(sources, startIdx, mid); |
| Scanner right = createKWayMergerInternal(sources, startIdx + mid, num - mid); |
| return new PairWiseMerger(inSchema, left, right, primitiveComparator); |
| } else { |
| return sources[startIdx]; |
| } |
| } |
| |
| private static class MemTableScanner<T extends Tuple> extends AbstractScanner { |
| final Iterable<T> iterable; |
| final long sortAndStoredBytes; |
| final int totalRecords; |
| |
| Iterator<T> iterator; |
| // for input stats |
| float scannerProgress; |
| int numRecords; |
| TableStats scannerTableStats; |
| |
| public MemTableScanner(Iterable<T> iterable, int length, long inBytes) { |
| this.iterable = iterable; |
| this.totalRecords = length; |
| this.sortAndStoredBytes = inBytes; |
| } |
| |
| @Override |
| public void init() throws IOException { |
| iterator = iterable.iterator(); |
| |
| scannerProgress = 0.0f; |
| numRecords = 0; |
| |
| // it will be returned as the final stats |
| scannerTableStats = new TableStats(); |
| scannerTableStats.setNumBytes(sortAndStoredBytes); |
| scannerTableStats.setReadBytes(sortAndStoredBytes); |
| scannerTableStats.setNumRows(totalRecords); |
| } |
| |
| @Override |
| public Tuple next() throws IOException { |
| if (iterator.hasNext()) { |
| numRecords++; |
| return iterator.next(); |
| } else { |
| return null; |
| } |
| } |
| |
| @Override |
| public void reset() throws IOException { |
| init(); |
| } |
| |
| @Override |
| public void close() throws IOException { |
| iterator = null; |
| scannerProgress = 1.0f; |
| } |
| |
| @Override |
| public float getProgress() { |
| if (iterator != null && numRecords > 0) { |
| return (float)numRecords / (float)totalRecords; |
| |
| } else { // if an input is empty |
| return scannerProgress; |
| } |
| } |
| |
| @Override |
| public TableStats getInputStats() { |
| return scannerTableStats; |
| } |
| } |
| |
| enum State { |
| NEW, |
| INITED, |
| CLOSED |
| } |
| |
| /** |
| * Two-way merger scanner that reads two input sources and outputs one output tuples sorted in some order. |
| */ |
| private static class PairWiseMerger extends AbstractScanner { |
| |
| protected final Schema schema; |
| protected final Comparator<Tuple> comparator; |
| |
| protected final Scanner leftScan; |
| protected final Scanner rightScan; |
| |
| private Tuple leftTuple; |
| private Tuple rightTuple; |
| private boolean leftEOF; |
| private boolean rightEOF; |
| |
| private Tuple outTuple; |
| |
| private float mergerProgress; |
| private TableStats mergerInputStats; |
| |
| private State state = State.NEW; |
| |
| public PairWiseMerger(Schema schema, Scanner leftScanner, Scanner rightScanner, Comparator<Tuple> comparator) |
| throws IOException { |
| this.schema = schema; |
| this.leftScan = leftScanner; |
| this.rightScan = rightScanner; |
| this.comparator = comparator; |
| } |
| |
| private void setState(State state) { |
| this.state = state; |
| } |
| |
| @Override |
| public void init() throws IOException { |
| if (state == State.NEW) { |
| leftScan.init(); |
| rightScan.init(); |
| |
| mergerInputStats = new TableStats(); |
| mergerProgress = 0.0f; |
| |
| setState(State.INITED); |
| } else { |
| throw new IllegalStateException("Illegal State: init() is not allowed in " + state.name()); |
| } |
| } |
| |
| protected int compare() { |
| return comparator.compare(leftTuple, rightTuple); |
| } |
| |
| @Override |
| public Tuple next() throws IOException { |
| if(!leftEOF && leftTuple == null) { |
| leftTuple = leftScan.next(); |
| } |
| |
| if(!rightEOF && rightTuple == null) { |
| rightTuple = rightScan.next(); |
| } |
| |
| if (leftTuple != null && rightTuple != null) { |
| if (compare() < 0) { |
| outTuple = leftTuple; |
| leftTuple = null; |
| } else { |
| outTuple = rightTuple; |
| rightTuple = null; |
| } |
| return outTuple; |
| } |
| |
| if (leftTuple == null) { |
| leftEOF = true; |
| |
| if (rightTuple != null) { |
| outTuple = rightTuple; |
| rightTuple = null; |
| } else { |
| rightEOF = true; |
| outTuple = null; |
| } |
| } else { |
| rightEOF = true; |
| outTuple = leftTuple; |
| leftTuple = null; |
| } |
| return outTuple; |
| } |
| |
| @Override |
| public void reset() throws IOException { |
| if (state == State.INITED) { |
| leftScan.reset(); |
| rightScan.reset(); |
| |
| leftTuple = null; |
| rightTuple = null; |
| outTuple = null; |
| |
| leftEOF = false; |
| rightEOF = false; |
| |
| } else { |
| throw new IllegalStateException("Illegal State: init() is not allowed in " + state.name()); |
| } |
| } |
| |
| @Override |
| public void close() throws IOException { |
| IOUtils.cleanup(LOG, leftScan, rightScan); |
| getInputStats(); |
| mergerProgress = 1.0f; |
| leftTuple = null; |
| rightTuple = null; |
| setState(State.CLOSED); |
| } |
| |
| @Override |
| public Schema getSchema() { |
| return schema; |
| } |
| |
| @Override |
| public float getProgress() { |
| if (leftScan == null) { |
| return mergerProgress; |
| } |
| return leftScan.getProgress() * 0.5f + rightScan.getProgress() * 0.5f; |
| } |
| |
| @Override |
| public TableStats getInputStats() { |
| if (leftScan == null) { |
| return mergerInputStats; |
| } |
| TableStats leftInputStats = leftScan.getInputStats(); |
| if (mergerInputStats == null) { |
| mergerInputStats = new TableStats(); |
| } |
| mergerInputStats.setNumBytes(0); |
| mergerInputStats.setReadBytes(0); |
| mergerInputStats.setNumRows(0); |
| |
| if (leftInputStats != null) { |
| mergerInputStats.setNumBytes(leftInputStats.getNumBytes()); |
| mergerInputStats.setReadBytes(leftInputStats.getReadBytes()); |
| mergerInputStats.setNumRows(leftInputStats.getNumRows()); |
| } |
| |
| TableStats rightInputStats = rightScan.getInputStats(); |
| if (rightInputStats != null) { |
| mergerInputStats.setNumBytes(mergerInputStats.getNumBytes() + rightInputStats.getNumBytes()); |
| mergerInputStats.setReadBytes(mergerInputStats.getReadBytes() + rightInputStats.getReadBytes()); |
| mergerInputStats.setNumRows(mergerInputStats.getNumRows() + rightInputStats.getNumRows()); |
| } |
| |
| return mergerInputStats; |
| } |
| } |
| |
| @Override |
| public void close() throws IOException { |
| super.close(); |
| |
| if (result != null) { |
| result.close(); |
| } |
| |
| if (finalOutputFiles != null) { |
| for (Chunk chunk : finalOutputFiles) { |
| if (!chunk.isMemory()) { |
| FileFragment frag = chunk.getFragment(); |
| File tmpFile = new File(localFS.makeQualified(frag.getPath()).toUri()); |
| if (frag.getStartKey() == 0 && frag.getLength() == tmpFile.length()) { |
| localFS.delete(frag.getPath(), true); |
| if(LOG.isDebugEnabled()) { |
| debug(LOG, "Delete file: " + frag); |
| } |
| } |
| } |
| } |
| } |
| |
| if(inMemoryTable != null) { |
| inMemoryTable.release(); |
| inMemoryTable = null; |
| } |
| |
| if(executorService != null){ |
| executorService.shutdown(); |
| executorService = null; |
| } |
| |
| plan = null; |
| } |
| |
| @Override |
| public void rescan() throws IOException { |
| if (result != null) { |
| result.reset(); |
| } |
| super.rescan(); |
| progress = 0.5f; |
| } |
| |
| @Override |
| public float getProgress() { |
| if (result != null) { |
| return progress + result.getProgress() * 0.5f; |
| } else { |
| return progress; |
| } |
| } |
| |
| @Override |
| public TableStats getInputStats() { |
| if (result != null) { |
| |
| TableStats tableStats = result.getInputStats(); |
| inputStats.setNumRows(tableStats.getNumRows()); |
| inputStats.setNumBytes(inputBytes); |
| inputStats.setReadBytes(tableStats.getReadBytes()); |
| } |
| return inputStats; |
| } |
| |
| private static class Chunk { |
| private FileFragment fragment; |
| private TableMeta meta; |
| private Schema schema; |
| private UnSafeTupleList memoryTuples; |
| private boolean isMemory; |
| |
| public Chunk(Schema schema, FileFragment fragment, TableMeta meta) { |
| this.schema = schema; |
| this.fragment = fragment; |
| this.meta = meta; |
| } |
| |
| public Chunk(Schema schema, UnSafeTupleList tuples, TableMeta meta) { |
| this.memoryTuples = tuples; |
| this.isMemory = true; |
| this.schema = schema; |
| this.meta = meta; |
| } |
| |
| public FileFragment getFragment() { |
| return fragment; |
| } |
| |
| public TableMeta getMeta() { |
| return meta; |
| } |
| |
| public UnSafeTupleList getMemoryTuples() { |
| return memoryTuples; |
| } |
| |
| public boolean isMemory() { |
| return isMemory; |
| } |
| |
| public Schema getSchema() { |
| return schema; |
| } |
| } |
| |
| /** |
| * The Comparator class for UnSafeTuples |
| * |
| * @see UnSafeTuple |
| */ |
| static class UnSafeComparator implements Comparator<UnSafeTuple> { |
| private final int[] sortKeyIds; |
| private final TajoDataTypes.Type[] sortKeyTypes; |
| private final boolean[] asc; |
| private final boolean[] nullFirsts; |
| |
| /** |
| * @param schema The schema of input tuples |
| * @param sortKeys The description of sort keys |
| */ |
| public UnSafeComparator(Schema schema, SortSpec[] sortKeys) { |
| Preconditions.checkArgument(sortKeys.length > 0, |
| "At least one sort key must be specified."); |
| |
| this.sortKeyIds = new int[sortKeys.length]; |
| this.sortKeyTypes = new TajoDataTypes.Type[sortKeys.length]; |
| this.asc = new boolean[sortKeys.length]; |
| this.nullFirsts = new boolean[sortKeys.length]; |
| for (int i = 0; i < sortKeys.length; i++) { |
| if (sortKeys[i].getSortKey().hasQualifier()) { |
| this.sortKeyIds[i] = schema.getColumnId(sortKeys[i].getSortKey().getQualifiedName()); |
| } else { |
| this.sortKeyIds[i] = schema.getColumnIdByName(sortKeys[i].getSortKey().getSimpleName()); |
| } |
| |
| this.asc[i] = sortKeys[i].isAscending(); |
| this.nullFirsts[i] = sortKeys[i].isNullsFirst(); |
| this.sortKeyTypes[i] = sortKeys[i].getSortKey().getDataType().getType(); |
| } |
| } |
| |
| @Override |
| public int compare(UnSafeTuple tuple1, UnSafeTuple tuple2) { |
| for (int i = 0; i < sortKeyIds.length; i++) { |
| int compare = OffHeapRowBlockUtils.compareColumn(tuple1, tuple2, |
| sortKeyIds[i], sortKeyTypes[i], asc[i], nullFirsts[i]); |
| |
| if (compare != 0) { |
| return compare; |
| } |
| } |
| return 0; |
| } |
| } |
| |
| /** |
| * The Comparator class for raw file |
| */ |
| static class PrimitiveComparator implements Comparator<Tuple> { |
| private final int[] sortKeyIds; |
| private final TajoDataTypes.Type[] sortKeyTypes; |
| private final boolean[] asc; |
| private final boolean[] nullFirsts; |
| |
| /** |
| * @param schema The schema of input tuples |
| * @param sortKeys The description of sort keys |
| */ |
| public PrimitiveComparator(Schema schema, SortSpec[] sortKeys) { |
| Preconditions.checkArgument(sortKeys.length > 0, |
| "At least one sort key must be specified."); |
| |
| this.sortKeyIds = new int[sortKeys.length]; |
| this.sortKeyTypes = new TajoDataTypes.Type[sortKeys.length]; |
| this.asc = new boolean[sortKeys.length]; |
| this.nullFirsts = new boolean[sortKeys.length]; |
| for (int i = 0; i < sortKeys.length; i++) { |
| if (sortKeys[i].getSortKey().hasQualifier()) { |
| this.sortKeyIds[i] = schema.getColumnId(sortKeys[i].getSortKey().getQualifiedName()); |
| } else { |
| this.sortKeyIds[i] = schema.getColumnIdByName(sortKeys[i].getSortKey().getSimpleName()); |
| } |
| |
| this.asc[i] = sortKeys[i].isAscending(); |
| this.nullFirsts[i] = sortKeys[i].isNullsFirst(); |
| this.sortKeyTypes[i] = sortKeys[i].getSortKey().getDataType().getType(); |
| } |
| } |
| |
| @Override |
| public int compare(Tuple tuple1, Tuple tuple2) { |
| for (int i = 0; i < sortKeyIds.length; i++) { |
| int compare = compareColumn(tuple1, tuple2, |
| sortKeyIds[i], sortKeyTypes[i], asc[i], nullFirsts[i]); |
| |
| if (compare != 0) { |
| return compare; |
| } |
| } |
| return 0; |
| } |
| |
| public int compareColumn(Tuple tuple1, Tuple tuple2, int index, TajoDataTypes.Type type, |
| boolean ascending, boolean nullFirst) { |
| final boolean n1 = tuple1.isBlankOrNull(index); |
| final boolean n2 = tuple2.isBlankOrNull(index); |
| if (n1 && n2) { |
| return 0; |
| } |
| |
| if (n1 ^ n2) { |
| return nullFirst ? (n1 ? -1 : 1) : (n1 ? 1 : -1); |
| } |
| |
| int compare; |
| switch (type) { |
| case BOOLEAN: |
| compare = Booleans.compare(tuple1.getBool(index), tuple2.getBool(index)); |
| break; |
| case BIT: |
| compare = tuple1.getByte(index) - tuple2.getByte(index); |
| break; |
| case INT1: |
| case INT2: |
| compare = Shorts.compare(tuple1.getInt2(index), tuple2.getInt2(index)); |
| break; |
| case DATE: |
| case INT4: |
| compare = Ints.compare(tuple1.getInt4(index), tuple2.getInt4(index)); |
| break; |
| case INET4: |
| compare = UnsignedInts.compare(tuple1.getInt4(index), tuple2.getInt4(index)); |
| break; |
| case TIME: |
| case TIMESTAMP: |
| case INT8: |
| compare = Longs.compare(tuple1.getInt8(index), tuple2.getInt8(index)); |
| break; |
| case FLOAT4: |
| compare = Floats.compare(tuple1.getFloat4(index), tuple2.getFloat4(index)); |
| break; |
| case FLOAT8: |
| compare = Doubles.compare(tuple1.getFloat8(index), tuple2.getFloat8(index)); |
| break; |
| case CHAR: |
| case TEXT: |
| case BLOB: |
| compare = TextDatum.COMPARATOR.compare(tuple1.getBytes(index), tuple2.getBytes(index)); |
| break; |
| default: |
| throw new TajoRuntimeException( |
| new UnsupportedException("unknown data type '" + type.name() + "'")); |
| } |
| return ascending ? compare : -compare; |
| } |
| } |
| } |