APEXCORE-732 Handling serialization and other exceptions while recording tuples, preventing the container from failing
diff --git a/engine/src/main/java/com/datatorrent/stram/debug/TupleRecorder.java b/engine/src/main/java/com/datatorrent/stram/debug/TupleRecorder.java
index fc6c2e6..4f6bbfa 100644
--- a/engine/src/main/java/com/datatorrent/stram/debug/TupleRecorder.java
+++ b/engine/src/main/java/com/datatorrent/stram/debug/TupleRecorder.java
@@ -32,6 +32,7 @@
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
 
+import org.codehaus.jackson.JsonProcessingException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -59,7 +60,7 @@
 public class TupleRecorder
 {
   public static final String VERSION = "1.2";
-  private int totalTupleCount = 0;
+  private long totalTupleCount = 0;
   private final HashMap<String, PortInfo> portMap = new HashMap<>(); // used for output portInfo <name, id> map
   private final HashMap<String, PortCount> portCountMap = new HashMap<>(); // used for tupleCount of each port <name, count> map
   private transient long currentWindowId = WindowGenerator.MIN_WINDOW_ID - 1;
@@ -76,6 +77,33 @@
   private String recordingNameTopic;
   private long numWindows = Long.MAX_VALUE; // number of windows to record
   private Runnable stopProcedure; // stop procedure to execute
+
+  private static final Logger logger = LoggerFactory.getLogger(TupleRecorder.class);
+
+  // If there are errors processing tuples, don't log an error for every tuple as it could overwhelm the logs.
+  // The property specifies the minumum number of tuples between two consecutive error log statements. Set it to zero to
+  // log every tuple error
+  private static long ERROR_LOG_GAP;
+  long lastLog = -1;
+
+  static {
+    ERROR_LOG_GAP = 10000L;
+    String property = System.getProperty("org.apache.apex.stram.tupleRecorder.errorLogGap");
+    if (property != null) {
+      try {
+        long value = Long.decode(property);
+        if (value < 0 ) {
+          logger.warn("Log gap should be greater than or equal to 0, setting to default");
+        } else {
+          ERROR_LOG_GAP = value;
+        }
+      } catch (Exception ex) {
+        logger.warn("Unable to parse the log gap property, setting to default", ex);
+      }
+    }
+    logger.debug("Log gap is {}", ERROR_LOG_GAP);
+  }
+
   private final FSPartFileCollection storage = new FSPartFileCollection()
   {
     @Override
@@ -154,7 +182,7 @@
     return Collections.unmodifiableMap(portMap);
   }
 
-  public int getTotalTupleCount()
+  public long getTotalTupleCount()
   {
     return totalTupleCount;
   }
@@ -392,12 +420,19 @@
 
   public void writeTuple(Object obj, String port)
   {
+    ++totalTupleCount;
     if (windowIdRanges.isEmpty()) {
       throw new RuntimeException("Data tuples received from tuple recorder before any BEGIN_WINDOW");
     }
+    ByteArrayOutputStream bos = new ByteArrayOutputStream();
+    Slice f = null;
     try {
-      ByteArrayOutputStream bos = new ByteArrayOutputStream();
-      Slice f = streamCodec.toByteArray(obj);
+      f = streamCodec.toByteArray(obj);
+    } catch (RuntimeException ex) {
+      checkLogTuple(ex, "save", obj);
+      return;
+    }
+    try {
       PortInfo pi = portMap.get(port);
       String str = "T:" + System.currentTimeMillis() + ":" + pi.id + ":" + f.length + ":";
       bos.write(str.getBytes());
@@ -410,13 +445,12 @@
       storage.writeDataItem(bos.toByteArray(), true);
       //logger.debug("Writing tuple for port id {}", pi.id);
       //fsOutput.hflush();
-      ++totalTupleCount;
       if (numSubscribers > 0) {
         // this is not asynchronous.  we need to fix this
         publishTupleData(pi.id, obj);
       }
-    } catch (IOException ex) {
-      logger.error(ex.toString());
+    } catch (Exception ex) {
+      logger.warn("Error saving tuple", ex);
     }
   }
 
@@ -463,7 +497,19 @@
         wsClient.publish(recordingNameTopic, map);
       }
     } catch (Exception ex) {
-      logger.warn("Error publishing tuple data", ex);
+      if (ex instanceof JsonProcessingException) {
+        checkLogTuple(ex, "publish", obj);
+      } else {
+        logger.warn("Error publishing tuple", ex);
+      }
+    }
+  }
+
+  private void checkLogTuple(Exception ex, String context, Object tuple)
+  {
+    if ((lastLog == -1) || (totalTupleCount - lastLog) >= ERROR_LOG_GAP) {
+      lastLog = totalTupleCount;
+      logger.warn("Error serializing during {} for tuple {} ", context, tuple, ex);
     }
   }
 
@@ -519,5 +565,4 @@
 
   }
 
-  private static final Logger logger = LoggerFactory.getLogger(TupleRecorder.class);
 }
diff --git a/engine/src/test/java/com/datatorrent/stram/debug/TupleRecorderTest.java b/engine/src/test/java/com/datatorrent/stram/debug/TupleRecorderTest.java
index 9d88bdf..447326f 100644
--- a/engine/src/test/java/com/datatorrent/stram/debug/TupleRecorderTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/debug/TupleRecorderTest.java
@@ -207,7 +207,7 @@
   }
 
   private static final File testWorkDir = new File("target", TupleRecorderTest.class.getName());
-  private static final int testTupleCount = 10;
+  private static final long testTupleCount = 10;
 
   @Test
   public void testRecordingFlow() throws Exception