blob: 53c9d42f4ba6f097122769132728b8441e935f54 [file] [log] [blame]
package edu.uci.ics.hivesterix.runtime.exec;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.io.Serializable;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.ConditionalTask;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.hive.ql.exec.MapRedTask;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
import org.apache.hadoop.hive.ql.plan.FetchWork;
import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
import org.apache.hadoop.hive.ql.plan.MapredWork;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.hive.ql.plan.TableScanDesc;
import edu.uci.ics.hivesterix.common.config.ConfUtil;
import edu.uci.ics.hivesterix.logical.expression.HiveExpressionTypeComputer;
import edu.uci.ics.hivesterix.logical.expression.HiveMergeAggregationExpressionFactory;
import edu.uci.ics.hivesterix.logical.expression.HiveNullableTypeComputer;
import edu.uci.ics.hivesterix.logical.expression.HivePartialAggregationTypeComputer;
import edu.uci.ics.hivesterix.logical.plan.HiveAlgebricksTranslator;
import edu.uci.ics.hivesterix.logical.plan.HiveLogicalPlanAndMetaData;
import edu.uci.ics.hivesterix.optimizer.rulecollections.HiveRuleCollections;
import edu.uci.ics.hivesterix.runtime.factory.evaluator.HiveExpressionRuntimeProvider;
import edu.uci.ics.hivesterix.runtime.factory.nullwriter.HiveNullWriterFactory;
import edu.uci.ics.hivesterix.runtime.inspector.HiveBinaryBooleanInspectorFactory;
import edu.uci.ics.hivesterix.runtime.inspector.HiveBinaryIntegerInspectorFactory;
import edu.uci.ics.hivesterix.runtime.jobgen.HiveConnectorPolicyAssignmentPolicy;
import edu.uci.ics.hivesterix.runtime.jobgen.HiveConnectorPolicyAssignmentPolicy.Policy;
import edu.uci.ics.hivesterix.runtime.provider.HiveBinaryComparatorFactoryProvider;
import edu.uci.ics.hivesterix.runtime.provider.HiveBinaryHashFunctionFactoryProvider;
import edu.uci.ics.hivesterix.runtime.provider.HiveBinaryHashFunctionFamilyProvider;
import edu.uci.ics.hivesterix.runtime.provider.HiveNormalizedKeyComputerFactoryProvider;
import edu.uci.ics.hivesterix.runtime.provider.HivePrinterFactoryProvider;
import edu.uci.ics.hivesterix.runtime.provider.HiveSerializerDeserializerProvider;
import edu.uci.ics.hivesterix.runtime.provider.HiveTypeTraitProvider;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
import edu.uci.ics.hyracks.algebricks.compiler.api.HeuristicCompilerFactoryBuilder;
import edu.uci.ics.hyracks.algebricks.compiler.api.HeuristicCompilerFactoryBuilder.DefaultOptimizationContextFactory;
import edu.uci.ics.hyracks.algebricks.compiler.api.ICompiler;
import edu.uci.ics.hyracks.algebricks.compiler.api.ICompilerFactory;
import edu.uci.ics.hyracks.algebricks.compiler.rewriter.rulecontrollers.SequentialFixpointRuleController;
import edu.uci.ics.hyracks.algebricks.compiler.rewriter.rulecontrollers.SequentialOnceRuleController;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlanAndMetadata;
import edu.uci.ics.hyracks.algebricks.core.algebra.prettyprint.LogicalOperatorPrettyPrintVisitor;
import edu.uci.ics.hyracks.algebricks.core.algebra.prettyprint.PlanPrettyPrinter;
import edu.uci.ics.hyracks.algebricks.core.rewriter.base.AbstractRuleController;
import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
import edu.uci.ics.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
import edu.uci.ics.hyracks.api.client.HyracksConnection;
import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobSpecification;
@SuppressWarnings({ "rawtypes", "unchecked" })
public class HyracksExecutionEngine implements IExecutionEngine {
private static final Log LOG = LogFactory.getLog(HyracksExecutionEngine.class.getName());
private static final String clusterPropertiesPath = "conf/cluster.properties";
private static final String masterFilePath = "conf/master";
private static List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> DEFAULT_LOGICAL_REWRITES = new ArrayList<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>>();
private static List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> DEFAULT_PHYSICAL_REWRITES = new ArrayList<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>>();
static {
SequentialFixpointRuleController seqCtrlNoDfs = new SequentialFixpointRuleController(false);
SequentialFixpointRuleController seqCtrlFullDfs = new SequentialFixpointRuleController(true);
SequentialOnceRuleController seqOnceCtrl = new SequentialOnceRuleController(true);
DEFAULT_LOGICAL_REWRITES.add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(seqCtrlFullDfs,
HiveRuleCollections.NORMALIZATION));
DEFAULT_LOGICAL_REWRITES.add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(seqCtrlNoDfs,
HiveRuleCollections.COND_PUSHDOWN_AND_JOIN_INFERENCE));
DEFAULT_LOGICAL_REWRITES.add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(seqCtrlFullDfs,
HiveRuleCollections.LOAD_FIELDS));
DEFAULT_LOGICAL_REWRITES.add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(seqCtrlNoDfs,
HiveRuleCollections.OP_PUSHDOWN));
DEFAULT_LOGICAL_REWRITES.add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(seqOnceCtrl,
HiveRuleCollections.DATA_EXCHANGE));
DEFAULT_LOGICAL_REWRITES.add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(seqCtrlNoDfs,
HiveRuleCollections.CONSOLIDATION));
DEFAULT_PHYSICAL_REWRITES.add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(seqOnceCtrl,
HiveRuleCollections.PHYSICAL_PLAN_REWRITES));
DEFAULT_PHYSICAL_REWRITES.add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(seqOnceCtrl,
HiveRuleCollections.prepareJobGenRules));
}
/**
* static configurations for compiler
*/
private HeuristicCompilerFactoryBuilder builder;
/**
* compiler
*/
private ICompiler compiler;
/**
* physical optimization config
*/
private PhysicalOptimizationConfig physicalOptimizationConfig;
/**
* final ending operators
*/
private List<Operator> leaveOps = new ArrayList<Operator>();
/**
* tasks that are already visited
*/
private Map<Task<? extends Serializable>, Boolean> tasksVisited = new HashMap<Task<? extends Serializable>, Boolean>();
/**
* hyracks job spec
*/
private JobSpecification jobSpec;
/**
* hive configuration
*/
private HiveConf conf;
/**
* plan printer
*/
private PrintWriter planPrinter;
/**
* properties
*/
private Properties clusterProps;
/**
* the Hyracks client connection
*/
private IHyracksClientConnection hcc;
public HyracksExecutionEngine(HiveConf conf) {
this.conf = conf;
init(conf);
}
public HyracksExecutionEngine(HiveConf conf, PrintWriter planPrinter) {
this.conf = conf;
this.planPrinter = planPrinter;
init(conf);
}
private void init(HiveConf conf) {
builder = new HeuristicCompilerFactoryBuilder(DefaultOptimizationContextFactory.INSTANCE);
builder.setLogicalRewrites(DEFAULT_LOGICAL_REWRITES);
builder.setPhysicalRewrites(DEFAULT_PHYSICAL_REWRITES);
builder.setIMergeAggregationExpressionFactory(HiveMergeAggregationExpressionFactory.INSTANCE);
builder.setExpressionTypeComputer(HiveExpressionTypeComputer.INSTANCE);
builder.setNullableTypeComputer(HiveNullableTypeComputer.INSTANCE);
long memSizeExternalGby = conf.getLong("hive.algebricks.groupby.external.memory", 268435456);
long memSizeExternalSort = conf.getLong("hive.algebricks.sort.memory", 536870912);
int frameSize = conf.getInt("hive.algebricks.framesize", 32768);
physicalOptimizationConfig = new PhysicalOptimizationConfig();
int frameLimitExtGby = (int) (memSizeExternalGby / frameSize);
physicalOptimizationConfig.setMaxFramesExternalGroupBy(frameLimitExtGby);
int frameLimitExtSort = (int) (memSizeExternalSort / frameSize);
physicalOptimizationConfig.setMaxFramesExternalSort(frameLimitExtSort);
builder.setPhysicalOptimizationConfig(physicalOptimizationConfig);
}
@Override
public int compileJob(List<Task<? extends Serializable>> rootTasks) {
// clean up
leaveOps.clear();
tasksVisited.clear();
jobSpec = null;
HashMap<String, PartitionDesc> aliasToPath = new HashMap<String, PartitionDesc>();
List<Operator> rootOps = generateRootOperatorDAG(rootTasks, aliasToPath);
// get all leave Ops
getLeaves(rootOps, leaveOps);
HiveAlgebricksTranslator translator = new HiveAlgebricksTranslator();
try {
translator.translate(rootOps, null, aliasToPath);
ILogicalPlan plan = translator.genLogicalPlan();
if (plan.getRoots() != null && plan.getRoots().size() > 0 && plan.getRoots().get(0).getValue() != null) {
translator.printOperators();
ILogicalPlanAndMetadata planAndMetadata = new HiveLogicalPlanAndMetaData(plan,
translator.getMetadataProvider());
ICompilerFactory compilerFactory = builder.create();
compiler = compilerFactory.createCompiler(planAndMetadata.getPlan(),
planAndMetadata.getMetadataProvider(), translator.getVariableCounter());
// run optimization and re-writing rules for Hive plan
compiler.optimize();
// print optimized plan
LogicalOperatorPrettyPrintVisitor pvisitor = new LogicalOperatorPrettyPrintVisitor();
StringBuilder buffer = new StringBuilder();
PlanPrettyPrinter.printPlan(plan, buffer, pvisitor, 0);
String planStr = buffer.toString();
System.out.println(planStr);
if (planPrinter != null)
planPrinter.print(planStr);
}
} catch (Exception e) {
e.printStackTrace();
return 1;
}
return 0;
}
private void codeGen() throws AlgebricksException {
try {
// number of cpu cores in the cluster
builder.setClusterLocations(new AlgebricksAbsolutePartitionConstraint(ConfUtil.getNCs()));
} catch (Exception e) {
throw new AlgebricksException(e);
}
// builder.setClusterTopology(ConfUtil.getClusterTopology());
builder.setBinaryBooleanInspectorFactory(HiveBinaryBooleanInspectorFactory.INSTANCE);
builder.setBinaryIntegerInspectorFactory(HiveBinaryIntegerInspectorFactory.INSTANCE);
builder.setComparatorFactoryProvider(HiveBinaryComparatorFactoryProvider.INSTANCE);
builder.setExpressionRuntimeProvider(HiveExpressionRuntimeProvider.INSTANCE);
builder.setHashFunctionFactoryProvider(HiveBinaryHashFunctionFactoryProvider.INSTANCE);
builder.setPrinterProvider(HivePrinterFactoryProvider.INSTANCE);
builder.setSerializerDeserializerProvider(HiveSerializerDeserializerProvider.INSTANCE);
builder.setNullWriterFactory(HiveNullWriterFactory.INSTANCE);
builder.setNormalizedKeyComputerFactoryProvider(HiveNormalizedKeyComputerFactoryProvider.INSTANCE);
builder.setPartialAggregationTypeComputer(HivePartialAggregationTypeComputer.INSTANCE);
builder.setTypeTraitProvider(HiveTypeTraitProvider.INSTANCE);
builder.setHashFunctionFamilyProvider(HiveBinaryHashFunctionFamilyProvider.INSTANCE);
jobSpec = compiler.createJob(null, null);
// set the policy
String policyStr = conf.get("hive.hyracks.connectorpolicy");
if (policyStr == null)
policyStr = "PIPELINING";
Policy policyValue = Policy.valueOf(policyStr);
jobSpec.setConnectorPolicyAssignmentPolicy(new HiveConnectorPolicyAssignmentPolicy(policyValue));
jobSpec.setUseConnectorPolicyForScheduling(false);
}
@Override
public int executeJob() {
try {
codeGen();
executeHyracksJob(jobSpec);
} catch (Exception e) {
e.printStackTrace();
return 1;
}
return 0;
}
private List<Operator> generateRootOperatorDAG(List<Task<? extends Serializable>> rootTasks,
HashMap<String, PartitionDesc> aliasToPath) {
List<Operator> rootOps = new ArrayList<Operator>();
List<Task<? extends Serializable>> toDelete = new ArrayList<Task<? extends Serializable>>();
tasksVisited.clear();
for (int i = rootTasks.size() - 1; i >= 0; i--) {
/**
* list of map-reduce tasks
*/
Task<? extends Serializable> task = rootTasks.get(i);
if (task instanceof MapRedTask) {
List<Operator> mapRootOps = articulateMapReduceOperators(task, rootOps, aliasToPath, rootTasks);
if (i == 0)
rootOps.addAll(mapRootOps);
else {
List<Operator> leaves = new ArrayList<Operator>();
getLeaves(rootOps, leaves);
List<Operator> mapChildren = new ArrayList<Operator>();
for (Operator childMap : mapRootOps) {
if (childMap instanceof TableScanOperator) {
TableScanDesc topDesc = (TableScanDesc) childMap.getConf();
if (topDesc == null)
mapChildren.add(childMap);
else {
rootOps.add(childMap);
}
} else
mapChildren.add(childMap);
}
if (mapChildren.size() > 0) {
for (Operator leaf : leaves)
leaf.setChildOperators(mapChildren);
for (Operator child : mapChildren)
child.setParentOperators(leaves);
}
}
MapredWork mr = (MapredWork) task.getWork();
HashMap<String, PartitionDesc> map = mr.getAliasToPartnInfo();
addAliasToPartition(aliasToPath, map);
toDelete.add(task);
}
}
for (Task<? extends Serializable> task : toDelete)
rootTasks.remove(task);
return rootOps;
}
private void addAliasToPartition(HashMap<String, PartitionDesc> aliasToPath, HashMap<String, PartitionDesc> map) {
Iterator<String> keys = map.keySet().iterator();
while (keys.hasNext()) {
String key = keys.next();
PartitionDesc part = map.get(key);
String[] names = key.split(":");
for (String name : names) {
aliasToPath.put(name, part);
}
}
}
private List<Operator> articulateMapReduceOperators(Task task, List<Operator> rootOps,
HashMap<String, PartitionDesc> aliasToPath, List<Task<? extends Serializable>> rootTasks) {
// System.out.println("!"+task.getName());
if (!(task instanceof MapRedTask)) {
if (!(task instanceof ConditionalTask)) {
rootTasks.add(task);
return null;
} else {
// remove map-reduce branches in condition task
ConditionalTask condition = (ConditionalTask) task;
List<Task<? extends Serializable>> branches = condition.getListTasks();
for (int i = branches.size() - 1; i >= 0; i--) {
Task branch = branches.get(i);
if (branch instanceof MapRedTask) {
return articulateMapReduceOperators(branch, rootOps, aliasToPath, rootTasks);
}
}
rootTasks.add(task);
return null;
}
}
MapredWork mr = (MapredWork) task.getWork();
HashMap<String, PartitionDesc> map = mr.getAliasToPartnInfo();
// put all aliasToParitionDesc mapping into the map
addAliasToPartition(aliasToPath, map);
MapRedTask mrtask = (MapRedTask) task;
MapredWork work = (MapredWork) mrtask.getWork();
HashMap<String, Operator<? extends Serializable>> operators = work.getAliasToWork();
Set entries = operators.entrySet();
Iterator<Entry<String, Operator>> iterator = entries.iterator();
List<Operator> mapRootOps = new ArrayList<Operator>();
// get map root operators
while (iterator.hasNext()) {
Operator next = iterator.next().getValue();
if (!mapRootOps.contains(next)) {
// clear that only for the case of union
mapRootOps.add(next);
}
}
// get map local work
MapredLocalWork localWork = work.getMapLocalWork();
if (localWork != null) {
HashMap<String, Operator<? extends Serializable>> localOperators = localWork.getAliasToWork();
Set localEntries = localOperators.entrySet();
Iterator<Entry<String, Operator>> localIterator = localEntries.iterator();
while (localIterator.hasNext()) {
mapRootOps.add(localIterator.next().getValue());
}
HashMap<String, FetchWork> localFetch = localWork.getAliasToFetchWork();
Set localFetchEntries = localFetch.entrySet();
Iterator<Entry<String, FetchWork>> localFetchIterator = localFetchEntries.iterator();
while (localFetchIterator.hasNext()) {
Entry<String, FetchWork> fetchMap = localFetchIterator.next();
FetchWork fetch = fetchMap.getValue();
String alias = fetchMap.getKey();
List<PartitionDesc> dirPart = fetch.getPartDesc();
// temporary hack: put the first partitionDesc into the map
aliasToPath.put(alias, dirPart.get(0));
}
}
Boolean visited = tasksVisited.get(task);
if (visited != null && visited.booleanValue() == true) {
return mapRootOps;
}
// do that only for union operator
for (Operator op : mapRootOps)
if (op.getParentOperators() != null)
op.getParentOperators().clear();
List<Operator> mapLeaves = new ArrayList<Operator>();
downToLeaves(mapRootOps, mapLeaves);
List<Operator> reduceOps = new ArrayList<Operator>();
if (work.getReducer() != null)
reduceOps.add(work.getReducer());
for (Operator mapLeaf : mapLeaves) {
mapLeaf.setChildOperators(reduceOps);
}
for (Operator reduceOp : reduceOps) {
if (reduceOp != null)
reduceOp.setParentOperators(mapLeaves);
}
List<Operator> leafs = new ArrayList<Operator>();
if (reduceOps.size() > 0) {
downToLeaves(reduceOps, leafs);
} else {
leafs = mapLeaves;
}
List<Operator> mapChildren = new ArrayList<Operator>();
if (task.getChildTasks() != null && task.getChildTasks().size() > 0) {
for (Object child : task.getChildTasks()) {
List<Operator> childMapOps = articulateMapReduceOperators((Task) child, rootOps, aliasToPath, rootTasks);
if (childMapOps == null)
continue;
for (Operator childMap : childMapOps) {
if (childMap instanceof TableScanOperator) {
TableScanDesc topDesc = (TableScanDesc) childMap.getConf();
if (topDesc == null)
mapChildren.add(childMap);
else {
rootOps.add(childMap);
}
} else {
// if not table scan, add the child
mapChildren.add(childMap);
}
}
}
if (mapChildren.size() > 0) {
int i = 0;
for (Operator leaf : leafs) {
if (leaf.getChildOperators() == null || leaf.getChildOperators().size() == 0)
leaf.setChildOperators(new ArrayList<Operator>());
leaf.getChildOperators().add(mapChildren.get(i));
i++;
}
i = 0;
for (Operator child : mapChildren) {
if (child.getParentOperators() == null || child.getParentOperators().size() == 0)
child.setParentOperators(new ArrayList<Operator>());
child.getParentOperators().add(leafs.get(i));
i++;
}
}
}
// mark this task as visited
this.tasksVisited.put(task, true);
return mapRootOps;
}
/**
* down to leaf nodes
*
* @param ops
* @param leaves
*/
private void downToLeaves(List<Operator> ops, List<Operator> leaves) {
// Operator currentOp;
for (Operator op : ops) {
if (op != null && op.getChildOperators() != null && op.getChildOperators().size() > 0) {
downToLeaves(op.getChildOperators(), leaves);
} else {
if (op != null && leaves.indexOf(op) < 0)
leaves.add(op);
}
}
}
private void getLeaves(List<Operator> roots, List<Operator> currentLeaves) {
for (Operator op : roots) {
List<Operator> children = op.getChildOperators();
if (children == null || children.size() <= 0) {
currentLeaves.add(op);
} else {
getLeaves(children, currentLeaves);
}
}
}
private void executeHyracksJob(JobSpecification job) throws Exception {
/**
* load the properties file if it is not loaded
*/
if (clusterProps == null) {
clusterProps = new Properties();
InputStream confIn = new FileInputStream(clusterPropertiesPath);
clusterProps.load(confIn);
confIn.close();
}
if (hcc == null) {
BufferedReader ipReader = new BufferedReader(new InputStreamReader(new FileInputStream(masterFilePath)));
String masterNode = ipReader.readLine();
ipReader.close();
InetAddress[] ips = InetAddress.getAllByName(masterNode);
int port = Integer.parseInt(clusterProps.getProperty("CC_CLIENTPORT"));
for (InetAddress ip : ips) {
if (ip.getAddress().length <= 4) {
try {
hcc = new HyracksConnection(ip.getHostAddress(), port);
break;
} catch (Exception e) {
continue;
}
}
}
}
long start = System.currentTimeMillis();
JobId jobId = hcc.startJob(job);
hcc.waitForCompletion(jobId);
// System.out.println("job finished: " + jobId.toString());
// call all leave nodes to end
for (Operator leaf : leaveOps) {
jobClose(leaf);
}
long end = System.currentTimeMillis();
System.err.println(start + " " + end + " " + (end - start));
}
/**
* mv to final directory on hdfs (not real final)
*
* @param leaf
* @throws Exception
*/
private void jobClose(Operator leaf) throws Exception {
FileSinkOperator fsOp = (FileSinkOperator) leaf;
FileSinkDesc desc = fsOp.getConf();
boolean isNativeTable = !desc.getTableInfo().isNonNative();
if ((conf != null) && isNativeTable) {
String specPath = desc.getDirName();
DynamicPartitionCtx dpCtx = desc.getDynPartCtx();
// for 0.7.0
fsOp.mvFileToFinalPath(specPath, conf, true, LOG, dpCtx);
// for 0.8.0
// Utilities.mvFileToFinalPath(specPath, conf, true, LOG, dpCtx,
// desc);
}
}
}