blob: b77d65dbf6ed51b67e91f63967a9c1c334aa1a36 [file] [log] [blame]
/*
* Copyright 2009-2010 by The Regents of the University of California
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.uci.ics.hyracks.algebricks.core.jobgen.impl;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.commons.lang3.mutable.Mutable;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
import edu.uci.ics.hyracks.api.job.IJobletEventListenerFactory;
import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
import edu.uci.ics.hyracks.api.job.JobSpecification;
public class PlanCompiler {
private JobGenContext context;
private Map<Mutable<ILogicalOperator>, List<Mutable<ILogicalOperator>>> operatorVisitedToParents = new HashMap<Mutable<ILogicalOperator>, List<Mutable<ILogicalOperator>>>();
public PlanCompiler(JobGenContext context) {
this.context = context;
}
public JobGenContext getContext() {
return context;
}
public JobSpecification compilePlan(ILogicalPlan plan, IOperatorSchema outerPlanSchema, IJobletEventListenerFactory jobEventListenerFactory) throws AlgebricksException {
JobSpecification spec = new JobSpecification();
if (jobEventListenerFactory != null) {
spec.setJobletEventListenerFactory(jobEventListenerFactory);
}
List<ILogicalOperator> rootOps = new ArrayList<ILogicalOperator>();
IHyracksJobBuilder builder = new JobBuilder(spec, context.getClusterLocations());
for (Mutable<ILogicalOperator> opRef : plan.getRoots()) {
compileOpRef(opRef, spec, builder, outerPlanSchema);
rootOps.add(opRef.getValue());
}
reviseEdges(builder);
operatorVisitedToParents.clear();
builder.buildSpec(rootOps);
spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
// Do not do activity cluster planning because it is slow on large clusters
spec.setUseConnectorPolicyForScheduling(false);
return spec;
}
private void compileOpRef(Mutable<ILogicalOperator> opRef, IOperatorDescriptorRegistry spec, IHyracksJobBuilder builder,
IOperatorSchema outerPlanSchema) throws AlgebricksException {
ILogicalOperator op = opRef.getValue();
int n = op.getInputs().size();
IOperatorSchema[] schemas = new IOperatorSchema[n];
int i = 0;
for (Mutable<ILogicalOperator> opRef2 : op.getInputs()) {
List<Mutable<ILogicalOperator>> parents = operatorVisitedToParents.get(opRef2);
if (parents == null) {
parents = new ArrayList<Mutable<ILogicalOperator>>();
operatorVisitedToParents.put(opRef2, parents);
parents.add(opRef);
compileOpRef(opRef2, spec, builder, outerPlanSchema);
schemas[i++] = context.getSchema(opRef2.getValue());
} else {
if (!parents.contains(opRef))
parents.add(opRef);
schemas[i++] = context.getSchema(opRef2.getValue());
continue;
}
}
IOperatorSchema opSchema = new OperatorSchemaImpl();
context.putSchema(op, opSchema);
op.getVariablePropagationPolicy().propagateVariables(opSchema, schemas);
op.contributeRuntimeOperator(builder, context, opSchema, schemas, outerPlanSchema);
}
private void reviseEdges(IHyracksJobBuilder builder) {
/**
* revise the edges for the case of replicate operator
*/
for (Entry<Mutable<ILogicalOperator>, List<Mutable<ILogicalOperator>>> entry : operatorVisitedToParents
.entrySet()) {
Mutable<ILogicalOperator> child = entry.getKey();
List<Mutable<ILogicalOperator>> parents = entry.getValue();
if (parents.size() > 1) {
int i = 0;
for (Mutable<ILogicalOperator> parent : parents) {
builder.contributeGraphEdge(child.getValue(), i, parent.getValue(), 0);
i++;
}
}
}
}
}