blob: 9ec709e9c769fd19764c64fbe8cd1fb9c5ffcbe2 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.datatorrent.lib.io.fs;
import java.io.File;
import java.io.IOException;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestWatcher;
import org.junit.runner.Description;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import com.datatorrent.api.Attribute;
import com.datatorrent.api.AutoMetric;
import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.InputOperator;
import com.datatorrent.api.LocalMode;
import com.datatorrent.api.Stats;
import com.datatorrent.api.StatsListener;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.annotation.ApplicationAnnotation;
import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.lib.io.block.BlockMetadata;
import com.datatorrent.lib.testbench.CollectorTestSink;
import com.datatorrent.lib.util.TestUtils;
import static com.datatorrent.lib.helper.OperatorContextTestHelper.mockOperatorContext;
/**
* Tests for {@link FileSplitterBase}
*/
public class FileSplitterBaseTest
{
class BastTestMeta extends TestWatcher
{
String dataDirectory;
FileSplitterBase fileSplitter;
CollectorTestSink<FileSplitterInput.FileMetadata> fileMetadataSink;
CollectorTestSink<BlockMetadata.FileBlockMetadata> blockMetadataSink;
Set<String> filePaths;
Context.OperatorContext context;
@Override
protected void starting(org.junit.runner.Description description)
{
TestUtils.deleteTargetTestClassFolder(description);
String methodName = description.getMethodName();
String className = description.getClassName();
this.dataDirectory = "target/" + className + "/" + methodName;
try {
filePaths = FileSplitterInputTest.createData(this.dataDirectory);
} catch (IOException e) {
throw new RuntimeException(e);
}
fileSplitter = new FileSplitterBase();
fileSplitter.setBlocksThreshold(100);
fileSplitter.setFile(this.dataDirectory);
Attribute.AttributeMap.DefaultAttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap();
attributes.put(Context.OperatorContext.SPIN_MILLIS, 500);
context = mockOperatorContext(0, attributes);
fileMetadataSink = new CollectorTestSink<>();
TestUtils.setSink(fileSplitter.filesMetadataOutput, fileMetadataSink);
blockMetadataSink = new CollectorTestSink<>();
TestUtils.setSink(fileSplitter.blocksMetadataOutput, blockMetadataSink);
}
@Override
protected void finished(Description description)
{
TestUtils.deleteTargetTestClassFolder(description);
}
}
@Rule
public BastTestMeta baseTestMeta = new BastTestMeta();
@Test
public void testFileMetadata() throws InterruptedException
{
baseTestMeta.fileSplitter.setup(baseTestMeta.context);
baseTestMeta.fileSplitter.beginWindow(1);
for (String filePath : baseTestMeta.filePaths) {
baseTestMeta.fileSplitter.input.process(new FileSplitterInput.FileInfo(null, filePath));
}
baseTestMeta.fileSplitter.endWindow();
Assert.assertEquals("File metadata", 12, baseTestMeta.fileMetadataSink.collectedTuples.size());
for (Object fileMetadata : baseTestMeta.fileMetadataSink.collectedTuples) {
FileSplitterInput.FileMetadata metadata = (FileSplitterInput.FileMetadata)fileMetadata;
Assert.assertTrue("path: " + metadata.getFilePath(), baseTestMeta.filePaths.contains(metadata.getFilePath()));
Assert.assertNotNull("name: ", metadata.getFileName());
}
baseTestMeta.fileMetadataSink.collectedTuples.clear();
baseTestMeta.fileSplitter.teardown();
}
@Test
public void testBlockMetadataNoSplit() throws InterruptedException
{
baseTestMeta.fileSplitter.setup(baseTestMeta.context);
baseTestMeta.fileSplitter.beginWindow(1);
for (String filePath : baseTestMeta.filePaths) {
baseTestMeta.fileSplitter.input.process(new FileSplitterInput.FileInfo(null, filePath));
}
Assert.assertEquals("Blocks", 12, baseTestMeta.blockMetadataSink.collectedTuples.size());
for (Object blockMetadata : baseTestMeta.blockMetadataSink.collectedTuples) {
BlockMetadata.FileBlockMetadata metadata = (BlockMetadata.FileBlockMetadata)blockMetadata;
Assert.assertTrue("path: " + metadata.getFilePath(), baseTestMeta.filePaths.contains(metadata.getFilePath()));
}
baseTestMeta.fileSplitter.teardown();
}
@Test
public void testBlockMetadataWithSplit() throws InterruptedException
{
baseTestMeta.fileSplitter.setup(baseTestMeta.context);
baseTestMeta.fileSplitter.setBlockSize(2L);
baseTestMeta.fileSplitter.beginWindow(1);
for (String filePath : baseTestMeta.filePaths) {
baseTestMeta.fileSplitter.input.process(new FileSplitterInput.FileInfo(null, filePath));
}
Assert.assertEquals("Files", 12, baseTestMeta.fileMetadataSink.collectedTuples.size());
int noOfBlocks = 0;
for (int i = 0; i < 12; i++) {
FileSplitterInput.FileMetadata fm = baseTestMeta.fileMetadataSink.collectedTuples.get(i);
File testFile = new File(baseTestMeta.dataDirectory, fm.getFileName());
noOfBlocks += (int)Math.ceil(testFile.length() / (2 * 1.0));
}
Assert.assertEquals("Blocks", noOfBlocks, baseTestMeta.blockMetadataSink.collectedTuples.size());
baseTestMeta.fileSplitter.teardown();
}
@Test
public void testBlocksThreshold() throws InterruptedException
{
baseTestMeta.fileSplitter.setup(baseTestMeta.context);
int noOfBlocks = 0;
for (int i = 0; i < 12; i++) {
File testFile = new File(baseTestMeta.dataDirectory, "file" + i + ".txt");
noOfBlocks += (int)Math.ceil(testFile.length() / (2 * 1.0));
}
baseTestMeta.fileSplitter.setBlockSize(2L);
baseTestMeta.fileSplitter.setBlocksThreshold(10);
baseTestMeta.fileSplitter.beginWindow(1);
for (String filePath : baseTestMeta.filePaths) {
baseTestMeta.fileSplitter.input.process(new FileSplitterInput.FileInfo(null, filePath));
}
baseTestMeta.fileSplitter.endWindow();
Assert.assertEquals("Blocks", 10, baseTestMeta.blockMetadataSink.collectedTuples.size());
for (int window = 2; window <= 8; window++) {
baseTestMeta.fileSplitter.beginWindow(window);
baseTestMeta.fileSplitter.handleIdleTime();
baseTestMeta.fileSplitter.endWindow();
}
Assert.assertEquals("Files", 12, baseTestMeta.fileMetadataSink.collectedTuples.size());
Assert.assertEquals("Blocks", noOfBlocks, baseTestMeta.blockMetadataSink.collectedTuples.size());
baseTestMeta.fileSplitter.teardown();
}
@Test
public void testSplitterInApp() throws Exception
{
LocalMode lma = LocalMode.newInstance();
SplitterApp app = new SplitterApp();
Configuration appConf = new Configuration();
appConf.set("dt.operator.Splitter.prop.blocksThreshold", "4");
lma.prepareDAG(app, appConf);
lma.cloneDAG(); // check serialization
LocalMode.Controller lc = lma.getController();
lc.runAsync();
app.receiver.latch.await();
Assert.assertEquals("no. of metadata", 12, app.receiver.count);
lc.shutdown();
}
@ApplicationAnnotation(name = "TestApp")
class SplitterApp implements StreamingApplication
{
MockReceiver receiver;
@Override
public void populateDAG(DAG dag, Configuration configuration)
{
MockFileInput fileInput = dag.addOperator("Input", new MockFileInput());
fileInput.filePaths = baseTestMeta.filePaths;
FileSplitterBase splitter = dag.addOperator("Splitter", new FileSplitterBase());
splitter.setFile(baseTestMeta.dataDirectory);
receiver = dag.addOperator("Receiver", new MockReceiver());
dag.addStream("files", fileInput.files, splitter.input);
dag.addStream("file-metadata", splitter.filesMetadataOutput, receiver.fileMetadata);
}
}
static class MockReceiver extends BaseOperator implements StatsListener
{
@AutoMetric
int count;
transient CountDownLatch latch = new CountDownLatch(1);
public final transient DefaultInputPort<FileSplitterInput.FileMetadata> fileMetadata = new
DefaultInputPort<FileSplitterInput.FileMetadata>()
{
@Override
public void process(FileSplitterInput.FileMetadata fileMetadata)
{
MockReceiver.this.count++;
LOG.debug("count {}", MockReceiver.this.count);
}
};
@Override
public Response processStats(BatchedOperatorStats stats)
{
Stats.OperatorStats operatorStats = stats.getLastWindowedStats().get(stats.getLastWindowedStats().size() - 1);
count = (Integer)operatorStats.metrics.get("count");
if (count == 12) {
latch.countDown();
}
return null;
}
}
static class MockFileInput extends BaseOperator implements InputOperator
{
public final transient DefaultOutputPort<FileSplitterInput.FileInfo> files = new DefaultOutputPort<>();
protected Set<String> filePaths;
protected boolean done;
@Override
public void emitTuples()
{
if (!done) {
done = true;
for (String file : filePaths) {
files.emit(new FileSplitterInput.FileInfo(null, file));
}
}
}
}
private static final transient Logger LOG = LoggerFactory.getLogger(FileSplitterBaseTest.class);
}