blob: 576e677d757ce9221f87aec792331de78afe23c1 [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
*/
package org.apache.edgent.test.connectors.file;
import static org.apache.edgent.test.connectors.common.FileUtil.createTempFile;
//import static org.junit.Assume.assumeFalse;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.BufferedInputStream;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
import org.apache.edgent.connectors.file.CompressedFileWriterPolicy;
import org.apache.edgent.connectors.file.FileStreams;
import org.apache.edgent.connectors.file.FileWriterCycleConfig;
import org.apache.edgent.connectors.file.FileWriterFlushConfig;
import org.apache.edgent.connectors.file.FileWriterPolicy;
import org.apache.edgent.connectors.file.FileWriterRetentionConfig;
import org.apache.edgent.connectors.file.runtime.IFileWriterPolicy;
import org.apache.edgent.function.Predicate;
import org.apache.edgent.test.providers.direct.DirectTopologyTestBase;
import org.apache.edgent.topology.TSink;
import org.apache.edgent.topology.TStream;
import org.apache.edgent.topology.Topology;
import org.apache.edgent.topology.plumbing.PlumbingStreams;
import org.apache.edgent.topology.tester.Condition;
import org.junit.Test;
public class FileStreamsTextFileWriterTest extends DirectTopologyTestBase {
String str = "123456789";
String[] stdLines = new String[] {
"1-"+str,
"2-"+str,
"3-"+str,
"4-"+str
};
private int TMO_SEC = 2;
public String getStr() {
return str;
}
public String[] getLines() {
return stdLines;
}
@Test
public void testFlushConfig() throws Exception {
FileWriterFlushConfig<String> cfg;
String trueTuple = "true";
String falseTuple = "false";
Predicate<String> p = tuple -> tuple.equals("true");
cfg = FileWriterFlushConfig.newImplicitConfig();
checkFileWriterConfig(cfg, 0, 0, null, trueTuple, falseTuple);
cfg = FileWriterFlushConfig.newCountBasedConfig(3);
checkFileWriterConfig(cfg, 3, 0, null, trueTuple, falseTuple);
expectIAE(() -> FileWriterFlushConfig.newCountBasedConfig(0));
cfg = FileWriterFlushConfig.newTimeBasedConfig(10);
checkFileWriterConfig(cfg, 0, 10, null, trueTuple, falseTuple);
expectIAE(() -> FileWriterFlushConfig.newTimeBasedConfig(0));
cfg = FileWriterFlushConfig.newPredicateBasedConfig(p);
checkFileWriterConfig(cfg, 0, 0, p, trueTuple, falseTuple);
expectIAE(() -> FileWriterFlushConfig.newPredicateBasedConfig(null));
cfg = FileWriterFlushConfig.newConfig(1, 2, p);
checkFileWriterConfig(cfg, 1, 2, p, trueTuple, falseTuple);
cfg = FileWriterFlushConfig.newConfig(0, 0, null);
checkFileWriterConfig(cfg, 0, 0, null, trueTuple, falseTuple);
expectIAE(() -> FileWriterFlushConfig.newConfig(-1, 0, null));
expectIAE(() -> FileWriterFlushConfig.newConfig(0, -1, null));
}
private static <T> void checkFileWriterConfig(FileWriterFlushConfig<T> cfg,
int cntTuples, long periodMsec, Predicate<T> tuplePredicate,
T trueTuple, T falseTuple) {
assertEquals(cntTuples, cfg.getCntTuples());
assertEquals(periodMsec, cfg.getPeriodMsec());
assertEquals(tuplePredicate, cfg.getTuplePredicate());
cfg.toString();
int falseNTuples = cntTuples==1 ? 0 : cntTuples+1;
int trueNTuples = 3*cntTuples;
assertFalse("cntTuples:"+cntTuples+" pred:"+tuplePredicate,
cfg.evaluate(falseNTuples, falseTuple));
if (cntTuples!=0)
assertTrue("cntTuples:"+cntTuples+" pred:"+tuplePredicate,
cfg.evaluate(trueNTuples, falseTuple));
if (tuplePredicate!=null)
assertTrue("cntTuples:"+cntTuples+" pred:"+tuplePredicate,
cfg.evaluate(falseNTuples, trueTuple));
}
@Test
public void testCycleConfig() throws Exception {
FileWriterCycleConfig<String> cfg;
String trueTuple = "true";
String falseTuple = "false";
Predicate<String> p = tuple -> tuple.equals("true");
cfg = FileWriterCycleConfig.newFileSizeBasedConfig(2);
checkFileWriterConfig(cfg, 2, 0, 0, null, trueTuple, falseTuple);
expectIAE(() -> FileWriterCycleConfig.newFileSizeBasedConfig(0));
cfg = FileWriterCycleConfig.newCountBasedConfig(3);
checkFileWriterConfig(cfg, 0, 3, 0, null, trueTuple, falseTuple);
expectIAE(() -> FileWriterCycleConfig.newCountBasedConfig(0));
cfg = FileWriterCycleConfig.newTimeBasedConfig(10);
checkFileWriterConfig(cfg, 0, 0, 10, null, trueTuple, falseTuple);
expectIAE(() -> FileWriterCycleConfig.newTimeBasedConfig(0));
cfg = FileWriterCycleConfig.newPredicateBasedConfig(p);
checkFileWriterConfig(cfg, 0, 0, 0, p, trueTuple, falseTuple);
expectIAE(() -> FileWriterCycleConfig.newPredicateBasedConfig(null));
cfg = FileWriterCycleConfig.newConfig(1, 2, 3, p);
checkFileWriterConfig(cfg, 1, 2, 3, p, trueTuple, falseTuple);
expectIAE(() -> FileWriterCycleConfig.newConfig(0, 0, 0, null));
expectIAE(() -> FileWriterCycleConfig.newConfig(-1, 0, 0, null));
expectIAE(() -> FileWriterCycleConfig.newConfig(0, -1, 0, null));
expectIAE(() -> FileWriterCycleConfig.newConfig(0, 0, -1, null));
}
private static <T> void checkFileWriterConfig(FileWriterCycleConfig<T> cfg,
long fileSize, int cntTuples, long periodMsec, Predicate<T> tuplePredicate,
T trueTuple, T falseTuple) {
assertEquals(fileSize, cfg.getFileSize());
assertEquals(cntTuples, cfg.getCntTuples());
assertEquals(periodMsec, cfg.getPeriodMsec());
assertEquals(tuplePredicate, cfg.getTuplePredicate());
cfg.toString();
long falseFileSize = fileSize-1;
long trueFileSize = fileSize+1;
int falseNTuples = cntTuples==1 ? 0 : cntTuples+1;
int trueNTuples = 3*cntTuples;
assertFalse("fileSize:"+fileSize+" cntTuples:"+cntTuples+" pred:"+tuplePredicate,
cfg.evaluate(falseFileSize, falseNTuples, falseTuple));
if (fileSize!=0)
assertTrue("fileSize:"+fileSize+" cntTuples:"+cntTuples+" pred:"+tuplePredicate,
cfg.evaluate(trueFileSize, trueNTuples, falseTuple));
if (cntTuples!=0)
assertTrue("fileSize:"+fileSize+" cntTuples:"+cntTuples+" pred:"+tuplePredicate,
cfg.evaluate(falseFileSize, trueNTuples, falseTuple));
if (tuplePredicate!=null)
assertTrue("fileSize:"+fileSize+" cntTuples:"+cntTuples+" pred:"+tuplePredicate,
cfg.evaluate(falseFileSize, falseNTuples, trueTuple));
}
@Test
public void testRetentionConfig() throws Exception {
FileWriterRetentionConfig cfg;
cfg = FileWriterRetentionConfig.newFileCountBasedConfig(2);
checkFileWriterConfig(cfg, 2, 0, 0, 0);
expectIAE(() -> FileWriterRetentionConfig.newFileCountBasedConfig(0));
cfg = FileWriterRetentionConfig.newAggregateFileSizeBasedConfig(3);
checkFileWriterConfig(cfg, 0, 3, 0, 0);
expectIAE(() -> FileWriterRetentionConfig.newAggregateFileSizeBasedConfig(0));
cfg = FileWriterRetentionConfig.newAgeBasedConfig(10,11);
checkFileWriterConfig(cfg, 0, 0, 10, 11);
expectIAE(() -> FileWriterRetentionConfig.newAgeBasedConfig(0,1));
expectIAE(() -> FileWriterRetentionConfig.newAgeBasedConfig(1,0));
cfg = FileWriterRetentionConfig.newConfig(1, 2, 3, 0);
checkFileWriterConfig(cfg, 1, 2, 3, 0);
expectIAE(() -> FileWriterRetentionConfig.newConfig(0, 0, 0, 0));
expectIAE(() -> FileWriterRetentionConfig.newConfig(0, 0, 1, 0));
expectIAE(() -> FileWriterRetentionConfig.newConfig(0, 0, 0, 1));
expectIAE(() -> FileWriterRetentionConfig.newConfig(-1, 0, 0, 0));
expectIAE(() -> FileWriterRetentionConfig.newConfig(0, -1, 0, 0));
expectIAE(() -> FileWriterRetentionConfig.newConfig(0, 0, -1, 0));
expectIAE(() -> FileWriterRetentionConfig.newConfig(0, 0, 0, -1));
}
private void expectIAE(Runnable fn) {
try {
fn.run();
fail("expected IAE");
} catch (IllegalArgumentException e) { /* expected */ }
}
private static <T> void checkFileWriterConfig(FileWriterRetentionConfig cfg,
int fileCnt, long aggSize, long ageSec, long periodMsec) {
assertEquals(fileCnt, cfg.getFileCount());
assertEquals(aggSize, cfg.getAggregateFileSize());
assertEquals(ageSec, cfg.getAgeSec());
assertEquals(periodMsec, cfg.getPeriodMsec());
cfg.toString();
int falseFileCnt = fileCnt-1;
int trueFileCnt = fileCnt+1;
long falseAggSize = aggSize-1;
long trueAggSize = aggSize+1;
assertFalse("fileCnt:"+fileCnt+" aggSize:"+aggSize,
cfg.evaluate(falseFileCnt, falseAggSize));
if (fileCnt!=0)
assertTrue("fileCnt:"+fileCnt+" aggSize:"+aggSize,
cfg.evaluate(trueFileCnt, falseAggSize));
if (aggSize!=0)
assertTrue("fileCnt:"+fileCnt+" aggSize:"+aggSize,
cfg.evaluate(falseFileCnt, trueAggSize));
}
@Test
public void testDefaultConfig() throws Exception {
FileWriterPolicy<String> policy = new FileWriterPolicy<>();
checkFileWriterConfig(policy.getFlushConfig(), 0, TimeUnit.SECONDS.toMillis(10), null, null, null);
checkFileWriterConfig(policy.getCycleConfig(), 1*1024*1024, 0, 0, null, null, null);
checkFileWriterConfig(policy.getRetentionConfig(), 10, 0, 0, 0);
policy.toString();
policy.close();
}
@Test
public void testNoFilesCreated() throws Exception {
// complete before any files are generated
Topology t = newTopology("testNoFilesCreated");
// establish a base path
Path basePath = createTempFile("test1", "txt", new String[0]);
// build expected results
List<List<String>> expResults = Collections.emptyList();
TStream<String> s = t.events(eventSetup -> { /* no tuples generated */ });
FileStreams.textFileWriter(s, () -> basePath.toString());
completeAndValidateWriter(t, TMO_SEC, basePath, expResults);
}
@Test
public void testOneFileCreated() throws Exception {
// all lines into a single (the first) file
Topology t = newTopology("testOneFileCreated");
// establish a base path
Path basePath = createTempFile("test1", "txt", new String[0]);
String[] lines = getLines();
// build expected results
// net all in one, the first, file
List<List<String>> expResults = buildExpResults(lines, tuple -> false);
assertEquals(1, expResults.size());
TStream<String> s = t.strings(lines);
// default writer policy
TSink<String> sink = FileStreams.textFileWriter(s, () -> basePath.toString());
completeAndValidateWriter(t, TMO_SEC, basePath, expResults);
assertNotNull(sink);
}
@Test
public void testManyFiles() throws Exception {
Topology t = newTopology("testManyFiles");
// establish a base path
Path basePath = createTempFile("test1", "txt", new String[0]);
String[] lines = getLines();
// build expected results
// net one tuples per file
List<List<String>> expResults = buildExpResults(lines, tuple -> true);
// in this config files are create very fast hence they end
// up exercising the _<n> suffix to basePath_YYYYMMDD_HHMMSS
TStream<String> s = t.strings(lines);
IFileWriterPolicy<String> policy = new FileWriterPolicy<String>(
FileWriterFlushConfig.newImplicitConfig(), // no extra flush
FileWriterCycleConfig.newCountBasedConfig(1), // yield one line per file
FileWriterRetentionConfig.newFileCountBasedConfig(10)
);
FileStreams.textFileWriter(s, () -> basePath.toString(), () -> policy);
completeAndValidateWriter(t, TMO_SEC, basePath, expResults);
}
@Test
public void testManyFilesSlow() throws Exception {
Topology t = newTopology("testManyFilesSlow");
// establish a base path
Path basePath = createTempFile("test1", "txt", new String[0]);
String[] lines = getLines();
// build expected results
// net one tuples per file
List<List<String>> expResults = buildExpResults(lines, tuple -> true);
// add delay so we get different files w/o a _<n> suffix
int throttleSec = 2;
TStream<String> s = PlumbingStreams.blockingThrottle(
t.strings(lines), throttleSec, TimeUnit.SECONDS);
IFileWriterPolicy<String> policy = new FileWriterPolicy<String>(
FileWriterFlushConfig.newImplicitConfig(), // no extra flush
FileWriterCycleConfig.newCountBasedConfig(1), // yield one line per file
FileWriterRetentionConfig.newFileCountBasedConfig(10)
);
FileStreams.textFileWriter(s, () -> basePath.toString(), () -> policy);
completeAndValidateWriter(t, (lines.length*throttleSec)+TMO_SEC,
basePath, expResults);
}
@Test
public void testRetainCntBased() throws Exception {
// more lines than configured retained numFiles; only keep the last numFiles
Topology t = newTopology("testRetainCntBased");
// establish a base path
Path basePath = createTempFile("test1", "txt", new String[0]);
String[] lines = getLines();
// build expected results
// net one tuples per file
List<List<String>> expResults = buildExpResults(lines, tuple -> true);
int keepCnt = 2; // only keep the last n files
for (int i = 0; i < keepCnt; i++)
expResults.remove(0);
assertEquals(keepCnt, expResults.size());
TStream<String> s = t.strings(lines);
IFileWriterPolicy<String> policy = new FileWriterPolicy<String>(
FileWriterFlushConfig.newImplicitConfig(),
FileWriterCycleConfig.newCountBasedConfig(1),
FileWriterRetentionConfig.newFileCountBasedConfig(keepCnt)
);
FileStreams.textFileWriter(s, () -> basePath.toString(), () -> policy);
completeAndValidateWriter(t, TMO_SEC, basePath, expResults);
}
@Test
public void testRetainAggSizeBased() throws Exception {
// more aggsize than configured; only keep aggsize worth
Topology t = newTopology("testRetainAggSizeBased");
// establish a base path
Path basePath = createTempFile("test1", "txt", new String[0]);
String[] lines = getLines();
// build expected results
// net one tuple per file
List<List<String>> expResults = buildExpResults(lines, tuple -> true);
// agg size only enough for last two lines
long aggregateFileSize = 2 * (("1-"+getStr()).getBytes(StandardCharsets.UTF_8).length + 1/*eol*/);
expResults.remove(0);
expResults.remove(0);
assertEquals(2, expResults.size());
TStream<String> s = t.strings(lines);
IFileWriterPolicy<String> policy = new FileWriterPolicy<String>(
FileWriterFlushConfig.newImplicitConfig(),
FileWriterCycleConfig.newCountBasedConfig(1),
FileWriterRetentionConfig.newAggregateFileSizeBasedConfig(aggregateFileSize)
);
FileStreams.textFileWriter(s, () -> basePath.toString(), () -> policy);
completeAndValidateWriter(t, TMO_SEC, basePath, expResults);
}
@Test
public void testRetainAgeBased() throws Exception {
Topology t = newTopology("testRetainAgeBased");
// establish a base path
Path basePath = createTempFile("test1", "txt", new String[0]);
String[] lines = getLines();
// build expected results
int keepCnt = 2; // only keep the last n files with throttling, age,
// and TMO_SEC
int ageSec = 5;
long periodMsec = TimeUnit.SECONDS.toMillis(1);
// net one tuple per file
List<List<String>> expResults = buildExpResults(lines, tuple -> true);
for (int i = 0; i < keepCnt; i++)
expResults.remove(0);
assertEquals(keepCnt, expResults.size());
// add delay so we can age out things
//
// After several runs this test seems reliable but
// I suspect it may be fragile wrt timing hence the results.
//
// With 4 tuples, throttleDelay=2sec, and ageSec=5
// t0=add-f1, t1, t2=add-f2, t3, t4=add-f3, t5-rm-f1, t6=add-f4, t7=rm-f2, t8, t9=rm-f3, ...
//
// So we want to check somewhere around t8 (after t7 and definitely before t9)
// so all 4 files were created and the first 2 have been aged out.
// with complete delay = #files-1*throttle + TMO_SEC, should be 6+2 == t8.
int throttleSec = 2;
TStream<String> s = PlumbingStreams.blockingThrottle(
t.strings(lines), throttleSec, TimeUnit.SECONDS);
IFileWriterPolicy<String> policy = new FileWriterPolicy<String>(
FileWriterFlushConfig.newImplicitConfig(),
FileWriterCycleConfig.newCountBasedConfig(1),
FileWriterRetentionConfig.newAgeBasedConfig(ageSec, periodMsec)
);
FileStreams.textFileWriter(s, () -> basePath.toString(), () -> policy);
completeAndValidateWriter(t, ((lines.length-1)*throttleSec)+TMO_SEC,
basePath, expResults);
}
@Test
public void testFlushImplicit() throws Exception {
Topology t = newTopology("testFlushImplicit");
// establish a base path
Path basePath = createTempFile("test1", "txt", new String[0]);
String[] lines = getLines();
// build expected results
// net all in one, the first, file
List<List<String>> expResults = buildExpResults(lines, tuple -> false);
TStream<String> s = t.strings(lines);
IFileWriterPolicy<String> policy = new FileWriterPolicy<String>(
FileWriterFlushConfig.newImplicitConfig(),
FileWriterCycleConfig.newCountBasedConfig(1000),
FileWriterRetentionConfig.newFileCountBasedConfig(10)
);
FileStreams.textFileWriter(s, () -> basePath.toString(), () -> policy);
completeAndValidateWriter(t, TMO_SEC, basePath, expResults);
}
@Test
public void testFlushCntBased() throws Exception {
Topology t = newTopology("testFlushCntBased");
// establish a base path
Path basePath = createTempFile("test1", "txt", new String[0]);
String[] lines = getLines();
// build expected results
// net all in one, the first, file
List<List<String>> expResults = buildExpResults(lines, tuple -> false);
TStream<String> s = t.strings(lines);
IFileWriterPolicy<String> policy = new FileWriterPolicy<String>(
FileWriterFlushConfig.newCountBasedConfig(1), // every tuple
FileWriterCycleConfig.newCountBasedConfig(1000), // all in 1 file
FileWriterRetentionConfig.newFileCountBasedConfig(10)
);
FileStreams.textFileWriter(s, () -> basePath.toString(), () -> policy);
completeAndValidateWriter(t, TMO_SEC, basePath, expResults);
}
@Test
public void testFlushTimeBased() throws Exception {
Topology t = newTopology("testFlushTimeBased");
// establish a base path
Path basePath = createTempFile("test1", "txt", new String[0]);
String[] lines = getLines();
// build expected results
// net all in one, the first, file
List<List<String>> expResults = buildExpResults(lines, tuple -> false);
// add delay so time flush happens
int throttleSec = 1;
TStream<String> s = PlumbingStreams.blockingThrottle(
t.strings(lines), throttleSec, TimeUnit.SECONDS);
IFileWriterPolicy<String> policy = new FileWriterPolicy<String>(
FileWriterFlushConfig.newTimeBasedConfig(TimeUnit.MILLISECONDS.toMillis(250)),
FileWriterCycleConfig.newCountBasedConfig(1000), // all in 1 file
FileWriterRetentionConfig.newFileCountBasedConfig(10)
);
FileStreams.textFileWriter(s, () -> basePath.toString(), () -> policy);
completeAndValidateWriter(t, (lines.length*throttleSec)+TMO_SEC,
basePath, expResults);
}
@Test
public void testFlushTupleBased() throws Exception {
Topology t = newTopology("testFlushTupleBased");
// establish a base path
Path basePath = createTempFile("test1", "txt", new String[0]);
String[] lines = getLines();
// build expected results
// net all in one, the first, file
List<List<String>> expResults = buildExpResults(lines, tuple -> false);
TStream<String> s = t.strings(lines);
IFileWriterPolicy<String> policy = new FileWriterPolicy<String>(
FileWriterFlushConfig.newPredicateBasedConfig(
tuple -> tuple.startsWith("1-") || tuple.startsWith("3-")),
FileWriterCycleConfig.newCountBasedConfig(1000), // all in 1 file
FileWriterRetentionConfig.newFileCountBasedConfig(10)
);
FileStreams.textFileWriter(s, () -> basePath.toString(), () -> policy);
completeAndValidateWriter(t, TMO_SEC, basePath, expResults);
}
@Test
public void testCycleCntBased() throws Exception {
Topology t = newTopology("testCycleCntBased");
// establish a base path
Path basePath = createTempFile("test1", "txt", new String[0]);
String[] lines = getLines();
// build expected results
// net two tuples per file
int cntTuples = 2;
AtomicInteger cnt = new AtomicInteger();
Predicate<String> cycleIt = tuple -> cnt.incrementAndGet() % cntTuples == 0;
List<List<String>> expResults = buildExpResults(lines, cycleIt);
assertEquals(lines.length / cntTuples, expResults.size());
TStream<String> s = t.strings(lines);
IFileWriterPolicy<String> policy = new FileWriterPolicy<String>(
FileWriterFlushConfig.newImplicitConfig(),
FileWriterCycleConfig.newCountBasedConfig(cntTuples),
FileWriterRetentionConfig.newFileCountBasedConfig(10)
);
FileStreams.textFileWriter(s, () -> basePath.toString(), () -> policy);
completeAndValidateWriter(t, TMO_SEC, basePath, expResults);
}
@Test
public void testCycleSizeBased() throws Exception {
Topology t = newTopology("testCycleSizeBased");
// establish a base path
Path basePath = createTempFile("test1", "txt", new String[0]);
String[] lines = getLines();
// build expected results
// net one tuple per file
List<List<String>> expResults = buildExpResults(lines, tuple -> true);
int fileSize = 2;
TStream<String> s = t.strings(lines);
IFileWriterPolicy<String> policy = new FileWriterPolicy<String>(
FileWriterFlushConfig.newImplicitConfig(),
FileWriterCycleConfig.newFileSizeBasedConfig(fileSize),
FileWriterRetentionConfig.newFileCountBasedConfig(10)
);
FileStreams.textFileWriter(s, () -> basePath.toString(), () -> policy);
completeAndValidateWriter(t, TMO_SEC, basePath, expResults);
}
@Test
public void testCycleTimeBased() throws Exception {
Topology t = newTopology("testCycleTimeBased");
// establish a base path
Path basePath = createTempFile("test1", "txt", new String[0]);
String[] lines = getLines();
// build expected results
// net one tuple per file with 250msec cycle config and 1 throttle
List<List<String>> expResults = buildExpResults(lines, tuple -> true);
// add delay so time cycle happens
// also verifies only cycle if there's something to cycle
// (i.e., these cycles happen faster than tuples are written)
int throttleSec = 1;
TStream<String> s = PlumbingStreams.blockingThrottle(
t.strings(lines), throttleSec, TimeUnit.SECONDS);
IFileWriterPolicy<String> policy = new FileWriterPolicy<String>(
FileWriterFlushConfig.newImplicitConfig(),
FileWriterCycleConfig.newTimeBasedConfig(TimeUnit.MILLISECONDS.toMillis(250)),
FileWriterRetentionConfig.newFileCountBasedConfig(10)
);
FileStreams.textFileWriter(s, () -> basePath.toString(), () -> policy);
completeAndValidateWriter(t, (lines.length*throttleSec)+TMO_SEC,
basePath, expResults);
}
@Test
public void testCycleTupleBased() throws Exception {
Topology t = newTopology("testCycleTupleBased");
// establish a base path
Path basePath = createTempFile("test1", "txt", new String[0]);
String[] lines = getLines();
// build expected results
// a tuple based config / predicate. in this case should end up with 3 files.
Predicate<String> cycleIt = tuple -> tuple.startsWith("1-") || tuple.startsWith("3-");
List<List<String>> expResults = buildExpResults(lines, cycleIt);
assertEquals(3, expResults.size());
TStream<String> s = t.strings(lines);
IFileWriterPolicy<String> policy = new FileWriterPolicy<String>(
FileWriterFlushConfig.newImplicitConfig(),
FileWriterCycleConfig.newPredicateBasedConfig(cycleIt),
FileWriterRetentionConfig.newFileCountBasedConfig(10)
);
FileStreams.textFileWriter(s, () -> basePath.toString(), () -> policy);
completeAndValidateWriter(t, TMO_SEC, basePath, expResults);
}
@Test
public void testAllTimeBased() throws Exception {
// exercise case with multiple timer based policies
Topology t = newTopology("testAllTimeBased");
// establish a base path
Path basePath = createTempFile("test1", "txt", new String[0]);
String[] lines = getLines();
// build expected results
// keep all given age and TMO_SEC
int ageSec = 10;
long periodMsec = TimeUnit.SECONDS.toMillis(1);
// net one tuple per file
List<List<String>> expResults = buildExpResults(lines, tuple -> true);
TStream<String> s = t.strings(lines);
IFileWriterPolicy<String> policy = new FileWriterPolicy<String>(
FileWriterFlushConfig.newTimeBasedConfig(TimeUnit.MILLISECONDS.toMillis(250)),
FileWriterCycleConfig.newConfig(1, 2000, TimeUnit.SECONDS.toMillis(10), null),
FileWriterRetentionConfig.newAgeBasedConfig(ageSec, periodMsec)
);
FileStreams.textFileWriter(s, () -> basePath.toString(), () -> policy);
completeAndValidateWriter(t, TMO_SEC, basePath, expResults);
}
@Test
public void testWriterWatcherReader() throws Exception {
// verify all the pieces work together
Topology t = newTopology("testWriterWatcherReader");
String testDirPrefix = "testWriterWatcherReader";
Path dir = Files.createTempDirectory(testDirPrefix);
Path basePath = dir.resolve("writerCreated");
String[] lines = getLines();
System.out.println("########## "+t.getName());
// Write the files
// add delay so watcher starts first and gets to "see" file additions
int throttleSec = 2;
TStream<String> contents = PlumbingStreams.blockingOneShotDelay(
t.strings(lines), 2, TimeUnit.SECONDS);
contents = PlumbingStreams.blockingThrottle(
contents, throttleSec, TimeUnit.SECONDS);
IFileWriterPolicy<String> policy = new FileWriterPolicy<String>(
FileWriterFlushConfig.newImplicitConfig(),
FileWriterCycleConfig.newCountBasedConfig(1), // one per file
FileWriterRetentionConfig.newFileCountBasedConfig(10)
);
FileStreams.textFileWriter(contents, () -> basePath.toString(), () -> policy);
// Watch and read contents
TStream<String> pathnames = FileStreams.directoryWatcher(t,
() -> dir.toAbsolutePath().toString())
.peek(tuple -> System.out.println(new Date() + " watcher added "+tuple))
.peek(tuple -> { if (new File(tuple).getName().startsWith("."))
throw new RuntimeException("Not filtering active/hidden files "+tuple); });
TStream<String> readContents = FileStreams.textFileReader(pathnames);
boolean dump = true;
try {
completeAndValidate("", t, readContents,
(lines.length*throttleSec)+TMO_SEC+3/*on-the-edge*/, lines);
dump = false;
}
finally {
deleteDirAndFiles(dir, testDirPrefix, dump);
}
}
@Test
public void testCompressedFileWriterPolicy() throws Exception {
Topology t = newTopology("testCompressedFileWriterPolicy");
// establish a base path
Path basePath = createTempFile("test1", "txt", new String[0]);
String[] lines = getLines();
// build expected results
// net 2 tuples per file
int cntTuples = 2;
AtomicInteger cnt = new AtomicInteger();
Predicate<String> cycleIt = tuple -> cnt.incrementAndGet() % cntTuples == 0;
List<List<String>> expResults = buildExpResults(lines, cycleIt);
assertEquals(lines.length / cntTuples, expResults.size());
TStream<String> s = t.strings(lines);
IFileWriterPolicy<String> policy = new CompressedFileWriterPolicy<String>(
FileWriterFlushConfig.newImplicitConfig(),
FileWriterCycleConfig.newCountBasedConfig(cntTuples),
FileWriterRetentionConfig.newFileCountBasedConfig(10)
);
FileStreams.textFileWriter(s, () -> basePath.toString(), () -> policy);
completeAndValidateWriter(t, TMO_SEC, basePath, expResults);
}
private void deleteDirAndFiles(Path dir, String dirPrefix, boolean dump) {
// exercise caution before removing all files in dir
if (!dirPrefix.startsWith("test"))
throw new IllegalStateException("Yikes. dir:"+dir+" dirPrefix:"+dirPrefix);
String leaf = dir.getFileName().toString();
if (!leaf.startsWith(dirPrefix))
throw new IllegalStateException("Yikes. dir:"+dir+" dirPrefix:"+dirPrefix);
// Ok, delete all the files in the dir and then the dir
for (File file : dir.toFile().listFiles()) {
if (dump)
dumpFile(file);
file.delete();
}
dir.toFile().delete();
}
private void dumpFile(File f) {
System.out.println("<<<<< Dumping "+f);
try {
Path path = f.toPath();
try (BufferedReader br = Files.newBufferedReader(path)) {
br.lines().forEach(line -> System.out.println(line));
}
}
catch (Exception e) {
System.out.println("##### exception: " + e.getLocalizedMessage());
}
System.out.println(">>>>> DONE "+f);
}
private <T> List<List<T>> buildExpResults(T[] tuples, Predicate<T> cycleIt) {
List<List<T>> expResults = new ArrayList<>();
List<T> oneFile = null;
for (T tuple : tuples) {
if (oneFile==null) {
oneFile = new ArrayList<>();
expResults.add(oneFile);
}
oneFile.add(tuple);
if (cycleIt.test(tuple))
oneFile = null;
}
return expResults;
}
private <T> void completeAndValidateWriter(Topology t, int tmoSec,
Path basePath, List<List<T>> expResults) throws Exception {
try {
// just let it run to the tmo before we check the file contents
Condition<Object> tc = new Condition<Object>() {
public boolean valid() { return false; }
public Object getResult() { return null; }
};
complete(t, tc, tmoSec, TimeUnit.SECONDS);
System.out.println("########## "+t.getName());
// right number of files?
List<Path> actFiles = getActFiles(basePath);
System.out.println("actFiles: "+actFiles);
assertEquals(actFiles.toString(), expResults.size(), actFiles.size());
// do the file(s) have the right contents?
System.out.println("expResults: "+expResults);
int i = 0;
for (List<T> expFile : expResults) {
Path path = actFiles.get(i++);
checkContents(path, expFile.toArray(new String[0]));
}
}
finally {
deleteAll(basePath);
}
}
private void deleteAll(Path basePath) {
Path parent = basePath.getParent();
String baseLeaf = basePath.getFileName().toString();
String[] actLeafs = parent.toFile().list(
(dir,leaf) -> leaf.startsWith(baseLeaf));
for (String leaf : actLeafs) {
parent.resolve(leaf).toFile().delete();
}
}
private List<Path> getActFiles(Path basePath) {
List<Path> paths = new ArrayList<>();
Path parent = basePath.getParent();
String baseLeaf = basePath.getFileName().toString();
String[] actLeafs = parent.toFile().list(
(dir,leaf) -> leaf.startsWith(baseLeaf+"_"));
Arrays.sort(actLeafs, (o1,o2) -> o1.compareTo(o2));
for (String leaf : actLeafs) {
paths.add(parent.resolve(leaf));
}
return paths;
}
private void checkContents(Path path, String[] lines) {
if (path.getFileName().toString().endsWith(".zip")) {
checkZipContents(path, lines);
return;
}
System.out.println("checking file "+path);
int lineCnt = 0;
try (BufferedReader br = Files.newBufferedReader(path)) {
for (String line : lines) {
++lineCnt;
String actLine = br.readLine();
assertEquals("path:"+path+" line "+lineCnt, line, actLine);
}
assertNull("path:"+path+" line "+lineCnt+" expected EOF", br.readLine());
}
catch (IOException e) {
assertNull("path:"+path+" line "+lineCnt+" unexpected IOException "+e, e);
}
}
private void checkZipContents(Path path, String[] lines) {
System.out.println("checking file "+path);
String fileName = path.getFileName().toString();
String entryName = fileName.substring(0, fileName.length() - ".zip".length());
int lineCnt = 0;
try (
FileInputStream fis = new FileInputStream(path.toFile());
ZipInputStream zin = new ZipInputStream((new BufferedInputStream(fis)));
)
{
ZipEntry entry = zin.getNextEntry();
assertEquals(entryName, entry.getName());
BufferedReader br = new BufferedReader(new InputStreamReader(zin, StandardCharsets.UTF_8));
for (String line : lines) {
++lineCnt;
String actLine = br.readLine();
assertEquals("path:"+path+" line "+lineCnt, line, actLine);
}
assertNull("path:"+path+" line "+lineCnt+" expected EOF", br.readLine());
}
catch (IOException e) {
assertNull("path:"+path+" line "+lineCnt+" unexpected IOException "+e, e);
}
}
}