blob: 2d5c92942e47eb0026699182bb36fd2d08185d34 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hyracks.algebricks.runtime.operators.std;
import java.io.DataOutput;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.data.IBinaryBooleanInspector;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
import org.apache.hyracks.api.comm.IFrame;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.comm.VSizeFrame;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
import org.apache.hyracks.data.std.api.IPointable;
import org.apache.hyracks.data.std.primitive.VoidPointable;
import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputOperatorNodePushable;
public class PartitioningSplitOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
private static final long serialVersionUID = 1L;
public static int NO_DEFAULT_BRANCH = -1;
private final IScalarEvaluatorFactory[] evalFactories;
private final IBinaryBooleanInspector boolInspector;
private final int defaultBranchIndex;
public PartitioningSplitOperatorDescriptor(IOperatorDescriptorRegistry spec,
IScalarEvaluatorFactory[] evalFactories, IBinaryBooleanInspector boolInspector, int defaultBranchIndex,
RecordDescriptor rDesc) {
super(spec, 1, (defaultBranchIndex == evalFactories.length) ? evalFactories.length + 1 : evalFactories.length);
for (int i = 0; i < evalFactories.length; i++) {
recordDescriptors[i] = rDesc;
}
this.evalFactories = evalFactories;
this.boolInspector = boolInspector;
this.defaultBranchIndex = defaultBranchIndex;
}
@Override
public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
final IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions)
throws HyracksDataException {
return new AbstractUnaryInputOperatorNodePushable() {
private final IFrameWriter[] writers = new IFrameWriter[outputArity];
private final boolean[] isOpen = new boolean[outputArity];
private final IFrame[] writeBuffers = new IFrame[outputArity];
private final IScalarEvaluator[] evals = new IScalarEvaluator[outputArity];
private final IPointable evalPointable = new VoidPointable();
private final RecordDescriptor inOutRecDesc = recordDescProvider.getInputRecordDescriptor(getActivityId(),
0);
private final FrameTupleAccessor accessor = new FrameTupleAccessor(inOutRecDesc);
private final FrameTupleReference frameTuple = new FrameTupleReference();
private final FrameTupleAppender tupleAppender = new FrameTupleAppender();
private final ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(inOutRecDesc.getFieldCount());
private final DataOutput tupleDos = tupleBuilder.getDataOutput();
@Override
public void close() throws HyracksDataException {
HyracksDataException hde = null;
for (int i = 0; i < outputArity; i++) {
if (isOpen[i]) {
try {
tupleAppender.reset(writeBuffers[i], false);
tupleAppender.write(writers[i], false);
} catch (Throwable th) {
if (hde == null) {
hde = new HyracksDataException();
}
hde.addSuppressed(th);
} finally {
try {
writers[i].close();
} catch (Throwable th) {
if (hde == null) {
hde = new HyracksDataException();
}
hde.addSuppressed(th);
}
}
}
}
if (hde != null) {
throw hde;
}
}
@Override
public void flush() throws HyracksDataException {
for (int i = 0; i < outputArity; i++) {
tupleAppender.reset(writeBuffers[i], false);
tupleAppender.flush(writers[i]);
}
}
@Override
public void fail() throws HyracksDataException {
HyracksDataException hde = null;
for (int i = 0; i < outputArity; i++) {
if (isOpen[i]) {
try {
writers[i].fail();
} catch (Throwable th) {
if (hde == null) {
hde = new HyracksDataException();
}
hde.addSuppressed(th);
}
}
}
if (hde != null) {
throw hde;
}
}
@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
accessor.reset(buffer);
int tupleCount = accessor.getTupleCount();
for (int i = 0; i < tupleCount; i++) {
frameTuple.reset(accessor, i);
boolean found = false;
for (int j = 0; j < evals.length; j++) {
try {
evals[j].evaluate(frameTuple, evalPointable);
} catch (AlgebricksException e) {
throw new HyracksDataException(e);
}
found = boolInspector.getBooleanValue(evalPointable.getByteArray(),
evalPointable.getStartOffset(), evalPointable.getLength());
if (found) {
copyAndAppendTuple(j);
break;
}
}
// Optionally write to default output branch.
if (!found && defaultBranchIndex != NO_DEFAULT_BRANCH) {
copyAndAppendTuple(defaultBranchIndex);
}
}
}
private void copyAndAppendTuple(int outputIndex) throws HyracksDataException {
// Copy tuple into tuple builder.
try {
tupleBuilder.reset();
for (int i = 0; i < frameTuple.getFieldCount(); i++) {
tupleDos.write(frameTuple.getFieldData(i), frameTuple.getFieldStart(i),
frameTuple.getFieldLength(i));
tupleBuilder.addFieldEndOffset();
}
} catch (IOException e) {
throw new HyracksDataException(e);
}
tupleAppender.reset(writeBuffers[outputIndex], false);
FrameUtils.appendToWriter(writers[outputIndex], tupleAppender, tupleBuilder.getFieldEndOffsets(),
tupleBuilder.getByteArray(), 0, tupleBuilder.getSize());
}
@Override
public void open() throws HyracksDataException {
for (int i = 0; i < writers.length; i++) {
isOpen[i] = true;
writers[i].open();
}
// Create write buffers.
for (int i = 0; i < outputArity; i++) {
writeBuffers[i] = new VSizeFrame(ctx);
// Make sure to clear all buffers, since we are reusing the tupleAppender.
tupleAppender.reset(writeBuffers[i], true);
}
// Create evaluators for partitioning.
try {
for (int i = 0; i < evalFactories.length; i++) {
evals[i] = evalFactories[i].createScalarEvaluator(ctx);
}
} catch (AlgebricksException e) {
throw new HyracksDataException(e);
}
}
@Override
public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
writers[index] = writer;
}
};
}
}