blob: 2a48605080de2e60b48527276cd7637e4cf4a78c [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.datatorrent.lib.io.fs;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeoutException;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestWatcher;
import org.junit.runner.Description;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import com.google.common.collect.Sets;
import com.datatorrent.api.Attribute;
import com.datatorrent.api.Context;
import com.datatorrent.lib.io.block.BlockMetadata;
import com.datatorrent.lib.testbench.CollectorTestSink;
import com.datatorrent.lib.util.KryoCloneUtils;
import com.datatorrent.lib.util.TestUtils;
import static com.datatorrent.lib.helper.OperatorContextTestHelper.mockOperatorContext;
/**
* Tests for {@link FileSplitterInput}
*/
public class FileSplitterInputTest
{
static Set<String> createData(String dataDirectory) throws IOException
{
Set<String> filePaths = Sets.newHashSet();
FileContext.getLocalFSFileContext().delete(new Path(new File(dataDirectory).getAbsolutePath()), true);
HashSet<String> allLines = Sets.newHashSet();
for (int file = 0; file < 12; file++) {
HashSet<String> lines = Sets.newHashSet();
for (int line = 0; line < 2; line++) {
//padding 0 to file number so every file has 6 blocks.
lines.add("f" + String.format("%02d", file) + "l" + line);
}
allLines.addAll(lines);
File created = new File(dataDirectory, "file" + file + ".txt");
filePaths.add(created.getAbsolutePath());
FileUtils.write(created, StringUtils.join(lines, '\n'));
}
return filePaths;
}
public static class TestMeta extends TestWatcher
{
String dataDirectory;
FileSplitterInput fileSplitterInput;
CollectorTestSink<FileSplitterInput.FileMetadata> fileMetadataSink;
CollectorTestSink<BlockMetadata.FileBlockMetadata> blockMetadataSink;
Set<String> filePaths;
Context.OperatorContext context;
MockScanner scanner;
@Override
protected void starting(org.junit.runner.Description description)
{
TestUtils.deleteTargetTestClassFolder(description);
String methodName = description.getMethodName();
String className = description.getClassName();
this.dataDirectory = "target/" + className + "/" + methodName + "/data";
try {
filePaths = createData(this.dataDirectory);
} catch (IOException e) {
throw new RuntimeException(e);
}
fileSplitterInput = new FileSplitterInput();
fileSplitterInput.setBlocksThreshold(100);
scanner = new MockScanner();
scanner.setScanIntervalMillis(100);
scanner.setFilePatternRegularExp(".*[.]txt");
scanner.setFiles(dataDirectory);
fileSplitterInput.setScanner(scanner);
Attribute.AttributeMap.DefaultAttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap();
attributes.put(Context.DAGContext.APPLICATION_PATH,
"target/" + className + "/" + methodName + "/" + Long.toHexString(System.currentTimeMillis()));
context = mockOperatorContext(0, attributes);
fileMetadataSink = new CollectorTestSink<>();
blockMetadataSink = new CollectorTestSink<>();
resetSinks();
}
@Override
protected void finished(Description description)
{
filePaths.clear();
TestUtils.deleteTargetTestClassFolder(description);
}
private void resetSinks()
{
TestUtils.setSink(fileSplitterInput.filesMetadataOutput, fileMetadataSink);
TestUtils.setSink(fileSplitterInput.blocksMetadataOutput, blockMetadataSink);
}
private void updateConfig(FSWindowDataManager fsWindowDataManager,
long scanInterval, long blockSize, int blocksThreshold)
{
fileSplitterInput.setWindowDataManager(fsWindowDataManager);
fileSplitterInput.getScanner().setScanIntervalMillis(scanInterval);
fileSplitterInput.setBlockSize(blockSize);
fileSplitterInput.setBlocksThreshold(blocksThreshold);
}
}
@Rule
public TestMeta testMeta = new TestMeta();
private void validateFileMetadataInWindow1() throws InterruptedException
{
testMeta.fileSplitterInput.beginWindow(1);
((MockScanner)testMeta.fileSplitterInput.getScanner()).semaphore.acquire(12);
testMeta.fileSplitterInput.emitTuples();
testMeta.fileSplitterInput.endWindow();
Assert.assertEquals("File metadata", 12, testMeta.fileMetadataSink.collectedTuples.size());
for (Object fileMetadata : testMeta.fileMetadataSink.collectedTuples) {
FileSplitterInput.FileMetadata metadata = (FileSplitterInput.FileMetadata)fileMetadata;
Assert.assertTrue("path: " + metadata.getFilePath(), testMeta.filePaths.contains(metadata.getFilePath()));
Assert.assertNotNull("name: ", metadata.getFileName());
}
testMeta.fileMetadataSink.collectedTuples.clear();
}
@Test
public void testFileMetadata() throws InterruptedException
{
testMeta.fileSplitterInput.setup(testMeta.context);
validateFileMetadataInWindow1();
testMeta.fileSplitterInput.teardown();
}
@Test
public void testScannerFilterForDuplicates() throws InterruptedException
{
String filePath = testMeta.dataDirectory + Path.SEPARATOR + "file0.txt";
testMeta.scanner = new MockScanner();
testMeta.fileSplitterInput.setScanner(testMeta.scanner);
testMeta.fileSplitterInput.getScanner().setScanIntervalMillis(500);
testMeta.fileSplitterInput.getScanner().setFilePatternRegularExp(".*[.]txt");
testMeta.fileSplitterInput.getScanner().setFiles(filePath);
testMeta.fileSplitterInput.setup(testMeta.context);
testMeta.fileSplitterInput.beginWindow(1);
testMeta.scanner.semaphore.acquire();
testMeta.fileSplitterInput.emitTuples();
testMeta.fileSplitterInput.endWindow();
testMeta.fileSplitterInput.beginWindow(2);
testMeta.fileSplitterInput.emitTuples();
testMeta.fileSplitterInput.endWindow();
Assert.assertEquals("File metadata", 1, testMeta.fileMetadataSink.collectedTuples.size());
for (Object fileMetadata : testMeta.fileMetadataSink.collectedTuples) {
FileSplitterInput.FileMetadata metadata = (FileSplitterInput.FileMetadata)fileMetadata;
Assert.assertTrue("path: " + metadata.getFilePath(), testMeta.filePaths.contains(metadata.getFilePath()));
Assert.assertNotNull("name: ", metadata.getFileName());
}
testMeta.fileMetadataSink.collectedTuples.clear();
testMeta.fileSplitterInput.teardown();
}
@Test
public void testBlockMetadataNoSplit() throws InterruptedException
{
testMeta.fileSplitterInput.setup(testMeta.context);
testMeta.fileSplitterInput.beginWindow(1);
testMeta.scanner.semaphore.acquire(12);
testMeta.fileSplitterInput.emitTuples();
Assert.assertEquals("Blocks", 12, testMeta.blockMetadataSink.collectedTuples.size());
for (Object blockMetadata : testMeta.blockMetadataSink.collectedTuples) {
BlockMetadata.FileBlockMetadata metadata = (BlockMetadata.FileBlockMetadata)blockMetadata;
Assert.assertTrue("path: " + metadata.getFilePath(), testMeta.filePaths.contains(metadata.getFilePath()));
}
testMeta.fileSplitterInput.teardown();
}
@Test
public void testBlockMetadataWithSplit() throws InterruptedException
{
testMeta.fileSplitterInput.setBlockSize(2L);
testMeta.fileSplitterInput.setup(testMeta.context);
testMeta.fileSplitterInput.beginWindow(1);
testMeta.scanner.semaphore.acquire(12);
testMeta.fileSplitterInput.emitTuples();
Assert.assertEquals("Files", 12, testMeta.fileMetadataSink.collectedTuples.size());
int noOfBlocks = 0;
for (int i = 0; i < 12; i++) {
FileSplitterInput.FileMetadata fm = testMeta.fileMetadataSink.collectedTuples.get(i);
File testFile = new File(testMeta.dataDirectory, fm.getFileName());
noOfBlocks += (int)Math.ceil(testFile.length() / (2 * 1.0));
}
Assert.assertEquals("Blocks", noOfBlocks, testMeta.blockMetadataSink.collectedTuples.size());
testMeta.fileSplitterInput.teardown();
}
@Test
public void testIdempotency() throws InterruptedException
{
FSWindowDataManager fsIdempotentStorageManager = new FSWindowDataManager();
testMeta.fileSplitterInput.setWindowDataManager(fsIdempotentStorageManager);
testMeta.fileSplitterInput.setup(testMeta.context);
//will emit window 1 from data directory
validateFileMetadataInWindow1();
testMeta.fileMetadataSink.clear();
testMeta.blockMetadataSink.clear();
testMeta.fileSplitterInput.teardown();
testMeta.fileSplitterInput = KryoCloneUtils.cloneObject(testMeta.fileSplitterInput);
testMeta.resetSinks();
testMeta.fileSplitterInput.setup(testMeta.context);
testMeta.fileSplitterInput.beginWindow(1);
Assert.assertEquals("Blocks", 12, testMeta.blockMetadataSink.collectedTuples.size());
for (Object blockMetadata : testMeta.blockMetadataSink.collectedTuples) {
BlockMetadata.FileBlockMetadata metadata = (BlockMetadata.FileBlockMetadata)blockMetadata;
Assert.assertTrue("path: " + metadata.getFilePath(), testMeta.filePaths.contains(metadata.getFilePath()));
}
testMeta.fileSplitterInput.teardown();
}
@Test
public void testTimeScan() throws InterruptedException, IOException, TimeoutException
{
testMeta.fileSplitterInput.setup(testMeta.context);
validateFileMetadataInWindow1();
testMeta.fileMetadataSink.clear();
testMeta.blockMetadataSink.clear();
Thread.sleep(1000);
//added a new relativeFilePath
File f13 = new File(testMeta.dataDirectory, "file13" + ".txt");
HashSet<String> lines = Sets.newHashSet();
for (int line = 0; line < 2; line++) {
lines.add("f13" + "l" + line);
}
FileUtils.write(f13, StringUtils.join(lines, '\n'));
//window 2
testMeta.fileSplitterInput.beginWindow(2);
testMeta.scanner.semaphore.acquire();
testMeta.fileSplitterInput.emitTuples();
testMeta.fileSplitterInput.endWindow();
Assert.assertEquals("window 2: files " + testMeta.fileMetadataSink.collectedTuples, 1,
testMeta.fileMetadataSink.collectedTuples.size());
Assert.assertEquals("window 2: blocks", 1, testMeta.blockMetadataSink.collectedTuples.size());
testMeta.fileSplitterInput.teardown();
}
@Test
public void testTrigger() throws InterruptedException, IOException, TimeoutException
{
testMeta.fileSplitterInput.getScanner().setScanIntervalMillis(60 * 1000);
testMeta.fileSplitterInput.setup(testMeta.context);
validateFileMetadataInWindow1();
testMeta.fileMetadataSink.clear();
testMeta.blockMetadataSink.clear();
Thread.sleep(1000);
//added a new relativeFilePath
File f13 = new File(testMeta.dataDirectory, "file13" + ".txt");
HashSet<String> lines = Sets.newHashSet();
for (int line = 0; line < 2; line++) {
lines.add("f13" + "l" + line);
}
FileUtils.write(f13, StringUtils.join(lines, '\n'));
testMeta.fileSplitterInput.getScanner().setTrigger(true);
//window 2
testMeta.fileSplitterInput.beginWindow(2);
testMeta.scanner.semaphore.acquire();
testMeta.fileSplitterInput.emitTuples();
testMeta.fileSplitterInput.endWindow();
Assert.assertEquals("window 2: files", 1, testMeta.fileMetadataSink.collectedTuples.size());
Assert.assertEquals("window 2: blocks", 1, testMeta.blockMetadataSink.collectedTuples.size());
testMeta.fileSplitterInput.teardown();
}
private int getTotalNumOfBlocks(int numFiles, long blockLength)
{
int noOfBlocks = 0;
for (int i = 0; i < numFiles; i++) {
File testFile = new File(testMeta.dataDirectory, "file" + i + ".txt");
noOfBlocks += (int)Math.ceil(testFile.length() / (blockLength * 1.0));
}
return noOfBlocks;
}
private void validateBlocks(long targetWindow, long blockLength) throws InterruptedException
{
testMeta.fileSplitterInput.beginWindow(1);
((MockScanner)testMeta.fileSplitterInput.getScanner()).semaphore.acquire(12);
testMeta.fileSplitterInput.emitTuples();
testMeta.fileSplitterInput.endWindow();
Assert.assertEquals("Blocks", 10, testMeta.blockMetadataSink.collectedTuples.size());
for (int window = 2; window <= targetWindow; window++) {
testMeta.fileSplitterInput.beginWindow(window);
testMeta.fileSplitterInput.emitTuples();
testMeta.fileSplitterInput.endWindow();
}
Assert.assertEquals("Files", 12, testMeta.fileMetadataSink.collectedTuples.size());
Assert.assertEquals("Blocks", getTotalNumOfBlocks(12, blockLength), testMeta.blockMetadataSink.collectedTuples.size());
}
@Test
public void testBlocksThreshold() throws InterruptedException
{
testMeta.fileSplitterInput.setBlockSize(2L);
testMeta.fileSplitterInput.setBlocksThreshold(10);
testMeta.fileSplitterInput.setup(testMeta.context);
validateBlocks(8, 2);
testMeta.fileSplitterInput.teardown();
}
private void validateRecovery(long targetWindow, long blockLength) throws InterruptedException
{
validateBlocks(targetWindow, blockLength);
testMeta.fileMetadataSink.clear();
testMeta.blockMetadataSink.clear();
testMeta.fileSplitterInput.teardown();
testMeta.fileSplitterInput = KryoCloneUtils.cloneObject(testMeta.fileSplitterInput);
testMeta.resetSinks();
testMeta.fileSplitterInput.setup(testMeta.context);
for (int i = 1; i <= targetWindow; i++) {
testMeta.fileSplitterInput.beginWindow(i);
}
Assert.assertEquals("Files", 12, testMeta.fileMetadataSink.collectedTuples.size());
Assert.assertEquals("Blocks", getTotalNumOfBlocks(12, blockLength),
testMeta.blockMetadataSink.collectedTuples.size());
}
@Test
public void testIdempotencyWithBlocksThreshold() throws InterruptedException
{
FSWindowDataManager fsWindowDataManager = new FSWindowDataManager();
testMeta.updateConfig(fsWindowDataManager, 500, 2L, 10);
testMeta.fileSplitterInput.setup(testMeta.context);
validateRecovery(8, 2);
testMeta.fileSplitterInput.teardown();
}
@Test
public void testFirstWindowAfterRecovery() throws IOException, InterruptedException
{
FSWindowDataManager fsWindowDataManager = new FSWindowDataManager();
testMeta.updateConfig(fsWindowDataManager, 500, 2L, 10);
testMeta.fileSplitterInput.setup(testMeta.context);
validateRecovery(8, 2);
Thread.sleep(1000);
HashSet<String> lines = Sets.newHashSet();
for (int line = 2; line < 4; line++) {
lines.add("f13" + "l" + line);
}
File f13 = new File(testMeta.dataDirectory, "file13" + ".txt");
FileUtils.writeLines(f13, lines, true);
testMeta.fileMetadataSink.clear();
testMeta.blockMetadataSink.clear();
testMeta.fileSplitterInput.beginWindow(9);
((MockScanner)testMeta.fileSplitterInput.getScanner()).semaphore.acquire();
testMeta.fileSplitterInput.emitTuples();
testMeta.fileSplitterInput.endWindow();
Assert.assertEquals("Files " + testMeta.fileMetadataSink.collectedTuples, 1,
testMeta.fileMetadataSink.collectedTuples.size());
Assert.assertEquals("Blocks", 6, testMeta.blockMetadataSink.collectedTuples.size());
testMeta.fileSplitterInput.teardown();
}
@Test
public void testRecoveryOfPartialFile() throws InterruptedException
{
FSWindowDataManager fsIdempotentStorageManager = new FSWindowDataManager();
testMeta.updateConfig(fsIdempotentStorageManager, 500L, 2L, 2);
FileSplitterInput checkpointedInput = KryoCloneUtils.cloneObject(testMeta.fileSplitterInput);
testMeta.fileSplitterInput.setup(testMeta.context);
testMeta.fileSplitterInput.beginWindow(1);
testMeta.scanner.semaphore.acquire(12);
testMeta.fileSplitterInput.emitTuples();
testMeta.fileSplitterInput.endWindow();
//fileX.txt has just 6 blocks. Since blocks threshold is 2, only 2 are emitted.
Assert.assertEquals("Files", 1, testMeta.fileMetadataSink.collectedTuples.size());
Assert.assertEquals("Blocks", 2, testMeta.blockMetadataSink.collectedTuples.size());
AbstractFileSplitter.FileMetadata fileX = testMeta.fileMetadataSink.collectedTuples.get(0);
testMeta.fileMetadataSink.clear();
testMeta.blockMetadataSink.clear();
testMeta.fileSplitterInput.teardown();
//there was a failure and the operator was re-deployed
testMeta.fileSplitterInput = checkpointedInput;
testMeta.resetSinks();
testMeta.fileSplitterInput.setup(testMeta.context);
testMeta.fileSplitterInput.beginWindow(1);
//fileX is recovered and first two blocks are repeated.
Assert.assertEquals("Recovered Files", 1, testMeta.fileMetadataSink.collectedTuples.size());
AbstractFileSplitter.FileMetadata fileXRecovered = testMeta.fileMetadataSink.collectedTuples.get(0);
Assert.assertEquals("recovered file-metadata", fileX.getFileName(), fileXRecovered.getFileName());
Assert.assertEquals("Recovered Blocks", 2, testMeta.blockMetadataSink.collectedTuples.size());
testMeta.fileSplitterInput.endWindow();
testMeta.fileMetadataSink.clear();
testMeta.blockMetadataSink.clear();
testMeta.fileSplitterInput.beginWindow(2);
testMeta.fileSplitterInput.emitTuples(); //next 2 blocks of fileX
testMeta.fileSplitterInput.endWindow();
testMeta.fileSplitterInput.beginWindow(3);
testMeta.fileSplitterInput.emitTuples(); //next 2 blocks of fileX
testMeta.fileSplitterInput.endWindow();
//Next 2 blocks of fileX
Assert.assertEquals("File", 0, testMeta.fileMetadataSink.collectedTuples.size());
Assert.assertEquals("Blocks", 4, testMeta.blockMetadataSink.collectedTuples.size());
testMeta.fileMetadataSink.clear();
testMeta.blockMetadataSink.clear();
testMeta.fileSplitterInput.beginWindow(4);
((MockScanner)testMeta.fileSplitterInput.getScanner()).semaphore.acquire(11);
testMeta.fileSplitterInput.emitTuples();
testMeta.fileSplitterInput.endWindow();
//2 blocks of a different file
Assert.assertEquals("New file", 1, testMeta.fileMetadataSink.collectedTuples.size());
Assert.assertEquals("Blocks", 2, testMeta.blockMetadataSink.collectedTuples.size());
AbstractFileSplitter.FileMetadata fileY = testMeta.fileMetadataSink.collectedTuples.get(0);
for (BlockMetadata.FileBlockMetadata blockMetadata : testMeta.blockMetadataSink.collectedTuples) {
Assert.assertTrue("Block file name", blockMetadata.getFilePath().endsWith(fileY.getFileName()));
testMeta.fileSplitterInput.teardown();
}
}
@Test
public void testRecursive() throws InterruptedException, IOException
{
testMeta.fileSplitterInput.setup(testMeta.context);
validateFileMetadataInWindow1();
testMeta.fileMetadataSink.clear();
testMeta.blockMetadataSink.clear();
//added a new relativeFilePath
File f13 = new File(testMeta.dataDirectory + "/child", "file13" + ".txt");
HashSet<String> lines = Sets.newHashSet();
for (int line = 0; line < 2; line++) {
lines.add("f13" + "l" + line);
}
FileUtils.write(f13, StringUtils.join(lines, '\n'));
//window 2
testMeta.fileSplitterInput.beginWindow(2);
testMeta.scanner.semaphore.acquire(2);
testMeta.fileSplitterInput.emitTuples();
testMeta.fileSplitterInput.endWindow();
//one for the folder "child" and one for "file13"
Assert.assertEquals("window 2: files " + testMeta.fileMetadataSink.collectedTuples, 2,
testMeta.fileMetadataSink.collectedTuples.size());
Assert.assertEquals("window 2: blocks", 1, testMeta.blockMetadataSink.collectedTuples.size());
testMeta.fileSplitterInput.teardown();
}
@Test
public void testSingleFile() throws InterruptedException, IOException
{
testMeta.fileSplitterInput.setScanner(new MockScanner());
testMeta.fileSplitterInput.getScanner().setFiles(testMeta.dataDirectory + "/file1.txt");
testMeta.fileSplitterInput.setup(testMeta.context);
testMeta.fileSplitterInput.beginWindow(1);
((MockScanner)testMeta.fileSplitterInput.getScanner()).semaphore.acquire();
testMeta.fileSplitterInput.emitTuples();
testMeta.fileSplitterInput.endWindow();
Assert.assertEquals("File metadata count", 1, testMeta.fileMetadataSink.collectedTuples.size());
Assert.assertEquals("File metadata", new File(testMeta.dataDirectory + "/file1.txt").getAbsolutePath(),
testMeta.fileMetadataSink.collectedTuples.get(0).getFilePath());
testMeta.fileSplitterInput.teardown();
}
@Test
public void testRecoveryOfBlockMetadataIterator() throws InterruptedException
{
FSWindowDataManager fsWindowDataManager = new FSWindowDataManager();
testMeta.updateConfig(fsWindowDataManager, 500L, 2L, 2);
testMeta.fileSplitterInput.setup(testMeta.context);
testMeta.fileSplitterInput.beginWindow(1);
testMeta.scanner.semaphore.acquire(12);
testMeta.fileSplitterInput.emitTuples();
testMeta.fileSplitterInput.endWindow();
//file0.txt has just 5 blocks. Since blocks threshold is 2, only 2 are emitted.
Assert.assertEquals("Files", 1, testMeta.fileMetadataSink.collectedTuples.size());
Assert.assertEquals("Blocks", 2, testMeta.blockMetadataSink.collectedTuples.size());
testMeta.fileMetadataSink.clear();
testMeta.blockMetadataSink.clear();
//At this point the operator was check-pointed and then there was a failure.
testMeta.fileSplitterInput.teardown();
//The operator was restored from persisted state and re-deployed.
testMeta.fileSplitterInput = KryoCloneUtils.cloneObject(testMeta.fileSplitterInput);
TestUtils.setSink(testMeta.fileSplitterInput.blocksMetadataOutput, testMeta.blockMetadataSink);
TestUtils.setSink(testMeta.fileSplitterInput.filesMetadataOutput, testMeta.fileMetadataSink);
testMeta.fileSplitterInput.setup(testMeta.context);
testMeta.fileSplitterInput.beginWindow(1);
Assert.assertEquals("Recovered Files", 1, testMeta.fileMetadataSink.collectedTuples.size());
Assert.assertEquals("Recovered Blocks", 2, testMeta.blockMetadataSink.collectedTuples.size());
testMeta.fileSplitterInput.teardown();
}
@Test
public void testFileModificationTest() throws InterruptedException, IOException, TimeoutException
{
File file11 = new File(testMeta.dataDirectory, "file11.txt");
long lastModifiedTime = file11.lastModified();
LOG.debug("file 11 modified time {} ", lastModifiedTime);
testMeta.fileSplitterInput.getScanner().setScanIntervalMillis(60 * 1000);
testMeta.fileSplitterInput.setup(testMeta.context);
validateFileMetadataInWindow1();
testMeta.fileMetadataSink.clear();
testMeta.blockMetadataSink.clear();
Thread.sleep(1000);
//create more lines to append to the file
HashSet<String> lines = Sets.newHashSet();
for (int line = 0; line < 2; line++) {
lines.add("f11" + "l" + line);
}
/* Need to use FileWriter, FileUtils changes the directory timestamp when
file is changed. */
FileWriter fout = new FileWriter(file11, true);
fout.write(StringUtils.join(lines, '\n').toCharArray());
fout.close();
LOG.debug("file 11 modified time after append {} ", file11.lastModified());
testMeta.fileSplitterInput.getScanner().setTrigger(true);
//window 2
testMeta.fileSplitterInput.beginWindow(2);
testMeta.scanner.semaphore.acquire();
testMeta.fileSplitterInput.emitTuples();
testMeta.fileSplitterInput.endWindow();
Assert.assertEquals("window 2: files", 1, testMeta.fileMetadataSink.collectedTuples.size());
Assert.assertEquals("window 2: blocks", 1, testMeta.blockMetadataSink.collectedTuples.size());
//window 3
testMeta.fileMetadataSink.clear();
testMeta.blockMetadataSink.clear();
testMeta.scanner.setTrigger(true);
testMeta.fileSplitterInput.beginWindow(3);
testMeta.fileSplitterInput.emitTuples();
testMeta.fileSplitterInput.endWindow();
Assert.assertEquals("window 2: files", 0, testMeta.fileMetadataSink.collectedTuples.size());
Assert.assertEquals("window 2: blocks", 0, testMeta.blockMetadataSink.collectedTuples.size());
testMeta.fileSplitterInput.teardown();
}
@Test
public void testMultipleNestedInput() throws IOException, InterruptedException
{
File subDir = new File(testMeta.dataDirectory, "subDir");
subDir.mkdir();
File file = new File(subDir, "file.txt");
FileWriter fout = new FileWriter(file, true);
fout.write(StringUtils.join("testData", '\n').toCharArray());
fout.close();
String files = testMeta.scanner.getFiles();
files = files.concat("," + subDir.getAbsolutePath());
List<String> expectedFiles = new ArrayList<String>();
expectedFiles.addAll(testMeta.filePaths);
expectedFiles.add(subDir.getAbsolutePath());
expectedFiles.add(file.getAbsolutePath());
expectedFiles.add(file.getAbsolutePath()); // file will be discovered twice w.t.r. two input dirs
testMeta.fileSplitterInput.setScanner(new MockScanner());
testMeta.fileSplitterInput.getScanner().setScanIntervalMillis(60 * 1000);
testMeta.fileSplitterInput.getScanner().setFiles(files);
testMeta.fileSplitterInput.setup(testMeta.context);
testMeta.fileSplitterInput.beginWindow(1);
((MockScanner)testMeta.fileSplitterInput.getScanner()).semaphore.acquire(15);
testMeta.fileSplitterInput.emitTuples();
testMeta.fileSplitterInput.endWindow();
Assert.assertEquals("File metadata", 15, testMeta.fileMetadataSink.collectedTuples.size());
for (Object fileMetadata : testMeta.fileMetadataSink.collectedTuples) {
FileSplitterInput.FileMetadata metadata = (FileSplitterInput.FileMetadata)fileMetadata;
Assert.assertTrue("path: " + metadata.getFilePath(), expectedFiles.contains(metadata.getFilePath()));
Assert.assertNotNull("name: ", metadata.getFileName());
}
testMeta.fileMetadataSink.collectedTuples.clear();
testMeta.fileSplitterInput.teardown();
}
@Test
public void testEmptyDirCopy() throws InterruptedException
{
File emptyDir = new File(testMeta.dataDirectory, "emptyDir");
emptyDir.mkdirs();
testMeta.fileSplitterInput.setScanner(new MockScanner());
testMeta.fileSplitterInput.getScanner().regex = null;
testMeta.fileSplitterInput.getScanner().setFiles(testMeta.dataDirectory + "/emptyDir");
testMeta.fileSplitterInput.setup(testMeta.context);
testMeta.fileSplitterInput.beginWindow(1);
((MockScanner)testMeta.fileSplitterInput.getScanner()).semaphore.acquire();
testMeta.fileSplitterInput.emitTuples();
testMeta.fileSplitterInput.endWindow();
Assert.assertEquals("File metadata count", 1, testMeta.fileMetadataSink.collectedTuples.size());
Assert.assertEquals("Empty directory not copied.", emptyDir.getName(),
testMeta.fileMetadataSink.collectedTuples.get(0).getFileName());
testMeta.fileSplitterInput.teardown();
}
private static class MockScanner extends FileSplitterInput.TimeBasedDirectoryScanner
{
private final transient Semaphore semaphore = new Semaphore(0);
@Override
protected void processDiscoveredFile(FileSplitterInput.ScannedFileInfo info)
{
super.processDiscoveredFile(info);
semaphore.release();
}
}
private static final Logger LOG = LoggerFactory.getLogger(FileSplitterInputTest.class);
}