blob: f43992c85d4695f6278eaa36420ef5ba331f5200 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hive.ql;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.hive.ql.exec.NodeUtils;
import org.apache.hadoop.hive.ql.exec.NodeUtils.Function;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.StatsTask;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.TaskRunner;
import org.apache.hadoop.hive.ql.exec.mr.MapRedTask;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.ReduceWork;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* DriverContext.
*
*/
public class DriverContext {
private static final Logger LOG = LoggerFactory.getLogger(Driver.class.getName());
private static final SessionState.LogHelper console = new SessionState.LogHelper(LOG);
private static final int SLEEP_TIME = 2000;
private Queue<Task<? extends Serializable>> runnable;
private Queue<TaskRunner> running;
// how many jobs have been started
private int curJobNo;
private Context ctx;
private boolean shutdown;
final Map<String, StatsTask> statsTasks = new HashMap<String, StatsTask>(1);
public DriverContext() {
}
public DriverContext(Context ctx) {
this.runnable = new ConcurrentLinkedQueue<Task<? extends Serializable>>();
this.running = new LinkedBlockingQueue<TaskRunner>();
this.ctx = ctx;
}
public synchronized boolean isShutdown() {
return shutdown;
}
public synchronized boolean isRunning() {
return !shutdown && (!running.isEmpty() || !runnable.isEmpty());
}
public synchronized void remove(Task<? extends Serializable> task) {
runnable.remove(task);
}
public synchronized void launching(TaskRunner runner) throws HiveException {
checkShutdown();
running.add(runner);
}
public synchronized Task<? extends Serializable> getRunnable(int maxthreads) throws HiveException {
checkShutdown();
if (runnable.peek() != null && running.size() < maxthreads) {
return runnable.remove();
}
return null;
}
/**
* Polls running tasks to see if a task has ended.
*
* @return The result object for any completed/failed task
*/
public synchronized TaskRunner pollFinished() throws InterruptedException {
while (!shutdown) {
Iterator<TaskRunner> it = running.iterator();
while (it.hasNext()) {
TaskRunner runner = it.next();
if (runner != null && !runner.isRunning()) {
it.remove();
return runner;
}
}
wait(SLEEP_TIME);
}
return null;
}
private void checkShutdown() throws HiveException {
if (shutdown) {
throw new HiveException("FAILED: Operation cancelled");
}
}
/**
* Cleans up remaining tasks in case of failure
*/
public synchronized void shutdown() {
LOG.debug("Shutting down query " + ctx.getCmd());
shutdown = true;
for (TaskRunner runner : running) {
if (runner.isRunning()) {
Task<?> task = runner.getTask();
LOG.warn("Shutting down task : " + task);
try {
task.shutdown();
} catch (Exception e) {
console.printError("Exception on shutting down task " + task.getId() + ": " + e);
}
Thread thread = runner.getRunner();
if (thread != null) {
thread.interrupt();
}
}
}
running.clear();
}
/**
* Checks if a task can be launched.
*
* @param tsk
* the task to be checked
* @return true if the task is launchable, false otherwise
*/
public static boolean isLaunchable(Task<? extends Serializable> tsk) {
// A launchable task is one that hasn't been queued, hasn't been
// initialized, and is runnable.
return !tsk.getQueued() && !tsk.getInitialized() && tsk.isRunnable();
}
public synchronized boolean addToRunnable(Task<? extends Serializable> tsk) throws HiveException {
if (runnable.contains(tsk)) {
return false;
}
checkShutdown();
runnable.add(tsk);
tsk.setQueued();
return true;
}
public int getCurJobNo() {
return curJobNo;
}
public Context getCtx() {
return ctx;
}
public void incCurJobNo(int amount) {
this.curJobNo = this.curJobNo + amount;
}
public void prepare(QueryPlan plan) {
// extract stats keys from StatsTask
List<Task<?>> rootTasks = plan.getRootTasks();
NodeUtils.iterateTask(rootTasks, StatsTask.class, new Function<StatsTask>() {
@Override
public void apply(StatsTask statsTask) {
statsTasks.put(statsTask.getWork().getAggKey(), statsTask);
}
});
}
public void prepare(TaskRunner runner) {
}
public void finished(TaskRunner runner) {
if (statsTasks.isEmpty() || !(runner.getTask() instanceof MapRedTask)) {
return;
}
MapRedTask mapredTask = (MapRedTask) runner.getTask();
MapWork mapWork = mapredTask.getWork().getMapWork();
ReduceWork reduceWork = mapredTask.getWork().getReduceWork();
List<Operator> operators = new ArrayList<Operator>(mapWork.getAliasToWork().values());
if (reduceWork != null) {
operators.add(reduceWork.getReducer());
}
final List<String> statKeys = new ArrayList<String>(1);
NodeUtils.iterate(operators, FileSinkOperator.class, new Function<FileSinkOperator>() {
@Override
public void apply(FileSinkOperator fsOp) {
if (fsOp.getConf().isGatherStats()) {
statKeys.add(fsOp.getConf().getStatsAggPrefix());
}
}
});
for (String statKey : statKeys) {
statsTasks.get(statKey).getWork().setSourceTask(mapredTask);
}
}
}