blob: c055d119e417751e22d0f5c17cbf51a9ec212d63 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tajo.querymaster;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.state.*;
import org.apache.tajo.*;
import org.apache.tajo.catalog.CatalogUtil;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.TableDesc;
import org.apache.tajo.catalog.TableMeta;
import org.apache.tajo.catalog.proto.CatalogProtos.PartitionDescProto;
import org.apache.tajo.catalog.statistics.ColumnStats;
import org.apache.tajo.catalog.statistics.StatisticsUtil;
import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.engine.planner.PhysicalPlannerImpl;
import org.apache.tajo.engine.planner.enforce.Enforcer;
import org.apache.tajo.engine.planner.global.DataChannel;
import org.apache.tajo.engine.planner.global.ExecutionBlock;
import org.apache.tajo.engine.planner.global.MasterPlan;
import org.apache.tajo.engine.planner.global.MasterPlan.ShuffleContext;
import org.apache.tajo.error.Errors.SerializedException;
import org.apache.tajo.exception.ErrorUtil;
import org.apache.tajo.exception.TajoException;
import org.apache.tajo.exception.TajoInternalError;
import org.apache.tajo.ipc.TajoWorkerProtocol;
import org.apache.tajo.master.TaskState;
import org.apache.tajo.master.event.*;
import org.apache.tajo.master.event.TaskAttemptToSchedulerEvent.TaskAttemptScheduleContext;
import org.apache.tajo.plan.logical.*;
import org.apache.tajo.plan.serder.PlanProto.DistinctGroupbyEnforcer.MultipleAggregationStage;
import org.apache.tajo.plan.serder.PlanProto.EnforceProperty;
import org.apache.tajo.plan.util.PlannerUtil;
import org.apache.tajo.querymaster.Task.IntermediateEntry;
import org.apache.tajo.rpc.AsyncRpcClient;
import org.apache.tajo.rpc.NullCallback;
import org.apache.tajo.rpc.RpcClientManager;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
import org.apache.tajo.storage.TablespaceManager;
import org.apache.tajo.storage.fragment.Fragment;
import org.apache.tajo.unit.StorageUnit;
import org.apache.tajo.util.KeyValueSet;
import org.apache.tajo.util.RpcParameterFactory;
import org.apache.tajo.util.SplitUtil;
import org.apache.tajo.util.TUtil;
import org.apache.tajo.util.history.StageHistory;
import org.apache.tajo.util.history.TaskHistory;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import static org.apache.tajo.ResourceProtos.*;
import static org.apache.tajo.conf.TajoConf.ConfVars;
import static org.apache.tajo.plan.serder.PlanProto.ShuffleType;
/**
* Stage plays a role in controlling an ExecutionBlock and is a finite state machine.
*/
public class Stage implements EventHandler<StageEvent> {
private static final Log LOG = LogFactory.getLog(Stage.class);
private final Properties rpcParams;
private MasterPlan masterPlan;
private ExecutionBlock block;
private int priority;
private Schema outSchema;
private TableMeta meta;
private TableStats resultStatistics;
private TableStats inputStatistics;
private EventHandler<Event> eventHandler;
private AbstractTaskScheduler taskScheduler;
private QueryMasterTask.QueryMasterTaskContext context;
private final List<String> diagnostics = new ArrayList<>();
private StageState stageState;
private long startTime;
private long finishTime;
private volatile long lastContactTime;
private Thread timeoutChecker;
private final Map<TaskId, Task> tasks = Maps.newConcurrentMap();
private final Map<Integer, InetSocketAddress> workerMap = Maps.newConcurrentMap();
private static final DiagnosticsUpdateTransition DIAGNOSTIC_UPDATE_TRANSITION = new DiagnosticsUpdateTransition();
private static final InternalErrorTransition INTERNAL_ERROR_TRANSITION = new InternalErrorTransition();
private static final TaskCompletedTransition TASK_COMPLETED_TRANSITION = new TaskCompletedTransition();
private static final StageCompleteTransition STAGE_COMPLETED_TRANSITION = new StageCompleteTransition();
private static final StageFinalizeTransition STAGE_FINALIZE_TRANSITION = new StageFinalizeTransition();
private StateMachine<StageState, StageEventType, StageEvent> stateMachine;
protected static final StateMachineFactory<Stage, StageState,
StageEventType, StageEvent> stateMachineFactory =
new StateMachineFactory<Stage, StageState,
StageEventType, StageEvent>(StageState.NEW)
// Transitions from NEW state
.addTransition(StageState.NEW,
EnumSet.of(StageState.INITED, StageState.ERROR, StageState.SUCCEEDED),
StageEventType.SQ_INIT,
new InitAndRequestContainer())
.addTransition(StageState.NEW, StageState.NEW,
StageEventType.SQ_DIAGNOSTIC_UPDATE,
DIAGNOSTIC_UPDATE_TRANSITION)
.addTransition(StageState.NEW, StageState.KILLED,
StageEventType.SQ_KILL)
.addTransition(StageState.NEW, StageState.ERROR,
StageEventType.SQ_INTERNAL_ERROR,
INTERNAL_ERROR_TRANSITION)
// Transitions from INITED state
.addTransition(StageState.INITED, StageState.RUNNING,
StageEventType.SQ_START)
.addTransition(StageState.INITED, StageState.INITED,
StageEventType.SQ_DIAGNOSTIC_UPDATE,
DIAGNOSTIC_UPDATE_TRANSITION)
.addTransition(StageState.INITED,
EnumSet.of(StageState.SUCCEEDED, StageState.FAILED),
StageEventType.SQ_STAGE_COMPLETED,
STAGE_COMPLETED_TRANSITION)
.addTransition(StageState.INITED, StageState.KILL_WAIT,
StageEventType.SQ_KILL, new KillTasksTransition())
.addTransition(StageState.INITED, StageState.ERROR,
StageEventType.SQ_INTERNAL_ERROR,
INTERNAL_ERROR_TRANSITION)
// Transitions from RUNNING state
.addTransition(StageState.RUNNING, StageState.RUNNING,
StageEventType.SQ_TASK_COMPLETED,
TASK_COMPLETED_TRANSITION)
.addTransition(StageState.RUNNING, StageState.FINALIZING,
StageEventType.SQ_SHUFFLE_REPORT,
STAGE_FINALIZE_TRANSITION)
.addTransition(StageState.RUNNING,
EnumSet.of(StageState.SUCCEEDED, StageState.FAILED),
StageEventType.SQ_STAGE_COMPLETED,
STAGE_COMPLETED_TRANSITION)
.addTransition(StageState.RUNNING, StageState.RUNNING,
StageEventType.SQ_FAILED,
TASK_COMPLETED_TRANSITION)
.addTransition(StageState.RUNNING, StageState.RUNNING,
StageEventType.SQ_DIAGNOSTIC_UPDATE,
DIAGNOSTIC_UPDATE_TRANSITION)
.addTransition(StageState.RUNNING, StageState.KILL_WAIT,
StageEventType.SQ_KILL,
new KillTasksTransition())
.addTransition(StageState.RUNNING, StageState.ERROR,
StageEventType.SQ_INTERNAL_ERROR,
INTERNAL_ERROR_TRANSITION)
// Ignore-able Transition
.addTransition(StageState.RUNNING, StageState.RUNNING,
StageEventType.SQ_START)
// Transitions from KILL_WAIT state
.addTransition(StageState.KILL_WAIT, StageState.KILL_WAIT,
EnumSet.of(StageEventType.SQ_KILL),
new KillTasksTransition())
.addTransition(StageState.KILL_WAIT, StageState.KILL_WAIT,
StageEventType.SQ_TASK_COMPLETED,
TASK_COMPLETED_TRANSITION)
.addTransition(StageState.KILL_WAIT,
EnumSet.of(StageState.SUCCEEDED, StageState.FAILED, StageState.KILLED),
StageEventType.SQ_STAGE_COMPLETED,
STAGE_COMPLETED_TRANSITION)
.addTransition(StageState.KILL_WAIT, StageState.KILL_WAIT,
StageEventType.SQ_DIAGNOSTIC_UPDATE,
DIAGNOSTIC_UPDATE_TRANSITION)
.addTransition(StageState.KILL_WAIT, StageState.KILL_WAIT,
StageEventType.SQ_FAILED,
TASK_COMPLETED_TRANSITION)
.addTransition(StageState.KILL_WAIT, StageState.ERROR,
StageEventType.SQ_INTERNAL_ERROR,
INTERNAL_ERROR_TRANSITION)
// Ignore-able events
.addTransition(StageState.KILL_WAIT, StageState.KILL_WAIT,
EnumSet.of(StageEventType.SQ_START))
// Transitions from FINALIZING state
.addTransition(StageState.FINALIZING, StageState.FINALIZING,
StageEventType.SQ_SHUFFLE_REPORT,
STAGE_FINALIZE_TRANSITION)
.addTransition(StageState.FINALIZING,
EnumSet.of(StageState.SUCCEEDED, StageState.FAILED),
StageEventType.SQ_STAGE_COMPLETED,
STAGE_COMPLETED_TRANSITION)
.addTransition(StageState.FINALIZING, StageState.FINALIZING,
StageEventType.SQ_DIAGNOSTIC_UPDATE,
DIAGNOSTIC_UPDATE_TRANSITION)
.addTransition(StageState.FINALIZING, StageState.ERROR,
StageEventType.SQ_INTERNAL_ERROR,
INTERNAL_ERROR_TRANSITION)
// Ignore-able Transition
.addTransition(StageState.FINALIZING, StageState.KILLED,
StageEventType.SQ_KILL)
// Transitions from SUCCEEDED state
.addTransition(StageState.SUCCEEDED, StageState.SUCCEEDED,
StageEventType.SQ_DIAGNOSTIC_UPDATE,
DIAGNOSTIC_UPDATE_TRANSITION)
.addTransition(StageState.SUCCEEDED, StageState.ERROR,
StageEventType.SQ_INTERNAL_ERROR,
INTERNAL_ERROR_TRANSITION)
// Ignore-able events
.addTransition(StageState.SUCCEEDED, StageState.SUCCEEDED,
EnumSet.of(
StageEventType.SQ_START,
StageEventType.SQ_KILL,
StageEventType.SQ_SHUFFLE_REPORT))
// Transitions from KILLED state
.addTransition(StageState.KILLED, StageState.KILLED,
StageEventType.SQ_DIAGNOSTIC_UPDATE,
DIAGNOSTIC_UPDATE_TRANSITION)
.addTransition(StageState.KILLED, StageState.ERROR,
StageEventType.SQ_INTERNAL_ERROR,
INTERNAL_ERROR_TRANSITION)
// Ignore-able transitions
.addTransition(StageState.KILLED, StageState.KILLED,
EnumSet.of(
StageEventType.SQ_START,
StageEventType.SQ_KILL,
StageEventType.SQ_SHUFFLE_REPORT,
StageEventType.SQ_STAGE_COMPLETED,
StageEventType.SQ_FAILED))
// Transitions from FAILED state
.addTransition(StageState.FAILED, StageState.FAILED,
StageEventType.SQ_DIAGNOSTIC_UPDATE,
DIAGNOSTIC_UPDATE_TRANSITION)
.addTransition(StageState.FAILED, StageState.ERROR,
StageEventType.SQ_INTERNAL_ERROR,
INTERNAL_ERROR_TRANSITION)
// Ignore-able transitions
.addTransition(StageState.FAILED, StageState.FAILED,
EnumSet.of(
StageEventType.SQ_START,
StageEventType.SQ_KILL,
StageEventType.SQ_FAILED))
// Transitions from ERROR state
.addTransition(StageState.ERROR, StageState.ERROR,
StageEventType.SQ_DIAGNOSTIC_UPDATE,
DIAGNOSTIC_UPDATE_TRANSITION)
// Ignore-able transitions
.addTransition(StageState.ERROR, StageState.ERROR,
EnumSet.of(
StageEventType.SQ_START,
StageEventType.SQ_KILL,
StageEventType.SQ_FAILED,
StageEventType.SQ_INTERNAL_ERROR,
StageEventType.SQ_STAGE_COMPLETED,
StageEventType.SQ_SHUFFLE_REPORT))
.installTopology();
private final Lock readLock;
private final Lock writeLock;
private volatile int totalScheduledObjectsCount;
private volatile int completedTaskCount = 0;
private volatile int succeededObjectCount = 0;
private volatile int killedObjectCount = 0;
private volatile int failedObjectCount = 0;
private volatile SerializedException failureReason;
private TaskSchedulerContext schedulerContext;
private List<IntermediateEntry> hashShuffleIntermediateEntries = Lists.newArrayList();
private AtomicInteger completedShuffleTasks = new AtomicInteger(0);
private AtomicBoolean stopShuffleReceiver = new AtomicBoolean();
private StageHistory finalStageHistory;
public Stage(QueryMasterTask.QueryMasterTaskContext context, MasterPlan masterPlan, ExecutionBlock block) {
this.context = context;
this.masterPlan = masterPlan;
this.block = block;
this.eventHandler = context.getEventHandler();
this.rpcParams = RpcParameterFactory.get(context.getConf());
ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
this.readLock = readWriteLock.readLock();
this.writeLock = readWriteLock.writeLock();
stateMachine = stateMachineFactory.make(this);
stageState = stateMachine.getCurrentState();
}
public static boolean isRunningState(StageState state) {
return state == StageState.INITED || state == StageState.NEW || state == StageState.RUNNING;
}
public QueryMasterTask.QueryMasterTaskContext getContext() {
return context;
}
public MasterPlan getMasterPlan() {
return masterPlan;
}
public DataChannel getDataChannel() {
return masterPlan.getOutgoingChannels(getId()).iterator().next();
}
public EventHandler<Event> getEventHandler() {
return eventHandler;
}
public AbstractTaskScheduler getTaskScheduler() {
return taskScheduler;
}
public void setStartTime() {
startTime = context.getClock().getTime();
}
@SuppressWarnings("UnusedDeclaration")
public long getStartTime() {
return this.startTime;
}
public void setFinishTime() {
finishTime = context.getClock().getTime();
}
@SuppressWarnings("UnusedDeclaration")
public long getFinishTime() {
return this.finishTime;
}
public float getTaskProgress() {
readLock.lock();
try {
if (getState() == StageState.NEW) {
return 0;
} else {
return (float)(succeededObjectCount) / (float)totalScheduledObjectsCount;
}
} finally {
readLock.unlock();
}
}
public float getProgress() {
List<Task> tempTasks = null;
readLock.lock();
try {
if (getState() == StageState.NEW) {
return 0.0f;
} else {
tempTasks = new ArrayList<>(tasks.values());
}
} finally {
readLock.unlock();
}
float totalProgress = 0.0f;
for (Task eachTask : tempTasks) {
if (eachTask.getLastAttempt() != null) {
totalProgress += eachTask.getLastAttempt().getProgress();
}
}
if (totalProgress > 0.0f) {
return (float) Math.floor((totalProgress / (float) Math.max(tempTasks.size(), 1)) * 1000.0f) / 1000.0f;
} else {
return 0.0f;
}
}
public int getSucceededObjectCount() {
return succeededObjectCount;
}
public int getTotalScheduledObjectsCount() {
return totalScheduledObjectsCount;
}
public int getKilledObjectCount() {
return killedObjectCount;
}
public int getFailedObjectCount() {
return failedObjectCount;
}
public int getCompletedTaskCount() {
return completedTaskCount;
}
public SerializedException getFailureReason() {
return failureReason;
}
public ExecutionBlock getBlock() {
return block;
}
public void addTask(Task task) {
tasks.put(task.getId(), task);
}
public StageHistory getStageHistory() {
if (finalStageHistory != null) {
if (finalStageHistory.getFinishTime() == 0) {
finalStageHistory = makeStageHistory();
finalStageHistory.setTasks(makeTaskHistories());
}
return finalStageHistory;
} else {
return makeStageHistory();
}
}
private List<TaskHistory> makeTaskHistories() {
List<TaskHistory> taskHistories = new ArrayList<>();
for(Task eachTask : getTasks()) {
taskHistories.add(eachTask.getTaskHistory());
}
return taskHistories;
}
private StageHistory makeStageHistory() {
StageHistory stageHistory = new StageHistory();
stageHistory.setExecutionBlockId(getId().toString());
stageHistory.setPlan(PlannerUtil.buildExplainString(block.getPlan()));
stageHistory.setState(getState().toString());
stageHistory.setStartTime(startTime);
stageHistory.setFinishTime(finishTime);
stageHistory.setSucceededObjectCount(succeededObjectCount);
stageHistory.setKilledObjectCount(killedObjectCount);
stageHistory.setFailedObjectCount(failedObjectCount);
stageHistory.setTotalScheduledObjectsCount(totalScheduledObjectsCount);
AbstractTaskScheduler scheduler = getTaskScheduler();
if (scheduler != null) {
stageHistory.setHostLocalAssigned(scheduler.getHostLocalAssigned());
stageHistory.setRackLocalAssigned(scheduler.getRackLocalAssigned());
}
long totalInputBytes = 0;
long totalReadBytes = 0;
long totalReadRows = 0;
long totalWriteBytes = 0;
long totalWriteRows = 0;
for(Task eachTask : getTasks()) {
if (eachTask.getLastAttempt() != null) {
TableStats inputStats = eachTask.getLastAttempt().getInputStats();
if (inputStats != null) {
totalInputBytes += inputStats.getNumBytes();
totalReadBytes += inputStats.getReadBytes();
totalReadRows += inputStats.getNumRows();
}
TableStats outputStats = eachTask.getLastAttempt().getResultStats();
if (outputStats != null) {
totalWriteBytes += outputStats.getNumBytes();
totalWriteRows += outputStats.getNumRows();
}
}
}
Set<Integer> partitions = Sets.newHashSet();
for (IntermediateEntry entry : getHashShuffleIntermediateEntries()) {
partitions.add(entry.getPartId());
}
stageHistory.setTotalInputBytes(totalInputBytes);
stageHistory.setTotalReadBytes(totalReadBytes);
stageHistory.setTotalReadRows(totalReadRows);
stageHistory.setTotalWriteBytes(totalWriteBytes);
stageHistory.setTotalWriteRows(totalWriteRows);
stageHistory.setNumShuffles(partitions.size());
stageHistory.setProgress(getProgress());
return stageHistory;
}
public Set<PartitionDescProto> getPartitions() {
Set<PartitionDescProto> partitions = new HashSet<>();
for(Task eachTask : getTasks()) {
if (eachTask.getLastAttempt() != null && !eachTask.getLastAttempt().getPartitions().isEmpty()) {
partitions.addAll(eachTask.getLastAttempt().getPartitions());
}
}
return partitions;
}
public void clearPartitions() {
for(Task eachTask : getTasks()) {
if (eachTask.getLastAttempt() != null && !eachTask.getLastAttempt().getPartitions().isEmpty()) {
eachTask.getLastAttempt().getPartitions().clear();
}
}
}
/**
* It finalizes this stage. It is only invoked when the stage is finalizing.
*/
public void finalizeStage() {
cleanup();
}
/**
* It complete this stage. It is only invoked when the stage is succeeded.
*/
public void complete() {
finalizeStats();
setFinishTime();
eventHandler.handle(new StageCompletedEvent(getId(), StageState.SUCCEEDED));
}
public void abort(StageState finalState) {
abort(finalState, null);
}
/**
* It finalizes this stage. Unlike {@link Stage#complete()},
* it is invoked when a stage is abnormally finished.
*
* @param finalState The final stage state
* @param reason The failure reason, if exist
*/
public void abort(StageState finalState, Throwable reason) {
// TODO -
// - committer.abortStage(...)
// - record Stage Finish Time
// - CleanUp Tasks
// - Record History
if(reason != null)
failureReason = ErrorUtil.convertException(reason);
cleanup();
setFinishTime();
eventHandler.handle(new StageCompletedEvent(getId(), finalState));
}
public StateMachine<StageState, StageEventType, StageEvent> getStateMachine() {
return this.stateMachine;
}
public void setPriority(int priority) {
this.priority = priority;
}
public int getPriority() {
return this.priority;
}
public ExecutionBlockId getId() {
return block.getId();
}
public Task[] getTasks() {
return tasks.values().toArray(new Task[tasks.size()]);
}
public Task getTask(TaskId qid) {
return tasks.get(qid);
}
public Schema getOutSchema() {
return outSchema;
}
public TableMeta getTableMeta() {
return meta;
}
public TableStats getResultStats() {
return resultStatistics;
}
public TableStats getInputStats() {
return inputStatistics;
}
public List<String> getDiagnostics() {
readLock.lock();
try {
return diagnostics;
} finally {
readLock.unlock();
}
}
protected void addDiagnostic(String diag) {
diagnostics.add(diag);
}
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append(this.getId());
return sb.toString();
}
@Override
public boolean equals(Object o) {
if (o instanceof Stage) {
Stage other = (Stage)o;
return getId().equals(other.getId());
}
return false;
}
@Override
public int hashCode() {
return getId().hashCode();
}
public int compareTo(Stage other) {
return getId().compareTo(other.getId());
}
public StageState getSynchronizedState() {
readLock.lock();
try {
return stateMachine.getCurrentState();
} finally {
readLock.unlock();
}
}
/* non-blocking call for client API */
public StageState getState() {
return stageState;
}
public static TableStats[] computeStatFromUnionBlock(Stage stage) {
TableStats[] stat = new TableStats[]{new TableStats(), new TableStats()};
long[] avgRows = new long[]{0, 0};
long[] numBytes = new long[]{0, 0};
long[] readBytes = new long[]{0, 0};
long[] numRows = new long[]{0, 0};
int[] numBlocks = new int[]{0, 0};
int[] numOutputs = new int[]{0, 0};
List<ColumnStats> columnStatses = StatisticsUtil.emptyColumnStats(stage.getDataChannel().getSchema());
MasterPlan masterPlan = stage.getMasterPlan();
for (ExecutionBlock block : masterPlan.getChilds(stage.getBlock())) {
Stage childStage = stage.context.getStage(block.getId());
TableStats[] childStatArray = new TableStats[]{
childStage.getInputStats(), childStage.getResultStats()
};
for (int i = 0; i < 2; i++) {
if (childStatArray[i] == null) {
continue;
}
avgRows[i] += childStatArray[i].getAvgRows();
numBlocks[i] += childStatArray[i].getNumBlocks();
numBytes[i] += childStatArray[i].getNumBytes();
readBytes[i] += childStatArray[i].getReadBytes();
numOutputs[i] += childStatArray[i].getNumShuffleOutputs();
numRows[i] += childStatArray[i].getNumRows();
}
if (childStatArray[1].getColumnStats() != null && childStatArray[1].getColumnStats().size() > 0) {
columnStatses = StatisticsUtil.aggregateColumnStats(columnStatses, childStatArray[1].getColumnStats());
}
}
for (int i = 0; i < 2; i++) {
stat[i].setNumBlocks(numBlocks[i]);
stat[i].setNumBytes(numBytes[i]);
stat[i].setReadBytes(readBytes[i]);
stat[i].setNumShuffleOutputs(numOutputs[i]);
stat[i].setNumRows(numRows[i]);
stat[i].setAvgRows(avgRows[i]);
}
stat[1].setColumnStats(columnStatses);
return stat;
}
private TableStats[] computeStatFromTasks() {
List<TableStats> inputStatsList = Lists.newArrayList();
List<TableStats> resultStatsList = Lists.newArrayList();
for (Task unit : getTasks()) {
resultStatsList.add(unit.getStats());
if (unit.getLastAttempt().getInputStats() != null) {
inputStatsList.add(unit.getLastAttempt().getInputStats());
}
}
TableStats inputStats = StatisticsUtil.aggregateTableStat(inputStatsList);
TableStats resultStats = StatisticsUtil.aggregateTableStat(resultStatsList);
return new TableStats[]{inputStats, resultStats};
}
private void stopScheduler() {
if (taskScheduler != null) {
taskScheduler.stop();
}
}
/**
* Get the launched worker address
*/
protected Map<Integer, InetSocketAddress> getAssignedWorkerMap() {
return workerMap;
}
private void sendStopExecutionBlockEvent(final StopExecutionBlockRequest requestProto) {
for (final InetSocketAddress worker : getAssignedWorkerMap().values()) {
getContext().getQueryMasterContext().getEventExecutor().submit(new Runnable() {
@Override
public void run() {
try {
AsyncRpcClient tajoWorkerRpc =
RpcClientManager.getInstance().getClient(worker, TajoWorkerProtocol.class, true, rpcParams);
TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerRpcClient = tajoWorkerRpc.getStub();
tajoWorkerRpcClient.stopExecutionBlock(null,
requestProto, NullCallback.get(PrimitiveProtos.BoolProto.class));
} catch (Throwable e) {
LOG.error(e.getMessage(), e);
}
}
});
}
}
/**
* Sends stopping request to all worker
*/
protected void stopExecutionBlock() {
// If there are still live tasks, try to kill the tasks. and send the shuffle report request
List<TajoIdProtos.ExecutionBlockIdProto> ebIds = Lists.newArrayList();
if (!getContext().getQueryContext().getBool(SessionVars.DEBUG_ENABLED)) {
List<ExecutionBlock> childs = getMasterPlan().getChilds(getId());
for (ExecutionBlock executionBlock : childs) {
ebIds.add(executionBlock.getId().getProto());
}
}
StopExecutionBlockRequest.Builder stopRequest = StopExecutionBlockRequest.newBuilder();
ExecutionBlockListProto.Builder cleanupList = ExecutionBlockListProto.newBuilder();
cleanupList.addAllExecutionBlockId(Lists.newArrayList(ebIds));
stopRequest.setCleanupList(cleanupList.build());
stopRequest.setExecutionBlockId(getId().getProto());
sendStopExecutionBlockEvent(stopRequest.build());
}
/**
* It computes all stats and sets the intermediate result.
*/
private void finalizeStats() {
TableStats[] statsArray;
if (block.isUnionOnly()) {
statsArray = computeStatFromUnionBlock(this);
} else {
statsArray = computeStatFromTasks();
}
DataChannel channel = masterPlan.getOutgoingChannels(getId()).get(0);
// if store plan (i.e., CREATE or INSERT OVERWRITE)
String dataFormat = PlannerUtil.getDataFormat(masterPlan.getLogicalPlan());
if (dataFormat == null) {
// get final output store type (i.e., SELECT)
dataFormat = channel.getDataFormat();
}
outSchema = channel.getSchema();
meta = CatalogUtil.newTableMeta(dataFormat, new KeyValueSet());
inputStatistics = statsArray[0];
resultStatistics = statsArray[1];
}
@Override
public void handle(StageEvent event) {
lastContactTime = System.currentTimeMillis();
if (LOG.isDebugEnabled()) {
LOG.debug("Processing " + event.getStageId() + " of type " + event.getType() + ", preState="
+ getSynchronizedState());
}
try {
writeLock.lock();
StageState oldState = getSynchronizedState();
try {
getStateMachine().doTransition(event.getType(), event);
stageState = getSynchronizedState();
} catch (InvalidStateTransitonException e) {
LOG.error("Can't handle this event at current state"
+ ", eventType:" + event.getType().name()
+ ", oldState:" + oldState.name()
+ ", nextState:" + getSynchronizedState().name()
, e);
eventHandler.handle(new StageEvent(getId(),
StageEventType.SQ_INTERNAL_ERROR));
}
// notify the eventhandler of state change
if (LOG.isDebugEnabled()) {
if (oldState != getSynchronizedState()) {
LOG.debug(getId() + " Stage Transitioned from " + oldState + " to "
+ getSynchronizedState());
}
}
} finally {
writeLock.unlock();
}
}
private static class InitAndRequestContainer implements MultipleArcTransition<Stage,
StageEvent, StageState> {
@Override
public StageState transition(final Stage stage, StageEvent stageEvent) {
stage.setStartTime();
ExecutionBlock execBlock = stage.getBlock();
StageState state;
try {
// Union operator does not require actual query processing. It is performed logically.
if (execBlock.isUnionOnly()) {
// Though union operator does not be processed at all, but it should handle the completion event.
stage.complete();
state = StageState.SUCCEEDED;
} else {
ExecutionBlock parent = stage.getMasterPlan().getParent(stage.getBlock());
DataChannel channel = stage.getMasterPlan().getChannel(stage.getId(), parent.getId());
setShuffleIfNecessary(stage, channel);
// TODO: verify changed shuffle plan
initTaskScheduler(stage);
// execute pre-processing asyncronously
stage.getContext().getQueryMasterContext().getSingleEventExecutor()
.submit(new Runnable() {
@Override
public void run() {
try {
schedule(stage);
stage.totalScheduledObjectsCount = stage.getTaskScheduler().remainingScheduledObjectNum();
LOG.info(stage.totalScheduledObjectsCount + " objects are scheduled");
if (stage.getTaskScheduler().remainingScheduledObjectNum() == 0) { // if there is no tasks
stage.eventHandler.handle(
new StageEvent(stage.getId(), StageEventType.SQ_STAGE_COMPLETED));
} else {
if(stage.getSynchronizedState() == StageState.INITED) {
stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_START));
stage.taskScheduler.start();
} else {
/* all tasks are killed before stage are inited */
if (stage.getTotalScheduledObjectsCount() == stage.getCompletedTaskCount()) {
stage.eventHandler.handle(
new StageEvent(stage.getId(), StageEventType.SQ_STAGE_COMPLETED));
} else {
stage.eventHandler.handle(
new StageEvent(stage.getId(), StageEventType.SQ_KILL));
}
}
}
} catch (Throwable e) {
LOG.error("Stage (" + stage.getId() + ") ERROR: ", e);
stage.setFinishTime();
stage.eventHandler.handle(new StageDiagnosticsUpdateEvent(stage.getId(), e.getMessage()));
stage.eventHandler.handle(new StageCompletedEvent(stage.getId(), StageState.ERROR));
}
}
}
);
state = StageState.INITED;
}
} catch (Throwable e) {
LOG.error("Stage (" + stage.getId() + ") ERROR: ", e);
stage.setFinishTime();
stage.eventHandler.handle(new StageDiagnosticsUpdateEvent(stage.getId(), e.getMessage()));
stage.eventHandler.handle(new StageCompletedEvent(stage.getId(), StageState.ERROR));
return StageState.ERROR;
}
return state;
}
private void initTaskScheduler(Stage stage) throws IOException {
TajoConf conf = stage.context.getConf();
stage.schedulerContext = new TaskSchedulerContext(stage.context,
stage.getMasterPlan().isLeaf(stage.getId()), stage.getId());
stage.taskScheduler = TaskSchedulerFactory.get(conf, stage.schedulerContext, stage);
stage.taskScheduler.init(conf);
LOG.info(stage.taskScheduler.getName() + " is chosen for the task scheduling for " + stage.getId());
}
/**
* If a parent block requires a repartition operation, the method sets proper repartition
* methods and the number of partitions to a given Stage.
*/
private static void setShuffleIfNecessary(Stage stage, DataChannel channel) {
if (channel.isHashShuffle()) {
int numTasks = calculateShuffleOutputNum(stage);
Repartitioner.setShuffleOutputNumForTwoPhase(stage, numTasks, channel);
}
}
/**
* Getting the desire number of partitions according to the volume of input data.
* This method is only used to determine the partition key number of hash join or aggregation.
*
* @param stage
* @return
*/
public static int calculateShuffleOutputNum(Stage stage) {
MasterPlan masterPlan = stage.getMasterPlan();
// For test
if (masterPlan.getContext().containsKey(SessionVars.TEST_MIN_TASK_NUM)) {
int partitionNum = masterPlan.getContext().getInt(SessionVars.TEST_MIN_TASK_NUM);
LOG.info(stage.getId() + ", The determined number of partitions is " + partitionNum + " for test");
return partitionNum;
}
Optional<ShuffleContext> optional = masterPlan.getShuffleInfo(stage.getId());
if (optional.isPresent()) {
LOG.info("# of partitions is determined as " + optional.get().getPartitionNum() +
"to match with sibling eb's partition number");
return optional.get().getPartitionNum();
} else {
ExecutionBlock parent = masterPlan.getParent(stage.getBlock());
int partitionNum;
if (parent != null) {
// We assume this execution block the first stage of join if two or more tables are included in this block,
if (parent.hasJoin()) {
if (parent.getNonBroadcastRelNum() > 1) {
// repartition join
partitionNum = calculatePartitionNumForRepartitionJoin(parent, stage);
LOG.info(stage.getId() + ", The determined number of partitions for repartition join is " + partitionNum);
} else {
// broadcast join
// partition number is calculated using the volume of the large table
partitionNum = calculatePartitionNumDefault(parent, stage);
LOG.info(stage.getId() + ", The determined number of partitions for broadcast join is " + partitionNum);
}
} else {
// Is this stage the first step of group-by?
if (parent.hasAgg()) {
LogicalNode grpNode = PlannerUtil.findMostBottomNode(parent.getPlan(), NodeType.GROUP_BY,
NodeType.DISTINCT_GROUP_BY, NodeType.WINDOW_AGG);
if (grpNode == null) {
throw new TajoInternalError("Cannot find aggregation plan for " + stage.getId());
}
if (!hasGroupKeys(stage, grpNode)) {
LOG.info(stage.getId() + ", No Grouping Column - determinedTaskNum is set to 1");
partitionNum = 1;
} else {
partitionNum = calculatePartitionNumForAgg(parent, stage);
LOG.info(stage.getId() + ", The determined number of aggregation partitions is " + partitionNum);
}
} else {
// NOTE: the below code might be executed during sort, but the partition number is not used anymore for sort.
LOG.info("============>>>>> Unexpected Case! <<<<<================");
partitionNum = calculatePartitionNumDefault(parent, stage);
LOG.info(stage.getId() + ", The determined number of partitions is " + partitionNum);
}
}
} else {
// This case means that the parent eb does not exist even though data shuffle is required after the current eb.
throw new TajoInternalError("Cannot find parent execution block of " + stage.block.getId());
}
// Record the partition number for sibling execution blocks
masterPlan.addShuffleInfo(stage.getId(), partitionNum);
return partitionNum;
}
}
private static int calculatePartitionNumForRepartitionJoin(ExecutionBlock parent, Stage currentStage) {
List<ExecutionBlock> childs = currentStage.masterPlan.getChilds(parent);
// for outer
ExecutionBlock outer = childs.get(0);
long outerVolume = getInputVolume(currentStage.masterPlan, currentStage.context, outer);
// for inner
ExecutionBlock inner = childs.get(1);
long innerVolume = getInputVolume(currentStage.masterPlan, currentStage.context, inner);
LOG.info(currentStage.getId() + ", Outer volume: " + Math.ceil((double) outerVolume / 1048576) + "MB, "
+ "Inner volume: " + Math.ceil((double) innerVolume / 1048576) + "MB");
long bigger = Math.max(outerVolume, innerVolume);
int mb = (int) Math.ceil((double) bigger / 1048576);
LOG.info(currentStage.getId() + ", Bigger Table's volume is approximately " + mb + " MB");
return (int) Math.ceil((double) mb /
currentStage.masterPlan.getContext().getInt(SessionVars.JOIN_PER_SHUFFLE_SIZE));
}
private static int calculatePartitionNumForAgg(ExecutionBlock parent, Stage stage) {
int volumeByMB = getInputVolumeMB(parent, stage);
LOG.info(stage.getId() + ", Table's volume is approximately " + volumeByMB + " MB");
// determine the number of task
return (int) Math.ceil((double) volumeByMB /
stage.masterPlan.getContext().getInt(SessionVars.GROUPBY_PER_SHUFFLE_SIZE));
}
private static boolean hasGroupKeys(Stage currentStage, LogicalNode aggNode) {
if (aggNode.getType() == NodeType.GROUP_BY) {
return ((GroupbyNode)aggNode).getGroupingColumns().length > 0;
} else if (aggNode.getType() == NodeType.DISTINCT_GROUP_BY) {
// Find current distinct stage node.
DistinctGroupbyNode distinctNode = PlannerUtil.findMostBottomNode(currentStage.getBlock().getPlan(),
NodeType.DISTINCT_GROUP_BY);
if (distinctNode == null) {
LOG.warn(currentStage.getId() + ", Can't find current DistinctGroupbyNode");
distinctNode = (DistinctGroupbyNode)aggNode;
}
boolean hasGroupColumns = distinctNode.getGroupingColumns().length > 0;
Enforcer enforcer = currentStage.getBlock().getEnforcer();
if (enforcer == null) {
LOG.warn(currentStage.getId() + ", DistinctGroupbyNode's enforcer is null.");
} else {
EnforceProperty property = PhysicalPlannerImpl.getAlgorithmEnforceProperty(enforcer, distinctNode);
if (property != null) {
if (property.getDistinct().getIsMultipleAggregation()) {
MultipleAggregationStage multiAggStage = property.getDistinct().getMultipleAggregationStage();
hasGroupColumns = multiAggStage != MultipleAggregationStage.THRID_STAGE;
}
}
}
return hasGroupColumns;
} else {
return ((WindowAggNode) aggNode).hasPartitionKeys();
}
}
private static int calculatePartitionNumDefault(ExecutionBlock parent, Stage currentStage) {
int mb = getInputVolumeMB(parent, currentStage);
LOG.info(currentStage.getId() + ", Table's volume is approximately " + mb + " MB");
// determine the number of task per 128 MB
return (int) Math.ceil((double)mb / 128);
}
private static int getInputVolumeMB(ExecutionBlock parent, Stage currentStage) {
// NOTE: Get input volume from the parent EB.
// If the parent EB contains an UNION query, the volume of the whole input for the UNION is returned.
// Otherwise, only the input volume of the current EB is returned.
long volume = getInputVolume(currentStage.masterPlan, currentStage.context, parent);
return (int) Math.ceil((double)volume / StorageUnit.MB);
}
private static void schedule(Stage stage) throws IOException, TajoException {
MasterPlan masterPlan = stage.getMasterPlan();
ExecutionBlock execBlock = stage.getBlock();
if (stage.getMasterPlan().isLeaf(execBlock.getId()) && execBlock.getScanNodes().length == 1) { // Case 1: Just Scan
// Some execution blocks can have broadcast table even though they don't have any join nodes
scheduleFragmentsForLeafQuery(stage);
} else if (execBlock.getScanNodes().length > 1) { // Case 2: Join
Repartitioner.scheduleFragmentsForJoinQuery(stage.schedulerContext, stage);
} else { // Case 3: Others (Sort or Aggregation)
int numTasks = getNonLeafTaskNum(stage);
Repartitioner.scheduleFragmentsForNonLeafTasks(stage.schedulerContext, masterPlan, stage, numTasks);
}
}
/**
* Getting the desire number of tasks according to the volume of input data
*
* @param stage
* @return
*/
public static int getNonLeafTaskNum(Stage stage) {
// This method is assumed to be called only for aggregation or sort.
LogicalNode plan = stage.getBlock().getPlan();
LogicalNode sortNode = PlannerUtil.findTopNode(plan, NodeType.SORT);
LogicalNode groupbyNode = PlannerUtil.findTopNode(plan, NodeType.GROUP_BY);
// Task volume is assumed to be 64 MB by default.
long taskVolume = 64;
if (groupbyNode != null && sortNode == null) {
// aggregation plan
taskVolume = stage.getContext().getQueryContext().getLong(SessionVars.GROUPBY_TASK_INPUT_SIZE);
} else if (sortNode != null && groupbyNode == null) {
// sort plan
taskVolume = stage.getContext().getQueryContext().getLong(SessionVars.SORT_TASK_INPUT_SIZE);
} else if (sortNode != null /* && groupbyNode != null */) {
// NOTE: when the plan includes both aggregation and sort, usually aggregation is executed first.
// If not, we need to check the query plan is valid.
LogicalNode aggChildOfSort = PlannerUtil.findTopNode(sortNode, NodeType.GROUP_BY);
boolean aggFirst = aggChildOfSort != null && aggChildOfSort.equals(groupbyNode);
// Set task volume according to the operator which will be executed first.
if (aggFirst) {
// choose aggregation task volume
taskVolume = stage.getContext().getQueryContext().getLong(SessionVars.GROUPBY_TASK_INPUT_SIZE);
} else {
// choose sort task volume
LOG.warn("Sort is executed before aggregation.");
taskVolume = stage.getContext().getQueryContext().getLong(SessionVars.SORT_TASK_INPUT_SIZE);
}
} else {
LOG.warn("Task volume is chosen as " + taskVolume + " in unexpected case.");
}
// Getting intermediate data size
long volume = getInputVolume(stage.getMasterPlan(), stage.context, stage.getBlock());
int mb = (int) Math.ceil((double)volume / (double)StorageUnit.MB);
LOG.info(stage.getId() + ", Table's volume is approximately " + mb + " MB");
// determine the number of task
int minTaskNum = Math.max(1, stage.getContext().getQueryMasterContext().getConf().
getInt(ConfVars.$TEST_MIN_TASK_NUM.varname, 1));
int maxTaskNum = Math.max(minTaskNum, (int) Math.ceil((double)mb / taskVolume));
LOG.info(stage.getId() + ", The determined number of non-leaf tasks is " + maxTaskNum);
return maxTaskNum;
}
public static long getInputVolume(MasterPlan masterPlan, QueryMasterTask.QueryMasterTaskContext context,
ExecutionBlock execBlock) {
if (masterPlan.isLeaf(execBlock)) {
ScanNode[] outerScans = execBlock.getScanNodes();
long maxVolume = 0;
for (ScanNode eachScanNode: outerScans) {
TableStats stat = context.getTableDesc(eachScanNode).getStats();
if (stat.getNumBytes() > maxVolume) {
maxVolume = stat.getNumBytes();
}
}
return maxVolume;
} else {
long aggregatedVolume = 0;
for (ExecutionBlock childBlock : masterPlan.getChilds(execBlock)) {
Stage stage = context.getStage(childBlock.getId());
if (stage == null || stage.getSynchronizedState() != StageState.SUCCEEDED) {
aggregatedVolume += getInputVolume(masterPlan, context, childBlock);
} else {
aggregatedVolume += stage.getResultStats().getNumBytes();
}
}
return aggregatedVolume;
}
}
private static void scheduleFragmentsForLeafQuery(Stage stage) throws IOException, TajoException {
ExecutionBlock execBlock = stage.getBlock();
ScanNode[] scans = execBlock.getScanNodes();
Preconditions.checkArgument(scans.length == 1, "Must be Scan Query");
ScanNode scan = scans[0];
TableDesc table = stage.context.getTableDesc(scan);
Collection<Fragment> fragments = SplitUtil.getSplits(
TablespaceManager.get(scan.getTableDesc().getUri()), scan, table, false);
SplitUtil.preparePartitionScanPlanForSchedule(scan);
Stage.scheduleFragments(stage, fragments);
// The number of leaf tasks should be the number of fragments.
stage.schedulerContext.setEstimatedTaskNum(fragments.size());
}
}
public static void scheduleFragment(Stage stage, Fragment fragment) {
stage.taskScheduler.handle(new FragmentScheduleEvent(TaskSchedulerEvent.EventType.T_SCHEDULE,
stage.getId(), fragment));
}
public static void scheduleFragments(Stage stage, Collection<Fragment> fragments) {
for (Fragment eachFragment : fragments) {
scheduleFragment(stage, eachFragment);
}
}
public static void scheduleFragments(Stage stage, Collection<Fragment> leftFragments,
Collection<Fragment> broadcastFragments) {
for (Fragment eachLeafFragment : leftFragments) {
scheduleFragment(stage, eachLeafFragment, broadcastFragments);
}
}
public static void scheduleFragment(Stage stage,
Fragment leftFragment, Collection<Fragment> rightFragments) {
stage.taskScheduler.handle(new FragmentScheduleEvent(TaskSchedulerEvent.EventType.T_SCHEDULE,
stage.getId(), leftFragment, rightFragments));
}
public static void scheduleFetches(Stage stage, Map<String, List<FetchProto>> fetches) {
stage.taskScheduler.handle(new FetchScheduleEvent(TaskSchedulerEvent.EventType.T_SCHEDULE,
stage.getId(), fetches));
}
public static Task newEmptyTask(TaskSchedulerContext schedulerContext,
TaskAttemptScheduleContext taskContext,
Stage stage, int taskId) {
ExecutionBlock execBlock = stage.getBlock();
Task unit = new Task(schedulerContext.getMasterContext().getConf(),
taskContext,
QueryIdFactory.newTaskId(schedulerContext.getBlockId(), taskId),
schedulerContext.isLeafQuery(), stage.eventHandler);
unit.setLogicalPlan(execBlock.getPlan());
stage.addTask(unit);
return unit;
}
private static class TaskCompletedTransition implements SingleArcTransition<Stage, StageEvent> {
@Override
public void transition(Stage stage,
StageEvent event) {
if (!(event instanceof StageTaskEvent)) {
throw new IllegalArgumentException("event should be a StageTaskEvent type.");
}
StageTaskEvent taskEvent = (StageTaskEvent) event;
Task task = stage.getTask(taskEvent.getTaskId());
if (task == null) { // task failed
LOG.error(String.format("Task %s is absent", taskEvent.getTaskId()));
stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_FAILED));
} else {
stage.completedTaskCount++;
stage.getTaskScheduler().releaseTaskAttempt(task.getLastAttempt());
if (taskEvent.getState() == TaskState.SUCCEEDED) {
stage.succeededObjectCount++;
} else if (task.getState() == TaskState.KILLED) {
stage.killedObjectCount++;
} else if (task.getState() == TaskState.FAILED) {
StageTaskFailedEvent failedEvent = TUtil.checkTypeAndGet(event, StageTaskFailedEvent.class);
stage.failedObjectCount++;
stage.failureReason = failedEvent.getException();
// if at least one task is failed, try to kill all tasks.
stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_KILL));
}
if (stage.totalScheduledObjectsCount == stage.completedTaskCount) {
if (stage.succeededObjectCount == stage.completedTaskCount) {
stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_SHUFFLE_REPORT));
} else {
stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_STAGE_COMPLETED));
}
} else {
LOG.info(String.format("[%s] Task Completion Event (Total: %d, Success: %d, Killed: %d, Failed: %d)",
stage.getId(),
stage.totalScheduledObjectsCount,
stage.succeededObjectCount,
stage.killedObjectCount,
stage.failedObjectCount));
}
}
}
}
private static class KillTasksTransition implements SingleArcTransition<Stage, StageEvent> {
@Override
public void transition(Stage stage, StageEvent stageEvent) {
if(stage.getTaskScheduler() != null){
stage.getTaskScheduler().stop();
}
for (Task task : stage.getTasks()) {
stage.eventHandler.handle(new TaskEvent(task.getId(), TaskEventType.T_KILL));
}
}
}
private void cleanup() {
stopScheduler();
stopExecutionBlock();
this.finalStageHistory = makeStageHistory();
this.finalStageHistory.setTasks(makeTaskHistories());
}
public List<IntermediateEntry> getHashShuffleIntermediateEntries() {
return hashShuffleIntermediateEntries;
}
protected void stopFinalization() {
stopShuffleReceiver.set(true);
}
private void finalizeShuffleReport(StageShuffleReportEvent event, ShuffleType type) {
if(!checkIfNeedFinalizing(type)) return;
ExecutionBlockReport report = event.getReport();
if (!report.getReportSuccess()) {
stopFinalization();
LOG.error(getId() + ", " + type + " report are failed. Caused by:" + report.getReportErrorMessage());
getEventHandler().handle(new StageEvent(getId(), StageEventType.SQ_FAILED));
}
completedShuffleTasks.addAndGet(report.getSucceededTasks());
if (report.getIntermediateEntriesCount() > 0) {
for (IntermediateEntryProto eachInterm : report.getIntermediateEntriesList()) {
hashShuffleIntermediateEntries.add(new IntermediateEntry(eachInterm));
}
}
if (completedShuffleTasks.get() >= succeededObjectCount) {
LOG.info(getId() + ", Finalized " + type + " reports: " + completedShuffleTasks.get());
getEventHandler().handle(new StageEvent(getId(), StageEventType.SQ_STAGE_COMPLETED));
if (timeoutChecker != null) {
stopFinalization();
synchronized (timeoutChecker){
timeoutChecker.notifyAll();
}
}
} else {
LOG.info(getId() + ", Received " + type + " reports " +
completedShuffleTasks.get() + "/" + succeededObjectCount);
}
}
/**
* HASH_SHUFFLE, SCATTERED_HASH_SHUFFLE should get report from worker nodes when ExecutionBlock is stopping.
* RANGE_SHUFFLE report is sent from task reporter when a task finished in worker node.
*/
public static boolean checkIfNeedFinalizing(ShuffleType type) {
switch (type) {
case HASH_SHUFFLE:
case SCATTERED_HASH_SHUFFLE:
return true;
default:
return false;
}
}
private static class StageFinalizeTransition implements SingleArcTransition<Stage, StageEvent> {
@Override
public void transition(final Stage stage, StageEvent event) {
//If a shuffle report are failed, remaining reports will ignore
if (stage.stopShuffleReceiver.get()) {
return;
}
stage.lastContactTime = System.currentTimeMillis();
ShuffleType shuffleType = stage.getDataChannel().getShuffleType();
try {
if (event instanceof StageShuffleReportEvent) {
stage.finalizeShuffleReport((StageShuffleReportEvent) event, shuffleType);
} else {
LOG.info(String.format("Stage - %s finalize %s (total=%d, success=%d, killed=%d)",
stage.getId().toString(),
shuffleType,
stage.totalScheduledObjectsCount,
stage.succeededObjectCount,
stage.killedObjectCount));
stage.finalizeStage();
if (checkIfNeedFinalizing(shuffleType)) {
/* wait for StageShuffleReportEvent from worker nodes */
LOG.info(stage.getId() + ", wait for " + shuffleType + " reports. expected Tasks:"
+ stage.succeededObjectCount);
/* FIXME implement timeout handler of stage and task */
if (stage.timeoutChecker != null) {
stage.timeoutChecker = new Thread(new Runnable() {
@Override
public void run() {
while (stage.getSynchronizedState() == StageState.FINALIZING && !Thread.interrupted()) {
long elapsedTime = System.currentTimeMillis() - stage.lastContactTime;
if (elapsedTime > 120 * 1000) {
stage.stopFinalization();
LOG.error(stage.getId() + ": Timed out while receiving intermediate reports: " + elapsedTime
+ " ms, report:" + stage.completedShuffleTasks.get() + "/" + stage.succeededObjectCount);
stage.getEventHandler().handle(new StageEvent(stage.getId(), StageEventType.SQ_FAILED));
}
synchronized (this) {
try {
this.wait(1 * 1000);
} catch (InterruptedException e) {
}
}
}
}
});
stage.timeoutChecker.start();
}
} else {
stage.getEventHandler().handle(new StageEvent(stage.getId(), StageEventType.SQ_STAGE_COMPLETED));
}
}
} catch (Throwable t) {
LOG.error(t.getMessage(), t);
stage.stopFinalization();
stage.getEventHandler().handle(new StageDiagnosticsUpdateEvent(stage.getId(), t.getMessage()));
stage.getEventHandler().handle(new StageEvent(stage.getId(), StageEventType.SQ_INTERNAL_ERROR));
}
}
}
private static class StageCompleteTransition implements MultipleArcTransition<Stage, StageEvent, StageState> {
@Override
public StageState transition(Stage stage, StageEvent stageEvent) {
// TODO - Commit Stage
// TODO - records succeeded, failed, killed completed task
// TODO - records metrics
try {
LOG.info(String.format("Stage completed - %s (total=%d, success=%d, killed=%d)",
stage.getId().toString(),
stage.getTotalScheduledObjectsCount(),
stage.getSucceededObjectCount(),
stage.killedObjectCount));
// If the current stage are failed, next stages receives SQ_KILL event
if (stage.killedObjectCount + stage.failedObjectCount > 0) {
if (stage.failedObjectCount > 0) {
stage.abort(StageState.FAILED);
return StageState.FAILED;
} else {
stage.abort(StageState.KILLED);
return StageState.KILLED;
}
} else {
stage.complete();
return StageState.SUCCEEDED;
}
} catch (Throwable t) {
LOG.error(t.getMessage(), t);
stage.abort(StageState.ERROR, t);
return StageState.ERROR;
}
}
}
private static class DiagnosticsUpdateTransition implements SingleArcTransition<Stage, StageEvent> {
@Override
public void transition(Stage stage, StageEvent event) {
if (!(event instanceof StageDiagnosticsUpdateEvent)) {
throw new IllegalArgumentException("event should be a StageDiagnosticsUpdateEvent type.");
}
stage.addDiagnostic(((StageDiagnosticsUpdateEvent) event).getDiagnosticUpdate());
}
}
private static class InternalErrorTransition implements SingleArcTransition<Stage, StageEvent> {
@Override
public void transition(Stage stage, StageEvent stageEvent) {
stage.abort(StageState.ERROR);
}
}
}