blob: 77d5d4998d934fd47416d59b9dd72334bd010883 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hyracks.dataflow.std.sort;
import java.nio.ByteBuffer;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.hyracks.api.comm.IFrame;
import org.apache.hyracks.api.comm.IFrameTupleAppender;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.comm.VSizeFrame;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputer;
import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
import org.apache.hyracks.dataflow.std.buffermanager.BufferInfo;
import org.apache.hyracks.dataflow.std.buffermanager.IFrameBufferManager;
import org.apache.hyracks.util.IntSerDeUtils;
public abstract class AbstractFrameSorter implements IFrameSorter {
protected Logger LOGGER = Logger.getLogger(AbstractFrameSorter.class.getName());
static final int PTR_SIZE = 4;
static final int ID_FRAMEID = 0;
static final int ID_TUPLE_START = 1;
static final int ID_TUPLE_END = 2;
static final int ID_NORMAL_KEY = 3;
protected final int[] sortFields;
protected final IBinaryComparator[] comparators;
protected final INormalizedKeyComputer nkc;
protected final IFrameBufferManager bufferManager;
protected final FrameTupleAccessor inputTupleAccessor;
protected final IFrameTupleAppender outputAppender;
protected final IFrame outputFrame;
protected final int outputLimit;
protected int[] tPointers;
protected int tupleCount;
private FrameTupleAccessor fta2;
private BufferInfo info = new BufferInfo(null, -1, -1);
public AbstractFrameSorter(IHyracksTaskContext ctx, IFrameBufferManager bufferManager, int[] sortFields,
INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
RecordDescriptor recordDescriptor) throws HyracksDataException {
this(ctx, bufferManager, sortFields, firstKeyNormalizerFactory, comparatorFactories, recordDescriptor,
Integer.MAX_VALUE);
}
public AbstractFrameSorter(IHyracksTaskContext ctx, IFrameBufferManager bufferManager, int[] sortFields,
INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
RecordDescriptor recordDescriptor, int outputLimit)
throws HyracksDataException {
this.bufferManager = bufferManager;
this.sortFields = sortFields;
this.nkc = firstKeyNormalizerFactory == null ? null : firstKeyNormalizerFactory.createNormalizedKeyComputer();
this.comparators = new IBinaryComparator[comparatorFactories.length];
for (int i = 0; i < comparatorFactories.length; ++i) {
comparators[i] = comparatorFactories[i].createBinaryComparator();
}
this.inputTupleAccessor = new FrameTupleAccessor(recordDescriptor);
this.outputAppender = new FrameTupleAppender();
this.outputFrame = new VSizeFrame(ctx);
this.outputLimit = outputLimit;
this.fta2 = new FrameTupleAccessor(recordDescriptor);
}
@Override
public void reset() throws HyracksDataException {
this.tupleCount = 0;
this.bufferManager.reset();
}
@Override
public boolean insertFrame(ByteBuffer inputBuffer) throws HyracksDataException {
if (bufferManager.insertFrame(inputBuffer) >= 0) {
return true;
}
if (getFrameCount() == 0) {
throw new HyracksDataException(
"The input frame is too big for the sorting buffer, please allocate bigger buffer size");
}
return false;
}
@Override
public void sort() throws HyracksDataException {
tupleCount = 0;
for (int i = 0; i < bufferManager.getNumFrames(); ++i) {
bufferManager.getFrame(i, info);
inputTupleAccessor.reset(info.getBuffer(), info.getStartOffset(), info.getLength());
tupleCount += inputTupleAccessor.getTupleCount();
}
if (tPointers == null || tPointers.length < tupleCount * PTR_SIZE) {
tPointers = new int[tupleCount * PTR_SIZE];
}
int ptr = 0;
int sfIdx = sortFields[0];
for (int i = 0; i < bufferManager.getNumFrames(); ++i) {
bufferManager.getFrame(i, info);
inputTupleAccessor.reset(info.getBuffer(), info.getStartOffset(), info.getLength());
int tCount = inputTupleAccessor.getTupleCount();
byte[] array = inputTupleAccessor.getBuffer().array();
for (int j = 0; j < tCount; ++j) {
int tStart = inputTupleAccessor.getTupleStartOffset(j);
int tEnd = inputTupleAccessor.getTupleEndOffset(j);
tPointers[ptr * PTR_SIZE + ID_FRAMEID] = i;
tPointers[ptr * PTR_SIZE + ID_TUPLE_START] = tStart;
tPointers[ptr * PTR_SIZE + ID_TUPLE_END] = tEnd;
int f0StartRel = inputTupleAccessor.getFieldStartOffset(j, sfIdx);
int f0EndRel = inputTupleAccessor.getFieldEndOffset(j, sfIdx);
int f0Start = f0StartRel + tStart + inputTupleAccessor.getFieldSlotsLength();
tPointers[ptr * PTR_SIZE + ID_NORMAL_KEY] =
nkc == null ? 0 : nkc.normalize(array, f0Start, f0EndRel - f0StartRel);
++ptr;
}
}
if (tupleCount > 0) {
sortTupleReferences();
}
}
abstract void sortTupleReferences() throws HyracksDataException;
@Override
public int getFrameCount() {
return bufferManager.getNumFrames();
}
@Override
public boolean hasRemaining() {
return getFrameCount() > 0;
}
@Override
public int flush(IFrameWriter writer) throws HyracksDataException {
outputAppender.reset(outputFrame, true);
int maxFrameSize = outputFrame.getFrameSize();
int limit = Math.min(tupleCount, outputLimit);
int io = 0;
for (int ptr = 0; ptr < limit; ++ptr) {
int i = tPointers[ptr * PTR_SIZE + ID_FRAMEID];
int tStart = tPointers[ptr * PTR_SIZE + ID_TUPLE_START];
int tEnd = tPointers[ptr * PTR_SIZE + ID_TUPLE_END];
bufferManager.getFrame(i, info);
inputTupleAccessor.reset(info.getBuffer(), info.getStartOffset(), info.getLength());
int flushed = FrameUtils.appendToWriter(writer, outputAppender, inputTupleAccessor, tStart, tEnd);
if (flushed > 0) {
maxFrameSize = Math.max(maxFrameSize, flushed);
io++;
}
}
maxFrameSize = Math.max(maxFrameSize, outputFrame.getFrameSize());
outputAppender.write(writer, true);
if (LOGGER.isLoggable(Level.FINE)) {
LOGGER.fine(
"Flushed records:" + limit + " out of " + tupleCount + "; Flushed through " + (io + 1) + " frames");
}
return maxFrameSize;
}
protected final int compare(int tp1, int tp2) throws HyracksDataException {
int i1 = tPointers[tp1 * 4 + ID_FRAMEID];
int j1 = tPointers[tp1 * 4 + ID_TUPLE_START];
int v1 = tPointers[tp1 * 4 + ID_NORMAL_KEY];
int tp2i = tPointers[tp2 * 4 + ID_FRAMEID];
int tp2j = tPointers[tp2 * 4 + ID_TUPLE_START];
int tp2v = tPointers[tp2 * 4 + ID_NORMAL_KEY];
if (v1 != tp2v) {
return ((((long) v1) & 0xffffffffL) < (((long) tp2v) & 0xffffffffL)) ? -1 : 1;
}
int i2 = tp2i;
int j2 = tp2j;
bufferManager.getFrame(i1, info);
byte[] b1 = info.getBuffer().array();
inputTupleAccessor.reset(info.getBuffer(), info.getStartOffset(), info.getLength());
bufferManager.getFrame(i2, info);
byte[] b2 = info.getBuffer().array();
fta2.reset(info.getBuffer(), info.getStartOffset(), info.getLength());
for (int f = 0; f < comparators.length; ++f) {
int fIdx = sortFields[f];
int f1Start = fIdx == 0 ? 0 : IntSerDeUtils.getInt(b1, j1 + (fIdx - 1) * 4);
int f1End = IntSerDeUtils.getInt(b1, j1 + fIdx * 4);
int s1 = j1 + inputTupleAccessor.getFieldSlotsLength() + f1Start;
int l1 = f1End - f1Start;
int f2Start = fIdx == 0 ? 0 : IntSerDeUtils.getInt(b2, j2 + (fIdx - 1) * 4);
int f2End = IntSerDeUtils.getInt(b2, j2 + fIdx * 4);
int s2 = j2 + fta2.getFieldSlotsLength() + f2Start;
int l2 = f2End - f2Start;
int c = comparators[f].compare(b1, s1, l1, b2, s2, l2);
if (c != 0) {
return c;
}
}
return 0;
}
@Override
public void close() {
tupleCount = 0;
bufferManager.close();
tPointers = null;
}
}