blob: efb375657a8a98bed838ec72a07911146bbbdca9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hyracks.dataflow.std.join;
import java.util.LinkedList;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.hyracks.api.comm.IFrameTupleAccessor;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
import org.apache.hyracks.dataflow.std.buffermanager.DeallocatableFramePool;
import org.apache.hyracks.dataflow.std.buffermanager.IDeallocatableFramePool;
import org.apache.hyracks.dataflow.std.buffermanager.IDeletableTupleBufferManager;
import org.apache.hyracks.dataflow.std.buffermanager.ITupleAccessor;
import org.apache.hyracks.dataflow.std.buffermanager.ITuplePointerAccessor;
import org.apache.hyracks.dataflow.std.buffermanager.TupleAccessor;
import org.apache.hyracks.dataflow.std.buffermanager.VariableDeletableTupleMemoryManager;
import org.apache.hyracks.dataflow.std.structures.RunFilePointer;
import org.apache.hyracks.dataflow.std.structures.TuplePointer;
/**
* Merge Joiner takes two sorted streams of input and joins.
* The two sorted streams must be in a logical order and the comparator must
* support keeping that order so the join will work.
* The left stream will spill to disk when memory is full.
* The right stream spills to memory and pause when memory is full.
*/
public class MergeJoiner extends AbstractMergeJoiner {
private static final Logger LOGGER = Logger.getLogger(MergeJoiner.class.getName());
private final IDeallocatableFramePool framePool;
private final IDeletableTupleBufferManager bufferManager;
private final ITuplePointerAccessor memoryAccessor;
private final LinkedList<TuplePointer> memoryBuffer = new LinkedList<>();
private int leftStreamIndex;
private final RunFileStream runFileStreamOld;
private final RunFileStream runFileStream;
private ITupleAccessor tmpAccessor;
private final RunFilePointer runFilePointer;
private final IMergeJoinChecker mjc;
private long joinComparisonCount = 0;
private long joinResultCount = 0;
private long spillFileCount = 0;
private long spillWriteCount = 0;
private long spillReadCount = 0;
private long spillCount = 0;
public MergeJoiner(IHyracksTaskContext ctx, int memorySize, int partition, MergeStatus status, MergeJoinLocks locks,
IMergeJoinChecker mjc, RecordDescriptor leftRd, RecordDescriptor rightRd) throws HyracksDataException {
super(ctx, partition, status, locks, leftRd, rightRd);
this.mjc = mjc;
tmpAccessor = new TupleAccessor(leftRd);
runFilePointer = new RunFilePointer();
// Memory (right buffer)
if (memorySize < 1) {
throw new HyracksDataException(
"MergeJoiner does not have enough memory (needs > 0, got " + memorySize + ").");
}
framePool = new DeallocatableFramePool(ctx, (memorySize) * ctx.getInitialFrameSize());
bufferManager = new VariableDeletableTupleMemoryManager(framePool, rightRd);
memoryAccessor = bufferManager.createTuplePointerAccessor();
// Run File and frame cache (left buffer)
leftStreamIndex = TupleAccessor.UNSET;
runFileStreamOld = new RunFileStream(ctx, "left", status.branch[LEFT_PARTITION]);
runFileStream = new RunFileStream(ctx, "left", status.branch[LEFT_PARTITION]);
if (LOGGER.isLoggable(Level.FINE)) {
LOGGER.fine(
"MergeJoiner has started partition " + partition + " with " + memorySize + " frames of memory.");
}
}
private boolean addToMemory(ITupleAccessor accessor) throws HyracksDataException {
TuplePointer tp = new TuplePointer();
if (bufferManager.insertTuple(accessor, accessor.getTupleId(), tp)) {
memoryBuffer.add(tp);
return true;
}
return false;
}
private void removeFromMemory(TuplePointer tp) throws HyracksDataException {
memoryBuffer.remove(tp);
bufferManager.deleteTuple(tp);
}
private void addToResult(IFrameTupleAccessor accessorLeft, int leftTupleIndex, IFrameTupleAccessor accessorRight,
int rightTupleIndex, IFrameWriter writer) throws HyracksDataException {
FrameUtils.appendConcatToWriter(writer, resultAppender, accessorLeft, leftTupleIndex, accessorRight,
rightTupleIndex);
joinResultCount++;
}
private void flushMemory() throws HyracksDataException {
memoryBuffer.clear();
bufferManager.reset();
}
// memory management
private boolean memoryHasTuples() {
return bufferManager.getNumTuples() > 0;
}
/**
* Ensures a frame exists for the right branch, either from memory or the run file.
*
* @throws HyracksDataException
*/
private TupleStatus loadRightTuple() throws HyracksDataException {
TupleStatus loaded = loadMemoryTuple(RIGHT_PARTITION);
if (loaded == TupleStatus.UNKNOWN) {
loaded = pauseAndLoadRightTuple();
}
return loaded;
}
/**
* Ensures a frame exists for the right branch, either from memory or the run file.
*
* @throws HyracksDataException
*/
private TupleStatus loadLeftTuple() throws HyracksDataException {
TupleStatus loaded;
if (status.branch[LEFT_PARTITION].isRunFileReading()) {
loaded = loadSpilledTuple(LEFT_PARTITION);
if (loaded.isEmpty()) {
if (status.branch[LEFT_PARTITION].isRunFileWriting() && !status.branch[LEFT_PARTITION].hasMore()) {
unfreezeAndContinue(inputAccessor[LEFT_PARTITION]);
} else {
continueStream(inputAccessor[LEFT_PARTITION]);
}
loaded = loadLeftTuple();
}
} else {
loaded = loadMemoryTuple(LEFT_PARTITION);
}
return loaded;
}
private TupleStatus loadSpilledTuple(int partition) throws HyracksDataException {
if (!inputAccessor[partition].exists()) {
runFileStream.loadNextBuffer(tmpAccessor);
if (!runFileStreamOld.loadNextBuffer(inputAccessor[partition])) {
return TupleStatus.EMPTY;
}
}
return TupleStatus.LOADED;
}
/**
* Left
*
* @throws HyracksDataException
*/
@Override
public void processLeftFrame(IFrameWriter writer) throws HyracksDataException {
TupleStatus leftTs = loadLeftTuple();
TupleStatus rightTs = loadRightTuple();
while (leftTs.isLoaded() && (status.branch[RIGHT_PARTITION].hasMore() || memoryHasTuples())) {
if (status.branch[LEFT_PARTITION].isRunFileWriting()) {
// Left side from disk
leftTs = processLeftTupleSpill(writer);
} else if (rightTs.isLoaded()
&& mjc.checkToLoadNextRightTuple(inputAccessor[LEFT_PARTITION], inputAccessor[RIGHT_PARTITION])) {
// Right side from stream
processRightTuple();
rightTs = loadRightTuple();
} else {
// Left side from stream
processLeftTuple(writer);
leftTs = loadLeftTuple();
}
}
}
@Override
public void processLeftClose(IFrameWriter writer) throws HyracksDataException {
if (status.branch[LEFT_PARTITION].isRunFileWriting()) {
unfreezeAndContinue(inputAccessor[LEFT_PARTITION]);
}
processLeftFrame(writer);
resultAppender.write(writer, true);
if (LOGGER.isLoggable(Level.WARNING)) {
LOGGER.warning("MergeJoiner statitics: " + joinComparisonCount + " comparisons, " + joinResultCount
+ " results, " + spillCount + " spills, " + runFileStreamOld.getFileCount() + " files, "
+ runFileStreamOld.getWriteCount() + " spill frames written, " + runFileStreamOld.getReadCount()
+ " spill frames read.");
}
}
private TupleStatus processLeftTupleSpill(IFrameWriter writer) throws HyracksDataException {
// System.err.print("Spill ");
runFileStreamOld.addToRunFile(inputAccessor[LEFT_PARTITION]);
if (true) {
runFileStream.addToRunFile(inputAccessor[LEFT_PARTITION]);
}
processLeftTuple(writer);
// Memory is empty and we can start processing the run file.
if (!memoryHasTuples() && status.branch[LEFT_PARTITION].isRunFileWriting()) {
unfreezeAndContinue(inputAccessor[LEFT_PARTITION]);
}
return loadLeftTuple();
}
private void processLeftTuple(IFrameWriter writer) throws HyracksDataException {
// TuplePrinterUtil.printTuple("Left", inputAccessor[LEFT]);
// Check against memory (right)
if (memoryHasTuples()) {
for (int i = memoryBuffer.size() - 1; i > -1; --i) {
memoryAccessor.reset(memoryBuffer.get(i));
if (mjc.checkToSaveInResult(inputAccessor[LEFT_PARTITION], inputAccessor[LEFT_PARTITION].getTupleId(),
memoryAccessor, memoryBuffer.get(i).getTupleIndex(), false)) {
// add to result
addToResult(inputAccessor[LEFT_PARTITION], inputAccessor[LEFT_PARTITION].getTupleId(),
memoryAccessor, memoryBuffer.get(i).getTupleIndex(), writer);
}
joinComparisonCount++;
if (mjc.checkToRemoveInMemory(inputAccessor[LEFT_PARTITION], inputAccessor[LEFT_PARTITION].getTupleId(),
memoryAccessor, memoryBuffer.get(i).getTupleIndex())) {
// remove from memory
// TuplePrinterUtil.printTuple("Remove Memory", memoryAccessor, memoryBuffer.get(i).getTupleIndex());
removeFromMemory(memoryBuffer.get(i));
}
}
}
inputAccessor[LEFT_PARTITION].next();
}
private void processRightTuple() throws HyracksDataException {
// append to memory
if (mjc.checkToSaveInMemory(inputAccessor[LEFT_PARTITION], inputAccessor[RIGHT_PARTITION])) {
if (!addToMemory(inputAccessor[RIGHT_PARTITION])) {
// go to log saving state
freezeAndSpill();
return;
}
}
// TuplePrinterUtil.printTuple("Memory", inputAccessor[RIGHT]);
inputAccessor[RIGHT_PARTITION].next();
}
private void freezeAndSpill() throws HyracksDataException {
// System.err.println("freezeAndSpill");
if (LOGGER.isLoggable(Level.WARNING)) {
LOGGER.warning("freeze snapshot: " + frameCounts[RIGHT_PARTITION] + " right, " + frameCounts[LEFT_PARTITION]
+ " left, " + joinComparisonCount + " comparisons, " + joinResultCount + " results, ["
+ bufferManager.getNumTuples() + " tuples memory].");
}
if (runFilePointer.getFileOffset() > 0) {
} else {
runFilePointer.reset(0, 0);
runFileStream.startRunFileWriting();
}
runFileStreamOld.startRunFileWriting();
if (LOGGER.isLoggable(Level.FINE)) {
LOGGER.fine(
"Memory is full. Freezing the right branch. (memory tuples: " + bufferManager.getNumTuples() + ")");
}
spillCount++;
}
private void continueStream(ITupleAccessor accessor) throws HyracksDataException {
// System.err.println("continueStream");
runFileStreamOld.closeRunFileReading();
accessor.reset(inputBuffer[LEFT_PARTITION]);
accessor.setTupleId(leftStreamIndex);
if (LOGGER.isLoggable(Level.FINE)) {
LOGGER.fine("Continue with left stream.");
}
}
private void unfreezeAndContinue(ITupleAccessor accessor) throws HyracksDataException {
// System.err.println("unfreezeAndContinue");
if (LOGGER.isLoggable(Level.WARNING)) {
LOGGER.warning("snapshot: " + frameCounts[RIGHT_PARTITION] + " right, " + frameCounts[LEFT_PARTITION]
+ " left, " + joinComparisonCount + " comparisons, " + joinResultCount + " results, ["
+ bufferManager.getNumTuples() + " tuples memory, " + spillCount + " spills, "
+ (runFileStreamOld.getFileCount() - spillFileCount) + " files, "
+ (runFileStreamOld.getWriteCount() - spillWriteCount) + " written, "
+ (runFileStreamOld.getReadCount() - spillReadCount) + " read].");
spillFileCount = runFileStreamOld.getFileCount();
spillReadCount = runFileStreamOld.getReadCount();
spillWriteCount = runFileStreamOld.getWriteCount();
}
runFileStreamOld.flushAndStopRunFile(accessor);
runFileStream.flushAndStopRunFile(accessor);
flushMemory();
if (!status.branch[LEFT_PARTITION].isRunFileReading()) {
leftStreamIndex = accessor.getTupleId();
}
runFileStreamOld.startReadingRunFile(accessor);
runFileStream.resetReadPointer(runFilePointer.getFileOffset());
accessor.setTupleId(runFilePointer.getTupleIndex());
runFileStream.startReadingRunFile(accessor);
if (LOGGER.isLoggable(Level.FINE)) {
LOGGER.fine("Unfreezing right partition.");
}
}
}