Add test dbg output for testMultiTopologyPollWithError
diff --git a/api/topology/src/test/java/org/apache/edgent/test/topology/TStreamTest.java b/api/topology/src/test/java/org/apache/edgent/test/topology/TStreamTest.java
index 441d43f..464f8e2 100644
--- a/api/topology/src/test/java/org/apache/edgent/test/topology/TStreamTest.java
+++ b/api/topology/src/test/java/org/apache/edgent/test/topology/TStreamTest.java
@@ -745,14 +745,26 @@
Topology t = newTopology();
TStream<String> s = t.strings("a", "b", "c", "d", "e", "f", "g", "h");
// Throw on the 8th tuple
- s.sink((tuple) -> { if ("h".equals(tuple)) throw new RuntimeException("Expected Test Exception");});
+ s.sink((tuple) -> { if ("h".equals(tuple)) throw new RuntimeException("MTWE Expected Test Exception");});
// Expect 7 tuples out of 8
Condition<Long> tc = t.getTester().tupleCount(s, 7);
- complete(t, tc);
+// complete(t, tc);
+ try {
+ complete(t, tc);
+ } catch (Exception e) {
+ System.err.println("MTWE complete() threw e:"+e);
+ throw e;
+ }
return true;
});
}
- waitForCompletion(completer, executions);
+// waitForCompletion(completer, executions);
+ try {
+ waitForCompletion(completer, executions);
+ } catch (Exception e) {
+ System.err.println("MTWE waitForCompletion() threw e:"+e);
+ throw e;
+ }
}
/**
@@ -762,6 +774,24 @@
@Test
public void testMultiTopologyPollWithError() throws Exception {
+ /*
+ * It's unclear exactly what this test is supposed to achieve
+ * (hence unclear how to ensure its achieving it).
+ * Is it just trying to verify that a failure in one topology/job
+ * doesn't affect the execution of another?
+ *
+ * The way the test is written I'm not sure there's any guarantee
+ * that the "Expected Exception" will be generated the appropriate
+ * number of times.
+ * Is it possible the completion condition could get evaluated
+ * true (having seen the 7th tuple) before the 8th tuple is generated
+ * and processed by the sink fn raising the exception, resulting in
+ * the job being closed... before the 8th is generated and processed?
+ * I'm also seeing more "Expected Test Exception" traces than I expected.
+ *
+ * Annotate this and the MPWE test a bit to help understand what we're seeing
+ * in the output.
+ */
int executions = 4;
ExecutorCompletionService<Boolean> completer = new ExecutorCompletionService<>(
Executors.newFixedThreadPool(executions));
@@ -771,15 +801,27 @@
AtomicLong n = new AtomicLong(0);
TStream<Long> s = t.poll(() -> n.incrementAndGet(), 10, TimeUnit.MILLISECONDS);
// Throw on the 8th tuple
- s.sink((tuple) -> { if (8 == n.get()) throw new RuntimeException("Expected Test Exception");});
+ s.sink((tuple) -> { if (8 == n.get()) throw new RuntimeException("MTPWE Expected Test Exception");});
// Expect 7 tuples out of 8
Condition<Long> tc = t.getTester().tupleCount(s, 7);
- complete(t, tc);
+// complete(t, tc);
+ try {
+ complete(t, tc);
+ } catch (Exception e) {
+ System.err.println("MTPWE complete() threw e:"+e);
+ throw e;
+ }
return true;
});
}
- waitForCompletion(completer, executions);
- }
+// waitForCompletion(completer, executions);
+ try {
+ waitForCompletion(completer, executions);
+ } catch (Exception e) {
+ System.err.println("MTPWE waitForCompletion() threw e:"+e);
+ throw e;
+ }
+ }
@Test
public void testJoinWithWindow() throws Exception{
diff --git a/spi/topology/src/main/java/org/apache/edgent/topology/spi/tester/AbstractTester.java b/spi/topology/src/main/java/org/apache/edgent/topology/spi/tester/AbstractTester.java
index 0c10a08..3dee72a 100644
--- a/spi/topology/src/main/java/org/apache/edgent/topology/spi/tester/AbstractTester.java
+++ b/spi/topology/src/main/java/org/apache/edgent/topology/spi/tester/AbstractTester.java
@@ -73,8 +73,14 @@
Thread.sleep(100);
}
+ if (!endCondition.valid() && getJob().getCurrentState() != State.CLOSED) {
+ System.err.println("complete(): timed out after " + tmoMsec + "msec");
+ }
+
if (getJob().getCurrentState() != State.CLOSED)
getJob().stateChange(Job.Action.CLOSE);
+ else
+ System.out.println("complete(): Job already closed");
return endCondition.valid();
}