| /* |
| Licensed to the Apache Software Foundation (ASF) under one |
| or more contributor license agreements. See the NOTICE file |
| distributed with this work for additional information |
| regarding copyright ownership. The ASF licenses this file |
| to you under the Apache License, Version 2.0 (the |
| "License"); you may not use this file except in compliance |
| with the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, |
| software distributed under the License is distributed on an |
| "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| KIND, either express or implied. See the License for the |
| specific language governing permissions and limitations |
| under the License. |
| */ |
| package org.apache.edgent.test.connectors.file; |
| |
| import static org.apache.edgent.test.connectors.common.FileUtil.createTempFile; |
| |
| //import static org.junit.Assume.assumeFalse; |
| |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertNotNull; |
| import static org.junit.Assert.assertNull; |
| import static org.junit.Assert.assertTrue; |
| import static org.junit.Assert.fail; |
| |
| import java.io.BufferedInputStream; |
| import java.io.BufferedReader; |
| import java.io.File; |
| import java.io.FileInputStream; |
| import java.io.IOException; |
| import java.io.InputStreamReader; |
| import java.nio.charset.StandardCharsets; |
| import java.nio.file.Files; |
| import java.nio.file.Path; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.Date; |
| import java.util.List; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.zip.ZipEntry; |
| import java.util.zip.ZipInputStream; |
| |
| import org.apache.edgent.connectors.file.CompressedFileWriterPolicy; |
| import org.apache.edgent.connectors.file.FileStreams; |
| import org.apache.edgent.connectors.file.FileWriterCycleConfig; |
| import org.apache.edgent.connectors.file.FileWriterFlushConfig; |
| import org.apache.edgent.connectors.file.FileWriterPolicy; |
| import org.apache.edgent.connectors.file.FileWriterRetentionConfig; |
| import org.apache.edgent.connectors.file.runtime.IFileWriterPolicy; |
| import org.apache.edgent.function.Predicate; |
| import org.apache.edgent.test.providers.direct.DirectTopologyTestBase; |
| import org.apache.edgent.topology.TSink; |
| import org.apache.edgent.topology.TStream; |
| import org.apache.edgent.topology.Topology; |
| import org.apache.edgent.topology.plumbing.PlumbingStreams; |
| import org.apache.edgent.topology.tester.Condition; |
| import org.junit.Test; |
| |
| public class FileStreamsTextFileWriterTest extends DirectTopologyTestBase { |
| |
| String str = "123456789"; |
| String[] stdLines = new String[] { |
| "1-"+str, |
| "2-"+str, |
| "3-"+str, |
| "4-"+str |
| }; |
| |
| private int TMO_SEC = 2; |
| |
| public String getStr() { |
| return str; |
| } |
| |
| public String[] getLines() { |
| return stdLines; |
| } |
| |
| @Test |
| public void testFlushConfig() throws Exception { |
| FileWriterFlushConfig<String> cfg; |
| |
| String trueTuple = "true"; |
| String falseTuple = "false"; |
| Predicate<String> p = tuple -> tuple.equals("true"); |
| |
| cfg = FileWriterFlushConfig.newImplicitConfig(); |
| checkFileWriterConfig(cfg, 0, 0, null, trueTuple, falseTuple); |
| |
| cfg = FileWriterFlushConfig.newCountBasedConfig(3); |
| checkFileWriterConfig(cfg, 3, 0, null, trueTuple, falseTuple); |
| expectIAE(() -> FileWriterFlushConfig.newCountBasedConfig(0)); |
| |
| cfg = FileWriterFlushConfig.newTimeBasedConfig(10); |
| checkFileWriterConfig(cfg, 0, 10, null, trueTuple, falseTuple); |
| expectIAE(() -> FileWriterFlushConfig.newTimeBasedConfig(0)); |
| |
| cfg = FileWriterFlushConfig.newPredicateBasedConfig(p); |
| checkFileWriterConfig(cfg, 0, 0, p, trueTuple, falseTuple); |
| expectIAE(() -> FileWriterFlushConfig.newPredicateBasedConfig(null)); |
| |
| cfg = FileWriterFlushConfig.newConfig(1, 2, p); |
| checkFileWriterConfig(cfg, 1, 2, p, trueTuple, falseTuple); |
| cfg = FileWriterFlushConfig.newConfig(0, 0, null); |
| checkFileWriterConfig(cfg, 0, 0, null, trueTuple, falseTuple); |
| expectIAE(() -> FileWriterFlushConfig.newConfig(-1, 0, null)); |
| expectIAE(() -> FileWriterFlushConfig.newConfig(0, -1, null)); |
| } |
| |
| private static <T> void checkFileWriterConfig(FileWriterFlushConfig<T> cfg, |
| int cntTuples, long periodMsec, Predicate<T> tuplePredicate, |
| T trueTuple, T falseTuple) { |
| assertEquals(cntTuples, cfg.getCntTuples()); |
| assertEquals(periodMsec, cfg.getPeriodMsec()); |
| assertEquals(tuplePredicate, cfg.getTuplePredicate()); |
| cfg.toString(); |
| |
| int falseNTuples = cntTuples==1 ? 0 : cntTuples+1; |
| int trueNTuples = 3*cntTuples; |
| |
| assertFalse("cntTuples:"+cntTuples+" pred:"+tuplePredicate, |
| cfg.evaluate(falseNTuples, falseTuple)); |
| if (cntTuples!=0) |
| assertTrue("cntTuples:"+cntTuples+" pred:"+tuplePredicate, |
| cfg.evaluate(trueNTuples, falseTuple)); |
| if (tuplePredicate!=null) |
| assertTrue("cntTuples:"+cntTuples+" pred:"+tuplePredicate, |
| cfg.evaluate(falseNTuples, trueTuple)); |
| } |
| |
| @Test |
| public void testCycleConfig() throws Exception { |
| FileWriterCycleConfig<String> cfg; |
| |
| String trueTuple = "true"; |
| String falseTuple = "false"; |
| Predicate<String> p = tuple -> tuple.equals("true"); |
| |
| cfg = FileWriterCycleConfig.newFileSizeBasedConfig(2); |
| checkFileWriterConfig(cfg, 2, 0, 0, null, trueTuple, falseTuple); |
| expectIAE(() -> FileWriterCycleConfig.newFileSizeBasedConfig(0)); |
| |
| cfg = FileWriterCycleConfig.newCountBasedConfig(3); |
| checkFileWriterConfig(cfg, 0, 3, 0, null, trueTuple, falseTuple); |
| expectIAE(() -> FileWriterCycleConfig.newCountBasedConfig(0)); |
| |
| cfg = FileWriterCycleConfig.newTimeBasedConfig(10); |
| checkFileWriterConfig(cfg, 0, 0, 10, null, trueTuple, falseTuple); |
| expectIAE(() -> FileWriterCycleConfig.newTimeBasedConfig(0)); |
| |
| cfg = FileWriterCycleConfig.newPredicateBasedConfig(p); |
| checkFileWriterConfig(cfg, 0, 0, 0, p, trueTuple, falseTuple); |
| expectIAE(() -> FileWriterCycleConfig.newPredicateBasedConfig(null)); |
| |
| cfg = FileWriterCycleConfig.newConfig(1, 2, 3, p); |
| checkFileWriterConfig(cfg, 1, 2, 3, p, trueTuple, falseTuple); |
| expectIAE(() -> FileWriterCycleConfig.newConfig(0, 0, 0, null)); |
| expectIAE(() -> FileWriterCycleConfig.newConfig(-1, 0, 0, null)); |
| expectIAE(() -> FileWriterCycleConfig.newConfig(0, -1, 0, null)); |
| expectIAE(() -> FileWriterCycleConfig.newConfig(0, 0, -1, null)); |
| } |
| |
| private static <T> void checkFileWriterConfig(FileWriterCycleConfig<T> cfg, |
| long fileSize, int cntTuples, long periodMsec, Predicate<T> tuplePredicate, |
| T trueTuple, T falseTuple) { |
| assertEquals(fileSize, cfg.getFileSize()); |
| assertEquals(cntTuples, cfg.getCntTuples()); |
| assertEquals(periodMsec, cfg.getPeriodMsec()); |
| assertEquals(tuplePredicate, cfg.getTuplePredicate()); |
| cfg.toString(); |
| |
| long falseFileSize = fileSize-1; |
| long trueFileSize = fileSize+1; |
| int falseNTuples = cntTuples==1 ? 0 : cntTuples+1; |
| int trueNTuples = 3*cntTuples; |
| |
| assertFalse("fileSize:"+fileSize+" cntTuples:"+cntTuples+" pred:"+tuplePredicate, |
| cfg.evaluate(falseFileSize, falseNTuples, falseTuple)); |
| if (fileSize!=0) |
| assertTrue("fileSize:"+fileSize+" cntTuples:"+cntTuples+" pred:"+tuplePredicate, |
| cfg.evaluate(trueFileSize, trueNTuples, falseTuple)); |
| if (cntTuples!=0) |
| assertTrue("fileSize:"+fileSize+" cntTuples:"+cntTuples+" pred:"+tuplePredicate, |
| cfg.evaluate(falseFileSize, trueNTuples, falseTuple)); |
| if (tuplePredicate!=null) |
| assertTrue("fileSize:"+fileSize+" cntTuples:"+cntTuples+" pred:"+tuplePredicate, |
| cfg.evaluate(falseFileSize, falseNTuples, trueTuple)); |
| } |
| |
| @Test |
| public void testRetentionConfig() throws Exception { |
| FileWriterRetentionConfig cfg; |
| |
| cfg = FileWriterRetentionConfig.newFileCountBasedConfig(2); |
| checkFileWriterConfig(cfg, 2, 0, 0, 0); |
| expectIAE(() -> FileWriterRetentionConfig.newFileCountBasedConfig(0)); |
| |
| cfg = FileWriterRetentionConfig.newAggregateFileSizeBasedConfig(3); |
| checkFileWriterConfig(cfg, 0, 3, 0, 0); |
| expectIAE(() -> FileWriterRetentionConfig.newAggregateFileSizeBasedConfig(0)); |
| |
| cfg = FileWriterRetentionConfig.newAgeBasedConfig(10,11); |
| checkFileWriterConfig(cfg, 0, 0, 10, 11); |
| expectIAE(() -> FileWriterRetentionConfig.newAgeBasedConfig(0,1)); |
| expectIAE(() -> FileWriterRetentionConfig.newAgeBasedConfig(1,0)); |
| |
| cfg = FileWriterRetentionConfig.newConfig(1, 2, 3, 0); |
| checkFileWriterConfig(cfg, 1, 2, 3, 0); |
| expectIAE(() -> FileWriterRetentionConfig.newConfig(0, 0, 0, 0)); |
| expectIAE(() -> FileWriterRetentionConfig.newConfig(0, 0, 1, 0)); |
| expectIAE(() -> FileWriterRetentionConfig.newConfig(0, 0, 0, 1)); |
| expectIAE(() -> FileWriterRetentionConfig.newConfig(-1, 0, 0, 0)); |
| expectIAE(() -> FileWriterRetentionConfig.newConfig(0, -1, 0, 0)); |
| expectIAE(() -> FileWriterRetentionConfig.newConfig(0, 0, -1, 0)); |
| expectIAE(() -> FileWriterRetentionConfig.newConfig(0, 0, 0, -1)); |
| } |
| |
| private void expectIAE(Runnable fn) { |
| try { |
| fn.run(); |
| fail("expected IAE"); |
| } catch (IllegalArgumentException e) { /* expected */ } |
| } |
| |
| private static <T> void checkFileWriterConfig(FileWriterRetentionConfig cfg, |
| int fileCnt, long aggSize, long ageSec, long periodMsec) { |
| assertEquals(fileCnt, cfg.getFileCount()); |
| assertEquals(aggSize, cfg.getAggregateFileSize()); |
| assertEquals(ageSec, cfg.getAgeSec()); |
| assertEquals(periodMsec, cfg.getPeriodMsec()); |
| cfg.toString(); |
| |
| int falseFileCnt = fileCnt-1; |
| int trueFileCnt = fileCnt+1; |
| long falseAggSize = aggSize-1; |
| long trueAggSize = aggSize+1; |
| |
| assertFalse("fileCnt:"+fileCnt+" aggSize:"+aggSize, |
| cfg.evaluate(falseFileCnt, falseAggSize)); |
| if (fileCnt!=0) |
| assertTrue("fileCnt:"+fileCnt+" aggSize:"+aggSize, |
| cfg.evaluate(trueFileCnt, falseAggSize)); |
| if (aggSize!=0) |
| assertTrue("fileCnt:"+fileCnt+" aggSize:"+aggSize, |
| cfg.evaluate(falseFileCnt, trueAggSize)); |
| } |
| |
| @Test |
| public void testDefaultConfig() throws Exception { |
| FileWriterPolicy<String> policy = new FileWriterPolicy<>(); |
| checkFileWriterConfig(policy.getFlushConfig(), 0, TimeUnit.SECONDS.toMillis(10), null, null, null); |
| checkFileWriterConfig(policy.getCycleConfig(), 1*1024*1024, 0, 0, null, null, null); |
| checkFileWriterConfig(policy.getRetentionConfig(), 10, 0, 0, 0); |
| policy.toString(); |
| policy.close(); |
| } |
| |
| @Test |
| public void testNoFilesCreated() throws Exception { |
| // complete before any files are generated |
| Topology t = newTopology("testNoFilesCreated"); |
| |
| // establish a base path |
| Path basePath = createTempFile("test1", "txt", new String[0]); |
| |
| // build expected results |
| List<List<String>> expResults = Collections.emptyList(); |
| |
| TStream<String> s = t.events(eventSetup -> { /* no tuples generated */ }); |
| |
| FileStreams.textFileWriter(s, () -> basePath.toString()); |
| |
| completeAndValidateWriter(t, TMO_SEC, basePath, expResults); |
| } |
| |
| @Test |
| public void testOneFileCreated() throws Exception { |
| // all lines into a single (the first) file |
| Topology t = newTopology("testOneFileCreated"); |
| |
| // establish a base path |
| Path basePath = createTempFile("test1", "txt", new String[0]); |
| |
| String[] lines = getLines(); |
| |
| // build expected results |
| // net all in one, the first, file |
| List<List<String>> expResults = buildExpResults(lines, tuple -> false); |
| assertEquals(1, expResults.size()); |
| |
| TStream<String> s = t.strings(lines); |
| |
| // default writer policy |
| TSink<String> sink = FileStreams.textFileWriter(s, () -> basePath.toString()); |
| |
| completeAndValidateWriter(t, TMO_SEC, basePath, expResults); |
| |
| assertNotNull(sink); |
| } |
| |
| @Test |
| public void testManyFiles() throws Exception { |
| Topology t = newTopology("testManyFiles"); |
| |
| // establish a base path |
| Path basePath = createTempFile("test1", "txt", new String[0]); |
| |
| String[] lines = getLines(); |
| |
| // build expected results |
| // net one tuples per file |
| List<List<String>> expResults = buildExpResults(lines, tuple -> true); |
| |
| // in this config files are create very fast hence they end |
| // up exercising the _<n> suffix to basePath_YYYYMMDD_HHMMSS |
| |
| TStream<String> s = t.strings(lines); |
| |
| IFileWriterPolicy<String> policy = new FileWriterPolicy<String>( |
| FileWriterFlushConfig.newImplicitConfig(), // no extra flush |
| FileWriterCycleConfig.newCountBasedConfig(1), // yield one line per file |
| FileWriterRetentionConfig.newFileCountBasedConfig(10) |
| ); |
| FileStreams.textFileWriter(s, () -> basePath.toString(), () -> policy); |
| |
| completeAndValidateWriter(t, TMO_SEC, basePath, expResults); |
| } |
| |
| @Test |
| public void testManyFilesSlow() throws Exception { |
| Topology t = newTopology("testManyFilesSlow"); |
| |
| // establish a base path |
| Path basePath = createTempFile("test1", "txt", new String[0]); |
| |
| String[] lines = getLines(); |
| |
| // build expected results |
| // net one tuples per file |
| List<List<String>> expResults = buildExpResults(lines, tuple -> true); |
| |
| // add delay so we get different files w/o a _<n> suffix |
| |
| int throttleSec = 2; |
| TStream<String> s = PlumbingStreams.blockingThrottle( |
| t.strings(lines), throttleSec, TimeUnit.SECONDS); |
| |
| IFileWriterPolicy<String> policy = new FileWriterPolicy<String>( |
| FileWriterFlushConfig.newImplicitConfig(), // no extra flush |
| FileWriterCycleConfig.newCountBasedConfig(1), // yield one line per file |
| FileWriterRetentionConfig.newFileCountBasedConfig(10) |
| ); |
| FileStreams.textFileWriter(s, () -> basePath.toString(), () -> policy); |
| |
| completeAndValidateWriter(t, (lines.length*throttleSec)+TMO_SEC, |
| basePath, expResults); |
| } |
| |
| @Test |
| public void testRetainCntBased() throws Exception { |
| // more lines than configured retained numFiles; only keep the last numFiles |
| Topology t = newTopology("testRetainCntBased"); |
| |
| // establish a base path |
| Path basePath = createTempFile("test1", "txt", new String[0]); |
| |
| String[] lines = getLines(); |
| |
| // build expected results |
| // net one tuples per file |
| List<List<String>> expResults = buildExpResults(lines, tuple -> true); |
| int keepCnt = 2; // only keep the last n files |
| for (int i = 0; i < keepCnt; i++) |
| expResults.remove(0); |
| assertEquals(keepCnt, expResults.size()); |
| |
| TStream<String> s = t.strings(lines); |
| |
| IFileWriterPolicy<String> policy = new FileWriterPolicy<String>( |
| FileWriterFlushConfig.newImplicitConfig(), |
| FileWriterCycleConfig.newCountBasedConfig(1), |
| FileWriterRetentionConfig.newFileCountBasedConfig(keepCnt) |
| ); |
| FileStreams.textFileWriter(s, () -> basePath.toString(), () -> policy); |
| |
| completeAndValidateWriter(t, TMO_SEC, basePath, expResults); |
| } |
| |
| @Test |
| public void testRetainAggSizeBased() throws Exception { |
| // more aggsize than configured; only keep aggsize worth |
| Topology t = newTopology("testRetainAggSizeBased"); |
| |
| // establish a base path |
| Path basePath = createTempFile("test1", "txt", new String[0]); |
| |
| String[] lines = getLines(); |
| |
| // build expected results |
| // net one tuple per file |
| List<List<String>> expResults = buildExpResults(lines, tuple -> true); |
| // agg size only enough for last two lines |
| long aggregateFileSize = 2 * (("1-"+getStr()).getBytes(StandardCharsets.UTF_8).length + 1/*eol*/); |
| expResults.remove(0); |
| expResults.remove(0); |
| assertEquals(2, expResults.size()); |
| |
| TStream<String> s = t.strings(lines); |
| |
| IFileWriterPolicy<String> policy = new FileWriterPolicy<String>( |
| FileWriterFlushConfig.newImplicitConfig(), |
| FileWriterCycleConfig.newCountBasedConfig(1), |
| FileWriterRetentionConfig.newAggregateFileSizeBasedConfig(aggregateFileSize) |
| ); |
| FileStreams.textFileWriter(s, () -> basePath.toString(), () -> policy); |
| |
| completeAndValidateWriter(t, TMO_SEC, basePath, expResults); |
| } |
| |
| @Test |
| public void testRetainAgeBased() throws Exception { |
| Topology t = newTopology("testRetainAgeBased"); |
| |
| // establish a base path |
| Path basePath = createTempFile("test1", "txt", new String[0]); |
| |
| String[] lines = getLines(); |
| |
| // build expected results |
| int keepCnt = 2; // only keep the last n files with throttling, age, |
| // and TMO_SEC |
| int ageSec = 5; |
| long periodMsec = TimeUnit.SECONDS.toMillis(1); |
| // net one tuple per file |
| List<List<String>> expResults = buildExpResults(lines, tuple -> true); |
| for (int i = 0; i < keepCnt; i++) |
| expResults.remove(0); |
| assertEquals(keepCnt, expResults.size()); |
| |
| // add delay so we can age out things |
| // |
| // After several runs this test seems reliable but |
| // I suspect it may be fragile wrt timing hence the results. |
| // |
| // With 4 tuples, throttleDelay=2sec, and ageSec=5 |
| // t0=add-f1, t1, t2=add-f2, t3, t4=add-f3, t5-rm-f1, t6=add-f4, t7=rm-f2, t8, t9=rm-f3, ... |
| // |
| // So we want to check somewhere around t8 (after t7 and definitely before t9) |
| // so all 4 files were created and the first 2 have been aged out. |
| // with complete delay = #files-1*throttle + TMO_SEC, should be 6+2 == t8. |
| |
| int throttleSec = 2; |
| TStream<String> s = PlumbingStreams.blockingThrottle( |
| t.strings(lines), throttleSec, TimeUnit.SECONDS); |
| |
| IFileWriterPolicy<String> policy = new FileWriterPolicy<String>( |
| FileWriterFlushConfig.newImplicitConfig(), |
| FileWriterCycleConfig.newCountBasedConfig(1), |
| FileWriterRetentionConfig.newAgeBasedConfig(ageSec, periodMsec) |
| ); |
| FileStreams.textFileWriter(s, () -> basePath.toString(), () -> policy); |
| |
| completeAndValidateWriter(t, ((lines.length-1)*throttleSec)+TMO_SEC, |
| basePath, expResults); |
| } |
| |
| @Test |
| public void testFlushImplicit() throws Exception { |
| Topology t = newTopology("testFlushImplicit"); |
| |
| // establish a base path |
| Path basePath = createTempFile("test1", "txt", new String[0]); |
| |
| String[] lines = getLines(); |
| |
| // build expected results |
| // net all in one, the first, file |
| List<List<String>> expResults = buildExpResults(lines, tuple -> false); |
| |
| TStream<String> s = t.strings(lines); |
| |
| IFileWriterPolicy<String> policy = new FileWriterPolicy<String>( |
| FileWriterFlushConfig.newImplicitConfig(), |
| FileWriterCycleConfig.newCountBasedConfig(1000), |
| FileWriterRetentionConfig.newFileCountBasedConfig(10) |
| ); |
| FileStreams.textFileWriter(s, () -> basePath.toString(), () -> policy); |
| |
| completeAndValidateWriter(t, TMO_SEC, basePath, expResults); |
| } |
| |
| @Test |
| public void testFlushCntBased() throws Exception { |
| Topology t = newTopology("testFlushCntBased"); |
| |
| // establish a base path |
| Path basePath = createTempFile("test1", "txt", new String[0]); |
| |
| String[] lines = getLines(); |
| |
| // build expected results |
| // net all in one, the first, file |
| List<List<String>> expResults = buildExpResults(lines, tuple -> false); |
| |
| TStream<String> s = t.strings(lines); |
| |
| IFileWriterPolicy<String> policy = new FileWriterPolicy<String>( |
| FileWriterFlushConfig.newCountBasedConfig(1), // every tuple |
| FileWriterCycleConfig.newCountBasedConfig(1000), // all in 1 file |
| FileWriterRetentionConfig.newFileCountBasedConfig(10) |
| ); |
| FileStreams.textFileWriter(s, () -> basePath.toString(), () -> policy); |
| |
| completeAndValidateWriter(t, TMO_SEC, basePath, expResults); |
| } |
| |
| @Test |
| public void testFlushTimeBased() throws Exception { |
| Topology t = newTopology("testFlushTimeBased"); |
| |
| // establish a base path |
| Path basePath = createTempFile("test1", "txt", new String[0]); |
| |
| String[] lines = getLines(); |
| |
| // build expected results |
| // net all in one, the first, file |
| List<List<String>> expResults = buildExpResults(lines, tuple -> false); |
| |
| // add delay so time flush happens |
| |
| int throttleSec = 1; |
| TStream<String> s = PlumbingStreams.blockingThrottle( |
| t.strings(lines), throttleSec, TimeUnit.SECONDS); |
| |
| IFileWriterPolicy<String> policy = new FileWriterPolicy<String>( |
| FileWriterFlushConfig.newTimeBasedConfig(TimeUnit.MILLISECONDS.toMillis(250)), |
| FileWriterCycleConfig.newCountBasedConfig(1000), // all in 1 file |
| FileWriterRetentionConfig.newFileCountBasedConfig(10) |
| ); |
| FileStreams.textFileWriter(s, () -> basePath.toString(), () -> policy); |
| |
| completeAndValidateWriter(t, (lines.length*throttleSec)+TMO_SEC, |
| basePath, expResults); |
| } |
| |
| @Test |
| public void testFlushTupleBased() throws Exception { |
| Topology t = newTopology("testFlushTupleBased"); |
| |
| // establish a base path |
| Path basePath = createTempFile("test1", "txt", new String[0]); |
| |
| String[] lines = getLines(); |
| |
| // build expected results |
| // net all in one, the first, file |
| List<List<String>> expResults = buildExpResults(lines, tuple -> false); |
| |
| TStream<String> s = t.strings(lines); |
| |
| IFileWriterPolicy<String> policy = new FileWriterPolicy<String>( |
| FileWriterFlushConfig.newPredicateBasedConfig( |
| tuple -> tuple.startsWith("1-") || tuple.startsWith("3-")), |
| FileWriterCycleConfig.newCountBasedConfig(1000), // all in 1 file |
| FileWriterRetentionConfig.newFileCountBasedConfig(10) |
| ); |
| FileStreams.textFileWriter(s, () -> basePath.toString(), () -> policy); |
| |
| completeAndValidateWriter(t, TMO_SEC, basePath, expResults); |
| } |
| |
| @Test |
| public void testCycleCntBased() throws Exception { |
| Topology t = newTopology("testCycleCntBased"); |
| |
| // establish a base path |
| Path basePath = createTempFile("test1", "txt", new String[0]); |
| |
| String[] lines = getLines(); |
| |
| // build expected results |
| // net two tuples per file |
| int cntTuples = 2; |
| AtomicInteger cnt = new AtomicInteger(); |
| Predicate<String> cycleIt = tuple -> cnt.incrementAndGet() % cntTuples == 0; |
| List<List<String>> expResults = buildExpResults(lines, cycleIt); |
| assertEquals(lines.length / cntTuples, expResults.size()); |
| |
| TStream<String> s = t.strings(lines); |
| |
| IFileWriterPolicy<String> policy = new FileWriterPolicy<String>( |
| FileWriterFlushConfig.newImplicitConfig(), |
| FileWriterCycleConfig.newCountBasedConfig(cntTuples), |
| FileWriterRetentionConfig.newFileCountBasedConfig(10) |
| ); |
| FileStreams.textFileWriter(s, () -> basePath.toString(), () -> policy); |
| |
| completeAndValidateWriter(t, TMO_SEC, basePath, expResults); |
| } |
| |
| @Test |
| public void testCycleSizeBased() throws Exception { |
| Topology t = newTopology("testCycleSizeBased"); |
| |
| // establish a base path |
| Path basePath = createTempFile("test1", "txt", new String[0]); |
| |
| String[] lines = getLines(); |
| |
| // build expected results |
| // net one tuple per file |
| List<List<String>> expResults = buildExpResults(lines, tuple -> true); |
| int fileSize = 2; |
| |
| TStream<String> s = t.strings(lines); |
| |
| IFileWriterPolicy<String> policy = new FileWriterPolicy<String>( |
| FileWriterFlushConfig.newImplicitConfig(), |
| FileWriterCycleConfig.newFileSizeBasedConfig(fileSize), |
| FileWriterRetentionConfig.newFileCountBasedConfig(10) |
| ); |
| FileStreams.textFileWriter(s, () -> basePath.toString(), () -> policy); |
| |
| completeAndValidateWriter(t, TMO_SEC, basePath, expResults); |
| } |
| |
| @Test |
| public void testCycleTimeBased() throws Exception { |
| Topology t = newTopology("testCycleTimeBased"); |
| |
| // establish a base path |
| Path basePath = createTempFile("test1", "txt", new String[0]); |
| |
| String[] lines = getLines(); |
| |
| // build expected results |
| // net one tuple per file with 250msec cycle config and 1 throttle |
| List<List<String>> expResults = buildExpResults(lines, tuple -> true); |
| |
| // add delay so time cycle happens |
| // also verifies only cycle if there's something to cycle |
| // (i.e., these cycles happen faster than tuples are written) |
| |
| int throttleSec = 1; |
| TStream<String> s = PlumbingStreams.blockingThrottle( |
| t.strings(lines), throttleSec, TimeUnit.SECONDS); |
| |
| IFileWriterPolicy<String> policy = new FileWriterPolicy<String>( |
| FileWriterFlushConfig.newImplicitConfig(), |
| FileWriterCycleConfig.newTimeBasedConfig(TimeUnit.MILLISECONDS.toMillis(250)), |
| FileWriterRetentionConfig.newFileCountBasedConfig(10) |
| ); |
| FileStreams.textFileWriter(s, () -> basePath.toString(), () -> policy); |
| |
| completeAndValidateWriter(t, (lines.length*throttleSec)+TMO_SEC, |
| basePath, expResults); |
| } |
| |
| @Test |
| public void testCycleTupleBased() throws Exception { |
| Topology t = newTopology("testCycleTupleBased"); |
| |
| // establish a base path |
| Path basePath = createTempFile("test1", "txt", new String[0]); |
| |
| String[] lines = getLines(); |
| |
| // build expected results |
| // a tuple based config / predicate. in this case should end up with 3 files. |
| Predicate<String> cycleIt = tuple -> tuple.startsWith("1-") || tuple.startsWith("3-"); |
| List<List<String>> expResults = buildExpResults(lines, cycleIt); |
| assertEquals(3, expResults.size()); |
| |
| TStream<String> s = t.strings(lines); |
| |
| IFileWriterPolicy<String> policy = new FileWriterPolicy<String>( |
| FileWriterFlushConfig.newImplicitConfig(), |
| FileWriterCycleConfig.newPredicateBasedConfig(cycleIt), |
| FileWriterRetentionConfig.newFileCountBasedConfig(10) |
| ); |
| FileStreams.textFileWriter(s, () -> basePath.toString(), () -> policy); |
| |
| completeAndValidateWriter(t, TMO_SEC, basePath, expResults); |
| } |
| |
| @Test |
| public void testAllTimeBased() throws Exception { |
| // exercise case with multiple timer based policies |
| Topology t = newTopology("testAllTimeBased"); |
| |
| // establish a base path |
| Path basePath = createTempFile("test1", "txt", new String[0]); |
| |
| String[] lines = getLines(); |
| |
| // build expected results |
| // keep all given age and TMO_SEC |
| int ageSec = 10; |
| long periodMsec = TimeUnit.SECONDS.toMillis(1); |
| // net one tuple per file |
| List<List<String>> expResults = buildExpResults(lines, tuple -> true); |
| |
| TStream<String> s = t.strings(lines); |
| |
| IFileWriterPolicy<String> policy = new FileWriterPolicy<String>( |
| FileWriterFlushConfig.newTimeBasedConfig(TimeUnit.MILLISECONDS.toMillis(250)), |
| FileWriterCycleConfig.newConfig(1, 2000, TimeUnit.SECONDS.toMillis(10), null), |
| FileWriterRetentionConfig.newAgeBasedConfig(ageSec, periodMsec) |
| ); |
| FileStreams.textFileWriter(s, () -> basePath.toString(), () -> policy); |
| |
| completeAndValidateWriter(t, TMO_SEC, basePath, expResults); |
| } |
| |
| @Test |
| public void testWriterWatcherReader() throws Exception { |
| // verify all the pieces work together |
| Topology t = newTopology("testWriterWatcherReader"); |
| |
| String testDirPrefix = "testWriterWatcherReader"; |
| Path dir = Files.createTempDirectory(testDirPrefix); |
| Path basePath = dir.resolve("writerCreated"); |
| |
| String[] lines = getLines(); |
| |
| System.out.println("########## "+t.getName()); |
| |
| // Write the files |
| // add delay so watcher starts first and gets to "see" file additions |
| int throttleSec = 2; |
| TStream<String> contents = PlumbingStreams.blockingOneShotDelay( |
| t.strings(lines), 2, TimeUnit.SECONDS); |
| contents = PlumbingStreams.blockingThrottle( |
| contents, throttleSec, TimeUnit.SECONDS); |
| |
| IFileWriterPolicy<String> policy = new FileWriterPolicy<String>( |
| FileWriterFlushConfig.newImplicitConfig(), |
| FileWriterCycleConfig.newCountBasedConfig(1), // one per file |
| FileWriterRetentionConfig.newFileCountBasedConfig(10) |
| ); |
| FileStreams.textFileWriter(contents, () -> basePath.toString(), () -> policy); |
| |
| // Watch and read contents |
| TStream<String> pathnames = FileStreams.directoryWatcher(t, |
| () -> dir.toAbsolutePath().toString()) |
| .peek(tuple -> System.out.println(new Date() + " watcher added "+tuple)) |
| .peek(tuple -> { if (new File(tuple).getName().startsWith(".")) |
| throw new RuntimeException("Not filtering active/hidden files "+tuple); }); |
| TStream<String> readContents = FileStreams.textFileReader(pathnames); |
| |
| boolean dump = true; |
| try { |
| completeAndValidate("", t, readContents, |
| (lines.length*throttleSec)+TMO_SEC+3/*on-the-edge*/, lines); |
| dump = false; |
| } |
| finally { |
| deleteDirAndFiles(dir, testDirPrefix, dump); |
| } |
| } |
| |
| @Test |
| public void testCompressedFileWriterPolicy() throws Exception { |
| Topology t = newTopology("testCompressedFileWriterPolicy"); |
| |
| // establish a base path |
| Path basePath = createTempFile("test1", "txt", new String[0]); |
| |
| String[] lines = getLines(); |
| |
| // build expected results |
| // net 2 tuples per file |
| int cntTuples = 2; |
| AtomicInteger cnt = new AtomicInteger(); |
| Predicate<String> cycleIt = tuple -> cnt.incrementAndGet() % cntTuples == 0; |
| List<List<String>> expResults = buildExpResults(lines, cycleIt); |
| assertEquals(lines.length / cntTuples, expResults.size()); |
| |
| TStream<String> s = t.strings(lines); |
| |
| IFileWriterPolicy<String> policy = new CompressedFileWriterPolicy<String>( |
| FileWriterFlushConfig.newImplicitConfig(), |
| FileWriterCycleConfig.newCountBasedConfig(cntTuples), |
| FileWriterRetentionConfig.newFileCountBasedConfig(10) |
| ); |
| FileStreams.textFileWriter(s, () -> basePath.toString(), () -> policy); |
| |
| completeAndValidateWriter(t, TMO_SEC, basePath, expResults); |
| } |
| |
| private void deleteDirAndFiles(Path dir, String dirPrefix, boolean dump) { |
| // exercise caution before removing all files in dir |
| if (!dirPrefix.startsWith("test")) |
| throw new IllegalStateException("Yikes. dir:"+dir+" dirPrefix:"+dirPrefix); |
| String leaf = dir.getFileName().toString(); |
| if (!leaf.startsWith(dirPrefix)) |
| throw new IllegalStateException("Yikes. dir:"+dir+" dirPrefix:"+dirPrefix); |
| |
| // Ok, delete all the files in the dir and then the dir |
| for (File file : dir.toFile().listFiles()) { |
| if (dump) |
| dumpFile(file); |
| file.delete(); |
| } |
| dir.toFile().delete(); |
| } |
| |
| private void dumpFile(File f) { |
| System.out.println("<<<<< Dumping "+f); |
| try { |
| Path path = f.toPath(); |
| try (BufferedReader br = Files.newBufferedReader(path)) { |
| br.lines().forEach(line -> System.out.println(line)); |
| } |
| } |
| catch (Exception e) { |
| System.out.println("##### exception: " + e.getLocalizedMessage()); |
| } |
| System.out.println(">>>>> DONE "+f); |
| } |
| |
| private <T> List<List<T>> buildExpResults(T[] tuples, Predicate<T> cycleIt) { |
| List<List<T>> expResults = new ArrayList<>(); |
| List<T> oneFile = null; |
| for (T tuple : tuples) { |
| if (oneFile==null) { |
| oneFile = new ArrayList<>(); |
| expResults.add(oneFile); |
| } |
| oneFile.add(tuple); |
| if (cycleIt.test(tuple)) |
| oneFile = null; |
| } |
| return expResults; |
| } |
| |
| private <T> void completeAndValidateWriter(Topology t, int tmoSec, |
| Path basePath, List<List<T>> expResults) throws Exception { |
| |
| try { |
| // just let it run to the tmo before we check the file contents |
| Condition<Object> tc = new Condition<Object>() { |
| public boolean valid() { return false; } |
| public Object getResult() { return null; } |
| }; |
| |
| complete(t, tc, tmoSec, TimeUnit.SECONDS); |
| |
| System.out.println("########## "+t.getName()); |
| |
| // right number of files? |
| List<Path> actFiles = getActFiles(basePath); |
| System.out.println("actFiles: "+actFiles); |
| assertEquals(actFiles.toString(), expResults.size(), actFiles.size()); |
| |
| // do the file(s) have the right contents? |
| System.out.println("expResults: "+expResults); |
| int i = 0; |
| for (List<T> expFile : expResults) { |
| Path path = actFiles.get(i++); |
| checkContents(path, expFile.toArray(new String[0])); |
| } |
| } |
| finally { |
| deleteAll(basePath); |
| } |
| } |
| |
| private void deleteAll(Path basePath) { |
| Path parent = basePath.getParent(); |
| String baseLeaf = basePath.getFileName().toString(); |
| String[] actLeafs = parent.toFile().list( |
| (dir,leaf) -> leaf.startsWith(baseLeaf)); |
| for (String leaf : actLeafs) { |
| parent.resolve(leaf).toFile().delete(); |
| } |
| } |
| |
| private List<Path> getActFiles(Path basePath) { |
| List<Path> paths = new ArrayList<>(); |
| Path parent = basePath.getParent(); |
| String baseLeaf = basePath.getFileName().toString(); |
| String[] actLeafs = parent.toFile().list( |
| (dir,leaf) -> leaf.startsWith(baseLeaf+"_")); |
| Arrays.sort(actLeafs, (o1,o2) -> o1.compareTo(o2)); |
| for (String leaf : actLeafs) { |
| paths.add(parent.resolve(leaf)); |
| } |
| return paths; |
| } |
| |
| private void checkContents(Path path, String[] lines) { |
| if (path.getFileName().toString().endsWith(".zip")) { |
| checkZipContents(path, lines); |
| return; |
| } |
| System.out.println("checking file "+path); |
| int lineCnt = 0; |
| try (BufferedReader br = Files.newBufferedReader(path)) { |
| for (String line : lines) { |
| ++lineCnt; |
| String actLine = br.readLine(); |
| assertEquals("path:"+path+" line "+lineCnt, line, actLine); |
| } |
| assertNull("path:"+path+" line "+lineCnt+" expected EOF", br.readLine()); |
| } |
| catch (IOException e) { |
| assertNull("path:"+path+" line "+lineCnt+" unexpected IOException "+e, e); |
| } |
| } |
| |
| private void checkZipContents(Path path, String[] lines) { |
| System.out.println("checking file "+path); |
| String fileName = path.getFileName().toString(); |
| String entryName = fileName.substring(0, fileName.length() - ".zip".length()); |
| int lineCnt = 0; |
| try ( |
| FileInputStream fis = new FileInputStream(path.toFile()); |
| ZipInputStream zin = new ZipInputStream((new BufferedInputStream(fis))); |
| ) |
| { |
| ZipEntry entry = zin.getNextEntry(); |
| |
| assertEquals(entryName, entry.getName()); |
| |
| BufferedReader br = new BufferedReader(new InputStreamReader(zin, StandardCharsets.UTF_8)); |
| for (String line : lines) { |
| ++lineCnt; |
| String actLine = br.readLine(); |
| assertEquals("path:"+path+" line "+lineCnt, line, actLine); |
| } |
| assertNull("path:"+path+" line "+lineCnt+" expected EOF", br.readLine()); |
| } |
| catch (IOException e) { |
| assertNull("path:"+path+" line "+lineCnt+" unexpected IOException "+e, e); |
| } |
| } |
| } |