blob: 64a3f127ea6c6b1e004e2cbf52c96b90366f6fdc [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hive.ql;
import java.io.DataInput;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.JavaUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.MetaStoreUtils;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Schema;
import org.apache.hadoop.hive.ql.exec.ConditionalTask;
import org.apache.hadoop.hive.ql.exec.ExecDriver;
import org.apache.hadoop.hive.ql.exec.FetchTask;
import org.apache.hadoop.hive.ql.exec.MapRedTask;
import org.apache.hadoop.hive.ql.exec.MoveTask;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.StatsTask;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.TaskFactory;
import org.apache.hadoop.hive.ql.exec.TaskResult;
import org.apache.hadoop.hive.ql.exec.TaskRunner;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.history.HiveHistory.Keys;
import org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext;
import org.apache.hadoop.hive.ql.hooks.Hook;
import org.apache.hadoop.hive.ql.hooks.HookContext;
import org.apache.hadoop.hive.ql.hooks.PostExecute;
import org.apache.hadoop.hive.ql.hooks.PreExecute;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.hive.ql.lockmgr.HiveLock;
import org.apache.hadoop.hive.ql.lockmgr.HiveLockManager;
import org.apache.hadoop.hive.ql.lockmgr.HiveLockManagerCtx;
import org.apache.hadoop.hive.ql.lockmgr.HiveLockMode;
import org.apache.hadoop.hive.ql.lockmgr.HiveLockObj;
import org.apache.hadoop.hive.ql.lockmgr.HiveLockObject;
import org.apache.hadoop.hive.ql.lockmgr.HiveLockObject.HiveLockObjectData;
import org.apache.hadoop.hive.ql.lockmgr.LockException;
import org.apache.hadoop.hive.ql.log.PerfLogger;
import org.apache.hadoop.hive.ql.metadata.AuthorizationException;
import org.apache.hadoop.hive.ql.metadata.DummyPartition;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner;
import org.apache.hadoop.hive.ql.parse.ASTNode;
import org.apache.hadoop.hive.ql.parse.AbstractSemanticAnalyzerHook;
import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer;
import org.apache.hadoop.hive.ql.parse.HiveSemanticAnalyzerHookContext;
import org.apache.hadoop.hive.ql.parse.HiveSemanticAnalyzerHookContextImpl;
import org.apache.hadoop.hive.ql.parse.ImportSemanticAnalyzer;
import org.apache.hadoop.hive.ql.parse.ParseContext;
import org.apache.hadoop.hive.ql.parse.ParseDriver;
import org.apache.hadoop.hive.ql.parse.ParseUtils;
import org.apache.hadoop.hive.ql.parse.PrunedPartitionList;
import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer;
import org.apache.hadoop.hive.ql.parse.SemanticAnalyzerFactory;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.parse.VariableSubstitution;
import org.apache.hadoop.hive.ql.plan.ConditionalResolver;
import org.apache.hadoop.hive.ql.plan.ConditionalResolverMergeFiles;
import org.apache.hadoop.hive.ql.plan.HiveOperation;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.processors.CommandProcessor;
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
import org.apache.hadoop.hive.serde2.ByteStream;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.mapred.ClusterStatus;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.util.ReflectionUtils;
import edu.uci.ics.hivesterix.runtime.exec.HyracksExecutionEngine;
import edu.uci.ics.hivesterix.runtime.exec.IExecutionEngine;
@SuppressWarnings({ "deprecation", "unchecked", "rawtypes" })
public class Driver implements CommandProcessor {
// hivesterix
private IExecutionEngine engine;
private boolean hivesterix = false;
private Set<Task> executedConditionalTsks = new HashSet<Task>();
static final private Log LOG = LogFactory.getLog(Driver.class.getName());
static final private LogHelper console = new LogHelper(LOG);
private static final Object compileMonitor = new Object();
private int maxRows = 100;
ByteStream.Output bos = new ByteStream.Output();
private HiveConf conf;
private DataInput resStream;
private Context ctx;
private QueryPlan plan;
private Schema schema;
private HiveLockManager hiveLockMgr;
private String errorMessage;
private String SQLState;
// A limit on the number of threads that can be launched
private int maxthreads;
private static final int SLEEP_TIME = 2000;
protected int tryCount = Integer.MAX_VALUE;
/**
* for backwards compatibility with current tests
*/
public Driver(HiveConf conf) {
this.conf = conf;
}
public Driver() {
if (SessionState.get() != null) {
conf = SessionState.get().getConf();
}
// hivesterix
engine = new HyracksExecutionEngine(conf);
}
// hivesterix: plan printer
public Driver(HiveConf conf, PrintWriter planPrinter) {
this.conf = conf;
engine = new HyracksExecutionEngine(conf, planPrinter);
}
public void clear() {
this.hivesterix = false;
this.executedConditionalTsks.clear();
}
private boolean checkLockManager() {
boolean supportConcurrency = conf.getBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY);
if (!supportConcurrency) {
return false;
}
if ((hiveLockMgr == null)) {
try {
setLockManager();
} catch (SemanticException e) {
errorMessage = "FAILED: Error in semantic analysis: " + e.getMessage();
SQLState = ErrorMsg.findSQLState(e.getMessage());
console.printError(errorMessage, "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e));
return false;
}
}
// the reason that we set the lock manager for the cxt here is because each
// query has its own ctx object. The hiveLockMgr is shared accross the
// same instance of Driver, which can run multiple queries.
ctx.setHiveLockMgr(hiveLockMgr);
return hiveLockMgr != null;
}
private void setLockManager() throws SemanticException {
boolean supportConcurrency = conf.getBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY);
if (supportConcurrency) {
String lockMgr = conf.getVar(HiveConf.ConfVars.HIVE_LOCK_MANAGER);
if ((lockMgr == null) || (lockMgr.isEmpty())) {
throw new SemanticException(ErrorMsg.LOCKMGR_NOT_SPECIFIED.getMsg());
}
try {
hiveLockMgr = (HiveLockManager) ReflectionUtils.newInstance(conf.getClassByName(lockMgr), conf);
hiveLockMgr.setContext(new HiveLockManagerCtx(conf));
} catch (Exception e) {
// set hiveLockMgr to null just in case this invalid manager got set to
// next query's ctx.
if (hiveLockMgr != null) {
try {
hiveLockMgr.close();
} catch (LockException e1) {
//nothing can do here
}
hiveLockMgr = null;
}
throw new SemanticException(ErrorMsg.LOCKMGR_NOT_INITIALIZED.getMsg() + e.getMessage());
}
}
}
public void init() {
Operator.resetId();
}
/**
* Return the status information about the Map-Reduce cluster
*/
public ClusterStatus getClusterStatus() throws Exception {
ClusterStatus cs;
try {
JobConf job = new JobConf(conf, ExecDriver.class);
JobClient jc = new JobClient(job);
cs = jc.getClusterStatus();
} catch (Exception e) {
e.printStackTrace();
throw e;
}
LOG.info("Returning cluster status: " + cs.toString());
return cs;
}
public Schema getSchema() {
return schema;
}
/**
* Get a Schema with fields represented with native Hive types
*/
public static Schema getSchema(BaseSemanticAnalyzer sem, HiveConf conf) {
Schema schema = null;
// If we have a plan, prefer its logical result schema if it's
// available; otherwise, try digging out a fetch task; failing that,
// give up.
if (sem == null) {
// can't get any info without a plan
} else if (sem.getResultSchema() != null) {
List<FieldSchema> lst = sem.getResultSchema();
schema = new Schema(lst, null);
} else if (sem.getFetchTask() != null) {
FetchTask ft = sem.getFetchTask();
TableDesc td = ft.getTblDesc();
// partitioned tables don't have tableDesc set on the FetchTask. Instead
// they have a list of PartitionDesc objects, each with a table desc.
// Let's
// try to fetch the desc for the first partition and use it's
// deserializer.
if (td == null && ft.getWork() != null && ft.getWork().getPartDesc() != null) {
if (ft.getWork().getPartDesc().size() > 0) {
td = ft.getWork().getPartDesc().get(0).getTableDesc();
}
}
if (td == null) {
LOG.info("No returning schema.");
} else {
String tableName = "result";
List<FieldSchema> lst = null;
try {
lst = MetaStoreUtils.getFieldsFromDeserializer(tableName, td.getDeserializer());
} catch (Exception e) {
LOG.warn("Error getting schema: " + org.apache.hadoop.util.StringUtils.stringifyException(e));
}
if (lst != null) {
schema = new Schema(lst, null);
}
}
}
if (schema == null) {
schema = new Schema();
}
LOG.info("Returning Hive schema: " + schema);
return schema;
}
/**
* Get a Schema with fields represented with Thrift DDL types
*/
public Schema getThriftSchema() throws Exception {
Schema schema;
try {
schema = getSchema();
if (schema != null) {
List<FieldSchema> lst = schema.getFieldSchemas();
// Go over the schema and convert type to thrift type
if (lst != null) {
for (FieldSchema f : lst) {
f.setType(MetaStoreUtils.typeToThriftType(f.getType()));
}
}
}
} catch (Exception e) {
e.printStackTrace();
throw e;
}
LOG.info("Returning Thrift schema: " + schema);
return schema;
}
/**
* Return the maximum number of rows returned by getResults
*/
public int getMaxRows() {
return maxRows;
}
/**
* Set the maximum number of rows returned by getResults
*/
public void setMaxRows(int maxRows) {
this.maxRows = maxRows;
}
public boolean hasReduceTasks(List<Task<? extends Serializable>> tasks) {
if (tasks == null) {
return false;
}
boolean hasReduce = false;
for (Task<? extends Serializable> task : tasks) {
if (task.hasReduce()) {
return true;
}
hasReduce = (hasReduce || hasReduceTasks(task.getChildTasks()));
}
return hasReduce;
}
/**
* Compile a new query. Any currently-planned query associated with this Driver is discarded.
*
* @param command
* The SQL query to compile.
*/
public int compile(String command) {
return compile(command, true);
}
/**
* Hold state variables specific to each query being executed, that may not
* be consistent in the overall SessionState
*/
private static class QueryState {
private HiveOperation op;
private String cmd;
private boolean init = false;
/**
* Initialize the queryState with the query state variables
*/
public void init(HiveOperation op, String cmd) {
this.op = op;
this.cmd = cmd;
this.init = true;
}
public boolean isInitialized() {
return this.init;
}
public HiveOperation getOp() {
return this.op;
}
public String getCmd() {
return this.cmd;
}
}
public void saveSession(QueryState qs) {
SessionState oldss = SessionState.get();
if (oldss != null && oldss.getHiveOperation() != null) {
qs.init(oldss.getHiveOperation(), oldss.getCmd());
}
}
public void restoreSession(QueryState qs) {
SessionState ss = SessionState.get();
if (ss != null && qs != null && qs.isInitialized()) {
ss.setCmd(qs.getCmd());
ss.setCommandType(qs.getOp());
}
}
/**
* Compile a new query, but potentially reset taskID counter. Not resetting task counter
* is useful for generating re-entrant QL queries.
*
* @param command
* The HiveQL query to compile
* @param resetTaskIds
* Resets taskID counter if true.
* @return 0 for ok
*/
public int compile(String command, boolean resetTaskIds) {
PerfLogger perfLogger = PerfLogger.getPerfLogger();
perfLogger.PerfLogBegin(LOG, PerfLogger.COMPILE);
//holder for parent command type/string when executing reentrant queries
QueryState queryState = new QueryState();
if (plan != null) {
close();
plan = null;
}
if (resetTaskIds) {
TaskFactory.resetId();
}
saveSession(queryState);
try {
command = new VariableSubstitution().substitute(conf, command);
ctx = new Context(conf);
ctx.setTryCount(getTryCount());
ctx.setCmd(command);
ctx.setHDFSCleanup(true);
ParseDriver pd = new ParseDriver();
ASTNode tree = pd.parse(command, ctx);
tree = ParseUtils.findRootNonNullToken(tree);
BaseSemanticAnalyzer sem = SemanticAnalyzerFactory.get(conf, tree);
List<AbstractSemanticAnalyzerHook> saHooks = getHooks(HiveConf.ConfVars.SEMANTIC_ANALYZER_HOOK,
AbstractSemanticAnalyzerHook.class);
// Do semantic analysis and plan generation
if (saHooks != null) {
HiveSemanticAnalyzerHookContext hookCtx = new HiveSemanticAnalyzerHookContextImpl();
hookCtx.setConf(conf);
for (AbstractSemanticAnalyzerHook hook : saHooks) {
tree = hook.preAnalyze(hookCtx, tree);
}
sem.analyze(tree, ctx);
hookCtx.update(sem);
for (AbstractSemanticAnalyzerHook hook : saHooks) {
hook.postAnalyze(hookCtx, sem.getRootTasks());
}
} else {
sem.analyze(tree, ctx);
}
LOG.info("Semantic Analysis Completed");
// validate the plan
sem.validate();
plan = new QueryPlan(command, sem, perfLogger.getStartTime(PerfLogger.DRIVER_RUN));
// test Only - serialize the query plan and deserialize it
if ("true".equalsIgnoreCase(System.getProperty("test.serialize.qplan"))) {
String queryPlanFileName = ctx.getLocalScratchDir(true) + Path.SEPARATOR_CHAR + "queryplan.xml";
LOG.info("query plan = " + queryPlanFileName);
queryPlanFileName = new Path(queryPlanFileName).toUri().getPath();
// serialize the queryPlan
FileOutputStream fos = new FileOutputStream(queryPlanFileName);
Utilities.serializeQueryPlan(plan, fos);
fos.close();
// deserialize the queryPlan
FileInputStream fis = new FileInputStream(queryPlanFileName);
QueryPlan newPlan = Utilities.deserializeQueryPlan(fis, conf);
fis.close();
// Use the deserialized plan
plan = newPlan;
}
// initialize FetchTask right here
if (plan.getFetchTask() != null) {
plan.getFetchTask().initialize(conf, plan, null);
}
// get the output schema
schema = getSchema(sem, conf);
//do the authorization check
if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_AUTHORIZATION_ENABLED)) {
try {
perfLogger.PerfLogBegin(LOG, PerfLogger.DO_AUTHORIZATION);
doAuthorization(sem);
} catch (AuthorizationException authExp) {
console.printError("Authorization failed:" + authExp.getMessage()
+ ". Use show grant to get more details.");
return 403;
} finally {
perfLogger.PerfLogEnd(LOG, PerfLogger.DO_AUTHORIZATION);
}
}
//restore state after we're done executing a specific query
// hyracks run
if (sem instanceof SemanticAnalyzer && command.toLowerCase().indexOf("create") < 0) {
int engineRet = engine.compileJob(sem.getRootTasks());
if (engineRet == 0) {
hivesterix = true;
}
}
return 0;
} catch (Exception e) {
ErrorMsg error = ErrorMsg.getErrorMsg(e.getMessage());
errorMessage = "FAILED: " + e.getClass().getSimpleName();
if (error != ErrorMsg.GENERIC_ERROR) {
errorMessage += " [Error " + error.getErrorCode() + "]:";
}
errorMessage += " " + e.getMessage();
SQLState = error.getSQLState();
console.printError(errorMessage, "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e));
return error.getErrorCode();
} finally {
perfLogger.PerfLogEnd(LOG, PerfLogger.COMPILE);
restoreSession(queryState);
}
}
private void doAuthorization(BaseSemanticAnalyzer sem) throws HiveException, AuthorizationException {
HashSet<ReadEntity> inputs = sem.getInputs();
HashSet<WriteEntity> outputs = sem.getOutputs();
SessionState ss = SessionState.get();
HiveOperation op = ss.getHiveOperation();
Hive db = sem.getDb();
if (op != null) {
if (op.equals(HiveOperation.CREATETABLE_AS_SELECT) || op.equals(HiveOperation.CREATETABLE)) {
ss.getAuthorizer().authorize(db.getDatabase(db.getCurrentDatabase()), null,
HiveOperation.CREATETABLE_AS_SELECT.getOutputRequiredPrivileges());
} else {
if (op.equals(HiveOperation.IMPORT)) {
ImportSemanticAnalyzer isa = (ImportSemanticAnalyzer) sem;
if (!isa.existsTable()) {
ss.getAuthorizer().authorize(db.getDatabase(db.getCurrentDatabase()), null,
HiveOperation.CREATETABLE_AS_SELECT.getOutputRequiredPrivileges());
}
}
}
if (outputs != null && outputs.size() > 0) {
for (WriteEntity write : outputs) {
if (write.getType() == WriteEntity.Type.PARTITION) {
Partition part = db.getPartition(write.getTable(), write.getPartition().getSpec(), false);
if (part != null) {
ss.getAuthorizer().authorize(write.getPartition(), null, op.getOutputRequiredPrivileges());
continue;
}
}
if (write.getTable() != null) {
ss.getAuthorizer().authorize(write.getTable(), null, op.getOutputRequiredPrivileges());
}
}
}
}
if (inputs != null && inputs.size() > 0) {
Map<Table, List<String>> tab2Cols = new HashMap<Table, List<String>>();
Map<Partition, List<String>> part2Cols = new HashMap<Partition, List<String>>();
Map<String, Boolean> tableUsePartLevelAuth = new HashMap<String, Boolean>();
for (ReadEntity read : inputs) {
Table tbl = read.getTable();
if ((read.getPartition() != null) || (tbl.isPartitioned())) {
String tblName = tbl.getTableName();
if (tableUsePartLevelAuth.get(tblName) == null) {
boolean usePartLevelPriv = (tbl.getParameters().get("PARTITION_LEVEL_PRIVILEGE") != null && ("TRUE"
.equalsIgnoreCase(tbl.getParameters().get("PARTITION_LEVEL_PRIVILEGE"))));
if (usePartLevelPriv) {
tableUsePartLevelAuth.put(tblName, Boolean.TRUE);
} else {
tableUsePartLevelAuth.put(tblName, Boolean.FALSE);
}
}
}
}
if (op.equals(HiveOperation.CREATETABLE_AS_SELECT) || op.equals(HiveOperation.QUERY)) {
SemanticAnalyzer querySem = (SemanticAnalyzer) sem;
ParseContext parseCtx = querySem.getParseContext();
Map<TableScanOperator, Table> tsoTopMap = parseCtx.getTopToTable();
for (Map.Entry<String, Operator<? extends OperatorDesc>> topOpMap : querySem.getParseContext()
.getTopOps().entrySet()) {
Operator<? extends OperatorDesc> topOp = topOpMap.getValue();
if (topOp instanceof TableScanOperator && tsoTopMap.containsKey(topOp)) {
TableScanOperator tableScanOp = (TableScanOperator) topOp;
Table tbl = tsoTopMap.get(tableScanOp);
List<Integer> neededColumnIds = tableScanOp.getNeededColumnIDs();
List<FieldSchema> columns = tbl.getCols();
List<String> cols = new ArrayList<String>();
if (neededColumnIds != null && neededColumnIds.size() > 0) {
for (int i = 0; i < neededColumnIds.size(); i++) {
cols.add(columns.get(neededColumnIds.get(i)).getName());
}
} else {
for (int i = 0; i < columns.size(); i++) {
cols.add(columns.get(i).getName());
}
}
//map may not contain all sources, since input list may have been optimized out
//or non-existent tho such sources may still be referenced by the TableScanOperator
//if it's null then the partition probably doesn't exist so let's use table permission
if (tbl.isPartitioned() && tableUsePartLevelAuth.get(tbl.getTableName()) == Boolean.TRUE) {
String alias_id = topOpMap.getKey();
PrunedPartitionList partsList = PartitionPruner.prune(parseCtx.getTopToTable().get(topOp),
parseCtx.getOpToPartPruner().get(topOp), parseCtx.getConf(), alias_id,
parseCtx.getPrunedPartitions());
Set<Partition> parts = new HashSet<Partition>();
parts.addAll(partsList.getConfirmedPartns());
parts.addAll(partsList.getUnknownPartns());
for (Partition part : parts) {
List<String> existingCols = part2Cols.get(part);
if (existingCols == null) {
existingCols = new ArrayList<String>();
}
existingCols.addAll(cols);
part2Cols.put(part, existingCols);
}
} else {
List<String> existingCols = tab2Cols.get(tbl);
if (existingCols == null) {
existingCols = new ArrayList<String>();
}
existingCols.addAll(cols);
tab2Cols.put(tbl, existingCols);
}
}
}
}
// cache the results for table authorization
Set<String> tableAuthChecked = new HashSet<String>();
for (ReadEntity read : inputs) {
Table tbl = read.getTable();
if (read.getPartition() != null) {
Partition partition = read.getPartition();
tbl = partition.getTable();
// use partition level authorization
if (tableUsePartLevelAuth.get(tbl.getTableName()) == Boolean.TRUE) {
List<String> cols = part2Cols.get(partition);
if (cols != null && cols.size() > 0) {
ss.getAuthorizer().authorize(partition.getTable(), partition, cols,
op.getInputRequiredPrivileges(), null);
} else {
ss.getAuthorizer().authorize(partition, op.getInputRequiredPrivileges(), null);
}
continue;
}
}
// if we reach here, it means it needs to do a table authorization
// check, and the table authorization may already happened because of other
// partitions
if (tbl != null && !tableAuthChecked.contains(tbl.getTableName())
&& !(tableUsePartLevelAuth.get(tbl.getTableName()) == Boolean.TRUE)) {
List<String> cols = tab2Cols.get(tbl);
if (cols != null && cols.size() > 0) {
ss.getAuthorizer().authorize(tbl, null, cols, op.getInputRequiredPrivileges(), null);
} else {
ss.getAuthorizer().authorize(tbl, op.getInputRequiredPrivileges(), null);
}
tableAuthChecked.add(tbl.getTableName());
}
}
}
}
/**
* @return The current query plan associated with this Driver, if any.
*/
public QueryPlan getPlan() {
return plan;
}
/**
* @param t
* The table to be locked
* @param p
* The partition to be locked
* @param mode
* The mode of the lock (SHARED/EXCLUSIVE) Get the list of objects to be locked. If a
* partition needs to be locked (in any mode), all its parents should also be locked in
* SHARED mode.
**/
private List<HiveLockObj> getLockObjects(Table t, Partition p, HiveLockMode mode) throws SemanticException {
List<HiveLockObj> locks = new LinkedList<HiveLockObj>();
HiveLockObjectData lockData = new HiveLockObjectData(plan.getQueryId(), String.valueOf(System
.currentTimeMillis()), "IMPLICIT", plan.getQueryStr());
if (t != null) {
locks.add(new HiveLockObj(new HiveLockObject(t, lockData), mode));
mode = HiveLockMode.SHARED;
locks.add(new HiveLockObj(new HiveLockObject(t.getDbName(), lockData), mode));
return locks;
}
if (p != null) {
if (!(p instanceof DummyPartition)) {
locks.add(new HiveLockObj(new HiveLockObject(p, lockData), mode));
}
// All the parents are locked in shared mode
mode = HiveLockMode.SHARED;
// For dummy partitions, only partition name is needed
String name = p.getName();
if (p instanceof DummyPartition) {
name = p.getName().split("@")[2];
}
String partialName = "";
String[] partns = name.split("/");
int len = p instanceof DummyPartition ? partns.length : partns.length - 1;
Map<String, String> partialSpec = new LinkedHashMap<String, String>();
for (int idx = 0; idx < len; idx++) {
String partn = partns[idx];
partialName += partn;
String[] nameValue = partn.split("=");
assert (nameValue.length == 2);
partialSpec.put(nameValue[0], nameValue[1]);
try {
locks.add(new HiveLockObj(new HiveLockObject(new DummyPartition(p.getTable(), p.getTable()
.getDbName() + "/" + p.getTable().getTableName() + "/" + partialName, partialSpec),
lockData), mode));
partialName += "/";
} catch (HiveException e) {
throw new SemanticException(e.getMessage());
}
}
locks.add(new HiveLockObj(new HiveLockObject(p.getTable(), lockData), mode));
locks.add(new HiveLockObj(new HiveLockObject(p.getTable().getDbName(), lockData), mode));
}
return locks;
}
/**
* Acquire read and write locks needed by the statement. The list of objects to be locked are
* obtained from he inputs and outputs populated by the compiler. The lock acuisition scheme is
* pretty simple. If all the locks cannot be obtained, error out. Deadlock is avoided by making
* sure that the locks are lexicographically sorted.
**/
public int acquireReadWriteLocks() {
PerfLogger perfLogger = PerfLogger.getPerfLogger();
perfLogger.PerfLogBegin(LOG, PerfLogger.ACQUIRE_READ_WRITE_LOCKS);
try {
boolean supportConcurrency = conf.getBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY);
if (!supportConcurrency) {
return 0;
}
List<HiveLockObj> lockObjects = new ArrayList<HiveLockObj>();
// Sort all the inputs, outputs.
// If a lock needs to be acquired on any partition, a read lock needs to be acquired on all
// its parents also
for (ReadEntity input : plan.getInputs()) {
if (input.getType() == ReadEntity.Type.TABLE) {
lockObjects.addAll(getLockObjects(input.getTable(), null, HiveLockMode.SHARED));
} else {
lockObjects.addAll(getLockObjects(null, input.getPartition(), HiveLockMode.SHARED));
}
}
for (WriteEntity output : plan.getOutputs()) {
List<HiveLockObj> lockObj = null;
if (output.getTyp() == WriteEntity.Type.TABLE) {
lockObj = getLockObjects(output.getTable(), null, output.isComplete() ? HiveLockMode.EXCLUSIVE
: HiveLockMode.SHARED);
} else if (output.getTyp() == WriteEntity.Type.PARTITION) {
lockObj = getLockObjects(null, output.getPartition(), HiveLockMode.EXCLUSIVE);
}
// In case of dynamic queries, it is possible to have incomplete dummy partitions
else if (output.getTyp() == WriteEntity.Type.DUMMYPARTITION) {
lockObj = getLockObjects(null, output.getPartition(), HiveLockMode.SHARED);
}
if (lockObj != null) {
lockObjects.addAll(lockObj);
ctx.getOutputLockObjects().put(output, lockObj);
}
}
if (lockObjects.isEmpty() && !ctx.isNeedLockMgr()) {
return 0;
}
HiveLockObjectData lockData = new HiveLockObjectData(plan.getQueryId(), String.valueOf(System
.currentTimeMillis()), "IMPLICIT", plan.getQueryStr());
// Lock the database also
try {
Hive db = Hive.get(conf);
lockObjects.add(new HiveLockObj(new HiveLockObject(db.getCurrentDatabase(), lockData),
HiveLockMode.SHARED));
} catch (HiveException e) {
throw new SemanticException(e.getMessage());
}
List<HiveLock> hiveLocks = ctx.getHiveLockMgr().lock(lockObjects, false);
if (hiveLocks == null) {
throw new SemanticException(ErrorMsg.LOCK_CANNOT_BE_ACQUIRED.getMsg());
} else {
ctx.setHiveLocks(hiveLocks);
}
return (0);
} catch (SemanticException e) {
errorMessage = "FAILED: Error in acquiring locks: " + e.getMessage();
SQLState = ErrorMsg.findSQLState(e.getMessage());
console.printError(errorMessage, "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e));
return (10);
} catch (LockException e) {
errorMessage = "FAILED: Error in acquiring locks: " + e.getMessage();
SQLState = ErrorMsg.findSQLState(e.getMessage());
console.printError(errorMessage, "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e));
return (10);
} finally {
perfLogger.PerfLogEnd(LOG, PerfLogger.ACQUIRE_READ_WRITE_LOCKS);
}
}
/**
* @param hiveLocks
* list of hive locks to be released Release all the locks specified. If some of the
* locks have already been released, ignore them
**/
private void releaseLocks(List<HiveLock> hiveLocks) {
PerfLogger perfLogger = PerfLogger.getPerfLogger();
perfLogger.PerfLogBegin(LOG, PerfLogger.RELEASE_LOCKS);
if (hiveLocks != null) {
ctx.getHiveLockMgr().releaseLocks(hiveLocks);
}
ctx.setHiveLocks(null);
perfLogger.PerfLogEnd(LOG, PerfLogger.RELEASE_LOCKS);
}
public CommandProcessorResponse run(String command) throws CommandNeedRetryException {
errorMessage = null;
SQLState = null;
if (!validateConfVariables()) {
return new CommandProcessorResponse(12, errorMessage, SQLState);
}
HiveDriverRunHookContext hookContext = new HiveDriverRunHookContextImpl(conf, command);
// Get all the driver run hooks and pre-execute them.
List<HiveDriverRunHook> driverRunHooks;
try {
driverRunHooks = getHooks(HiveConf.ConfVars.HIVE_DRIVER_RUN_HOOKS, HiveDriverRunHook.class);
for (HiveDriverRunHook driverRunHook : driverRunHooks) {
driverRunHook.preDriverRun(hookContext);
}
} catch (Exception e) {
errorMessage = "FAILED: Hive Internal Error: " + Utilities.getNameMessage(e);
SQLState = ErrorMsg.findSQLState(e.getMessage());
console.printError(errorMessage + "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e));
return new CommandProcessorResponse(12, errorMessage, SQLState);
}
// Reset the perf logger
PerfLogger perfLogger = PerfLogger.getPerfLogger(true);
perfLogger.PerfLogBegin(LOG, PerfLogger.DRIVER_RUN);
perfLogger.PerfLogBegin(LOG, PerfLogger.TIME_TO_SUBMIT);
int ret;
synchronized (compileMonitor) {
ret = compile(command);
}
if (ret != 0) {
releaseLocks(ctx.getHiveLocks());
return new CommandProcessorResponse(ret, errorMessage, SQLState);
}
boolean requireLock = false;
boolean ckLock = checkLockManager();
if (ckLock) {
boolean lockOnlyMapred = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_LOCK_MAPRED_ONLY);
if (lockOnlyMapred) {
Queue<Task<? extends Serializable>> taskQueue = new LinkedList<Task<? extends Serializable>>();
taskQueue.addAll(plan.getRootTasks());
while (taskQueue.peek() != null) {
Task<? extends Serializable> tsk = taskQueue.remove();
requireLock = requireLock || tsk.requireLock();
if (requireLock) {
break;
}
if (tsk instanceof ConditionalTask) {
taskQueue.addAll(((ConditionalTask) tsk).getListTasks());
}
if (tsk.getChildTasks() != null) {
taskQueue.addAll(tsk.getChildTasks());
}
// does not add back up task here, because back up task should be the same
// type of the original task.
}
} else {
requireLock = true;
}
}
if (requireLock) {
ret = acquireReadWriteLocks();
if (ret != 0) {
releaseLocks(ctx.getHiveLocks());
return new CommandProcessorResponse(ret, errorMessage, SQLState);
}
}
ret = execute();
if (ret != 0) {
//if needRequireLock is false, the release here will do nothing because there is no lock
releaseLocks(ctx.getHiveLocks());
return new CommandProcessorResponse(ret, errorMessage, SQLState);
}
//if needRequireLock is false, the release here will do nothing because there is no lock
releaseLocks(ctx.getHiveLocks());
perfLogger.PerfLogEnd(LOG, PerfLogger.DRIVER_RUN);
perfLogger.close(LOG, plan);
// Take all the driver run hooks and post-execute them.
try {
for (HiveDriverRunHook driverRunHook : driverRunHooks) {
driverRunHook.postDriverRun(hookContext);
}
} catch (Exception e) {
errorMessage = "FAILED: Hive Internal Error: " + Utilities.getNameMessage(e);
SQLState = ErrorMsg.findSQLState(e.getMessage());
console.printError(errorMessage + "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e));
return new CommandProcessorResponse(12, errorMessage, SQLState);
}
return new CommandProcessorResponse(ret);
}
/**
* Validate configuration variables.
*
* @return
*/
private boolean validateConfVariables() {
boolean valid = true;
if ((!conf.getBoolVar(HiveConf.ConfVars.HIVE_HADOOP_SUPPORTS_SUBDIRECTORIES))
&& ((conf.getBoolVar(HiveConf.ConfVars.HADOOPMAPREDINPUTDIRRECURSIVE))
|| (conf.getBoolVar(HiveConf.ConfVars.HIVEOPTLISTBUCKETING)) || ((conf
.getBoolVar(HiveConf.ConfVars.HIVE_OPTIMIZE_UNION_REMOVE))))) {
errorMessage = "FAILED: Hive Internal Error: " + ErrorMsg.SUPPORT_DIR_MUST_TRUE_FOR_LIST_BUCKETING.getMsg();
SQLState = ErrorMsg.findSQLState(errorMessage);
console.printError(errorMessage + "\n");
valid = false;
}
return valid;
}
/**
* Returns a set of hooks specified in a configuration variable.
* See getHooks(HiveConf.ConfVars hookConfVar, Class<T> clazz)
*
* @param hookConfVar
* @return
* @throws Exception
*/
private List<Hook> getHooks(HiveConf.ConfVars hookConfVar) throws Exception {
return getHooks(hookConfVar, Hook.class);
}
/**
* Returns the hooks specified in a configuration variable. The hooks are returned in a list in
* the order they were specified in the configuration variable.
*
* @param hookConfVar
* The configuration variable specifying a comma separated list of the hook
* class names.
* @param clazz
* The super type of the hooks.
* @return A list of the hooks cast as the type specified in clazz, in the order
* they are listed in the value of hookConfVar
* @throws Exception
*/
private <T extends Hook> List<T> getHooks(HiveConf.ConfVars hookConfVar, Class<T> clazz) throws Exception {
List<T> hooks = new ArrayList<T>();
String csHooks = conf.getVar(hookConfVar);
if (csHooks == null) {
return hooks;
}
csHooks = csHooks.trim();
if (csHooks.equals("")) {
return hooks;
}
String[] hookClasses = csHooks.split(",");
for (String hookClass : hookClasses) {
try {
T hook = (T) Class.forName(hookClass.trim(), true, JavaUtils.getClassLoader()).newInstance();
hooks.add(hook);
} catch (ClassNotFoundException e) {
console.printError(hookConfVar.varname + " Class not found:" + e.getMessage());
throw e;
}
}
return hooks;
}
public int execute() throws CommandNeedRetryException {
// execute hivesterix plan
if (hivesterix) {
hivesterix = false;
int ret = engine.executeJob();
if (ret != 0)
return ret;
}
PerfLogger perfLogger = PerfLogger.getPerfLogger();
perfLogger.PerfLogBegin(LOG, PerfLogger.DRIVER_EXECUTE);
boolean noName = StringUtils.isEmpty(conf.getVar(HiveConf.ConfVars.HADOOPJOBNAME));
int maxlen = conf.getIntVar(HiveConf.ConfVars.HIVEJOBNAMELENGTH);
String queryId = plan.getQueryId();
String queryStr = plan.getQueryStr();
conf.setVar(HiveConf.ConfVars.HIVEQUERYID, queryId);
conf.setVar(HiveConf.ConfVars.HIVEQUERYSTRING, queryStr);
conf.set("mapreduce.workflow.id", "hive_" + queryId);
conf.set("mapreduce.workflow.name", queryStr);
maxthreads = HiveConf.getIntVar(conf, HiveConf.ConfVars.EXECPARALLETHREADNUMBER);
try {
LOG.info("Starting command: " + queryStr);
plan.setStarted();
if (SessionState.get() != null) {
SessionState.get().getHiveHistory().startQuery(queryStr, conf.getVar(HiveConf.ConfVars.HIVEQUERYID));
SessionState.get().getHiveHistory().logPlanProgress(plan);
}
resStream = null;
HookContext hookContext = new HookContext(plan, conf, ctx.getPathToCS());
hookContext.setHookType(HookContext.HookType.PRE_EXEC_HOOK);
for (Hook peh : getHooks(HiveConf.ConfVars.PREEXECHOOKS)) {
if (peh instanceof ExecuteWithHookContext) {
perfLogger.PerfLogBegin(LOG, PerfLogger.PRE_HOOK + peh.getClass().getName());
((ExecuteWithHookContext) peh).run(hookContext);
perfLogger.PerfLogEnd(LOG, PerfLogger.PRE_HOOK + peh.getClass().getName());
} else if (peh instanceof PreExecute) {
perfLogger.PerfLogBegin(LOG, PerfLogger.PRE_HOOK + peh.getClass().getName());
((PreExecute) peh).run(SessionState.get(), plan.getInputs(), plan.getOutputs(), ShimLoader
.getHadoopShims().getUGIForConf(conf));
perfLogger.PerfLogEnd(LOG, PerfLogger.PRE_HOOK + peh.getClass().getName());
}
}
int jobs = Utilities.getMRTasks(plan.getRootTasks()).size();
if (jobs > 0) {
console.printInfo("Total MapReduce jobs = " + jobs);
}
if (SessionState.get() != null) {
SessionState.get().getHiveHistory()
.setQueryProperty(queryId, Keys.QUERY_NUM_TASKS, String.valueOf(jobs));
SessionState.get().getHiveHistory().setIdToTableMap(plan.getIdToTableNameMap());
}
String jobname = Utilities.abbreviate(queryStr, maxlen - 6);
// A runtime that launches runnable tasks as separate Threads through
// TaskRunners
// As soon as a task isRunnable, it is put in a queue
// At any time, at most maxthreads tasks can be running
// The main thread polls the TaskRunners to check if they have finished.
Queue<Task<? extends Serializable>> runnable = new ConcurrentLinkedQueue<Task<? extends Serializable>>();
Map<TaskResult, TaskRunner> running = new HashMap<TaskResult, TaskRunner>();
DriverContext driverCxt = new DriverContext(runnable, ctx);
ctx.setHDFSCleanup(true);
SessionState.get().setLastMapRedStatsList(new ArrayList<MapRedStats>());
SessionState.get().setStackTraces(new HashMap<String, List<List<String>>>());
SessionState.get().setLocalMapRedErrors(new HashMap<String, List<String>>());
// Add root Tasks to runnable
for (Task<? extends Serializable> tsk : plan.getRootTasks()) {
// This should never happen, if it does, it's a bug with the potential to produce
// incorrect results.
assert tsk.getParentTasks() == null || tsk.getParentTasks().isEmpty();
driverCxt.addToRunnable(tsk);
}
perfLogger.PerfLogEnd(LOG, PerfLogger.TIME_TO_SUBMIT);
// Loop while you either have tasks running, or tasks queued up
while (running.size() != 0 || runnable.peek() != null) {
// Launch upto maxthreads tasks
while (runnable.peek() != null && running.size() < maxthreads) {
Task<? extends Serializable> tsk = runnable.remove();
launchTask(tsk, queryId, noName, running, jobname, jobs, driverCxt);
}
// poll the Tasks to see which one completed
TaskResult tskRes = pollTasks(running.keySet());
TaskRunner tskRun = running.remove(tskRes);
Task<? extends Serializable> tsk = tskRun.getTask();
hookContext.addCompleteTask(tskRun);
int exitVal = tskRes.getExitVal();
if (exitVal != 0) {
if (tsk.ifRetryCmdWhenFail()) {
if (!running.isEmpty()) {
taskCleanup(running);
}
// in case we decided to run everything in local mode, restore the
// the jobtracker setting to its initial value
ctx.restoreOriginalTracker();
throw new CommandNeedRetryException();
}
Task<? extends Serializable> backupTask = tsk.getAndInitBackupTask();
if (backupTask != null) {
errorMessage = "FAILED: Execution Error, return code " + exitVal + " from "
+ tsk.getClass().getName();
ErrorMsg em = ErrorMsg.getErrorMsg(exitVal);
if (em != null) {
errorMessage += ". " + em.getMsg();
}
console.printError(errorMessage);
errorMessage = "ATTEMPT: Execute BackupTask: " + backupTask.getClass().getName();
console.printError(errorMessage);
// add backup task to runnable
if (DriverContext.isLaunchable(backupTask)) {
driverCxt.addToRunnable(backupTask);
}
continue;
} else {
hookContext.setHookType(HookContext.HookType.ON_FAILURE_HOOK);
// Get all the failure execution hooks and execute them.
for (Hook ofh : getHooks(HiveConf.ConfVars.ONFAILUREHOOKS)) {
perfLogger.PerfLogBegin(LOG, PerfLogger.FAILURE_HOOK + ofh.getClass().getName());
((ExecuteWithHookContext) ofh).run(hookContext);
perfLogger.PerfLogEnd(LOG, PerfLogger.FAILURE_HOOK + ofh.getClass().getName());
}
errorMessage = "FAILED: Execution Error, return code " + exitVal + " from "
+ tsk.getClass().getName();
ErrorMsg em = ErrorMsg.getErrorMsg(exitVal);
if (em != null) {
errorMessage += ". " + em.getMsg();
}
SQLState = "08S01";
console.printError(errorMessage);
if (!running.isEmpty()) {
taskCleanup(running);
}
// in case we decided to run everything in local mode, restore the
// the jobtracker setting to its initial value
ctx.restoreOriginalTracker();
return exitVal;
}
}
if (SessionState.get() != null) {
SessionState.get().getHiveHistory()
.setTaskProperty(queryId, tsk.getId(), Keys.TASK_RET_CODE, String.valueOf(exitVal));
SessionState.get().getHiveHistory().endTask(queryId, tsk);
}
if (tsk.getChildTasks() != null) {
for (Task<? extends Serializable> child : tsk.getChildTasks()) {
// hivesterix: don't check launchable condition
//if(DriverContext.isLaunchable(tsk)){
driverCxt.addToRunnable(child);
//}
}
}
}
// in case we decided to run everything in local mode, restore the
// the jobtracker setting to its initial value
ctx.restoreOriginalTracker();
// remove incomplete outputs.
// Some incomplete outputs may be added at the beginning, for eg: for dynamic partitions.
// remove them
HashSet<WriteEntity> remOutputs = new HashSet<WriteEntity>();
for (WriteEntity output : plan.getOutputs()) {
if (!output.isComplete()) {
remOutputs.add(output);
}
}
for (WriteEntity output : remOutputs) {
plan.getOutputs().remove(output);
}
hookContext.setHookType(HookContext.HookType.POST_EXEC_HOOK);
// Get all the post execution hooks and execute them.
for (Hook peh : getHooks(HiveConf.ConfVars.POSTEXECHOOKS)) {
if (peh instanceof ExecuteWithHookContext) {
perfLogger.PerfLogBegin(LOG, PerfLogger.POST_HOOK + peh.getClass().getName());
((ExecuteWithHookContext) peh).run(hookContext);
perfLogger.PerfLogEnd(LOG, PerfLogger.POST_HOOK + peh.getClass().getName());
} else if (peh instanceof PostExecute) {
perfLogger.PerfLogBegin(LOG, PerfLogger.POST_HOOK + peh.getClass().getName());
((PostExecute) peh)
.run(SessionState.get(), plan.getInputs(), plan.getOutputs(),
(SessionState.get() != null ? SessionState.get().getLineageState().getLineageInfo()
: null), ShimLoader.getHadoopShims().getUGIForConf(conf));
perfLogger.PerfLogEnd(LOG, PerfLogger.POST_HOOK + peh.getClass().getName());
}
}
if (SessionState.get() != null) {
SessionState.get().getHiveHistory().setQueryProperty(queryId, Keys.QUERY_RET_CODE, String.valueOf(0));
SessionState.get().getHiveHistory().printRowCount(queryId);
}
} catch (CommandNeedRetryException e) {
throw e;
} catch (Exception e) {
ctx.restoreOriginalTracker();
if (SessionState.get() != null) {
SessionState.get().getHiveHistory().setQueryProperty(queryId, Keys.QUERY_RET_CODE, String.valueOf(12));
}
// TODO: do better with handling types of Exception here
errorMessage = "FAILED: Hive Internal Error: " + Utilities.getNameMessage(e);
SQLState = "08S01";
console.printError(errorMessage + "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e));
return (12);
} finally {
if (SessionState.get() != null) {
SessionState.get().getHiveHistory().endQuery(queryId);
}
if (noName) {
conf.setVar(HiveConf.ConfVars.HADOOPJOBNAME, "");
}
perfLogger.PerfLogEnd(LOG, PerfLogger.DRIVER_EXECUTE);
if (SessionState.get().getLastMapRedStatsList() != null
&& SessionState.get().getLastMapRedStatsList().size() > 0) {
long totalCpu = 0;
console.printInfo("MapReduce Jobs Launched: ");
for (int i = 0; i < SessionState.get().getLastMapRedStatsList().size(); i++) {
console.printInfo("Job " + i + ": " + SessionState.get().getLastMapRedStatsList().get(i));
totalCpu += SessionState.get().getLastMapRedStatsList().get(i).getCpuMSec();
}
console.printInfo("Total MapReduce CPU Time Spent: " + Utilities.formatMsecToStr(totalCpu));
}
}
plan.setDone();
if (SessionState.get() != null) {
try {
SessionState.get().getHiveHistory().logPlanProgress(plan);
} catch (Exception e) {
}
}
console.printInfo("OK");
return (0);
}
/**
* Launches a new task
*
* @param tsk
* task being launched
* @param queryId
* Id of the query containing the task
* @param noName
* whether the task has a name set
* @param running
* map from taskresults to taskrunners
* @param jobname
* name of the task, if it is a map-reduce job
* @param jobs
* number of map-reduce jobs
* @param cxt
* the driver context
*/
public void launchTask(Task<? extends Serializable> tsk, String queryId, boolean noName,
Map<TaskResult, TaskRunner> running, String jobname, int jobs, DriverContext cxt) {
if (SessionState.get() != null) {
SessionState.get().getHiveHistory().startTask(queryId, tsk, tsk.getClass().getName());
}
if (tsk.isMapRedTask() && !(tsk instanceof ConditionalTask)) {
if (noName) {
conf.setVar(HiveConf.ConfVars.HADOOPJOBNAME, jobname + "(" + tsk.getId() + ")");
}
conf.set("mapreduce.workflow.node.name", tsk.getId());
Utilities.setWorkflowAdjacencies(conf, plan);
cxt.incCurJobNo(1);
console.printInfo("Launching Job " + cxt.getCurJobNo() + " out of " + jobs);
}
tsk.initialize(conf, plan, cxt);
TaskResult tskRes = new TaskResult();
TaskRunner tskRun = new TaskRunner(tsk, tskRes);
// Launch Task
//if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.EXECPARALLEL) && tsk.isMapRedTask()) {
// Launch it in the parallel mode, as a separate thread only for MR tasks
// tskRun.start();
//} else {
// tskRun.runSequential();
//}
// Launch Task: hivesterix tweak
if (tsk instanceof MapRedTask || tsk instanceof StatsTask) {
// Launch it in the parallel mode, as a separate thread only for MR
// tasks
tskRes.setRunning(false);
tskRes.setExitVal(0);
} else if (tsk instanceof ConditionalTask) {
ConditionalTask condTask = (ConditionalTask) tsk;
ConditionalResolver crs = condTask.getResolver();
if (crs instanceof ConditionalResolverMergeFiles) {
tskRes.setRunning(false);
tskRes.setExitVal(0);
if (!executedConditionalTsks.contains(tsk)) {
List<Task<? extends Serializable>> children = condTask.getListTasks();
Task<? extends Serializable> selectedBranch = null;
for (Task<? extends Serializable> branch : children) {
if (branch instanceof MoveTask) {
selectedBranch = branch;
break;
}
}
if (selectedBranch == null) {
for (int i = children.size() - 1; i >= 0; i--) {
Task<? extends Serializable> child = children.get(i);
if (child instanceof MapRedTask) {
selectedBranch = child;
break;
}
}
}
executedConditionalTsks.add(tsk);
cxt.addToRunnable(selectedBranch);
}
}
} else {
tskRun.runSequential();
}
running.put(tskRes, tskRun);
return;
}
/**
* Cleans up remaining tasks in case of failure
*/
public void taskCleanup(Map<TaskResult, TaskRunner> running) {
for (Map.Entry<TaskResult, TaskRunner> entry : running.entrySet()) {
if (entry.getKey().isRunning()) {
Task<?> task = entry.getValue().getTask();
try {
task.shutdown();
} catch (Exception e) {
console.printError("Exception on shutting down task " + task.getId() + ": " + e);
}
}
}
running.clear();
}
/**
* Polls running tasks to see if a task has ended.
*
* @param results
* Set of result objects for running tasks
* @return The result object for any completed/failed task
*/
public TaskResult pollTasks(Set<TaskResult> results) {
Iterator<TaskResult> resultIterator = results.iterator();
while (true) {
while (resultIterator.hasNext()) {
TaskResult tskRes = resultIterator.next();
if (tskRes.isRunning() == false) {
return tskRes;
}
}
// In this loop, nothing was found
// Sleep 10 seconds and restart
try {
Thread.sleep(SLEEP_TIME);
} catch (InterruptedException ie) {
// Do Nothing
;
}
resultIterator = results.iterator();
}
}
public boolean getResults(ArrayList<String> res) throws IOException, CommandNeedRetryException {
if (plan != null && plan.getFetchTask() != null) {
FetchTask ft = plan.getFetchTask();
ft.setMaxRows(maxRows);
return ft.fetch(res);
}
if (resStream == null) {
resStream = ctx.getStream();
}
if (resStream == null) {
return false;
}
int numRows = 0;
String row = null;
while (numRows < maxRows) {
if (resStream == null) {
if (numRows > 0) {
return true;
} else {
return false;
}
}
bos.reset();
Utilities.StreamStatus ss;
try {
ss = Utilities.readColumn(resStream, bos);
if (bos.getCount() > 0) {
row = new String(bos.getData(), 0, bos.getCount(), "UTF-8");
} else if (ss == Utilities.StreamStatus.TERMINATED) {
row = new String();
}
if (row != null) {
numRows++;
res.add(row);
}
} catch (IOException e) {
console.printError("FAILED: Unexpected IO exception : " + e.getMessage());
res = null;
return false;
}
if (ss == Utilities.StreamStatus.EOF) {
resStream = ctx.getStream();
}
}
return true;
}
public int getTryCount() {
return tryCount;
}
public void setTryCount(int tryCount) {
this.tryCount = tryCount;
}
public int close() {
try {
if (plan != null) {
FetchTask fetchTask = plan.getFetchTask();
if (null != fetchTask) {
try {
fetchTask.clearFetch();
} catch (Exception e) {
LOG.debug(" Exception while clearing the Fetch task ", e);
}
}
}
if (ctx != null) {
ctx.clear();
}
if (null != resStream) {
try {
((FSDataInputStream) resStream).close();
} catch (Exception e) {
LOG.debug(" Exception while closing the resStream ", e);
}
}
} catch (Exception e) {
console.printError("FAILED: Hive Internal Error: " + Utilities.getNameMessage(e) + "\n"
+ org.apache.hadoop.util.StringUtils.stringifyException(e));
return 13;
}
return 0;
}
public void destroy() {
if (ctx != null) {
releaseLocks(ctx.getHiveLocks());
}
if (hiveLockMgr != null) {
try {
hiveLockMgr.close();
} catch (LockException e) {
LOG.warn("Exception in closing hive lock manager. "
+ org.apache.hadoop.util.StringUtils.stringifyException(e));
}
}
}
public org.apache.hadoop.hive.ql.plan.api.Query getQueryPlan() throws IOException {
return plan.getQueryPlan();
}
}