APEXMALHAR-2476-Fix-tupleSeperator-override.
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/fs/GenericFileOutputOperator.java b/library/src/main/java/org/apache/apex/malhar/lib/fs/GenericFileOutputOperator.java
index 2ff405d..111ea7b 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/fs/GenericFileOutputOperator.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/fs/GenericFileOutputOperator.java
@@ -29,6 +29,7 @@
import org.slf4j.LoggerFactory;
import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.StreamCodec;
import com.datatorrent.lib.converter.Converter;
import com.datatorrent.lib.io.fs.AbstractSingleFileOutputOperator;
@@ -122,6 +123,13 @@
setRotationWindows(DEFAULT_ROTATION_WINDOWS);
}
+ @Override
+ public void setup(OperatorContext context)
+ {
+ super.setup(context);
+ this.tupleSeparatorBytes = tupleSeparator.getBytes();
+ }
+
/**
* {@inheritDoc}
*
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/fs/GenericFileOutputOperatorTest.java b/library/src/test/java/org/apache/apex/malhar/lib/fs/GenericFileOutputOperatorTest.java
index f0d2915..f248266 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/fs/GenericFileOutputOperatorTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/fs/GenericFileOutputOperatorTest.java
@@ -29,7 +29,11 @@
import org.apache.apex.malhar.lib.fs.GenericFileOutputOperator.BytesFileOutputOperator;
import org.apache.apex.malhar.lib.fs.GenericFileOutputOperator.StringFileOutputOperator;
import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
import com.datatorrent.lib.io.fs.AbstractFileOutputOperatorTest;
import com.datatorrent.netlet.util.DTThrowable;
@@ -127,6 +131,50 @@
}
}
+ public static class TestApplication implements StreamingApplication
+ {
+
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ LineByLineFileInputOperator input = dag.addOperator("input", new LineByLineFileInputOperator());
+ StringFileOutputOperator output = dag.addOperator("output", new StringFileOutputOperator());
+ dag.addStream("data", input.output, output.input);
+ }
+ }
+
+ @Test
+ public void runTestApplication() throws Exception
+ {
+ FileUtils.write(new File(testMeta.getDir(), "input.txt"), "a\nb\nc\nd\n");
+
+ Configuration conf = new Configuration(false);
+ conf.set("dt.operator.input.prop.directory", testMeta.getDir() + "/input.txt");
+ conf.set("dt.operator.output.prop.filePath", testMeta.getDir());
+ conf.set("dt.operator.output.prop.outputFileName", "output.txt");
+ conf.set("dt.operator.output.prop.tupleSeparator", "-");
+ conf.set("dt.operator.output.prop.maxIdleWindows", "2");
+ conf.set("dt.attr.CHECKPOINT_WINDOW_COUNT", "2");
+
+ LocalMode lma = LocalMode.newInstance();
+ lma.prepareDAG(new TestApplication(), conf);
+ LocalMode.Controller lc = lma.getController();
+ lc.runAsync();
+
+ File outputFile = new File(testMeta.getDir(), "output.txt_2.0");
+ final int MAX = 60;
+ for (int i = 0; i < MAX && (!outputFile.exists()); ++i) {
+ Thread.sleep(1000);
+ }
+ if (!outputFile.exists()) {
+ String msg = String.format("Error: output file not found after %d seconds%n", MAX);
+ throw new RuntimeException(msg);
+ }
+
+ String output = FileUtils.readFileToString(outputFile);
+ Assert.assertEquals("a-b-c-d-", output);
+ }
+
@Test
public void testRotationWithNoData() throws InterruptedException
{