blob: a3cc8fc02c584392bd288a9024978e8f9953b15e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tajo.worker;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.TajoProtos;
import org.apache.tajo.TaskAttemptId;
import org.apache.tajo.TaskId;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.ipc.QueryMasterProtocol;
import org.apache.tajo.plan.serder.PlanProto;
import org.apache.tajo.pullserver.TajoPullServerService;
import org.apache.tajo.rpc.*;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
import org.apache.tajo.storage.HashShuffleAppenderManager;
import org.apache.tajo.util.Pair;
import org.apache.tajo.worker.event.ExecutionBlockErrorEvent;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.tajo.ResourceProtos.*;
import static org.apache.tajo.ipc.QueryMasterProtocol.QueryMasterProtocolService.Interface;
public class ExecutionBlockContext {
/** class logger */
private static final Log LOG = LogFactory.getLog(ExecutionBlockContext.class);
protected AtomicInteger runningTasksNum = new AtomicInteger();
protected AtomicInteger completedTasksNum = new AtomicInteger();
protected AtomicInteger succeededTasksNum = new AtomicInteger();
protected AtomicInteger killedTasksNum = new AtomicInteger();
protected AtomicInteger failedTasksNum = new AtomicInteger();
private FileSystem localFS;
// for input files
private FileSystem defaultFS;
private ExecutionBlockId executionBlockId;
private QueryContext queryContext;
private TajoWorker.WorkerContext workerContext;
private String plan;
private final ExecutionBlockSharedResource resource;
private TajoQueryEngine queryEngine;
private RpcClientManager connManager;
private AsyncRpcClient queryMasterClient;
private QueryMasterProtocol.QueryMasterProtocolService.Interface stub;
private TajoConf systemConf;
// for the doAs block
private UserGroupInformation taskOwner;
private Reporter reporter;
private AtomicBoolean stop = new AtomicBoolean();
private PlanProto.ShuffleType shuffleType;
// It keeps all of the query unit attempts while a TaskRunner is running.
private final ConcurrentMap<TaskAttemptId, Task> tasks = Maps.newConcurrentMap();
private final Map<TaskId, TaskHistory> taskHistories = Maps.newConcurrentMap();
public ExecutionBlockContext(TajoWorker.WorkerContext workerContext, ExecutionBlockContextResponse request,
AsyncRpcClient queryMasterClient)
throws IOException {
this.executionBlockId = new ExecutionBlockId(request.getExecutionBlockId());
this.connManager = RpcClientManager.getInstance();
this.systemConf = workerContext.getConf();
this.reporter = new Reporter();
this.defaultFS = TajoConf.getTajoRootDir(systemConf).getFileSystem(systemConf);
this.localFS = FileSystem.getLocal(systemConf);
// Setup QueryEngine according to the query plan
// Here, we can setup row-based query engine or columnar query engine.
this.queryEngine = new TajoQueryEngine(systemConf);
this.queryContext = new QueryContext(workerContext.getConf(), request.getQueryContext());
this.plan = request.getPlanJson();
this.resource = new ExecutionBlockSharedResource();
this.workerContext = workerContext;
this.shuffleType = request.getShuffleType();
this.queryMasterClient = queryMasterClient;
}
public void init() throws Throwable {
LOG.info("Tajo Root Dir: " + systemConf.getVar(TajoConf.ConfVars.ROOT_DIR));
LOG.info("Worker Local Dir: " + systemConf.getVar(TajoConf.ConfVars.WORKER_TEMPORAL_DIR));
UserGroupInformation.setConfiguration(systemConf);
// TODO - 'load credential' should be implemented
// Getting taskOwner
UserGroupInformation
taskOwner = UserGroupInformation.createRemoteUser(systemConf.getVar(TajoConf.ConfVars.USERNAME));
// initialize DFS and LocalFileSystems
this.taskOwner = taskOwner;
this.stub = queryMasterClient.getStub();
this.reporter.startReporter();
// resource intiailization
try{
this.resource.initialize(queryContext, plan);
} catch (Throwable e) {
try {
getStub().killQuery(null, executionBlockId.getQueryId().getProto(), NullCallback.get());
} catch (Throwable t) {
LOG.error(t);
}
throw e;
}
}
public ExecutionBlockSharedResource getSharedResource() {
return resource;
}
private AsyncRpcClient getRpcClient() {
return queryMasterClient;
}
public Interface getStub() {
return stub;
}
public boolean isStopped() {
return stop.get();
}
public void stop(){
if(stop.getAndSet(true)){
return;
}
try {
reporter.stop();
} catch (InterruptedException e) {
LOG.error(e);
}
// If ExecutionBlock is stopped, all running or pending tasks will be marked as failed.
for (Task task : tasks.values()) {
if (task.getTaskContext().getState() == TajoProtos.TaskAttemptState.TA_PENDING ||
task.getTaskContext().getState() == TajoProtos.TaskAttemptState.TA_RUNNING) {
try{
task.abort();
} catch (Throwable e){
LOG.error(e, e);
}
}
}
tasks.clear();
taskHistories.clear();
resource.release();
RpcClientManager.cleanup(queryMasterClient);
}
public TajoConf getConf() {
return systemConf;
}
public FileSystem getLocalFS() {
return localFS;
}
public LocalDirAllocator getLocalDirAllocator() {
return workerContext.getLocalDirAllocator();
}
public TajoQueryEngine getTQueryEngine() {
return queryEngine;
}
// for the local temporal dir
public Path createBaseDir() throws IOException {
// the base dir for an output dir
String baseDir = getBaseOutputDir(executionBlockId).toString();
Path baseDirPath = localFS.makeQualified(getLocalDirAllocator().getLocalPathForWrite(baseDir, systemConf));
return baseDirPath;
}
public static Path getBaseOutputDir(ExecutionBlockId executionBlockId) {
return TajoPullServerService.getBaseOutputDir(
executionBlockId.getQueryId().toString(), String.valueOf(executionBlockId.getId()));
}
public static Path getBaseInputDir(ExecutionBlockId executionBlockId) {
return TajoPullServerService.getBaseInputDir(executionBlockId.getQueryId().toString(), executionBlockId.toString());
}
public ExecutionBlockId getExecutionBlockId() {
return executionBlockId;
}
public Map<TaskAttemptId, Task> getTasks() {
return tasks;
}
public Task getTask(TaskAttemptId taskAttemptId){
return tasks.get(taskAttemptId);
}
public void addTaskHistory(TaskId taskId, TaskHistory taskHistory) {
taskHistories.put(taskId, taskHistory);
}
public Map<TaskId, TaskHistory> getTaskHistories() {
return taskHistories;
}
public void fatalError(TaskAttemptId taskAttemptId, String message) {
if (message == null) {
message = "No error message";
}
TaskFatalErrorReport.Builder builder = TaskFatalErrorReport.newBuilder()
.setId(taskAttemptId.getProto())
.setErrorMessage(message);
try {
//If QueryMaster does not responding, current execution block should be stop
CallFuture<PrimitiveProtos.NullProto> callFuture = new CallFuture<PrimitiveProtos.NullProto>();
getStub().fatalError(callFuture.getController(), builder.build(), callFuture);
callFuture.get(RpcConstants.FUTURE_TIMEOUT_SECONDS_DEFAULT, TimeUnit.SECONDS);
} catch (Exception e) {
getWorkerContext().getTaskManager().getDispatcher().getEventHandler()
.handle(new ExecutionBlockErrorEvent(taskAttemptId.getTaskId().getExecutionBlockId(), e));
}
}
public TajoWorker.WorkerContext getWorkerContext(){
return workerContext;
}
/**
* HASH_SHUFFLE, SCATTERED_HASH_SHUFFLE should send report when this executionBlock stopping.
*/
protected void sendShuffleReport() throws Exception {
switch (shuffleType) {
case HASH_SHUFFLE:
case SCATTERED_HASH_SHUFFLE:
sendHashShuffleReport(executionBlockId);
break;
case NONE_SHUFFLE:
case RANGE_SHUFFLE:
default:
break;
}
}
private void sendHashShuffleReport(ExecutionBlockId ebId) throws Exception {
/* This case is that worker did not ran tasks */
if(completedTasksNum.get() == 0) return;
Interface stub = getStub();
ExecutionBlockReport.Builder reporterBuilder = ExecutionBlockReport.newBuilder();
reporterBuilder.setEbId(ebId.getProto());
reporterBuilder.setReportSuccess(true);
reporterBuilder.setSucceededTasks(succeededTasksNum.get());
try {
List<IntermediateEntryProto> intermediateEntries = Lists.newArrayList();
List<HashShuffleAppenderManager.HashShuffleIntermediate> shuffles =
getWorkerContext().getHashShuffleAppenderManager().close(ebId);
if (shuffles == null) {
reporterBuilder.addAllIntermediateEntries(intermediateEntries);
CallFuture<PrimitiveProtos.NullProto> callFuture = new CallFuture<PrimitiveProtos.NullProto>();
stub.doneExecutionBlock(callFuture.getController(), reporterBuilder.build(), callFuture);
callFuture.get(RpcConstants.FUTURE_TIMEOUT_SECONDS_DEFAULT, TimeUnit.SECONDS);
return;
}
IntermediateEntryProto.Builder intermediateBuilder = IntermediateEntryProto.newBuilder();
IntermediateEntryProto.PageProto.Builder pageBuilder = IntermediateEntryProto.PageProto.newBuilder();
FailureIntermediateProto.Builder failureBuilder = FailureIntermediateProto.newBuilder();
for (HashShuffleAppenderManager.HashShuffleIntermediate eachShuffle: shuffles) {
List<IntermediateEntryProto.PageProto> pages = Lists.newArrayList();
List<FailureIntermediateProto> failureIntermediateItems = Lists.newArrayList();
for (Pair<Long, Integer> eachPage: eachShuffle.getPages()) {
pageBuilder.clear();
pageBuilder.setPos(eachPage.getFirst());
pageBuilder.setLength(eachPage.getSecond());
pages.add(pageBuilder.build());
}
for(Pair<Long, Pair<Integer, Integer>> eachFailure: eachShuffle.getFailureTskTupleIndexes()) {
failureBuilder.clear();
failureBuilder.setPagePos(eachFailure.getFirst());
failureBuilder.setStartRowNum(eachFailure.getSecond().getFirst());
failureBuilder.setEndRowNum(eachFailure.getSecond().getSecond());
failureIntermediateItems.add(failureBuilder.build());
}
intermediateBuilder.clear();
intermediateBuilder.setEbId(ebId.getProto())
.setHost(getWorkerContext().getConnectionInfo().getHost() + ":" +
getWorkerContext().getConnectionInfo().getPullServerPort())
.setTaskId(-1)
.setAttemptId(-1)
.setPartId(eachShuffle.getPartId())
.setVolume(eachShuffle.getVolume())
.addAllPages(pages)
.addAllFailures(failureIntermediateItems);
intermediateEntries.add(intermediateBuilder.build());
}
// send intermediateEntries to QueryMaster
reporterBuilder.addAllIntermediateEntries(intermediateEntries);
} catch (Throwable e) {
LOG.error(e.getMessage(), e);
reporterBuilder.setReportSuccess(false);
if (e.getMessage() == null) {
reporterBuilder.setReportErrorMessage(e.getClass().getSimpleName());
} else {
reporterBuilder.setReportErrorMessage(e.getMessage());
}
}
try {
CallFuture<PrimitiveProtos.NullProto> callFuture = new CallFuture<PrimitiveProtos.NullProto>();
stub.doneExecutionBlock(callFuture.getController(), reporterBuilder.build(), callFuture);
callFuture.get(RpcConstants.FUTURE_TIMEOUT_SECONDS_DEFAULT, TimeUnit.SECONDS);
} catch (Throwable e) {
// can't send report to query master
LOG.fatal(e.getMessage(), e);
throw new RuntimeException(e);
}
}
protected class Reporter {
private Thread reporterThread;
private static final int PROGRESS_INTERVAL = 1000;
private static final int MAX_RETRIES = 10;
public Reporter() {
this.reporterThread = new Thread(createReporterThread());
this.reporterThread.setName("Task reporter");
}
public void startReporter(){
this.reporterThread.start();
}
Runnable createReporterThread() {
return new Runnable() {
int remainingRetries = MAX_RETRIES;
@Override
public void run() {
while (!isStopped() && !Thread.interrupted()) {
try {
Interface masterStub = getStub();
if(tasks.size() == 0){
masterStub.ping(null, getExecutionBlockId().getProto(), NullCallback.get());
} else {
for (Task task : new ArrayList<Task>(tasks.values())){
if (task.getTaskContext().getState() ==
TajoProtos.TaskAttemptState.TA_RUNNING && task.isProgressChanged()) {
masterStub.statusUpdate(null, task.getReport(), NullCallback.get());
}
task.updateProgress();
}
}
} catch (Throwable t) {
LOG.error(t.getMessage(), t);
remainingRetries -=1;
if (remainingRetries == 0) {
ReflectionUtils.logThreadInfo(LOG, "Communication exception", 0);
LOG.warn("Last retry, exiting ");
throw new RuntimeException(t);
}
} finally {
if (remainingRetries > 0 && !isStopped()) {
synchronized (reporterThread) {
try {
reporterThread.wait(PROGRESS_INTERVAL);
} catch (InterruptedException e) {
}
}
}
}
}
}
};
}
public void stop() throws InterruptedException {
if (reporterThread != null) {
// Intent of the lock is to not send an interupt in the middle of an
// umbilical.ping or umbilical.statusUpdate
synchronized (reporterThread) {
//Interrupt if sleeping. Otherwise wait for the RPC call to return.
reporterThread.notifyAll();
}
}
}
}
}