APEXMALHAR-2404 Provided fixes for kryo seralization & atleast once semantics for recovery.
Added unit test case to verify atleast once semantics for recovery.
diff --git a/contrib/src/main/java/com/datatorrent/contrib/avro/AvroFileInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/avro/AvroFileInputOperator.java
index 01e99d3..f863d41 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/avro/AvroFileInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/avro/AvroFileInputOperator.java
@@ -161,6 +161,7 @@
@Override
public void beginWindow(long windowId)
{
+ super.beginWindow(windowId);
errorCount = 0;
recordCount = 0;
}
diff --git a/contrib/src/main/java/com/datatorrent/contrib/avro/AvroToPojo.java b/contrib/src/main/java/com/datatorrent/contrib/avro/AvroToPojo.java
index 1951c1e..2acf98c 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/avro/AvroToPojo.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/avro/AvroToPojo.java
@@ -72,9 +72,9 @@
private String genericRecordToPOJOFieldsMapping = null;
- private List<FieldInfo> fieldInfos;
+ private transient List<FieldInfo> fieldInfos;
- private List<ActiveFieldInfo> columnFieldSetters;
+ private transient List<ActiveFieldInfo> columnFieldSetters;
@AutoMetric
@VisibleForTesting
@@ -87,7 +87,7 @@
@AutoMetric
@VisibleForTesting
int fieldErrorCount = 0;
-
+
public final transient DefaultOutputPort<GenericRecord> errorPort = new DefaultOutputPort<GenericRecord>();
/**
diff --git a/contrib/src/test/java/com/datatorrent/contrib/avro/AvroFileInputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/avro/AvroFileInputOperatorTest.java
index 09507e6..17b1e2c 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/avro/AvroFileInputOperatorTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/avro/AvroFileInputOperatorTest.java
@@ -53,10 +53,13 @@
import com.datatorrent.api.DAG;
import com.datatorrent.api.DAG.Locality;
import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.Sink;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.lib.helper.TestPortContext;
import com.datatorrent.lib.io.ConsoleOutputOperator;
+import com.datatorrent.lib.io.fs.AbstractFileInputOperatorTest;
import com.datatorrent.lib.testbench.CollectorTestSink;
+import com.datatorrent.lib.util.TestUtils;
import static com.datatorrent.lib.helper.OperatorContextTestHelper.mockOperatorContext;
@@ -149,7 +152,51 @@
avroFileInput.teardown();
}
+
+ @Test
+ public void testIdempotencyWithCheckPoint() throws Exception
+ {
+ AbstractFileInputOperatorTest.testIdempotencyWithCheckPoint(new AvroFileInputOperator(), new CollectorTestSink<String>(), new AbstractFileInputOperatorTest.IdempotencyTestDriver<AvroFileInputOperator>()
+ {
+ @Override
+ public void writeFile(int count, String fileName) throws IOException
+ {
+ recordList = Lists.newArrayList();
+
+ while (count > 0) {
+ GenericRecord rec = new GenericData.Record(new Schema.Parser().parse(AVRO_SCHEMA));
+ rec.put("orderId", count * 1L);
+ rec.put("customerId", count * 2);
+ rec.put("total", count * 1.5);
+ rec.put("customerName", "*" + count + "*");
+ count--;
+ recordList.add(rec);
+ }
+
+ writeAvroFile(new File(fileName));
+ }
+ @Override
+ public void setSink(AvroFileInputOperator operator, Sink<?> sink)
+ {
+ TestUtils.setSink(operator.output, sink);
+ }
+
+ @Override
+ public String getDirectory()
+ {
+ return testMeta.dir;
+ }
+
+ @Override
+ public OperatorContext getContext()
+ {
+ return testMeta.context;
+ }
+ });
+ }
+
+
@Test
public void testMultipleFileAvroReads() throws Exception
{
diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java
index 8acd16a..1e69a89 100644
--- a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java
@@ -20,6 +20,7 @@
import java.io.ByteArrayOutputStream;
import java.io.File;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
@@ -56,7 +57,9 @@
import com.datatorrent.api.Attribute;
import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultPartition;
+import com.datatorrent.api.Operator;
import com.datatorrent.api.Partitioner.Partition;
+import com.datatorrent.api.Sink;
import com.datatorrent.api.StatsListener;
import com.datatorrent.lib.io.fs.AbstractFileInputOperator.DirectoryScanner;
import com.datatorrent.lib.partitioner.StatelessPartitionerTest.PartitioningContextImpl;
@@ -907,38 +910,72 @@
@Test
public void testIdempotencyWithCheckPoint() throws Exception
{
- FileContext.getLocalFSFileContext().delete(new Path(new File(testMeta.dir).getAbsolutePath()), true);
+ testIdempotencyWithCheckPoint(new LineByLineFileInputOperator(), new CollectorTestSink<String>(), new IdempotencyTestDriver<LineByLineFileInputOperator>()
+ {
+ @Override
+ public void writeFile(int count, String fileName) throws IOException
+ {
+ List<String> lines = Lists.newArrayList();
+ for (int line = 0; line < count; line++) {
+ lines.add(fileName + "l" + line);
+ }
+ FileUtils.write(new File(testMeta.dir, fileName), StringUtils.join(lines, '\n'));
+ }
- List<String> lines = Lists.newArrayList();
+ @Override
+ public void setSink(LineByLineFileInputOperator operator, Sink<?> sink)
+ {
+ TestUtils.setSink(operator.output, sink);
+ }
+
+ @Override
+ public String getDirectory()
+ {
+ return testMeta.dir;
+ }
+
+ @Override
+ public Context.OperatorContext getContext()
+ {
+ return testMeta.context;
+ }
+ });
+ }
+
+ public interface IdempotencyTestDriver<T extends Operator>
+ {
+ void writeFile(int count, String fileName) throws IOException;
+
+ void setSink(T operator, Sink<?> sink);
+
+ String getDirectory();
+
+ Context.OperatorContext getContext();
+ }
+
+ public static <S extends AbstractFileInputOperator, T> void testIdempotencyWithCheckPoint(S oper, CollectorTestSink<T> queryResults, IdempotencyTestDriver<S> driver) throws Exception
+ {
+ FileContext.getLocalFSFileContext().delete(new Path(new File(driver.getDirectory()).getAbsolutePath()), true);
+
int file = 0;
- for (int line = 0; line < 5; line++) {
- lines.add("f" + file + "l" + line);
- }
- FileUtils.write(new File(testMeta.dir, "file" + file), StringUtils.join(lines, '\n'));
+ driver.writeFile(5, "file" + file);
file = 1;
- lines = Lists.newArrayList();
- for (int line = 0; line < 6; line++) {
- lines.add("f" + file + "l" + line);
- }
- FileUtils.write(new File(testMeta.dir, "file" + file), StringUtils.join(lines, '\n'));
+ driver.writeFile(6, "file" + file);
// empty file
file = 2;
- lines = Lists.newArrayList();
- FileUtils.write(new File(testMeta.dir, "file" + file), StringUtils.join(lines, '\n'));
+ driver.writeFile(0, "file" + file);
-
- LineByLineFileInputOperator oper = new LineByLineFileInputOperator();
FSWindowDataManager manager = new FSWindowDataManager();
- manager.setStatePath(testMeta.dir + "/recovery");
+ manager.setStatePath(driver.getDirectory() + "/recovery");
oper.setWindowDataManager(manager);
- oper.setDirectory(testMeta.dir);
+ oper.setDirectory(driver.getDirectory());
oper.getScanner().setFilePatternRegexp(".*file[\\d]");
- oper.setup(testMeta.context);
+ oper.setup(driver.getContext());
oper.setEmitBatchSize(3);
@@ -958,41 +995,40 @@
//checkpoint the operator
ByteArrayOutputStream bos = new ByteArrayOutputStream();
- LineByLineFileInputOperator checkPointOper = checkpoint(oper, bos);
+ S checkPointOper = checkpoint(oper, bos);
// start saving output
- CollectorTestSink<String> queryResults = new CollectorTestSink<String>();
- TestUtils.setSink(oper.output, queryResults);
+ driver.setSink(oper, queryResults);
// emit f0l3, f0l4, and closeFile(f0) in the same window
oper.beginWindow(2);
oper.emitTuples();
oper.endWindow();
- List<String> beforeRecovery2 = Lists.newArrayList(queryResults.collectedTuples);
+ List<T> beforeRecovery2 = Lists.newArrayList(queryResults.collectedTuples);
// emit f1l0, f1l1, f1l2
oper.beginWindow(3);
oper.emitTuples();
oper.endWindow();
- List<String> beforeRecovery3 = Lists.newArrayList(queryResults.collectedTuples);
+ List<T> beforeRecovery3 = Lists.newArrayList(queryResults.collectedTuples);
// emit f1l3, f1l4, f1l5
oper.beginWindow(4);
oper.emitTuples();
oper.endWindow();
- List<String> beforeRecovery4 = Lists.newArrayList(queryResults.collectedTuples);
+ List<T> beforeRecovery4 = Lists.newArrayList(queryResults.collectedTuples);
// closeFile(f1) in a new window
oper.beginWindow(5);
oper.emitTuples();
oper.endWindow();
- List<String> beforeRecovery5 = Lists.newArrayList(queryResults.collectedTuples);
+ List<T> beforeRecovery5 = Lists.newArrayList(queryResults.collectedTuples);
// empty file ops, closeFile(f2) in emitTuples() only
oper.beginWindow(6);
oper.emitTuples();
oper.endWindow();
- List<String> beforeRecovery6 = Lists.newArrayList(queryResults.collectedTuples);
+ List<T> beforeRecovery6 = Lists.newArrayList(queryResults.collectedTuples);
oper.teardown();
@@ -1001,11 +1037,11 @@
//idempotency part
oper = restoreCheckPoint(checkPointOper, bos);
- testMeta.context.getAttributes().put(Context.OperatorContext.ACTIVATION_WINDOW_ID, 1L);
- oper.setup(testMeta.context);
- TestUtils.setSink(oper.output, queryResults);
+ driver.getContext().getAttributes().put(Context.OperatorContext.ACTIVATION_WINDOW_ID, 1L);
+ oper.setup(driver.getContext());
+ driver.setSink(oper, queryResults);
- long startwid = testMeta.context.getAttributes().get(Context.OperatorContext.ACTIVATION_WINDOW_ID) + 1;
+ long startwid = driver.getContext().getAttributes().get(Context.OperatorContext.ACTIVATION_WINDOW_ID) + 1;
oper.beginWindow(startwid);
Assert.assertTrue(oper.currentFile == null);
@@ -1046,7 +1082,7 @@
* @param bos The ByteArrayOutputStream which saves the checkpoint data temporarily.
* @return new operator.
*/
- public static LineByLineFileInputOperator checkpoint(LineByLineFileInputOperator oper, ByteArrayOutputStream bos) throws Exception
+ public static <T> T checkpoint(T oper, ByteArrayOutputStream bos) throws Exception
{
Kryo kryo = new Kryo();
@@ -1056,7 +1092,7 @@
Input lInput = new Input(bos.toByteArray());
@SuppressWarnings("unchecked")
- LineByLineFileInputOperator checkPointedOper = kryo.readObject(lInput, oper.getClass());
+ T checkPointedOper = kryo.readObject(lInput, (Class<T>)oper.getClass());
lInput.close();
return checkPointedOper;
@@ -1068,12 +1104,12 @@
* @param bos The ByteArrayOutputStream which saves the checkpoint data temporarily.
*/
@SuppressWarnings({"unchecked", "rawtypes"})
- public static LineByLineFileInputOperator restoreCheckPoint(LineByLineFileInputOperator checkPointOper, ByteArrayOutputStream bos) throws Exception
+ public static <T> T restoreCheckPoint(T checkPointOper, ByteArrayOutputStream bos) throws Exception
{
Kryo kryo = new Kryo();
Input lInput = new Input(bos.toByteArray());
- LineByLineFileInputOperator oper = kryo.readObject(lInput, checkPointOper.getClass());
+ T oper = kryo.readObject(lInput, (Class<T>)checkPointOper.getClass());
lInput.close();
return oper;