blob: a54c96d75fbff0d508de63c64e6d841369cdc624 [file] [log] [blame]
package edu.uci.ics.hyracks.algebricks.runtime.operators.std;
import java.io.DataOutput;
import java.io.IOException;
import java.nio.ByteBuffer;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.data.IBinaryBooleanInspector;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputOperatorNodePushable;
public class PartitioningSplitOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
private static final long serialVersionUID = 1L;
public static int NO_DEFAULT_BRANCH = -1;
private final ICopyEvaluatorFactory[] evalFactories;
private final IBinaryBooleanInspector boolInspector;
private final int defaultBranchIndex;
public PartitioningSplitOperatorDescriptor(IOperatorDescriptorRegistry spec, ICopyEvaluatorFactory[] evalFactories,
IBinaryBooleanInspector boolInspector, int defaultBranchIndex, RecordDescriptor rDesc) {
super(spec, 1, (defaultBranchIndex == evalFactories.length) ? evalFactories.length + 1 : evalFactories.length);
for (int i = 0; i < evalFactories.length; i++) {
recordDescriptors[i] = rDesc;
}
this.evalFactories = evalFactories;
this.boolInspector = boolInspector;
this.defaultBranchIndex = defaultBranchIndex;
}
@Override
public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
final IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions)
throws HyracksDataException {
return new AbstractUnaryInputOperatorNodePushable() {
private final IFrameWriter[] writers = new IFrameWriter[outputArity];
private final ByteBuffer[] writeBuffers = new ByteBuffer[outputArity];
private final ICopyEvaluator[] evals = new ICopyEvaluator[outputArity];
private final ArrayBackedValueStorage evalBuf = new ArrayBackedValueStorage();
private final RecordDescriptor inOutRecDesc = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
private final FrameTupleAccessor accessor = new FrameTupleAccessor(ctx.getFrameSize(), inOutRecDesc);
private final FrameTupleReference frameTuple = new FrameTupleReference();
private final FrameTupleAppender tupleAppender = new FrameTupleAppender(ctx.getFrameSize());
private final ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(inOutRecDesc.getFieldCount());
private final DataOutput tupleDos = tupleBuilder.getDataOutput();
@Override
public void close() throws HyracksDataException {
// Flush (possibly not full) buffers that have data, and close writers.
for (int i = 0; i < outputArity; i++) {
tupleAppender.reset(writeBuffers[i], false);
if (tupleAppender.getTupleCount() > 0) {
FrameUtils.flushFrame(writeBuffers[i], writers[i]);
}
writers[i].close();
}
}
@Override
public void fail() throws HyracksDataException {
for (IFrameWriter writer : writers) {
writer.fail();
}
}
@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
accessor.reset(buffer);
int tupleCount = accessor.getTupleCount();
for (int i = 0; i < tupleCount; i++) {
frameTuple.reset(accessor, i);
boolean found = false;
for (int j = 0; j < evals.length; j++) {
try {
evalBuf.reset();
evals[j].evaluate(frameTuple);
} catch (AlgebricksException e) {
throw new HyracksDataException(e);
}
found = boolInspector.getBooleanValue(evalBuf.getByteArray(), 0, 1);
if (found) {
copyAndAppendTuple(j);
break;
}
}
// Optionally write to default output branch.
if (!found && defaultBranchIndex != NO_DEFAULT_BRANCH) {
copyAndAppendTuple(defaultBranchIndex);
}
}
}
private void copyAndAppendTuple(int outputIndex) throws HyracksDataException {
// Copy tuple into tuple builder.
try {
tupleBuilder.reset();
for (int i = 0; i < frameTuple.getFieldCount(); i++) {
tupleDos.write(frameTuple.getFieldData(i), frameTuple.getFieldStart(i),
frameTuple.getFieldLength(i));
tupleBuilder.addFieldEndOffset();
}
} catch (IOException e) {
throw new HyracksDataException(e);
}
// Append to frame.
tupleAppender.reset(writeBuffers[outputIndex], false);
if (!tupleAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0, tupleBuilder.getSize())) {
FrameUtils.flushFrame(writeBuffers[outputIndex], writers[outputIndex]);
tupleAppender.reset(writeBuffers[outputIndex], true);
if (!tupleAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0, tupleBuilder.getSize())) {
throw new IllegalStateException();
}
}
}
@Override
public void open() throws HyracksDataException {
for (IFrameWriter writer : writers) {
writer.open();
}
// Create write buffers.
for (int i = 0; i < outputArity; i++) {
writeBuffers[i] = ctx.allocateFrame();
// Make sure to clear all buffers, since we are reusing the tupleAppender.
tupleAppender.reset(writeBuffers[i], true);
}
// Create evaluators for partitioning.
try {
for (int i = 0; i < evalFactories.length; i++) {
evals[i] = evalFactories[i].createEvaluator(evalBuf);
}
} catch (AlgebricksException e) {
throw new HyracksDataException(e);
}
}
@Override
public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
writers[index] = writer;
}
};
}
}