TEZ-2781. Fallback to send only TaskAttemptFailedEvent if taskFailed heartbeat fails (zjffdu)
(cherry picked from commit f9d15c8695de7975817631b051450336bc5eadee)
Conflicts:
CHANGES.txt
tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java
(cherry picked from commit 8d49fd5285016fb64ebccdc9cf31c408c79ebaaf)
Conflicts:
CHANGES.txt
tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
tez-dag/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
(cherry picked from commit 3a8bf0e0afb0a30d5c4cbf82d1e2ce9e0dd22b4a)
Conflicts:
CHANGES.txt
diff --git a/CHANGES.txt b/CHANGES.txt
index 3a46867..1055fa0 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@
TEZ-2552. CRC errors can cause job to run for very long time in large jobs.
ALL CHANGES:
+ TEZ-2781. Fallback to send only TaskAttemptFailedEvent if taskFailed heartbeat fails
TEZ-2398. Flaky test: TestFaultTolerance
TEZ-2808. Race condition between preemption and container assignment
TEZ-1929. pre-empted tasks should be marked as killed instead of failed
diff --git a/tez-dag/src/main/java/org/apache/tez/runtime/task/TaskReporter.java b/tez-dag/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
index defc6bd..4def43f 100644
--- a/tez-dag/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
+++ b/tez-dag/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
@@ -332,8 +332,15 @@
*/
private boolean taskFailed(TezTaskAttemptID taskAttemptID, Throwable t, String diagnostics,
EventMetaData srcMeta) throws IOException, TezException {
- TezEvent statusUpdateEvent = new TezEvent(new TaskStatusUpdateEvent(task.getCounters(),
- task.getProgress()), updateEventMetadata);
+ List<TezEvent> tezEvents = new ArrayList<TezEvent>();
+ try {
+ TezEvent statusUpdateEvent = new TezEvent(new TaskStatusUpdateEvent(task.getCounters(),
+ task.getProgress()), updateEventMetadata);
+ tezEvents.add(statusUpdateEvent);
+ } catch (Exception e) {
+ // Counter may exceed limitation
+ LOG.warn("Error when get constructing TaskStatusUpdateEvent");
+ }
if (diagnostics == null) {
diagnostics = ExceptionUtils.getStackTrace(t);
} else {
@@ -341,7 +348,8 @@
}
TezEvent taskAttemptFailedEvent = new TezEvent(new TaskAttemptFailedEvent(diagnostics),
srcMeta == null ? updateEventMetadata : srcMeta);
- return !heartbeat(Lists.newArrayList(statusUpdateEvent, taskAttemptFailedEvent)).shouldDie;
+ tezEvents.add(taskAttemptFailedEvent);
+ return !heartbeat(tezEvents).shouldDie;
}
private void addEvents(TezTaskAttemptID taskAttemptID, Collection<TezEvent> events) {
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java b/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java
index bb9888a..d30d73f 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java
@@ -223,7 +223,11 @@
DAGStatus dagStatus = dagClient.waitForCompletion();
String diagnostics = StringUtils.join(dagStatus.getDiagnostics(), ",");
LOG.info("Diagnostics:" + diagnostics);
- assertTrue(diagnostics.contains(exLocation.name()));
+ if (exLocation == ExceptionLocation.PROCESSOR_COUNTER_EXCEEDED) {
+ assertTrue(diagnostics.contains("Too many counters"));
+ } else {
+ assertTrue(diagnostics.contains(exLocation.name()));
+ }
}
} finally {
stopSessionClient();
@@ -300,6 +304,7 @@
// PROCESSOR_HANDLE_EVENTS
PROCESSOR_RUN_ERROR, PROCESSOR_CLOSE_ERROR, PROCESSOR_INITIALIZE_ERROR,
PROCESSOR_RUN_EXCEPTION, PROCESSOR_CLOSE_EXCEPTION, PROCESSOR_INITIALIZE_EXCEPTION,
+ PROCESSOR_COUNTER_EXCEEDED,
// VM
VM_INITIALIZE, VM_ON_ROOTVERTEX_INITIALIZE,VM_ON_SOURCETASK_COMPLETED, VM_ON_VERTEX_STARTED,
@@ -624,6 +629,11 @@
throw new Error(this.exLocation.name());
} else if (this.exLocation == ExceptionLocation.PROCESSOR_RUN_EXCEPTION) {
throw new Exception(this.exLocation.name());
+ } else if (this.exLocation == ExceptionLocation.PROCESSOR_COUNTER_EXCEEDED) {
+ // simulate the counter limitation exceeded
+ for (int i=0;i< TezConfiguration.TEZ_COUNTERS_MAX_DEFAULT+1; ++i) {
+ getContext().getCounters().findCounter("mycounter", "counter_"+i).increment(1);
+ }
}
}