| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.asterix.runtime.operators.joins.intervalindex; |
| |
| import java.util.Comparator; |
| import java.util.List; |
| import java.util.logging.Level; |
| import java.util.logging.Logger; |
| |
| import org.apache.asterix.runtime.operators.joins.IIntervalMergeJoinChecker; |
| import org.apache.asterix.runtime.operators.joins.IIntervalMergeJoinCheckerFactory; |
| import org.apache.asterix.runtime.operators.joins.IntervalJoinUtil; |
| import org.apache.hyracks.api.comm.IFrameTupleAccessor; |
| import org.apache.hyracks.api.comm.IFrameWriter; |
| import org.apache.hyracks.api.context.IHyracksTaskContext; |
| import org.apache.hyracks.api.dataflow.value.RecordDescriptor; |
| import org.apache.hyracks.api.exceptions.HyracksDataException; |
| import org.apache.hyracks.dataflow.common.comm.util.FrameUtils; |
| import org.apache.hyracks.dataflow.std.buffermanager.IPartitionedDeletableTupleBufferManager; |
| import org.apache.hyracks.dataflow.std.buffermanager.ITupleAccessor; |
| import org.apache.hyracks.dataflow.std.buffermanager.ITuplePointerAccessor; |
| import org.apache.hyracks.dataflow.std.buffermanager.TupleAccessor; |
| import org.apache.hyracks.dataflow.std.buffermanager.VPartitionDeletableTupleBufferManager; |
| import org.apache.hyracks.dataflow.std.join.AbstractMergeJoiner; |
| import org.apache.hyracks.dataflow.std.join.MergeJoinLocks; |
| import org.apache.hyracks.dataflow.std.join.MergeStatus; |
| import org.apache.hyracks.dataflow.std.join.RunFileStream; |
| import org.apache.hyracks.dataflow.std.structures.TuplePointer; |
| |
| /** |
| * Interval Index Merge Joiner takes two sorted streams of input and joins. |
| * The two sorted streams must be in a logical order and the comparator must |
| * support keeping that order so the join will work. |
| * The left stream will spill to disk when memory is full. |
| * The both right and left use memory to maintain active intervals for the join. |
| */ |
| public class IntervalIndexJoiner extends AbstractMergeJoiner { |
| |
| private static final Logger LOGGER = Logger.getLogger(IntervalIndexJoiner.class.getName()); |
| |
| private final IPartitionedDeletableTupleBufferManager bufferManager; |
| |
| private final ActiveSweepManager[] activeManager; |
| private final ITuplePointerAccessor[] memoryAccessor; |
| private final int[] streamIndex; |
| private final RunFileStream[] runFileStream; |
| |
| // private final LinkedList<TuplePointer> buffer = new LinkedList<>(); |
| |
| private final IIntervalMergeJoinChecker imjc; |
| |
| private final byte point; |
| |
| private final int leftKey; |
| private final int rightKey; |
| |
| private long joinComparisonCount = 0; |
| private long joinResultCount = 0; |
| private long leftSpillCount = 0; |
| private long rightSpillCount = 0; |
| private long[] spillFileCount = { 0, 0 }; |
| private long[] spillReadCount = { 0, 0 }; |
| private long[] spillWriteCount = { 0, 0 }; |
| |
| public IntervalIndexJoiner(IHyracksTaskContext ctx, int memorySize, int partition, MergeStatus status, |
| MergeJoinLocks locks, Comparator<EndPointIndexItem> endPointComparator, |
| IIntervalMergeJoinCheckerFactory imjcf, int[] leftKeys, int[] rightKeys, RecordDescriptor leftRd, |
| RecordDescriptor rightRd) throws HyracksDataException { |
| super(ctx, partition, status, locks, leftRd, rightRd); |
| this.point = imjcf.isOrderAsc() ? EndPointIndexItem.START_POINT : EndPointIndexItem.END_POINT; |
| |
| this.imjc = imjcf.createMergeJoinChecker(leftKeys, rightKeys, partition, ctx); |
| |
| this.leftKey = leftKeys[0]; |
| this.rightKey = rightKeys[0]; |
| |
| RecordDescriptor[] recordDescriptors = new RecordDescriptor[JOIN_PARTITIONS]; |
| recordDescriptors[LEFT_PARTITION] = leftRd; |
| recordDescriptors[RIGHT_PARTITION] = rightRd; |
| |
| streamIndex = new int[JOIN_PARTITIONS]; |
| streamIndex[LEFT_PARTITION] = TupleAccessor.UNSET; |
| streamIndex[RIGHT_PARTITION] = TupleAccessor.UNSET; |
| |
| if (memorySize < 5) { |
| throw new HyracksDataException( |
| "IntervalIndexJoiner does not have enough memory (needs > 4, got " + memorySize + ")."); |
| } |
| // bufferManager = new VPartitionDeletableTupleBufferManager(ctx, |
| // VPartitionDeletableTupleBufferManager.NO_CONSTRAIN, JOIN_PARTITIONS, |
| // (memorySize - 4) * ctx.getInitialFrameSize(), recordDescriptors); |
| bufferManager = new VPartitionDeletableTupleBufferManager(ctx, |
| VPartitionDeletableTupleBufferManager.NO_CONSTRAIN, JOIN_PARTITIONS, |
| memorySize * ctx.getInitialFrameSize(), recordDescriptors); |
| memoryAccessor = new ITuplePointerAccessor[JOIN_PARTITIONS]; |
| memoryAccessor[LEFT_PARTITION] = bufferManager.getTuplePointerAccessor(leftRd); |
| memoryAccessor[RIGHT_PARTITION] = bufferManager.getTuplePointerAccessor(rightRd); |
| |
| activeManager = new ActiveSweepManager[JOIN_PARTITIONS]; |
| activeManager[LEFT_PARTITION] = new ActiveSweepManager(bufferManager, leftKey, LEFT_PARTITION, |
| endPointComparator); |
| activeManager[RIGHT_PARTITION] = new ActiveSweepManager(bufferManager, rightKey, RIGHT_PARTITION, |
| endPointComparator); |
| |
| // Run files for both branches |
| runFileStream = new RunFileStream[JOIN_PARTITIONS]; |
| runFileStream[LEFT_PARTITION] = new RunFileStream(ctx, "left", status.branch[LEFT_PARTITION]); |
| runFileStream[RIGHT_PARTITION] = new RunFileStream(ctx, "right", status.branch[RIGHT_PARTITION]); |
| |
| LOGGER.setLevel(Level.FINE); |
| System.out.println("IntervalIndexJoiner: Logging level is: " + LOGGER.getLevel()); |
| if (LOGGER.isLoggable(Level.FINE)) { |
| LOGGER.fine("IntervalIndexJoiner has started partition " + partition + " with " + memorySize |
| + " frames of memory."); |
| } |
| } |
| |
| private void addToResult(IFrameTupleAccessor accessor1, int index1, IFrameTupleAccessor accessor2, int index2, |
| boolean reversed, IFrameWriter writer) throws HyracksDataException { |
| if (reversed) { |
| FrameUtils.appendConcatToWriter(writer, resultAppender, accessor2, index2, accessor1, index1); |
| } else { |
| FrameUtils.appendConcatToWriter(writer, resultAppender, accessor1, index1, accessor2, index2); |
| } |
| joinResultCount++; |
| } |
| |
| private void flushMemory(int partition) throws HyracksDataException { |
| activeManager[partition].clear(); |
| } |
| |
| private TupleStatus loadSpilledTuple(int partition) throws HyracksDataException { |
| if (!inputAccessor[partition].exists()) { |
| if (!runFileStream[partition].loadNextBuffer(inputAccessor[partition])) { |
| return TupleStatus.EMPTY; |
| } |
| } |
| return TupleStatus.LOADED; |
| } |
| |
| private TupleStatus loadTuple(int partition) throws HyracksDataException { |
| TupleStatus loaded; |
| if (status.branch[partition].isRunFileReading()) { |
| loaded = loadSpilledTuple(partition); |
| if (loaded.isEmpty()) { |
| continueStream(partition, inputAccessor[partition]); |
| loaded = loadTuple(partition); |
| } |
| } else { |
| loaded = loadMemoryTuple(partition); |
| } |
| return loaded; |
| } |
| |
| /** |
| * Ensures a frame exists for the right branch, either from memory or the run file. |
| * |
| * @throws HyracksDataException |
| */ |
| private TupleStatus loadRightTuple() throws HyracksDataException { |
| TupleStatus loaded = loadTuple(RIGHT_PARTITION); |
| if (loaded == TupleStatus.UNKNOWN) { |
| loaded = pauseAndLoadRightTuple(); |
| } |
| return loaded; |
| } |
| |
| /** |
| * Ensures a frame exists for the left branch, either from memory or the run file. |
| * |
| * @throws HyracksDataException |
| */ |
| private TupleStatus loadLeftTuple() throws HyracksDataException { |
| return loadTuple(LEFT_PARTITION); |
| } |
| |
| @Override |
| public void processLeftFrame(IFrameWriter writer) throws HyracksDataException { |
| TupleStatus leftTs = loadLeftTuple(); |
| TupleStatus rightTs = loadRightTuple(); |
| while (leftTs.isKnown() && checkHasMoreProcessing(leftTs, LEFT_PARTITION, RIGHT_PARTITION) |
| && checkHasMoreProcessing(rightTs, RIGHT_PARTITION, LEFT_PARTITION)) { |
| if (status.branch[RIGHT_PARTITION].isRunFileWriting()) { |
| // Right side from disk |
| rightTs = processRightTupleSpill(writer); |
| } else if (status.branch[LEFT_PARTITION].isRunFileWriting()) { |
| // Left side from disk |
| leftTs = processLeftTupleSpill(writer); |
| } else { |
| if (leftTs.isEmpty() || (rightTs.isLoaded() && checkToProcessRightTuple())) { |
| // Right side from stream |
| processRightTuple(writer); |
| rightTs = loadRightTuple(); |
| } else { |
| // Left side from stream |
| processLeftTuple(writer); |
| leftTs = loadLeftTuple(); |
| } |
| } |
| } |
| } |
| |
| @Override |
| public void processLeftClose(IFrameWriter writer) throws HyracksDataException { |
| processLeftFrame(writer); |
| |
| resultAppender.write(writer, true); |
| activeManager[LEFT_PARTITION].clear(); |
| activeManager[RIGHT_PARTITION].clear(); |
| runFileStream[LEFT_PARTITION].close(); |
| runFileStream[RIGHT_PARTITION].close(); |
| if (LOGGER.isLoggable(Level.WARNING)) { |
| LOGGER.warning("IntervalIndexJoiner statitics: " + joinComparisonCount + " comparisons, " + joinResultCount |
| + " results, left[" + leftSpillCount + " spills, " + runFileStream[LEFT_PARTITION].getFileCount() |
| + " files, " + runFileStream[LEFT_PARTITION].getWriteCount() + " written, " |
| + runFileStream[LEFT_PARTITION].getReadCount() + " read]. right[" + rightSpillCount + " spills, " |
| + runFileStream[RIGHT_PARTITION].getFileCount() + " files, " |
| + runFileStream[RIGHT_PARTITION].getWriteCount() + " written, " |
| + runFileStream[RIGHT_PARTITION].getReadCount() + " read]."); |
| } |
| |
| } |
| |
| private boolean checkHasMoreProcessing(TupleStatus ts, int partition, int joinPartition) { |
| return ts.isLoaded() || status.branch[partition].isRunFileWriting() |
| || (checkHasMoreTuples(joinPartition) && activeManager[partition].hasRecords()); |
| } |
| |
| private boolean checkHasMoreTuples(int partition) { |
| return status.branch[partition].hasMore() || status.branch[partition].isRunFileReading(); |
| } |
| |
| private boolean checkToProcessRightTuple() { |
| long leftStart = IntervalJoinUtil.getIntervalStart(inputAccessor[LEFT_PARTITION], leftKey); |
| long rightStart = IntervalJoinUtil.getIntervalStart(inputAccessor[RIGHT_PARTITION], rightKey); |
| if (leftStart < rightStart) { |
| // Left stream has next tuple, check if right active must be updated first. |
| return activeManager[RIGHT_PARTITION].hasRecords() |
| && activeManager[RIGHT_PARTITION].getTopPoint() < leftStart; |
| } else { |
| // Right stream has next tuple, check if left active must be update first. |
| return !(activeManager[LEFT_PARTITION].hasRecords() |
| && activeManager[LEFT_PARTITION].getTopPoint() < rightStart); |
| } |
| } |
| |
| private boolean checkToProcessAdd(long startMemory, long endMemory) { |
| return startMemory < endMemory; |
| } |
| |
| private TupleStatus processLeftTupleSpill(IFrameWriter writer) throws HyracksDataException { |
| // Process left tuples one by one, check them with active memory from the right branch. |
| int count = 0; |
| TupleStatus ts = loadLeftTuple(); |
| while (ts.isLoaded() && activeManager[RIGHT_PARTITION].hasRecords()) { |
| long sweep = activeManager[RIGHT_PARTITION].getTopPoint(); |
| if (checkToProcessAdd(IntervalJoinUtil.getIntervalStart(inputAccessor[LEFT_PARTITION], leftKey), sweep) |
| || !imjc.checkToRemoveRightActive()) { |
| // Add individual tuples. |
| processTupleJoin(activeManager[RIGHT_PARTITION].getActiveList(), memoryAccessor[RIGHT_PARTITION], |
| inputAccessor[LEFT_PARTITION], true, writer); |
| runFileStream[LEFT_PARTITION].addToRunFile(inputAccessor[LEFT_PARTITION]); |
| inputAccessor[LEFT_PARTITION].next(); |
| ts = loadLeftTuple(); |
| ++count; |
| } else { |
| // Remove from active. |
| activeManager[RIGHT_PARTITION].removeTop(); |
| } |
| } |
| |
| if (LOGGER.isLoggable(Level.FINE)) { |
| LOGGER.fine("Spill for " + count + " left tuples"); |
| } |
| |
| // Memory is empty and we can start processing the run file. |
| if (activeManager[RIGHT_PARTITION].isEmpty() || ts.isEmpty()) { |
| unfreezeAndContinue(LEFT_PARTITION, inputAccessor[LEFT_PARTITION]); |
| ts = loadLeftTuple(); |
| } |
| return ts; |
| } |
| |
| private TupleStatus processRightTupleSpill(IFrameWriter writer) throws HyracksDataException { |
| // Process left tuples one by one, check them with active memory from the right branch. |
| int count = 0; |
| TupleStatus ts = loadRightTuple(); |
| while (ts.isLoaded() && activeManager[LEFT_PARTITION].hasRecords() && inputAccessor[RIGHT_PARTITION].exists()) { |
| long sweep = activeManager[LEFT_PARTITION].getTopPoint(); |
| if (checkToProcessAdd(IntervalJoinUtil.getIntervalStart(inputAccessor[RIGHT_PARTITION], rightKey), sweep) |
| || !imjc.checkToRemoveLeftActive()) { |
| // Add individual tuples. |
| processTupleJoin(activeManager[LEFT_PARTITION].getActiveList(), memoryAccessor[LEFT_PARTITION], |
| inputAccessor[RIGHT_PARTITION], false, writer); |
| runFileStream[RIGHT_PARTITION].addToRunFile(inputAccessor[RIGHT_PARTITION]); |
| inputAccessor[RIGHT_PARTITION].next(); |
| ts = loadRightTuple(); |
| ++count; |
| } else { |
| // Remove from active. |
| activeManager[LEFT_PARTITION].removeTop(); |
| } |
| } |
| |
| if (LOGGER.isLoggable(Level.FINE)) { |
| LOGGER.fine("Spill for " + count + " right tuples"); |
| } |
| |
| // Memory is empty and we can start processing the run file. |
| if (!activeManager[LEFT_PARTITION].hasRecords() || ts.isEmpty()) { |
| unfreezeAndContinue(RIGHT_PARTITION, inputAccessor[RIGHT_PARTITION]); |
| ts = loadRightTuple(); |
| } |
| return ts; |
| } |
| |
| private void processLeftTuple(IFrameWriter writer) throws HyracksDataException { |
| // Process endpoints |
| do { |
| if ((!activeManager[LEFT_PARTITION].hasRecords() |
| || checkToProcessAdd(IntervalJoinUtil.getIntervalStart(inputAccessor[LEFT_PARTITION], leftKey), |
| activeManager[LEFT_PARTITION].getTopPoint())) |
| || !imjc.checkToRemoveLeftActive()) { |
| // Add to active, end point index and buffer. |
| TuplePointer tp = new TuplePointer(); |
| if (activeManager[LEFT_PARTITION].addTuple(inputAccessor[LEFT_PARTITION], tp)) { |
| processTupleJoin(activeManager[RIGHT_PARTITION].getActiveList(), memoryAccessor[RIGHT_PARTITION], |
| inputAccessor[LEFT_PARTITION], true, writer); |
| // buffer.add(tp); |
| } else { |
| // Spill case |
| freezeAndSpill(); |
| break; |
| } |
| inputAccessor[LEFT_PARTITION].next(); |
| } else { |
| // Remove from active. |
| activeManager[LEFT_PARTITION].removeTop(); |
| } |
| } while (loadLeftTuple().isLoaded() && loadRightTuple().isLoaded() && !checkToProcessRightTuple()); |
| |
| // Add Results |
| // if (!buffer.isEmpty()) { |
| // processActiveJoin(activeManager[RIGHT_PARTITION].getActiveList(), memoryAccessor[RIGHT_PARTITION], buffer, |
| // memoryAccessor[LEFT_PARTITION], true, writer); |
| // } |
| } |
| |
| private void processRightTuple(IFrameWriter writer) throws HyracksDataException { |
| // Process endpoints |
| do { |
| if ((!activeManager[RIGHT_PARTITION].hasRecords() |
| || checkToProcessAdd(IntervalJoinUtil.getIntervalStart(inputAccessor[RIGHT_PARTITION], rightKey), |
| activeManager[RIGHT_PARTITION].getTopPoint())) |
| || !imjc.checkToRemoveRightActive()) { |
| // Add to active, end point index and buffer. |
| TuplePointer tp = new TuplePointer(); |
| if (activeManager[RIGHT_PARTITION].addTuple(inputAccessor[RIGHT_PARTITION], tp)) { |
| processTupleJoin(activeManager[LEFT_PARTITION].getActiveList(), memoryAccessor[LEFT_PARTITION], |
| inputAccessor[RIGHT_PARTITION], false, writer); |
| // buffer.add(tp); |
| } else { |
| // Spill case |
| freezeAndSpill(); |
| break; |
| } |
| inputAccessor[RIGHT_PARTITION].next(); |
| } else { |
| // Remove from active. |
| activeManager[RIGHT_PARTITION].removeTop(); |
| } |
| } while (loadRightTuple().isLoaded() && checkToProcessRightTuple()); |
| |
| // Add Results |
| // if (!buffer.isEmpty()) { |
| // processActiveJoin(activeManager[LEFT_PARTITION].getActiveList(), memoryAccessor[LEFT_PARTITION], buffer, |
| // memoryAccessor[RIGHT_PARTITION], false, writer); |
| // } |
| } |
| |
| // private void processActiveJoin(List<TuplePointer> outer, ITuplePointerAccessor outerAccessor, |
| // List<TuplePointer> inner, ITuplePointerAccessor innerAccessor, boolean reversed, IFrameWriter writer) |
| // throws HyracksDataException { |
| // for (TuplePointer outerTp : outer) { |
| // outerAccessor.reset(outerTp); |
| // for (TuplePointer innerTp : inner) { |
| // innerAccessor.reset(innerTp); |
| // if (imjc.checkToSaveInResult(outerAccessor, outerTp.getTupleIndex(), innerAccessor, |
| // innerTp.getTupleIndex(), reversed)) { |
| // addToResult(outerAccessor, outerTp.getTupleIndex(), innerAccessor, innerTp.getTupleIndex(), |
| // reversed, writer); |
| // } |
| // joinComparisonCount++; |
| // } |
| // } |
| // if (LOGGER.isLoggable(Level.FINE)) { |
| // LOGGER.fine("Sweep for " + buffer.size() + " tuples"); |
| // } |
| // buffer.clear(); |
| // } |
| |
| private void processTupleJoin(List<TuplePointer> outer, ITuplePointerAccessor outerAccessor, |
| ITupleAccessor tupleAccessor, boolean reversed, IFrameWriter writer) throws HyracksDataException { |
| for (TuplePointer outerTp : outer) { |
| outerAccessor.reset(outerTp); |
| if (imjc.checkToSaveInResult(outerAccessor, outerTp.getTupleIndex(), tupleAccessor, |
| tupleAccessor.getTupleId(), reversed)) { |
| addToResult(outerAccessor, outerTp.getTupleIndex(), tupleAccessor, tupleAccessor.getTupleId(), reversed, |
| writer); |
| } |
| joinComparisonCount++; |
| } |
| } |
| |
| private void freezeAndSpill() throws HyracksDataException { |
| if (LOGGER.isLoggable(Level.WARNING)) { |
| LOGGER.warning("freeze snapshot: " + frameCounts[RIGHT_PARTITION] + " right, " + frameCounts[LEFT_PARTITION] |
| + " left, left[" + bufferManager.getNumTuples(LEFT_PARTITION) + " memory]. right[" |
| + bufferManager.getNumTuples(RIGHT_PARTITION) + " memory]."); |
| } |
| if (bufferManager.getNumTuples(LEFT_PARTITION) > bufferManager.getNumTuples(RIGHT_PARTITION)) { |
| runFileStream[RIGHT_PARTITION].startRunFileWriting(); |
| if (LOGGER.isLoggable(Level.FINE)) { |
| LOGGER.fine("Memory is full. Freezing the left branch. (Left memory tuples: " |
| + bufferManager.getNumTuples(LEFT_PARTITION) + ", Right memory tuples: " |
| + bufferManager.getNumTuples(RIGHT_PARTITION) + ")"); |
| } |
| bufferManager.printStats("memory details"); |
| rightSpillCount++; |
| } else { |
| runFileStream[LEFT_PARTITION].startRunFileWriting(); |
| if (LOGGER.isLoggable(Level.FINE)) { |
| LOGGER.fine("Memory is full. Freezing the right branch. (Left memory tuples: " |
| + bufferManager.getNumTuples(LEFT_PARTITION) + ", Right memory tuples: " |
| + bufferManager.getNumTuples(RIGHT_PARTITION) + ")"); |
| } |
| bufferManager.printStats("memory details"); |
| leftSpillCount++; |
| } |
| } |
| |
| private void continueStream(int diskPartition, ITupleAccessor accessor) throws HyracksDataException { |
| runFileStream[diskPartition].closeRunFileReading(); |
| accessor.reset(inputBuffer[diskPartition]); |
| accessor.setTupleId(streamIndex[diskPartition]); |
| if (LOGGER.isLoggable(Level.FINE)) { |
| LOGGER.fine("Continue with stream (" + diskPartition + ")."); |
| } |
| } |
| |
| private void unfreezeAndContinue(int frozenPartition, ITupleAccessor accessor) throws HyracksDataException { |
| int flushPartition = frozenPartition == LEFT_PARTITION ? RIGHT_PARTITION : LEFT_PARTITION; |
| runFileStream[frozenPartition].flushAndStopRunFile(accessor); |
| if (LOGGER.isLoggable(Level.WARNING)) { |
| LOGGER.warning("snapshot(" + frozenPartition + "): " + frameCounts[RIGHT_PARTITION] + " right, " |
| + frameCounts[LEFT_PARTITION] + " left, left[" + bufferManager.getNumTuples(LEFT_PARTITION) |
| + " memory, " + leftSpillCount + " spills, " |
| + (runFileStream[LEFT_PARTITION].getFileCount() - spillFileCount[LEFT_PARTITION]) + " files, " |
| + (runFileStream[LEFT_PARTITION].getWriteCount() - spillWriteCount[LEFT_PARTITION]) + " written, " |
| + (runFileStream[LEFT_PARTITION].getReadCount() - spillReadCount[LEFT_PARTITION]) + " read]. right[" |
| + bufferManager.getNumTuples(RIGHT_PARTITION) + " memory, " + +rightSpillCount + " spills, " |
| + (runFileStream[RIGHT_PARTITION].getFileCount() - spillFileCount[RIGHT_PARTITION]) + " files, " |
| + (runFileStream[RIGHT_PARTITION].getWriteCount() - spillWriteCount[RIGHT_PARTITION]) + " written, " |
| + (runFileStream[RIGHT_PARTITION].getReadCount() - spillReadCount[RIGHT_PARTITION]) + " read]."); |
| spillFileCount[LEFT_PARTITION] = runFileStream[LEFT_PARTITION].getFileCount(); |
| spillReadCount[LEFT_PARTITION] = runFileStream[LEFT_PARTITION].getReadCount(); |
| spillWriteCount[LEFT_PARTITION] = runFileStream[LEFT_PARTITION].getWriteCount(); |
| spillFileCount[RIGHT_PARTITION] = runFileStream[RIGHT_PARTITION].getFileCount(); |
| spillReadCount[RIGHT_PARTITION] = runFileStream[RIGHT_PARTITION].getReadCount(); |
| spillWriteCount[RIGHT_PARTITION] = runFileStream[RIGHT_PARTITION].getWriteCount(); |
| } |
| flushMemory(flushPartition); |
| if ((LEFT_PARTITION == frozenPartition && !status.branch[LEFT_PARTITION].isRunFileReading()) |
| || (RIGHT_PARTITION == frozenPartition && !status.branch[RIGHT_PARTITION].isRunFileReading())) { |
| streamIndex[frozenPartition] = accessor.getTupleId(); |
| } |
| runFileStream[frozenPartition].startReadingRunFile(accessor); |
| if (LOGGER.isLoggable(Level.FINE)) { |
| LOGGER.fine("Unfreezing (" + frozenPartition + ")."); |
| } |
| } |
| |
| } |