APEXCORE-732 Handling serialization and other exceptions while recording tuples, preventing the container from failing
diff --git a/engine/src/main/java/com/datatorrent/stram/debug/TupleRecorder.java b/engine/src/main/java/com/datatorrent/stram/debug/TupleRecorder.java
index fc6c2e6..4f6bbfa 100644
--- a/engine/src/main/java/com/datatorrent/stram/debug/TupleRecorder.java
+++ b/engine/src/main/java/com/datatorrent/stram/debug/TupleRecorder.java
@@ -32,6 +32,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
+import org.codehaus.jackson.JsonProcessingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -59,7 +60,7 @@
public class TupleRecorder
{
public static final String VERSION = "1.2";
- private int totalTupleCount = 0;
+ private long totalTupleCount = 0;
private final HashMap<String, PortInfo> portMap = new HashMap<>(); // used for output portInfo <name, id> map
private final HashMap<String, PortCount> portCountMap = new HashMap<>(); // used for tupleCount of each port <name, count> map
private transient long currentWindowId = WindowGenerator.MIN_WINDOW_ID - 1;
@@ -76,6 +77,33 @@
private String recordingNameTopic;
private long numWindows = Long.MAX_VALUE; // number of windows to record
private Runnable stopProcedure; // stop procedure to execute
+
+ private static final Logger logger = LoggerFactory.getLogger(TupleRecorder.class);
+
+ // If there are errors processing tuples, don't log an error for every tuple as it could overwhelm the logs.
+ // The property specifies the minumum number of tuples between two consecutive error log statements. Set it to zero to
+ // log every tuple error
+ private static long ERROR_LOG_GAP;
+ long lastLog = -1;
+
+ static {
+ ERROR_LOG_GAP = 10000L;
+ String property = System.getProperty("org.apache.apex.stram.tupleRecorder.errorLogGap");
+ if (property != null) {
+ try {
+ long value = Long.decode(property);
+ if (value < 0 ) {
+ logger.warn("Log gap should be greater than or equal to 0, setting to default");
+ } else {
+ ERROR_LOG_GAP = value;
+ }
+ } catch (Exception ex) {
+ logger.warn("Unable to parse the log gap property, setting to default", ex);
+ }
+ }
+ logger.debug("Log gap is {}", ERROR_LOG_GAP);
+ }
+
private final FSPartFileCollection storage = new FSPartFileCollection()
{
@Override
@@ -154,7 +182,7 @@
return Collections.unmodifiableMap(portMap);
}
- public int getTotalTupleCount()
+ public long getTotalTupleCount()
{
return totalTupleCount;
}
@@ -392,12 +420,19 @@
public void writeTuple(Object obj, String port)
{
+ ++totalTupleCount;
if (windowIdRanges.isEmpty()) {
throw new RuntimeException("Data tuples received from tuple recorder before any BEGIN_WINDOW");
}
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ Slice f = null;
try {
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- Slice f = streamCodec.toByteArray(obj);
+ f = streamCodec.toByteArray(obj);
+ } catch (RuntimeException ex) {
+ checkLogTuple(ex, "save", obj);
+ return;
+ }
+ try {
PortInfo pi = portMap.get(port);
String str = "T:" + System.currentTimeMillis() + ":" + pi.id + ":" + f.length + ":";
bos.write(str.getBytes());
@@ -410,13 +445,12 @@
storage.writeDataItem(bos.toByteArray(), true);
//logger.debug("Writing tuple for port id {}", pi.id);
//fsOutput.hflush();
- ++totalTupleCount;
if (numSubscribers > 0) {
// this is not asynchronous. we need to fix this
publishTupleData(pi.id, obj);
}
- } catch (IOException ex) {
- logger.error(ex.toString());
+ } catch (Exception ex) {
+ logger.warn("Error saving tuple", ex);
}
}
@@ -463,7 +497,19 @@
wsClient.publish(recordingNameTopic, map);
}
} catch (Exception ex) {
- logger.warn("Error publishing tuple data", ex);
+ if (ex instanceof JsonProcessingException) {
+ checkLogTuple(ex, "publish", obj);
+ } else {
+ logger.warn("Error publishing tuple", ex);
+ }
+ }
+ }
+
+ private void checkLogTuple(Exception ex, String context, Object tuple)
+ {
+ if ((lastLog == -1) || (totalTupleCount - lastLog) >= ERROR_LOG_GAP) {
+ lastLog = totalTupleCount;
+ logger.warn("Error serializing during {} for tuple {} ", context, tuple, ex);
}
}
@@ -519,5 +565,4 @@
}
- private static final Logger logger = LoggerFactory.getLogger(TupleRecorder.class);
}
diff --git a/engine/src/test/java/com/datatorrent/stram/debug/TupleRecorderTest.java b/engine/src/test/java/com/datatorrent/stram/debug/TupleRecorderTest.java
index 9d88bdf..447326f 100644
--- a/engine/src/test/java/com/datatorrent/stram/debug/TupleRecorderTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/debug/TupleRecorderTest.java
@@ -207,7 +207,7 @@
}
private static final File testWorkDir = new File("target", TupleRecorderTest.class.getName());
- private static final int testTupleCount = 10;
+ private static final long testTupleCount = 10;
@Test
public void testRecordingFlow() throws Exception