APEXMALHAR-2404 Provided fixes for kryo seralization & atleast once semantics for recovery.
Added unit test case to verify atleast once semantics for recovery.
diff --git a/contrib/src/main/java/com/datatorrent/contrib/avro/AvroFileInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/avro/AvroFileInputOperator.java
index 01e99d3..f863d41 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/avro/AvroFileInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/avro/AvroFileInputOperator.java
@@ -161,6 +161,7 @@
   @Override
   public void beginWindow(long windowId)
   {
+    super.beginWindow(windowId);
     errorCount = 0;
     recordCount = 0;
   }
diff --git a/contrib/src/main/java/com/datatorrent/contrib/avro/AvroToPojo.java b/contrib/src/main/java/com/datatorrent/contrib/avro/AvroToPojo.java
index 1951c1e..2acf98c 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/avro/AvroToPojo.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/avro/AvroToPojo.java
@@ -72,9 +72,9 @@
 
   private String genericRecordToPOJOFieldsMapping = null;
 
-  private List<FieldInfo> fieldInfos;
+  private transient List<FieldInfo> fieldInfos;
 
-  private List<ActiveFieldInfo> columnFieldSetters;
+  private transient List<ActiveFieldInfo> columnFieldSetters;
 
   @AutoMetric
   @VisibleForTesting
@@ -87,7 +87,7 @@
   @AutoMetric
   @VisibleForTesting
   int fieldErrorCount = 0;
-
+  
   public final transient DefaultOutputPort<GenericRecord> errorPort = new DefaultOutputPort<GenericRecord>();
 
   /**
diff --git a/contrib/src/test/java/com/datatorrent/contrib/avro/AvroFileInputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/avro/AvroFileInputOperatorTest.java
index 09507e6..17b1e2c 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/avro/AvroFileInputOperatorTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/avro/AvroFileInputOperatorTest.java
@@ -53,10 +53,13 @@
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.DAG.Locality;
 import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.Sink;
 import com.datatorrent.api.StreamingApplication;
 import com.datatorrent.lib.helper.TestPortContext;
 import com.datatorrent.lib.io.ConsoleOutputOperator;
+import com.datatorrent.lib.io.fs.AbstractFileInputOperatorTest;
 import com.datatorrent.lib.testbench.CollectorTestSink;
+import com.datatorrent.lib.util.TestUtils;
 
 import static com.datatorrent.lib.helper.OperatorContextTestHelper.mockOperatorContext;
 
@@ -149,7 +152,51 @@
     avroFileInput.teardown();
 
   }
+  
+  @Test
+  public void testIdempotencyWithCheckPoint() throws Exception
+  {
+    AbstractFileInputOperatorTest.testIdempotencyWithCheckPoint(new AvroFileInputOperator(), new CollectorTestSink<String>(), new AbstractFileInputOperatorTest.IdempotencyTestDriver<AvroFileInputOperator>()
+    {
+      @Override
+      public void writeFile(int count, String fileName) throws IOException
+      {
+        recordList = Lists.newArrayList();
+        
+        while (count > 0) {
+          GenericRecord rec = new GenericData.Record(new Schema.Parser().parse(AVRO_SCHEMA));
+          rec.put("orderId", count * 1L);
+          rec.put("customerId", count * 2);
+          rec.put("total", count * 1.5);
+          rec.put("customerName", "*" + count + "*");
+          count--;
+          recordList.add(rec);
+        }
+        
+        writeAvroFile(new File(fileName));
+      }
 
+      @Override
+      public void setSink(AvroFileInputOperator operator, Sink<?> sink)
+      {
+        TestUtils.setSink(operator.output, sink);
+      }
+
+      @Override
+      public String getDirectory()
+      {
+        return testMeta.dir;
+      }
+
+      @Override
+      public OperatorContext getContext()
+      {
+        return testMeta.context;
+      }
+    });
+  }
+  
+  
   @Test
   public void testMultipleFileAvroReads() throws Exception
   {
diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java
index 8acd16a..1e69a89 100644
--- a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java
@@ -20,6 +20,7 @@
 
 import java.io.ByteArrayOutputStream;
 import java.io.File;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashSet;
@@ -56,7 +57,9 @@
 import com.datatorrent.api.Attribute;
 import com.datatorrent.api.Context;
 import com.datatorrent.api.DefaultPartition;
+import com.datatorrent.api.Operator;
 import com.datatorrent.api.Partitioner.Partition;
+import com.datatorrent.api.Sink;
 import com.datatorrent.api.StatsListener;
 import com.datatorrent.lib.io.fs.AbstractFileInputOperator.DirectoryScanner;
 import com.datatorrent.lib.partitioner.StatelessPartitionerTest.PartitioningContextImpl;
@@ -907,38 +910,72 @@
   @Test
   public void testIdempotencyWithCheckPoint() throws Exception
   {
-    FileContext.getLocalFSFileContext().delete(new Path(new File(testMeta.dir).getAbsolutePath()), true);
+    testIdempotencyWithCheckPoint(new LineByLineFileInputOperator(), new CollectorTestSink<String>(), new IdempotencyTestDriver<LineByLineFileInputOperator>()
+    {
+      @Override
+      public void writeFile(int count, String fileName) throws IOException
+      {
+        List<String> lines = Lists.newArrayList();
+        for (int line = 0; line < count; line++) {
+          lines.add(fileName + "l" + line);
+        }
+        FileUtils.write(new File(testMeta.dir, fileName), StringUtils.join(lines, '\n'));
+      }
 
-    List<String> lines = Lists.newArrayList();
+      @Override
+      public void setSink(LineByLineFileInputOperator operator, Sink<?> sink)
+      {
+        TestUtils.setSink(operator.output, sink);
+      }
+
+      @Override
+      public String getDirectory()
+      {
+        return testMeta.dir;
+      }
+
+      @Override
+      public Context.OperatorContext getContext()
+      {
+        return testMeta.context;
+      }
+    });
+  }
+
+  public interface IdempotencyTestDriver<T extends Operator>
+  {
+    void writeFile(int count, String fileName) throws IOException;
+
+    void setSink(T operator, Sink<?> sink);
+
+    String  getDirectory();
+
+    Context.OperatorContext getContext();
+  }
+
+  public static <S extends AbstractFileInputOperator, T> void testIdempotencyWithCheckPoint(S oper, CollectorTestSink<T> queryResults, IdempotencyTestDriver<S> driver) throws Exception
+  {
+    FileContext.getLocalFSFileContext().delete(new Path(new File(driver.getDirectory()).getAbsolutePath()), true);
+
     int file = 0;
-    for (int line = 0; line < 5; line++) {
-      lines.add("f" + file + "l" + line);
-    }
-    FileUtils.write(new File(testMeta.dir, "file" + file), StringUtils.join(lines, '\n'));
+    driver.writeFile(5, "file" + file);
 
     file = 1;
-    lines = Lists.newArrayList();
-    for (int line = 0; line < 6; line++) {
-      lines.add("f" + file + "l" + line);
-    }
-    FileUtils.write(new File(testMeta.dir, "file" + file), StringUtils.join(lines, '\n'));
+    driver.writeFile(6, "file" + file);
 
     // empty file
     file = 2;
-    lines = Lists.newArrayList();
-    FileUtils.write(new File(testMeta.dir, "file" + file), StringUtils.join(lines, '\n'));
+    driver.writeFile(0, "file" + file);
 
-
-    LineByLineFileInputOperator oper = new LineByLineFileInputOperator();
     FSWindowDataManager manager = new FSWindowDataManager();
-    manager.setStatePath(testMeta.dir + "/recovery");
+    manager.setStatePath(driver.getDirectory() + "/recovery");
 
     oper.setWindowDataManager(manager);
 
-    oper.setDirectory(testMeta.dir);
+    oper.setDirectory(driver.getDirectory());
     oper.getScanner().setFilePatternRegexp(".*file[\\d]");
 
-    oper.setup(testMeta.context);
+    oper.setup(driver.getContext());
 
     oper.setEmitBatchSize(3);
 
@@ -958,41 +995,40 @@
 
     //checkpoint the operator
     ByteArrayOutputStream bos = new ByteArrayOutputStream();
-    LineByLineFileInputOperator checkPointOper = checkpoint(oper, bos);
+    S checkPointOper = checkpoint(oper, bos);
 
     // start saving output
-    CollectorTestSink<String> queryResults = new CollectorTestSink<String>();
-    TestUtils.setSink(oper.output, queryResults);
+    driver.setSink(oper, queryResults);
 
     // emit f0l3, f0l4, and closeFile(f0) in the same window
     oper.beginWindow(2);
     oper.emitTuples();
     oper.endWindow();
-    List<String> beforeRecovery2 = Lists.newArrayList(queryResults.collectedTuples);
+    List<T> beforeRecovery2 = Lists.newArrayList(queryResults.collectedTuples);
 
     // emit f1l0, f1l1, f1l2
     oper.beginWindow(3);
     oper.emitTuples();
     oper.endWindow();
-    List<String> beforeRecovery3 = Lists.newArrayList(queryResults.collectedTuples);
+    List<T> beforeRecovery3 = Lists.newArrayList(queryResults.collectedTuples);
 
     // emit f1l3, f1l4, f1l5
     oper.beginWindow(4);
     oper.emitTuples();
     oper.endWindow();
-    List<String> beforeRecovery4 = Lists.newArrayList(queryResults.collectedTuples);
+    List<T> beforeRecovery4 = Lists.newArrayList(queryResults.collectedTuples);
 
     // closeFile(f1) in a new window
     oper.beginWindow(5);
     oper.emitTuples();
     oper.endWindow();
-    List<String> beforeRecovery5 = Lists.newArrayList(queryResults.collectedTuples);
+    List<T> beforeRecovery5 = Lists.newArrayList(queryResults.collectedTuples);
 
     // empty file ops, closeFile(f2) in emitTuples() only
     oper.beginWindow(6);
     oper.emitTuples();
     oper.endWindow();
-    List<String> beforeRecovery6 = Lists.newArrayList(queryResults.collectedTuples);
+    List<T> beforeRecovery6 = Lists.newArrayList(queryResults.collectedTuples);
 
     oper.teardown();
 
@@ -1001,11 +1037,11 @@
     //idempotency  part
 
     oper = restoreCheckPoint(checkPointOper, bos);
-    testMeta.context.getAttributes().put(Context.OperatorContext.ACTIVATION_WINDOW_ID, 1L);
-    oper.setup(testMeta.context);
-    TestUtils.setSink(oper.output, queryResults);
+    driver.getContext().getAttributes().put(Context.OperatorContext.ACTIVATION_WINDOW_ID, 1L);
+    oper.setup(driver.getContext());
+    driver.setSink(oper, queryResults);
 
-    long startwid = testMeta.context.getAttributes().get(Context.OperatorContext.ACTIVATION_WINDOW_ID) + 1;
+    long startwid = driver.getContext().getAttributes().get(Context.OperatorContext.ACTIVATION_WINDOW_ID) + 1;
 
     oper.beginWindow(startwid);
     Assert.assertTrue(oper.currentFile == null);
@@ -1046,7 +1082,7 @@
    * @param bos The ByteArrayOutputStream which saves the checkpoint data temporarily.
    * @return new operator.
    */
-  public static LineByLineFileInputOperator checkpoint(LineByLineFileInputOperator oper, ByteArrayOutputStream bos) throws Exception
+  public static <T> T checkpoint(T oper, ByteArrayOutputStream bos) throws Exception
   {
     Kryo kryo = new Kryo();
 
@@ -1056,7 +1092,7 @@
 
     Input lInput = new Input(bos.toByteArray());
     @SuppressWarnings("unchecked")
-    LineByLineFileInputOperator checkPointedOper = kryo.readObject(lInput, oper.getClass());
+    T checkPointedOper = kryo.readObject(lInput, (Class<T>)oper.getClass());
     lInput.close();
 
     return checkPointedOper;
@@ -1068,12 +1104,12 @@
    * @param bos The ByteArrayOutputStream which saves the checkpoint data temporarily.
    */
   @SuppressWarnings({"unchecked", "rawtypes"})
-  public static LineByLineFileInputOperator restoreCheckPoint(LineByLineFileInputOperator checkPointOper, ByteArrayOutputStream bos) throws Exception
+  public static <T> T restoreCheckPoint(T checkPointOper, ByteArrayOutputStream bos) throws Exception
   {
     Kryo kryo = new Kryo();
 
     Input lInput = new Input(bos.toByteArray());
-    LineByLineFileInputOperator oper = kryo.readObject(lInput, checkPointOper.getClass());
+    T oper = kryo.readObject(lInput, (Class<T>)checkPointOper.getClass());
     lInput.close();
 
     return oper;