blob: 535ae595201b1c75d60d8657e7d95022e4c599d7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.pen;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import org.apache.pig.backend.executionengine.ExecPhysicalOperator;
import org.apache.pig.backend.local.executionengine.PORead;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.eval.EvalSpec;
import org.apache.pig.impl.eval.FilterSpec;
import org.apache.pig.impl.logicalLayer.LOCogroup;
import org.apache.pig.impl.logicalLayer.LOEval;
import org.apache.pig.impl.logicalLayer.LOLoad;
import org.apache.pig.impl.logicalLayer.LOSort;
import org.apache.pig.impl.logicalLayer.LOSplit;
import org.apache.pig.impl.logicalLayer.LOStore;
import org.apache.pig.impl.logicalLayer.LOUnion;
import org.apache.pig.impl.logicalLayer.LogicalOperator;
import org.apache.pig.impl.logicalLayer.OperatorKey;
import org.apache.pig.impl.logicalLayer.parser.NodeIdGenerator;
import org.apache.pig.impl.physicalLayer.PhysicalOperator;
import org.apache.pig.impl.util.IdentityHashSet;
import org.apache.pig.impl.util.LineageTracer;
public class DerivedData {
static Map<LogicalOperator, DataBag> CreateDerivedData(LogicalOperator op, Map<LOLoad, DataBag> baseData, Map<OperatorKey, OperatorKey> logicalToPhysicalKeys, Map<OperatorKey, ExecPhysicalOperator> physicalOpTable) throws IOException {
return CreateDerivedData(op, baseData, null, null, null, logicalToPhysicalKeys, physicalOpTable);
}
static Map<LogicalOperator, DataBag> CreateDerivedData(LogicalOperator op, Map<LOLoad, DataBag> baseData, LineageTracer lineageTracer, Collection<IdentityHashSet<Tuple>> equivalenceClasses, Map<LogicalOperator, Collection<IdentityHashSet<Tuple>>> OperatorToEqClasses, Map<OperatorKey, OperatorKey> logicalToPhysicalKeys, Map<OperatorKey, ExecPhysicalOperator> physicalOpTable) throws IOException {
Map<LogicalOperator, DataBag> derivedData = new HashMap<LogicalOperator, DataBag>();
CreateDerivedData(op, baseData, derivedData, lineageTracer, equivalenceClasses, OperatorToEqClasses, logicalToPhysicalKeys, physicalOpTable);
return derivedData;
}
static void CreateDerivedData(LogicalOperator op, Map<LOLoad, DataBag> baseData, Map<LogicalOperator, DataBag> derivedData, LineageTracer lineageTracer, Collection<IdentityHashSet<Tuple>> equivalenceClasses, Map<LogicalOperator, Collection<IdentityHashSet<Tuple>>> OperatorToEqClasses, Map<OperatorKey, OperatorKey> logicalToPhysicalKeys, Map<OperatorKey, ExecPhysicalOperator> physicalOpTable) throws IOException {
// first, recurse on upstream operators (if any)
for(OperatorKey input : op.getInputs()) {
CreateDerivedData(op.getOpTable().get(input), baseData, derivedData, lineageTracer, equivalenceClasses, OperatorToEqClasses, logicalToPhysicalKeys, physicalOpTable);
}
if (op instanceof LOLoad) {
DataBag output = baseData.get(op);
derivedData.put(op, output);
// insert base data into lineage tracer
if (lineageTracer != null) {
for (Iterator<Tuple> it = output.iterator(); it.hasNext(); ) {
lineageTracer.insert(it.next());
}
}
if (equivalenceClasses != null) {
Collection<IdentityHashSet<Tuple>> eqClasses = GetEquivalenceClasses(op, derivedData);
equivalenceClasses.addAll(eqClasses);
OperatorToEqClasses.put(op, eqClasses);
}
} else if (op instanceof LOStore) {
derivedData.put(op, derivedData.get(op.getInputs().get(0)));
} else {
derivedData.put(op, EvaluateOperator(op, lineageTracer, derivedData, logicalToPhysicalKeys, physicalOpTable));
if (equivalenceClasses != null) {
Collection<IdentityHashSet<Tuple>> eqClasses = GetEquivalenceClasses(op, derivedData);
//equivalenceClasses.addAll(GetEquivalenceClasses(op, derivedData));
equivalenceClasses.addAll(eqClasses);
OperatorToEqClasses.put(op, eqClasses);
}
}
}
static DataBag EvaluateOperator(LogicalOperator logOp, LineageTracer lineageTracer, Map<LogicalOperator, DataBag> derivedData, Map<OperatorKey, OperatorKey> logicalToPhysicalKeys, Map<OperatorKey, ExecPhysicalOperator> physicalOpTable) throws IOException {
//We have the compiled physical plan. We just replace the inputs to the physical operator corresponding to the supplied logical
//operator by POReads reading from databags from the derivedData
PhysicalOperator pOp = (PhysicalOperator)physicalOpTable.get(logicalToPhysicalKeys.get(logOp.getOperatorKey()));
if(lineageTracer != null) {
pOp.setLineageTracer(lineageTracer);
}
OperatorKey[] origInputs = new OperatorKey[pOp.inputs.length];
for(int i = 0; i < origInputs.length; ++i) {
origInputs[i] = pOp.inputs[i];
}
for(int i = 0; i < pOp.inputs.length; ++i) {
PORead read = new PORead(logOp.getScope(),
NodeIdGenerator.getGenerator().getNextNodeId(logOp.getScope()),
physicalOpTable,
LogicalOperator.FIXED,
derivedData.get(logOp.getOpTable().get(logOp.getInputs().get(i))));
pOp.inputs[i] = read.getOperatorKey();
}
//get the output data
DataBag outputData = BagFactory.getInstance().newDefaultBag();
pOp.open();
Tuple t = pOp.getNext();
while(t != null) {
outputData.add(t);
t = pOp.getNext();
}
pOp.close();
//restore the physical operator to its previous form
pOp.setLineageTracer(null);
for(int i = 0; i < origInputs.length; ++i) {
physicalOpTable.remove(pOp.inputs[i]);
pOp.inputs[i] = origInputs[i];
}
return outputData;
}
public static Collection<IdentityHashSet<Tuple>> GetEquivalenceClasses(LogicalOperator op, Map<LogicalOperator, DataBag> derivedData) throws IOException {
if (op instanceof LOCogroup) return GetEquivalenceClasses((LOCogroup) op, derivedData);
else if (op instanceof LOEval) return GetEquivalenceClasses((LOEval) op, derivedData);
else if (op instanceof LOSort) return GetEquivalenceClasses((LOSort) op, derivedData);
else if (op instanceof LOSplit) return GetEquivalenceClasses((LOSplit) op, derivedData);
else if (op instanceof LOUnion) return GetEquivalenceClasses((LOUnion) op, derivedData);
else if (op instanceof LOLoad) return GetEquivalenceClasses((LOLoad) op, derivedData);
else throw new RuntimeException("Unrecognized logical operator.");
}
static Collection<IdentityHashSet<Tuple>> GetEquivalenceClasses(LOLoad op, Map<LogicalOperator, DataBag> derivedData) throws IOException {
//Since its a load, all the tuples belong to a single equivalence class
Collection<IdentityHashSet<Tuple>> equivClasses = new LinkedList<IdentityHashSet<Tuple>>();
IdentityHashSet<Tuple> input = new IdentityHashSet<Tuple> ();
equivClasses.add(input);
DataBag output = derivedData.get(op);
for(Iterator<Tuple> it = output.iterator(); it.hasNext();) {
Tuple t = it.next();
input.add(t);
}
return equivClasses;
}
static Collection<IdentityHashSet<Tuple>> GetEquivalenceClasses(LOCogroup op, Map<LogicalOperator, DataBag> derivedData) throws IOException {
// make a single equivalence class, comprised of the set of group tuples that properly illustrate grouping (i.e. ones with adequate cardinality of the inner bags)
Collection<IdentityHashSet<Tuple>> equivClasses = new LinkedList<IdentityHashSet<Tuple>>();
IdentityHashSet<Tuple> acceptableGroups = new IdentityHashSet<Tuple>();
equivClasses.add(acceptableGroups);
DataBag output = derivedData.get(op);
for (Iterator<Tuple> it = output.iterator(); it.hasNext(); ) {
Tuple group = it.next();
boolean isAcceptable;
if (group.arity() == 2) { // for single-relation group-by, need a group with at least 2 tuples
isAcceptable = (group.getBagField(1).cardinality() >= 2);
} else { // for co-group, need a group with at least one tuple from each input relation
isAcceptable = true;
for (int field = 1; field < group.arity(); field++) {
DataBag bag = group.getBagField(field);
if (bag.cardinality() == 0) {
isAcceptable = false;
break;
}
}
}
if (isAcceptable) acceptableGroups.add(group);
}
return equivClasses;
}
static Collection<IdentityHashSet<Tuple>> GetEquivalenceClasses(LOEval op, Map<LogicalOperator, DataBag> derivedData) throws IOException {
Collection<IdentityHashSet<Tuple>> equivClasses = new LinkedList<IdentityHashSet<Tuple>>();
EvalSpec spec = op.getSpec();
if (spec instanceof FilterSpec) { // it's a FILTER
FilterSpec fSpec = (FilterSpec) spec;
// make two equivalence classes: one for tuples that pass the filter, and one for tuples that fail the filter
IdentityHashSet<Tuple> pass = new IdentityHashSet<Tuple>();
IdentityHashSet<Tuple> fail = new IdentityHashSet<Tuple>();
equivClasses.add(pass);
equivClasses.add(fail);
//DataBag input = derivedData.get(op.getInputs().get(0));
DataBag input = derivedData.get(op.getOpTable().get(op.getInputs().get(0)));
for (Iterator<Tuple> it = input.iterator(); it.hasNext(); ) {
Tuple t = it.next();
if (fSpec.cond.eval(t)) pass.add(t);
else fail.add(t);
}
} else { // it's a FOREACH
// make a single equivalence class containing all output tuples (presumably, any of them makes an equally good demonstration)
IdentityHashSet<Tuple> equivClass = new IdentityHashSet<Tuple>();
equivClasses.add(equivClass);
DataBag output = derivedData.get(op);
for (Iterator<Tuple> it = output.iterator(); it.hasNext(); ) {
equivClass.add(it.next());
}
}
return equivClasses;
}
static Collection<IdentityHashSet<Tuple>> GetEquivalenceClasses(LOSort op, Map<LogicalOperator, DataBag> derivedData) throws IOException {
return new LinkedList<IdentityHashSet<Tuple>>(); // SortDistinct doesn't add any new equivalence classes
}
static Collection<IdentityHashSet<Tuple>> GetEquivalenceClasses(LOSplit op, Map<LogicalOperator, DataBag> derivedData) throws IOException {
throw new RuntimeException("LOSplit not supported yet in example generator.");
}
static Collection<IdentityHashSet<Tuple>> GetEquivalenceClasses(LOUnion op, Map<LogicalOperator, DataBag> derivedData) throws IOException {
// make one equivalence class per input relation
Collection<IdentityHashSet<Tuple>> equivClasses = new LinkedList<IdentityHashSet<Tuple>>();
for (OperatorKey opKey : op.getInputs()) {
LogicalOperator input = op.getOpTable().get(opKey);
IdentityHashSet<Tuple> equivClass = new IdentityHashSet<Tuple>();
DataBag inputTable = derivedData.get(input);
for (Iterator<Tuple> it = inputTable.iterator(); it.hasNext(); ) {
equivClass.add(it.next());
}
equivClasses.add(equivClass);
}
return equivClasses;
}
}