blob: d153f90e9ed06c2b2899b0dd73e412d0ee724df8 [file] [log] [blame]
/*
* Copyright 2009-2010 by The Regents of the University of California
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical;
import java.util.LinkedList;
import java.util.List;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IExpressionRuntimeProvider;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator.JoinKind;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
import edu.uci.ics.hyracks.algebricks.core.algebra.properties.BroadcastPartitioningProperty;
import edu.uci.ics.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
import edu.uci.ics.hyracks.algebricks.data.IBinaryBooleanInspector;
import edu.uci.ics.hyracks.algebricks.data.IBinaryBooleanInspectorFactory;
import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluator;
import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparator;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
import edu.uci.ics.hyracks.data.std.api.IPointable;
import edu.uci.ics.hyracks.data.std.primitive.VoidPointable;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
import edu.uci.ics.hyracks.dataflow.std.join.NestedLoopJoinOperatorDescriptor;
/**
* Left input is broadcast and preserves its local properties. Right input can
* be partitioned in any way.
*/
public class NLJoinPOperator extends AbstractJoinPOperator {
private final int memSize;
public NLJoinPOperator(JoinKind kind, JoinPartitioningType partitioningType, int memSize) {
super(kind, partitioningType);
this.memSize = memSize;
}
@Override
public PhysicalOperatorTag getOperatorTag() {
return PhysicalOperatorTag.NESTED_LOOP;
}
@Override
public boolean isMicroOperator() {
return false;
}
@Override
public void computeDeliveredProperties(ILogicalOperator iop, IOptimizationContext context) {
if (partitioningType != JoinPartitioningType.BROADCAST) {
throw new NotImplementedException(partitioningType + " nested loop joins are not implemented.");
}
IPartitioningProperty pp;
AbstractLogicalOperator op = (AbstractLogicalOperator) iop;
if (op.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.PARTITIONED) {
AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(1).getValue();
IPhysicalPropertiesVector pv1 = op2.getPhysicalOperator().getDeliveredProperties();
if (pv1 == null) {
pp = null;
} else {
pp = pv1.getPartitioningProperty();
}
} else {
pp = IPartitioningProperty.UNPARTITIONED;
}
List<ILocalStructuralProperty> localProps = new LinkedList<ILocalStructuralProperty>();
this.deliveredProperties = new StructuralPropertiesVector(pp, localProps);
}
@Override
public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
IPhysicalPropertiesVector reqdByParent) {
if (partitioningType != JoinPartitioningType.BROADCAST) {
throw new NotImplementedException(partitioningType + " nested loop joins are not implemented.");
}
StructuralPropertiesVector[] pv = new StructuralPropertiesVector[2];
pv[0] = new StructuralPropertiesVector(new BroadcastPartitioningProperty(null), null);
pv[1] = new StructuralPropertiesVector(null, null);
return new PhysicalRequirements(pv, IPartitioningRequirementsCoordinator.NO_COORDINATION);
}
@Override
public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
throws AlgebricksException {
AbstractBinaryJoinOperator join = (AbstractBinaryJoinOperator) op;
RecordDescriptor recDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op),
propagatedSchema, context);
IOperatorSchema[] conditionInputSchemas = new IOperatorSchema[1];
conditionInputSchemas[0] = propagatedSchema;
IExpressionRuntimeProvider expressionRuntimeProvider = context.getExpressionRuntimeProvider();
IScalarEvaluatorFactory cond = expressionRuntimeProvider.createEvaluatorFactory(join.getCondition().getValue(),
context.getTypeEnvironment(op), conditionInputSchemas, context);
ITuplePairComparatorFactory comparatorFactory = new TuplePairEvaluatorFactory(cond,
context.getBinaryBooleanInspectorFactory());
IOperatorDescriptorRegistry spec = builder.getJobSpec();
IOperatorDescriptor opDesc = null;
switch (kind) {
case INNER: {
opDesc = new NestedLoopJoinOperatorDescriptor(spec, comparatorFactory, recDescriptor, memSize, false,
null);
break;
}
case LEFT_OUTER: {
INullWriterFactory[] nullWriterFactories = new INullWriterFactory[inputSchemas[1].getSize()];
for (int j = 0; j < nullWriterFactories.length; j++) {
nullWriterFactories[j] = context.getNullWriterFactory();
}
opDesc = new NestedLoopJoinOperatorDescriptor(spec, comparatorFactory, recDescriptor, memSize, true,
nullWriterFactories);
break;
}
default: {
throw new NotImplementedException();
}
}
contributeOpDesc(builder, (AbstractLogicalOperator) op, opDesc);
ILogicalOperator src1 = op.getInputs().get(0).getValue();
builder.contributeGraphEdge(src1, 0, op, 0);
ILogicalOperator src2 = op.getInputs().get(1).getValue();
builder.contributeGraphEdge(src2, 0, op, 1);
}
public static class TuplePairEvaluatorFactory implements ITuplePairComparatorFactory {
private static final long serialVersionUID = 1L;
private final IScalarEvaluatorFactory cond;
private final IBinaryBooleanInspectorFactory binaryBooleanInspectorFactory;
public TuplePairEvaluatorFactory(IScalarEvaluatorFactory cond,
IBinaryBooleanInspectorFactory binaryBooleanInspectorFactory) {
this.cond = cond;
this.binaryBooleanInspectorFactory = binaryBooleanInspectorFactory;
}
@Override
public synchronized ITuplePairComparator createTuplePairComparator(IHyracksTaskContext ctx) {
return new TuplePairEvaluator(ctx, cond, binaryBooleanInspectorFactory.createBinaryBooleanInspector(ctx));
}
}
public static class TuplePairEvaluator implements ITuplePairComparator {
private final IHyracksTaskContext ctx;
private IScalarEvaluator condEvaluator;
private final IScalarEvaluatorFactory condFactory;
private final IPointable p;
private final CompositeFrameTupleReference compositeTupleRef;
private final FrameTupleReference leftRef;
private final FrameTupleReference rightRef;
private final IBinaryBooleanInspector binaryBooleanInspector;
public TuplePairEvaluator(IHyracksTaskContext ctx, IScalarEvaluatorFactory condFactory,
IBinaryBooleanInspector binaryBooleanInspector) {
this.ctx = ctx;
this.condFactory = condFactory;
this.binaryBooleanInspector = binaryBooleanInspector;
this.leftRef = new FrameTupleReference();
this.p = VoidPointable.FACTORY.createPointable();
this.rightRef = new FrameTupleReference();
this.compositeTupleRef = new CompositeFrameTupleReference(leftRef, rightRef);
}
@Override
public int compare(IFrameTupleAccessor outerAccessor, int outerIndex, IFrameTupleAccessor innerAccessor,
int innerIndex) throws HyracksDataException {
if (condEvaluator == null) {
try {
this.condEvaluator = condFactory.createScalarEvaluator(ctx);
} catch (AlgebricksException ae) {
throw new HyracksDataException(ae);
}
}
compositeTupleRef.reset(outerAccessor, outerIndex, innerAccessor, innerIndex);
try {
condEvaluator.evaluate(compositeTupleRef, p);
} catch (AlgebricksException ae) {
throw new HyracksDataException(ae);
}
boolean result = binaryBooleanInspector
.getBooleanValue(p.getByteArray(), p.getStartOffset(), p.getLength());
if (result)
return 0;
else
return 1;
}
}
public static class CompositeFrameTupleReference implements IFrameTupleReference {
private final FrameTupleReference refLeft;
private final FrameTupleReference refRight;
public CompositeFrameTupleReference(FrameTupleReference refLeft, FrameTupleReference refRight) {
this.refLeft = refLeft;
this.refRight = refRight;
}
public void reset(IFrameTupleAccessor outerAccessor, int outerIndex, IFrameTupleAccessor innerAccessor,
int innerIndex) {
refLeft.reset(outerAccessor, outerIndex);
refRight.reset(innerAccessor, innerIndex);
}
@Override
public int getFieldCount() {
return refLeft.getFieldCount() + refRight.getFieldCount();
}
@Override
public byte[] getFieldData(int fIdx) {
int leftFieldCount = refLeft.getFieldCount();
if (fIdx < leftFieldCount)
return refLeft.getFieldData(fIdx);
else
return refRight.getFieldData(fIdx - leftFieldCount);
}
@Override
public int getFieldStart(int fIdx) {
int leftFieldCount = refLeft.getFieldCount();
if (fIdx < leftFieldCount)
return refLeft.getFieldStart(fIdx);
else
return refRight.getFieldStart(fIdx - leftFieldCount);
}
@Override
public int getFieldLength(int fIdx) {
int leftFieldCount = refLeft.getFieldCount();
if (fIdx < leftFieldCount)
return refLeft.getFieldLength(fIdx);
else
return refRight.getFieldLength(fIdx - leftFieldCount);
}
@Override
public IFrameTupleAccessor getFrameTupleAccessor() {
throw new NotImplementedException();
}
@Override
public int getTupleIndex() {
throw new NotImplementedException();
}
}
}