| /* |
| * Copyright 2009-2013 by The Regents of the University of California |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * you may obtain a copy of the License from |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package edu.uci.ics.hivesterix.runtime.exec;
|
|
|
| import java.io.BufferedReader;
|
| import java.io.FileInputStream;
|
| import java.io.InputStream;
|
| import java.io.InputStreamReader;
|
| import java.io.PrintWriter;
|
| import java.io.Serializable;
|
| import java.net.InetAddress;
|
| import java.util.ArrayList;
|
| import java.util.HashMap;
|
| import java.util.Iterator;
|
| import java.util.List;
|
| import java.util.Map;
|
| import java.util.Map.Entry;
|
| import java.util.Properties;
|
| import java.util.Set;
|
|
|
| import org.apache.commons.logging.Log;
|
| import org.apache.commons.logging.LogFactory;
|
| import org.apache.hadoop.hive.conf.HiveConf;
|
| import org.apache.hadoop.hive.ql.exec.ConditionalTask;
|
| import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
|
| import org.apache.hadoop.hive.ql.exec.MapRedTask;
|
| import org.apache.hadoop.hive.ql.exec.Operator;
|
| import org.apache.hadoop.hive.ql.exec.TableScanOperator;
|
| import org.apache.hadoop.hive.ql.exec.Task;
|
| import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
|
| import org.apache.hadoop.hive.ql.plan.FetchWork;
|
| import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
|
| import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
|
| import org.apache.hadoop.hive.ql.plan.MapredWork;
|
| import org.apache.hadoop.hive.ql.plan.PartitionDesc;
|
| import org.apache.hadoop.hive.ql.plan.TableScanDesc;
|
|
|
| import edu.uci.ics.hivesterix.common.config.ConfUtil;
|
| import edu.uci.ics.hivesterix.logical.expression.HiveExpressionTypeComputer;
|
| import edu.uci.ics.hivesterix.logical.expression.HiveMergeAggregationExpressionFactory;
|
| import edu.uci.ics.hivesterix.logical.expression.HiveNullableTypeComputer;
|
| import edu.uci.ics.hivesterix.logical.expression.HivePartialAggregationTypeComputer;
|
| import edu.uci.ics.hivesterix.logical.plan.HiveAlgebricksTranslator;
|
| import edu.uci.ics.hivesterix.logical.plan.HiveLogicalPlanAndMetaData;
|
| import edu.uci.ics.hivesterix.optimizer.rulecollections.HiveRuleCollections;
|
| import edu.uci.ics.hivesterix.runtime.factory.evaluator.HiveExpressionRuntimeProvider;
|
| import edu.uci.ics.hivesterix.runtime.factory.nullwriter.HiveNullWriterFactory;
|
| import edu.uci.ics.hivesterix.runtime.inspector.HiveBinaryBooleanInspectorFactory;
|
| import edu.uci.ics.hivesterix.runtime.inspector.HiveBinaryIntegerInspectorFactory;
|
| import edu.uci.ics.hivesterix.runtime.jobgen.HiveConnectorPolicyAssignmentPolicy;
|
| import edu.uci.ics.hivesterix.runtime.jobgen.HiveConnectorPolicyAssignmentPolicy.Policy;
|
| import edu.uci.ics.hivesterix.runtime.provider.HiveBinaryComparatorFactoryProvider;
|
| import edu.uci.ics.hivesterix.runtime.provider.HiveBinaryHashFunctionFactoryProvider;
|
| import edu.uci.ics.hivesterix.runtime.provider.HiveBinaryHashFunctionFamilyProvider;
|
| import edu.uci.ics.hivesterix.runtime.provider.HiveNormalizedKeyComputerFactoryProvider;
|
| import edu.uci.ics.hivesterix.runtime.provider.HivePrinterFactoryProvider;
|
| import edu.uci.ics.hivesterix.runtime.provider.HiveSerializerDeserializerProvider;
|
| import edu.uci.ics.hivesterix.runtime.provider.HiveTypeTraitProvider;
|
| import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
|
| import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
|
| import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
|
| import edu.uci.ics.hyracks.algebricks.compiler.api.HeuristicCompilerFactoryBuilder;
|
| import edu.uci.ics.hyracks.algebricks.compiler.api.HeuristicCompilerFactoryBuilder.DefaultOptimizationContextFactory;
|
| import edu.uci.ics.hyracks.algebricks.compiler.api.ICompiler;
|
| import edu.uci.ics.hyracks.algebricks.compiler.api.ICompilerFactory;
|
| import edu.uci.ics.hyracks.algebricks.compiler.rewriter.rulecontrollers.SequentialFixpointRuleController;
|
| import edu.uci.ics.hyracks.algebricks.compiler.rewriter.rulecontrollers.SequentialOnceRuleController;
|
| import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
|
| import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlanAndMetadata;
|
| import edu.uci.ics.hyracks.algebricks.core.algebra.prettyprint.LogicalOperatorPrettyPrintVisitor;
|
| import edu.uci.ics.hyracks.algebricks.core.algebra.prettyprint.PlanPrettyPrinter;
|
| import edu.uci.ics.hyracks.algebricks.core.rewriter.base.AbstractRuleController;
|
| import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
|
| import edu.uci.ics.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
|
| import edu.uci.ics.hyracks.api.client.HyracksConnection;
|
| import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
|
| import edu.uci.ics.hyracks.api.job.JobId;
|
| import edu.uci.ics.hyracks.api.job.JobSpecification;
|
|
|
| @SuppressWarnings({ "rawtypes", "unchecked" })
|
| public class HyracksExecutionEngine implements IExecutionEngine {
|
|
|
| private static final Log LOG = LogFactory.getLog(HyracksExecutionEngine.class.getName());
|
| private static final String clusterPropertiesPath = "conf/cluster.properties";
|
| private static final String masterFilePath = "conf/master";
|
|
|
| private static List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> DEFAULT_LOGICAL_REWRITES = new ArrayList<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>>();
|
| private static List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> DEFAULT_PHYSICAL_REWRITES = new ArrayList<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>>();
|
| static {
|
| SequentialFixpointRuleController seqCtrlNoDfs = new SequentialFixpointRuleController(false);
|
| SequentialFixpointRuleController seqCtrlFullDfs = new SequentialFixpointRuleController(true);
|
| SequentialOnceRuleController seqOnceCtrl = new SequentialOnceRuleController(true);
|
| DEFAULT_LOGICAL_REWRITES.add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(seqCtrlFullDfs,
|
| HiveRuleCollections.NORMALIZATION));
|
| DEFAULT_LOGICAL_REWRITES.add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(seqCtrlNoDfs,
|
| HiveRuleCollections.COND_PUSHDOWN_AND_JOIN_INFERENCE));
|
| DEFAULT_LOGICAL_REWRITES.add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(seqCtrlFullDfs,
|
| HiveRuleCollections.LOAD_FIELDS));
|
| DEFAULT_LOGICAL_REWRITES.add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(seqCtrlNoDfs,
|
| HiveRuleCollections.OP_PUSHDOWN));
|
| DEFAULT_LOGICAL_REWRITES.add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(seqOnceCtrl,
|
| HiveRuleCollections.DATA_EXCHANGE));
|
| DEFAULT_LOGICAL_REWRITES.add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(seqCtrlNoDfs,
|
| HiveRuleCollections.CONSOLIDATION));
|
|
|
| DEFAULT_PHYSICAL_REWRITES.add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(seqOnceCtrl,
|
| HiveRuleCollections.PHYSICAL_PLAN_REWRITES));
|
| DEFAULT_PHYSICAL_REWRITES.add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(seqOnceCtrl,
|
| HiveRuleCollections.prepareJobGenRules));
|
| }
|
|
|
| /**
|
| * static configurations for compiler
|
| */
|
| private HeuristicCompilerFactoryBuilder builder;
|
|
|
| /**
|
| * compiler
|
| */
|
| private ICompiler compiler;
|
|
|
| /**
|
| * physical optimization config
|
| */
|
| private PhysicalOptimizationConfig physicalOptimizationConfig;
|
|
|
| /**
|
| * final ending operators
|
| */
|
| private List<Operator> leaveOps = new ArrayList<Operator>();
|
|
|
| /**
|
| * tasks that are already visited
|
| */
|
| private Map<Task<? extends Serializable>, Boolean> tasksVisited = new HashMap<Task<? extends Serializable>, Boolean>();
|
|
|
| /**
|
| * hyracks job spec
|
| */
|
| private JobSpecification jobSpec;
|
|
|
| /**
|
| * hive configuration
|
| */
|
| private HiveConf conf;
|
|
|
| /**
|
| * plan printer
|
| */
|
| private PrintWriter planPrinter;
|
|
|
| /**
|
| * properties
|
| */
|
| private Properties clusterProps;
|
|
|
| /**
|
| * the Hyracks client connection
|
| */
|
| private IHyracksClientConnection hcc;
|
|
|
| public HyracksExecutionEngine(HiveConf conf) {
|
| this.conf = conf;
|
| init(conf);
|
| }
|
|
|
| public HyracksExecutionEngine(HiveConf conf, PrintWriter planPrinter) {
|
| this.conf = conf;
|
| this.planPrinter = planPrinter;
|
| init(conf);
|
| }
|
|
|
| private void init(HiveConf conf) {
|
| builder = new HeuristicCompilerFactoryBuilder(DefaultOptimizationContextFactory.INSTANCE);
|
| builder.setLogicalRewrites(DEFAULT_LOGICAL_REWRITES);
|
| builder.setPhysicalRewrites(DEFAULT_PHYSICAL_REWRITES);
|
| builder.setIMergeAggregationExpressionFactory(HiveMergeAggregationExpressionFactory.INSTANCE);
|
| builder.setExpressionTypeComputer(HiveExpressionTypeComputer.INSTANCE);
|
| builder.setNullableTypeComputer(HiveNullableTypeComputer.INSTANCE);
|
|
|
| long memSizeExternalGby = conf.getLong("hive.algebricks.groupby.external.memory", 268435456);
|
| long memSizeExternalSort = conf.getLong("hive.algebricks.sort.memory", 536870912);
|
| int frameSize = conf.getInt("hive.algebricks.framesize", 32768);
|
|
|
| physicalOptimizationConfig = new PhysicalOptimizationConfig();
|
| int frameLimitExtGby = (int) (memSizeExternalGby / frameSize);
|
| physicalOptimizationConfig.setMaxFramesExternalGroupBy(frameLimitExtGby);
|
| int frameLimitExtSort = (int) (memSizeExternalSort / frameSize);
|
| physicalOptimizationConfig.setMaxFramesExternalSort(frameLimitExtSort);
|
| builder.setPhysicalOptimizationConfig(physicalOptimizationConfig);
|
| }
|
|
|
| @Override
|
| public int compileJob(List<Task<? extends Serializable>> rootTasks) {
|
| // clean up
|
| leaveOps.clear();
|
| tasksVisited.clear();
|
| jobSpec = null;
|
|
|
| HashMap<String, PartitionDesc> aliasToPath = new HashMap<String, PartitionDesc>();
|
| List<Operator> rootOps = generateRootOperatorDAG(rootTasks, aliasToPath);
|
|
|
| // get all leave Ops
|
| getLeaves(rootOps, leaveOps);
|
|
|
| HiveAlgebricksTranslator translator = new HiveAlgebricksTranslator();
|
| try {
|
| translator.translate(rootOps, null, aliasToPath);
|
|
|
| ILogicalPlan plan = translator.genLogicalPlan();
|
|
|
| if (plan.getRoots() != null && plan.getRoots().size() > 0 && plan.getRoots().get(0).getValue() != null) {
|
| translator.printOperators();
|
| ILogicalPlanAndMetadata planAndMetadata = new HiveLogicalPlanAndMetaData(plan,
|
| translator.getMetadataProvider());
|
|
|
| ICompilerFactory compilerFactory = builder.create();
|
| compiler = compilerFactory.createCompiler(planAndMetadata.getPlan(),
|
| planAndMetadata.getMetadataProvider(), translator.getVariableCounter());
|
|
|
| // run optimization and re-writing rules for Hive plan
|
| compiler.optimize();
|
|
|
| // print optimized plan
|
| LogicalOperatorPrettyPrintVisitor pvisitor = new LogicalOperatorPrettyPrintVisitor();
|
| StringBuilder buffer = new StringBuilder();
|
| PlanPrettyPrinter.printPlan(plan, buffer, pvisitor, 0);
|
| String planStr = buffer.toString();
|
| System.out.println(planStr);
|
|
|
| if (planPrinter != null)
|
| planPrinter.print(planStr);
|
| }
|
| } catch (Exception e) {
|
| e.printStackTrace();
|
| return 1;
|
| }
|
|
|
| return 0;
|
| }
|
|
|
| private void codeGen() throws AlgebricksException {
|
| try {
|
| // number of cpu cores in the cluster
|
| builder.setClusterLocations(new AlgebricksAbsolutePartitionConstraint(ConfUtil.getNCs()));
|
| } catch (Exception e) {
|
| throw new AlgebricksException(e);
|
| }
|
| // builder.setClusterTopology(ConfUtil.getClusterTopology());
|
| builder.setBinaryBooleanInspectorFactory(HiveBinaryBooleanInspectorFactory.INSTANCE);
|
| builder.setBinaryIntegerInspectorFactory(HiveBinaryIntegerInspectorFactory.INSTANCE);
|
| builder.setComparatorFactoryProvider(HiveBinaryComparatorFactoryProvider.INSTANCE);
|
| builder.setExpressionRuntimeProvider(HiveExpressionRuntimeProvider.INSTANCE);
|
| builder.setHashFunctionFactoryProvider(HiveBinaryHashFunctionFactoryProvider.INSTANCE);
|
| builder.setPrinterProvider(HivePrinterFactoryProvider.INSTANCE);
|
| builder.setSerializerDeserializerProvider(HiveSerializerDeserializerProvider.INSTANCE);
|
| builder.setNullWriterFactory(HiveNullWriterFactory.INSTANCE);
|
| builder.setNormalizedKeyComputerFactoryProvider(HiveNormalizedKeyComputerFactoryProvider.INSTANCE);
|
| builder.setPartialAggregationTypeComputer(HivePartialAggregationTypeComputer.INSTANCE);
|
| builder.setTypeTraitProvider(HiveTypeTraitProvider.INSTANCE);
|
| builder.setHashFunctionFamilyProvider(HiveBinaryHashFunctionFamilyProvider.INSTANCE);
|
|
|
| jobSpec = compiler.createJob(null, null);
|
|
|
| // set the policy
|
| String policyStr = conf.get("hive.hyracks.connectorpolicy");
|
| if (policyStr == null)
|
| policyStr = "PIPELINING";
|
| Policy policyValue = Policy.valueOf(policyStr);
|
| jobSpec.setConnectorPolicyAssignmentPolicy(new HiveConnectorPolicyAssignmentPolicy(policyValue));
|
| jobSpec.setUseConnectorPolicyForScheduling(false);
|
| }
|
|
|
| @Override
|
| public int executeJob() {
|
| try {
|
| codeGen();
|
| executeHyracksJob(jobSpec);
|
| } catch (Exception e) {
|
| e.printStackTrace();
|
| return 1;
|
| }
|
| return 0;
|
| }
|
|
|
| private List<Operator> generateRootOperatorDAG(List<Task<? extends Serializable>> rootTasks,
|
| HashMap<String, PartitionDesc> aliasToPath) {
|
|
|
| List<Operator> rootOps = new ArrayList<Operator>();
|
| List<Task<? extends Serializable>> toDelete = new ArrayList<Task<? extends Serializable>>();
|
| tasksVisited.clear();
|
|
|
| for (int i = rootTasks.size() - 1; i >= 0; i--) {
|
| /**
|
| * list of map-reduce tasks
|
| */
|
| Task<? extends Serializable> task = rootTasks.get(i);
|
|
|
| if (task instanceof MapRedTask) {
|
| List<Operator> mapRootOps = articulateMapReduceOperators(task, rootOps, aliasToPath, rootTasks);
|
| if (i == 0)
|
| rootOps.addAll(mapRootOps);
|
| else {
|
| List<Operator> leaves = new ArrayList<Operator>();
|
| getLeaves(rootOps, leaves);
|
|
|
| List<Operator> mapChildren = new ArrayList<Operator>();
|
| for (Operator childMap : mapRootOps) {
|
| if (childMap instanceof TableScanOperator) {
|
| TableScanDesc topDesc = (TableScanDesc) childMap.getConf();
|
| if (topDesc == null)
|
| mapChildren.add(childMap);
|
| else {
|
| rootOps.add(childMap);
|
| }
|
| } else
|
| mapChildren.add(childMap);
|
| }
|
|
|
| if (mapChildren.size() > 0) {
|
| for (Operator leaf : leaves)
|
| leaf.setChildOperators(mapChildren);
|
| for (Operator child : mapChildren)
|
| child.setParentOperators(leaves);
|
| }
|
| }
|
|
|
| MapredWork mr = (MapredWork) task.getWork();
|
| HashMap<String, PartitionDesc> map = mr.getAliasToPartnInfo();
|
|
|
| addAliasToPartition(aliasToPath, map);
|
| toDelete.add(task);
|
| }
|
| }
|
|
|
| for (Task<? extends Serializable> task : toDelete)
|
| rootTasks.remove(task);
|
|
|
| return rootOps;
|
| }
|
|
|
| private void addAliasToPartition(HashMap<String, PartitionDesc> aliasToPath, HashMap<String, PartitionDesc> map) {
|
| Iterator<String> keys = map.keySet().iterator();
|
| while (keys.hasNext()) {
|
| String key = keys.next();
|
| PartitionDesc part = map.get(key);
|
| String[] names = key.split(":");
|
| for (String name : names) {
|
| aliasToPath.put(name, part);
|
| }
|
| }
|
| }
|
|
|
| private List<Operator> articulateMapReduceOperators(Task task, List<Operator> rootOps,
|
| HashMap<String, PartitionDesc> aliasToPath, List<Task<? extends Serializable>> rootTasks) {
|
| // System.out.println("!"+task.getName());
|
| if (!(task instanceof MapRedTask)) {
|
| if (!(task instanceof ConditionalTask)) {
|
| rootTasks.add(task);
|
| return null;
|
| } else {
|
| // remove map-reduce branches in condition task
|
| ConditionalTask condition = (ConditionalTask) task;
|
| List<Task<? extends Serializable>> branches = condition.getListTasks();
|
| for (int i = branches.size() - 1; i >= 0; i--) {
|
| Task branch = branches.get(i);
|
| if (branch instanceof MapRedTask) {
|
| return articulateMapReduceOperators(branch, rootOps, aliasToPath, rootTasks);
|
| }
|
| }
|
| rootTasks.add(task);
|
| return null;
|
| }
|
| }
|
|
|
| MapredWork mr = (MapredWork) task.getWork();
|
| HashMap<String, PartitionDesc> map = mr.getAliasToPartnInfo();
|
|
|
| // put all aliasToParitionDesc mapping into the map
|
| addAliasToPartition(aliasToPath, map);
|
|
|
| MapRedTask mrtask = (MapRedTask) task;
|
| MapredWork work = (MapredWork) mrtask.getWork();
|
| HashMap<String, Operator<? extends Serializable>> operators = work.getAliasToWork();
|
|
|
| Set entries = operators.entrySet();
|
| Iterator<Entry<String, Operator>> iterator = entries.iterator();
|
| List<Operator> mapRootOps = new ArrayList<Operator>();
|
|
|
| // get map root operators
|
| while (iterator.hasNext()) {
|
| Operator next = iterator.next().getValue();
|
| if (!mapRootOps.contains(next)) {
|
| // clear that only for the case of union
|
| mapRootOps.add(next);
|
| }
|
| }
|
|
|
| // get map local work
|
| MapredLocalWork localWork = work.getMapLocalWork();
|
| if (localWork != null) {
|
| HashMap<String, Operator<? extends Serializable>> localOperators = localWork.getAliasToWork();
|
|
|
| Set localEntries = localOperators.entrySet();
|
| Iterator<Entry<String, Operator>> localIterator = localEntries.iterator();
|
| while (localIterator.hasNext()) {
|
| mapRootOps.add(localIterator.next().getValue());
|
| }
|
|
|
| HashMap<String, FetchWork> localFetch = localWork.getAliasToFetchWork();
|
| Set localFetchEntries = localFetch.entrySet();
|
| Iterator<Entry<String, FetchWork>> localFetchIterator = localFetchEntries.iterator();
|
| while (localFetchIterator.hasNext()) {
|
| Entry<String, FetchWork> fetchMap = localFetchIterator.next();
|
| FetchWork fetch = fetchMap.getValue();
|
| String alias = fetchMap.getKey();
|
| List<PartitionDesc> dirPart = fetch.getPartDesc();
|
|
|
| // temporary hack: put the first partitionDesc into the map
|
| aliasToPath.put(alias, dirPart.get(0));
|
| }
|
| }
|
|
|
| Boolean visited = tasksVisited.get(task);
|
| if (visited != null && visited.booleanValue() == true) {
|
| return mapRootOps;
|
| }
|
|
|
| // do that only for union operator
|
| for (Operator op : mapRootOps)
|
| if (op.getParentOperators() != null)
|
| op.getParentOperators().clear();
|
|
|
| List<Operator> mapLeaves = new ArrayList<Operator>();
|
| downToLeaves(mapRootOps, mapLeaves);
|
| List<Operator> reduceOps = new ArrayList<Operator>();
|
|
|
| if (work.getReducer() != null)
|
| reduceOps.add(work.getReducer());
|
|
|
| for (Operator mapLeaf : mapLeaves) {
|
| mapLeaf.setChildOperators(reduceOps);
|
| }
|
|
|
| for (Operator reduceOp : reduceOps) {
|
| if (reduceOp != null)
|
| reduceOp.setParentOperators(mapLeaves);
|
| }
|
|
|
| List<Operator> leafs = new ArrayList<Operator>();
|
| if (reduceOps.size() > 0) {
|
| downToLeaves(reduceOps, leafs);
|
| } else {
|
| leafs = mapLeaves;
|
| }
|
|
|
| List<Operator> mapChildren = new ArrayList<Operator>();
|
| if (task.getChildTasks() != null && task.getChildTasks().size() > 0) {
|
| for (Object child : task.getChildTasks()) {
|
| List<Operator> childMapOps = articulateMapReduceOperators((Task) child, rootOps, aliasToPath, rootTasks);
|
| if (childMapOps == null)
|
| continue;
|
|
|
| for (Operator childMap : childMapOps) {
|
| if (childMap instanceof TableScanOperator) {
|
| TableScanDesc topDesc = (TableScanDesc) childMap.getConf();
|
| if (topDesc == null)
|
| mapChildren.add(childMap);
|
| else {
|
| rootOps.add(childMap);
|
| }
|
| } else {
|
| // if not table scan, add the child
|
| mapChildren.add(childMap);
|
| }
|
| }
|
| }
|
|
|
| if (mapChildren.size() > 0) {
|
| int i = 0;
|
| for (Operator leaf : leafs) {
|
| if (leaf.getChildOperators() == null || leaf.getChildOperators().size() == 0)
|
| leaf.setChildOperators(new ArrayList<Operator>());
|
| leaf.getChildOperators().add(mapChildren.get(i));
|
| i++;
|
| }
|
| i = 0;
|
| for (Operator child : mapChildren) {
|
| if (child.getParentOperators() == null || child.getParentOperators().size() == 0)
|
| child.setParentOperators(new ArrayList<Operator>());
|
| child.getParentOperators().add(leafs.get(i));
|
| i++;
|
| }
|
| }
|
| }
|
|
|
| // mark this task as visited
|
| this.tasksVisited.put(task, true);
|
| return mapRootOps;
|
| }
|
|
|
| /**
|
| * down to leaf nodes
|
| *
|
| * @param ops
|
| * @param leaves
|
| */
|
| private void downToLeaves(List<Operator> ops, List<Operator> leaves) {
|
|
|
| // Operator currentOp;
|
| for (Operator op : ops) {
|
| if (op != null && op.getChildOperators() != null && op.getChildOperators().size() > 0) {
|
| downToLeaves(op.getChildOperators(), leaves);
|
| } else {
|
| if (op != null && leaves.indexOf(op) < 0)
|
| leaves.add(op);
|
| }
|
| }
|
| }
|
|
|
| private void getLeaves(List<Operator> roots, List<Operator> currentLeaves) {
|
| for (Operator op : roots) {
|
| List<Operator> children = op.getChildOperators();
|
| if (children == null || children.size() <= 0) {
|
| currentLeaves.add(op);
|
| } else {
|
| getLeaves(children, currentLeaves);
|
| }
|
| }
|
| }
|
|
|
| private void executeHyracksJob(JobSpecification job) throws Exception {
|
|
|
| /**
|
| * load the properties file if it is not loaded
|
| */
|
| if (clusterProps == null) {
|
| clusterProps = new Properties();
|
| InputStream confIn = new FileInputStream(clusterPropertiesPath);
|
| clusterProps.load(confIn);
|
| confIn.close();
|
| }
|
|
|
| if (hcc == null) {
|
| BufferedReader ipReader = new BufferedReader(new InputStreamReader(new FileInputStream(masterFilePath)));
|
| String masterNode = ipReader.readLine();
|
| ipReader.close();
|
|
|
| InetAddress[] ips = InetAddress.getAllByName(masterNode);
|
| int port = Integer.parseInt(clusterProps.getProperty("CC_CLIENTPORT"));
|
| for (InetAddress ip : ips) {
|
| if (ip.getAddress().length <= 4) {
|
| try {
|
| hcc = new HyracksConnection(ip.getHostAddress(), port);
|
| break;
|
| } catch (Exception e) {
|
| continue;
|
| }
|
| }
|
| }
|
| }
|
|
|
| long start = System.currentTimeMillis();
|
| JobId jobId = hcc.startJob(job);
|
| hcc.waitForCompletion(jobId);
|
|
|
| // System.out.println("job finished: " + jobId.toString());
|
| // call all leave nodes to end
|
| for (Operator leaf : leaveOps) {
|
| jobClose(leaf);
|
| }
|
|
|
| long end = System.currentTimeMillis();
|
| System.err.println(start + " " + end + " " + (end - start));
|
| }
|
|
|
| /**
|
| * mv to final directory on hdfs (not real final)
|
| *
|
| * @param leaf
|
| * @throws Exception
|
| */
|
| private void jobClose(Operator leaf) throws Exception {
|
| FileSinkOperator fsOp = (FileSinkOperator) leaf;
|
| FileSinkDesc desc = fsOp.getConf();
|
| boolean isNativeTable = !desc.getTableInfo().isNonNative();
|
| if ((conf != null) && isNativeTable) {
|
| String specPath = desc.getDirName();
|
| DynamicPartitionCtx dpCtx = desc.getDynPartCtx();
|
| // for 0.7.0
|
| fsOp.mvFileToFinalPath(specPath, conf, true, LOG, dpCtx);
|
| // for 0.8.0
|
| // Utilities.mvFileToFinalPath(specPath, conf, true, LOG, dpCtx,
|
| // desc);
|
| }
|
| }
|
| }
|