blob: f2482669c7387754c1f03d323e9f0119b8be34aa [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.apex.malhar.lib.fs;
import java.io.File;
import java.io.IOException;
import java.util.Collection;
import org.junit.Assert;
import org.junit.Test;
import org.apache.apex.malhar.lib.fs.GenericFileOutputOperator.BytesFileOutputOperator;
import org.apache.apex.malhar.lib.fs.GenericFileOutputOperator.StringFileOutputOperator;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import com.datatorrent.api.DAG;
import com.datatorrent.api.LocalMode;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.lib.io.fs.AbstractFileOutputOperatorTest;
import com.datatorrent.netlet.util.DTThrowable;
public class GenericFileOutputOperatorTest extends AbstractFileOutputOperatorTest
{
/**
* Test file rollover in case of idle windows
*
* @throws IOException
*/
@Test
public void testIdleWindowsFinalize() throws IOException
{
StringFileOutputOperator writer = new StringFileOutputOperator();
writer.setOutputFileName("output.txt");
writer.setFilePath(testMeta.getDir());
writer.setAlwaysWriteToTmp(true);
writer.setMaxIdleWindows(5);
writer.setup(testMeta.testOperatorContext);
String[][] tuples = {{"0a", "0b" }, {"1a", "1b" }, {}, {}, {}, {}, {"6a", "6b" }, {"7a", "7b" }, {}, {}, {},
{}, {}, {"13a", "13b" }, {"14a", "14b" }, {}, {}, {}, {"18a", "18b" }, {"19a", "19b" }, {}, {}, {}, {}, {},
{}, {"26a", "26b"} };
for (int i = 0; i <= 12; i++) {
writer.beginWindow(i);
for (String t : tuples[i]) {
writer.input.put(t);
}
writer.endWindow();
}
checkpoint(writer, 10);
writer.committed(10);
for (int i = 13; i <= 26; i++) {
writer.beginWindow(i);
for (String t : tuples[i]) {
writer.input.put(t);
}
writer.endWindow();
}
checkpoint(writer, 20);
writer.committed(20);
checkpoint(writer, 26);
writer.committed(26);
String[] expected = {"0a\n0b\n1a\n1b\n6a\n6b\n7a\n7b\n", "13a\n13b\n14a\n14b\n18a\n18b\n19a\n19b\n",
"26a\n26b\n" };
for (int i = 0; i < expected.length; i++) {
checkOutput(i, testMeta.getDir() + "/output.txt_0", expected[i], true);
}
}
/**
* Test file rollover for tuple count
*
* @throws IOException
*/
@Test
public void testTupleCountFinalize() throws IOException
{
BytesFileOutputOperator writer = new BytesFileOutputOperator();
writer.setOutputFileName("output.txt");
writer.setFilePath(testMeta.getDir());
writer.setAlwaysWriteToTmp(true);
writer.setMaxTupleCount(10);
writer.setup(testMeta.testOperatorContext);
String[][] tuples = {{"0a", "0b" }, {"1a", "1b" }, {}, {"3a", "3b" }, {"4a", "4b" }, {}, {"6a", "6b" },
{"7a", "7b" }, {}, {}, {"9a" }, {"10a", "10b" }, {}, {"12a" }, {"13a", "13b"}, {"14a", "14b" }, {}, {},
{}, {"18a", "18b" }, {"19a", "19b" }, {"20a" }, {"21a" }, {"22a" }};
for (int i = 0; i < tuples.length; i++) {
writer.beginWindow(i);
for (String t : tuples[i]) {
writer.input.put(t.getBytes());
}
writer.endWindow();
if (i % 10 == 0) {
checkpoint(writer, 10);
writer.committed(10);
}
checkpoint(writer, 24);
}
writer.committed(tuples.length);
String[] expected = {"0a\n0b\n1a\n1b\n3a\n3b\n4a\n4b\n6a\n6b\n", "7a\n7b\n9a\n10a\n10b\n12a\n13a\n13b\n14a\n14b\n",
"18a\n18b\n19a\n19b\n20a\n21a\n22a\n" };
for (int i = 0; i < expected.length; i++) {
checkOutput(i, testMeta.getDir() + "/output.txt_0", expected[i], true);
}
}
public static class TestApplication implements StreamingApplication
{
@Override
public void populateDAG(DAG dag, Configuration conf)
{
LineByLineFileInputOperator input = dag.addOperator("input", new LineByLineFileInputOperator());
StringFileOutputOperator output = dag.addOperator("output", new StringFileOutputOperator());
dag.addStream("data", input.output, output.input);
}
}
@Test
public void runTestApplication() throws Exception
{
FileUtils.write(new File(testMeta.getDir(), "input.txt"), "a\nb\nc\nd\n");
Configuration conf = new Configuration(false);
conf.set("dt.operator.input.prop.directory", testMeta.getDir() + "/input.txt");
conf.set("dt.operator.output.prop.filePath", testMeta.getDir());
conf.set("dt.operator.output.prop.outputFileName", "output.txt");
conf.set("dt.operator.output.prop.tupleSeparator", "-");
conf.set("dt.operator.output.prop.maxIdleWindows", "2");
conf.set("dt.attr.CHECKPOINT_WINDOW_COUNT", "2");
LocalMode lma = LocalMode.newInstance();
lma.prepareDAG(new TestApplication(), conf);
LocalMode.Controller lc = lma.getController();
lc.runAsync();
File outputFile = new File(testMeta.getDir(), "output.txt_2.0");
final int MAX = 60;
for (int i = 0; i < MAX && (!outputFile.exists()); ++i) {
Thread.sleep(1000);
}
if (!outputFile.exists()) {
String msg = String.format("Error: output file not found after %d seconds%n", MAX);
throw new RuntimeException(msg);
}
String output = FileUtils.readFileToString(outputFile);
Assert.assertEquals("a-b-c-d-", output);
}
@Test
public void testRotationWithNoData() throws InterruptedException
{
GenericFileOutputOperator writer = new GenericFileOutputOperator();
File dir = new File(testMeta.getDir());
writer.setFilePath(testMeta.getDir());
writer.setOutputFileName("output.txt");
writer.setMaxIdleWindows(5);
writer.setAlwaysWriteToTmp(true);
writer.setup(testMeta.testOperatorContext);
for (int i = 0; i < 30; ++i) {
writer.beginWindow(i);
writer.endWindow();
}
writer.committed(29);
Collection<File> files = FileUtils.listFiles(dir, null, false);
Assert.assertEquals("Number of part files", 0, files.size());
}
public static void checkOutput(int fileCount, String baseFilePath, String expectedOutput, boolean checkTmp)
{
if (fileCount >= 0) {
baseFilePath += "." + fileCount;
}
File file = new File(baseFilePath);
if (!file.exists()) {
String[] extensions = {"tmp"};
Collection<File> tmpFiles = FileUtils.listFiles(file.getParentFile(), extensions, false);
for (File tmpFile : tmpFiles) {
if (file.getPath().startsWith(baseFilePath)) {
file = tmpFile;
break;
}
}
}
String fileContents = null;
try {
fileContents = FileUtils.readFileToString(file);
} catch (IOException ex) {
DTThrowable.rethrow(ex);
}
Assert.assertEquals("Single file " + fileCount + " output contents", expectedOutput, fileContents);
}
}