TEZ-3719. DAGImpl.computeProgress slows down dispatcher and ipc threads (Gopal V via jeagles)
(cherry picked from commit de21f990a06fcb304328df7a601789b348873739)
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index 6bb14d5..04074af 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -476,11 +476,18 @@
public float getProgress() {
readLock.lock();
try {
- TaskAttempt bestAttempt = selectBestAttempt();
- if (bestAttempt == null) {
- return 0f;
+ final TaskStateInternal state = getInternalState();
+ if (state == TaskStateInternal.RUNNING) {
+ TaskAttempt bestAttempt = selectBestAttempt();
+ if (bestAttempt == null) {
+ return 0f;
+ }
+ return bestAttempt.getProgress();
+ } else if (state == TaskStateInternal.SUCCEEDED) {
+ return 1.0f;
+ } else {
+ return 0.0f;
}
- return bestAttempt.getProgress();
} finally {
readLock.unlock();
}
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index daef85d..cc0eda0 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -1337,7 +1337,30 @@
public float getProgress() {
this.readLock.lock();
try {
- computeProgress();
+ final VertexState state = this.getState();
+ switch (state) {
+ case NEW:
+ case INITED:
+ case INITIALIZING:
+ progress = 0.0f;
+ break;
+ case RUNNING:
+ computeProgress();
+ break;
+ case KILLED:
+ case ERROR:
+ case FAILED:
+ case TERMINATING:
+ progress = 0.0f;
+ break;
+ case COMMITTING:
+ case SUCCEEDED:
+ progress = 1.0f;
+ break;
+ default:
+ // unknown, do not change progress
+ break;
+ }
return progress;
} finally {
this.readLock.unlock();
@@ -1374,7 +1397,11 @@
ProgressBuilder progress = new ProgressBuilder();
progress.setTotalTaskCount(numTasks);
progress.setSucceededTaskCount(succeededTaskCount);
- progress.setRunningTaskCount(getRunningTasks());
+ if (inTerminalState()) {
+ progress.setRunningTaskCount(0);
+ } else {
+ progress.setRunningTaskCount(getRunningTasks());
+ }
progress.setFailedTaskCount(failedTaskCount);
progress.setKilledTaskCount(killedTaskCount);
progress.setFailedTaskAttemptCount(failedTaskAttemptCount.get());
@@ -1427,7 +1454,7 @@
try {
float progress = 0f;
for (Task task : this.tasks.values()) {
- progress += (task.isFinished() ? 1f : task.getProgress());
+ progress += (task.getProgress());
}
if (this.numTasks != 0) {
progress /= this.numTasks;