blob: 198f68c190cd11b1b434830927c766cd2abf4263 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
import java.util.List;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.VisitorException;
/**
* This is an implementation of the Filter operator. It has an Expression Plan
* that decides whether the input tuple should be filtered or passed through. To
* avoid many function calls, the filter operator, stores the Comparison
* Operator that is the root of the Expression Plan and uses its getNext
* directly.
*
* Since the filter is supposed to return tuples only, getNext is not supported
* on any other data type.
*
*/
public class POFilter extends PhysicalOperator {
/**
*
*/
private static final long serialVersionUID = 1L;
// The expression plan
private PhysicalPlan plan;
// The root comparison operator of the expression plan
// ComparisonOperator comOp;
private PhysicalOperator comOp;
// The operand type for the comparison operator needed
// to call the comparison operators getNext with the
// appropriate type
// private byte compOperandType;
public POFilter(OperatorKey k) {
this(k, -1, null);
}
public POFilter(OperatorKey k, int rp) {
this(k, rp, null);
}
public POFilter(OperatorKey k, List<PhysicalOperator> inputs) {
this(k, -1, inputs);
}
public POFilter(OperatorKey k, int rp, List<PhysicalOperator> inputs) {
super(k, rp, inputs);
}
/**
* Attaches the proccesed input tuple to the expression plan and checks if
* comparison operator returns a true. If so the tuple is not filtered and
* let to pass through. Else, further input is processed till a tuple that
* can be passed through is found or EOP is reached.
*/
@Override
public Result getNextTuple() throws ExecException {
Result res = null;
Result inp = null;
while (true) {
inp = processInput();
if (inp.returnStatus == POStatus.STATUS_EOP
|| inp.returnStatus == POStatus.STATUS_ERR)
break;
if (inp.returnStatus == POStatus.STATUS_NULL) {
continue;
}
plan.attachInput((Tuple) inp.result);
/*
switch (compOperandType) {
case DataType.BOOLEAN:
res = comOp.getNext(dummyBool);
if (res.returnStatus != POStatus.STATUS_OK)
continue;
break;
case DataType.BYTEARRAY:
res = comOp.getNext(dummyDBA);
if (res.returnStatus != POStatus.STATUS_OK)
continue;
break;
case DataType.CHARARRAY:
res = comOp.getNext(dummyString);
if (res.returnStatus != POStatus.STATUS_OK)
continue;
break;
case DataType.DOUBLE:
res = comOp.getNext(dummyDouble);
if (res.returnStatus != POStatus.STATUS_OK)
continue;
break;
case DataType.FLOAT:
res = comOp.getNext(dummyFloat);
if (res.returnStatus != POStatus.STATUS_OK)
continue;
break;
case DataType.INTEGER:
res = comOp.getNext(dummyInt);
if (res.returnStatus != POStatus.STATUS_OK)
continue;
break;
case DataType.LONG:
res = comOp.getNext(dummyLong);
if (res.returnStatus != POStatus.STATUS_OK)
continue;
break;
default:
throw new RuntimeException("Unexpected type " +
DataType.findTypeName(compOperandType));
}
*/
res = comOp.getNextBoolean();
plan.detachInput();
if (res.returnStatus != POStatus.STATUS_OK)
return res;
if (res.result != null) {
illustratorMarkup(inp.result, inp.result, (Boolean) res.result ? 0 : 1);
if ((Boolean) res.result)
return inp;
}
}
return inp;
}
@Override
public String name() {
return getAliasString() + "Filter" + "["
+ DataType.findTypeName(resultType) + "]" + " - "
+ mKey.toString();
}
@Override
public boolean supportsMultipleInputs() {
return false;
}
@Override
public boolean supportsMultipleOutputs() {
return false;
}
@Override
public void visit(PhyPlanVisitor v) throws VisitorException {
v.visitFilter(this);
}
public void setPlan(PhysicalPlan plan) {
this.plan = plan;
comOp = plan.getLeaves().get(0);
// compOperandType = comOp.getOperandType();
}
public PhysicalPlan getPlan() {
return plan;
}
@Override
public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
if (illustrator != null) {
int index = 0;
for (int i = 0; i < illustrator.getSubExpResults().size(); ++i) {
if (!illustrator.getSubExpResults().get(i)[0])
index += (1 << i);
}
if (index < illustrator.getEquivalenceClasses().size())
illustrator.getEquivalenceClasses().get(index).add((Tuple) in);
if (eqClassIndex == 0) // add only qualified record
illustrator.addData((Tuple) out);
}
return (Tuple) out;
}
@Override
public PhysicalOperator clone() throws CloneNotSupportedException {
POFilter opClone = (POFilter) super.clone();
opClone.setPlan(plan.clone());
return opClone;
}
}