blob: f57d205a68afeec9aa2293f8938a217acbd35687 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.asterix.runtime.operators.joins.intervalpartition;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.TreeMap;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.asterix.runtime.operators.joins.IIntervalMergeJoinChecker;
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.api.comm.IFrame;
import org.apache.hyracks.api.comm.IFrameTupleAccessor;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.comm.VSizeFrame;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
import org.apache.hyracks.dataflow.common.io.RunFileReader;
import org.apache.hyracks.dataflow.common.io.RunFileWriter;
import org.apache.hyracks.dataflow.std.buffermanager.BufferInfo;
import org.apache.hyracks.dataflow.std.buffermanager.IFrameBufferManager;
import org.apache.hyracks.dataflow.std.buffermanager.VPartitionTupleBufferManager;
import org.apache.hyracks.dataflow.std.join.AbstractMergeJoiner;
import org.apache.hyracks.dataflow.std.join.MergeJoinLocks;
import org.apache.hyracks.dataflow.std.join.MergeStatus;
import org.apache.hyracks.dataflow.std.structures.RunFilePointer;
import org.apache.hyracks.dataflow.std.structures.TuplePointer;
public class IntervalPartitionJoiner extends AbstractMergeJoiner {
private static final Logger LOGGER = Logger.getLogger(IntervalPartitionJoiner.class.getName());
private final RunFileWriter probeRunFileWriter;
private int probeRunFilePid = -1;
private final ITuplePartitionComputer buildHpc;
private final ITuplePartitionComputer probeHpc;
private final int buildMemory;
private final int k;
private final int numOfPartitions;
private long buildSize = 0;
private long probeSize = 0;
private long[] buildPartitionSizes;
private long[] probePartitionSizes;
private final TreeMap<RunFilePointer, Integer> probeRunFilePointers;
private final VPartitionTupleBufferManager buildBufferManager;
private final TuplePointer tempPtr = new TuplePointer();
private final List<Integer> buildInMemoryPartitions;
private final FrameTupleAccessor accessorBuild;
private BufferInfo bufferInfo;
private long spillWriteCount = 0;
private long spillReadCount = 0;
private long joinComparisonCount = 0;
private long joinResultCount = 0;
private final IIntervalMergeJoinChecker imjc;
private final FrameTupleAccessor accessorProbe;
private final IFrame reloadBuffer;
private boolean moreBuildProcessing = true;
private final List<IFrameBufferManager> fbms = new ArrayList<>();
public IntervalPartitionJoiner(IHyracksTaskContext ctx, int memorySize, int partition, int k, MergeStatus status,
MergeJoinLocks locks, IIntervalMergeJoinChecker imjc, RecordDescriptor leftRd, RecordDescriptor rightRd,
ITuplePartitionComputer buildHpc, ITuplePartitionComputer probeHpc) throws HyracksDataException {
super(ctx, partition, status, locks, leftRd, rightRd);
bufferInfo = new BufferInfo(null, -1, -1);
this.accessorProbe = new FrameTupleAccessor(leftRd);
reloadBuffer = new VSizeFrame(ctx);
this.numOfPartitions = IntervalPartitionUtil.getMaxPartitions(k);
this.imjc = imjc;
buildPartitionSizes = new long[numOfPartitions];
probePartitionSizes = new long[numOfPartitions];
// TODO fix available memory size
this.buildMemory = memorySize;
buildBufferManager = new VPartitionTupleBufferManager(ctx, VPartitionTupleBufferManager.NO_CONSTRAIN,
numOfPartitions, buildMemory * ctx.getInitialFrameSize());
this.k = k;
this.buildHpc = buildHpc;
this.probeHpc = probeHpc;
FileReference file = ctx.getJobletContext().createManagedWorkspaceFile("IntervalPartitionJoiner");
probeRunFileWriter = new RunFileWriter(file, ctx.getIOManager());
probeRunFileWriter.open();
probeRunFilePointers = new TreeMap<>(RunFilePointer.ASC);
buildInMemoryPartitions = new LinkedList<>();
this.accessorBuild = new FrameTupleAccessor(rightRd);
LOGGER.setLevel(Level.FINE);
System.out.println("IntervalIndexJoiner: Logging level is: " + LOGGER.getLevel());
if (LOGGER.isLoggable(Level.FINE)) {
LOGGER.fine("IntervalIndexJoiner has started partition " + partition + " with " + memorySize
+ " frames of memory.");
}
}
@Override
public void processLeftFrame(IFrameWriter writer) throws HyracksDataException {
while (inputAccessor[LEFT_PARTITION].exists()) {
int pid = probeHpc.partition(inputAccessor[LEFT_PARTITION], inputAccessor[LEFT_PARTITION].getTupleId(), k);
if (probeRunFilePid != pid) {
// Log new partition locations.
RunFilePointer rfp = new RunFilePointer(probeRunFileWriter.getFileSize(),
inputAccessor[LEFT_PARTITION].getTupleId());
probeRunFilePointers.put(rfp, pid);
probeRunFilePid = pid;
}
inputAccessor[LEFT_PARTITION].next();
probeSize++;
probePartitionSizes[pid]++;
}
inputBuffer[LEFT_PARTITION].rewind();
probeRunFileWriter.nextFrame(inputBuffer[LEFT_PARTITION]);
spillWriteCount++;
}
@Override
public void processLeftClose(IFrameWriter writer) throws HyracksDataException {
joinLoopOnMemory(writer);
// Flush result.
resultAppender.write(writer, true);
if (LOGGER.isLoggable(Level.WARNING)) {
LOGGER.warning("IntervalPartitionJoiner statitics: " + k + " k, " + joinComparisonCount + " comparisons, "
+ joinResultCount + " results, " + spillWriteCount + " written, " + spillReadCount + " read.");
}
System.err.print("build: [");
for (int i = 0; i < buildPartitionSizes.length; i++) {
System.err.print(buildPartitionSizes[i] + ", ");
}
System.err.println("]");
System.err.print("probe: [");
for (int i = 0; i < probePartitionSizes.length; i++) {
System.err.print(probePartitionSizes[i] + ", ");
}
System.err.println("]");
}
private void joinLoopOnMemory(IFrameWriter writer) throws HyracksDataException {
RunFileReader pReader = probeRunFileWriter.createDeleteOnCloseReader();
pReader.open();
// Load first frame.
loadReaderNextFrame(pReader);
while (moreBuildProcessing) {
fillMemory();
joinMemoryBlockWithRunFile(writer, pReader);
// Clean up
for (int pid : buildInMemoryPartitions) {
buildBufferManager.clearPartition(pid);
}
buildInMemoryPartitions.clear();
}
pReader.close();
}
private void joinMemoryBlockWithRunFile(IFrameWriter writer, RunFileReader pReader) throws HyracksDataException {
// Join Disk partitions with Memory partitions
for (RunFilePointer probeId : probeRunFilePointers.navigableKeySet()) {
Pair<Integer, Integer> probe = IntervalPartitionUtil.getIntervalPartition(probeRunFilePointers.get(probeId),
k);
for (int buildId : buildInMemoryPartitions) {
Pair<Integer, Integer> build = IntervalPartitionUtil.getIntervalPartition(buildId, k);
if (imjc.compareIntervalPartition(probe.first, probe.second, build.first, build.second)) {
fbms.add(buildBufferManager.getPartitionFrameBufferManager(buildId));
System.err.println("join " + probe + "(" + probePartitionSizes[probeRunFilePointers.get(probeId)]
+ ") with " + build + "(" + buildPartitionSizes[buildId] + ")");
}
}
if (!fbms.isEmpty()) {
join(pReader, probeId, fbms, writer);
}
fbms.clear();
}
}
private void join(RunFileReader pReader, RunFilePointer rfpStart, List<IFrameBufferManager> buildFbms,
IFrameWriter writer) throws HyracksDataException {
long fileOffsetStart = rfpStart.getFileOffset();
int tupleStart = rfpStart.getTupleIndex();
RunFilePointer rfpEnd = probeRunFilePointers.higherKey(rfpStart);
long fileOffsetEnd = rfpEnd == null ? pReader.getFileSize() : rfpEnd.getFileOffset();
int tupleEnd = rfpEnd == null ? Integer.MAX_VALUE : rfpEnd.getTupleIndex();
if (pReader.getReadPointer() != fileOffsetStart) {
pReader.reset(fileOffsetStart);
loadReaderNextFrame(pReader);
}
do {
int start = pReader.getReadPointer() == fileOffsetStart ? tupleStart : 0;
int end = pReader.getReadPointer() == fileOffsetEnd ? tupleEnd : accessorProbe.getTupleCount();
for (int i = start; i < end; ++i) {
// Tuple has potential match from build phase
for (IFrameBufferManager fbm : buildFbms) {
joinTupleWithMemoryPartition(accessorProbe, i, fbm, writer);
}
}
} while (pReader.getReadPointer() < fileOffsetEnd && loadReaderNextFrame(pReader));
}
private boolean loadReaderNextFrame(RunFileReader pReader) throws HyracksDataException {
if (pReader.nextFrame(reloadBuffer)) {
accessorProbe.reset(reloadBuffer.getBuffer());
spillReadCount++;
return true;
}
return false;
}
public void joinTupleWithMemoryPartition(IFrameTupleAccessor accessorProbe, int probeTupleIndex,
IFrameBufferManager fbm, IFrameWriter writer) throws HyracksDataException {
if (fbm.getNumFrames() == 0) {
return;
}
fbm.resetIterator();
int frameIndex = fbm.next();
while (fbm.exists()) {
fbm.getFrame(frameIndex, bufferInfo);
accessorBuild.reset(bufferInfo.getBuffer());
for (int buildTupleIndex = 0; buildTupleIndex < accessorBuild.getTupleCount(); ++buildTupleIndex) {
if (imjc.checkToSaveInResult(accessorBuild, buildTupleIndex, accessorProbe, probeTupleIndex, false)) {
appendToResult(accessorBuild, buildTupleIndex, accessorProbe, probeTupleIndex, writer);
}
joinComparisonCount++;
}
frameIndex = fbm.next();
}
}
private void appendToResult(IFrameTupleAccessor accessorBuild, int buildSidetIx, IFrameTupleAccessor accessorProbe,
int probeSidetIx, IFrameWriter writer) throws HyracksDataException {
FrameUtils.appendConcatToWriter(writer, resultAppender, accessorBuild, buildSidetIx, accessorProbe,
probeSidetIx);
joinResultCount++;
}
private void fillMemory() throws HyracksDataException {
int buildPid = -1;
TupleStatus ts;
for (ts = loadRightTuple(); ts.isLoaded(); ts = loadRightTuple()) {
int pid = buildHpc.partition(inputAccessor[RIGHT_PARTITION], inputAccessor[RIGHT_PARTITION].getTupleId(),
k);
if (!buildBufferManager.insertTuple(pid, inputAccessor[RIGHT_PARTITION],
inputAccessor[RIGHT_PARTITION].getTupleId(), tempPtr)) {
return;
}
buildPartitionSizes[pid]++;
if (buildPid != pid) {
// Track new partitions in memory.
buildInMemoryPartitions.add(pid);
buildPid = pid;
}
inputAccessor[RIGHT_PARTITION].next();
buildSize++;
}
if (ts.isEmpty()) {
moreBuildProcessing = false;
}
}
private TupleStatus loadRightTuple() throws HyracksDataException {
TupleStatus loaded = loadMemoryTuple(RIGHT_PARTITION);
if (loaded == TupleStatus.UNKNOWN) {
loaded = pauseAndLoadRightTuple();
}
return loaded;
}
}