blob: d138b53a1fcaad13918f18edc3439c318167554b [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.apex.malhar.lib.fs.s3;
import java.io.File;
import java.io.IOException;
import java.util.Collection;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestWatcher;
import org.junit.runner.Description;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.apache.apex.malhar.lib.fs.FSRecordCompactionOperator;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.fs.Path;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.model.PutObjectResult;
import com.datatorrent.api.Attribute;
import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.lib.testbench.CollectorTestSink;
import static com.datatorrent.lib.helper.OperatorContextTestHelper.mockOperatorContext;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.when;
public class S3ReconcilerTest
{
private class TestMeta extends TestWatcher
{
S3Reconciler underTest;
Context.OperatorContext context;
CollectorTestSink<Object> sink;
@Mock
AmazonS3 s3clientMock;
String outputPath;
@Override
protected void starting(Description description)
{
super.starting(description);
outputPath = new File(
"target" + Path.SEPARATOR + description.getClassName() + Path.SEPARATOR + description.getMethodName())
.getPath();
Attribute.AttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap();
attributes.put(DAG.DAGContext.APPLICATION_ID, description.getClassName());
attributes.put(DAG.DAGContext.APPLICATION_PATH, outputPath);
context = mockOperatorContext(1, attributes);
underTest = new S3Reconciler();
underTest.setAccessKey("");
underTest.setSecretKey("");
underTest.setup(context);
MockitoAnnotations.initMocks(this);
PutObjectResult result = new PutObjectResult();
result.setETag(outputPath);
when(s3clientMock.putObject((PutObjectRequest)any())).thenReturn(result);
underTest.setS3client(s3clientMock);
}
@Override
protected void finished(Description description)
{
this.underTest.teardown();
try {
FileUtils.deleteDirectory(new File("target" + Path.SEPARATOR + description.getClassName()));
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
@Rule
public TestMeta testMeta = new TestMeta();
@Test
public void verifyS3ReconclierOutputTuple() throws Exception
{
String fileName = "s3-compaction_1.0";
String path = testMeta.outputPath + Path.SEPARATOR + fileName;
long size = 80;
File file = new File(path);
File tmpFile = new File(path + "." + System.currentTimeMillis() + ".tmp");
StringBuffer sb = new StringBuffer();
for (int i = 0; i < 10; i++) {
sb.append("Record" + i + "\n");
if (i == 5) {
FileUtils.write(tmpFile, sb.toString());
}
}
FileUtils.write(file, sb.toString());
// Set test sink and later on collect the emitted tuples in this sink.
testMeta.sink = new CollectorTestSink<Object>();
testMeta.underTest.outputPort.setSink(testMeta.sink);
// Create meta information to be emitted as tuple.
FSRecordCompactionOperator.OutputMetaData outputMetaData = new FSRecordCompactionOperator.OutputMetaData(path, fileName, size);
testMeta.underTest.beginWindow(0);
testMeta.underTest.input.process(outputMetaData);
testMeta.underTest.endWindow();
for (int i = 1; i < 60; i++) {
testMeta.underTest.beginWindow(i);
testMeta.underTest.endWindow();
}
testMeta.underTest.committed(59);
// retrieve the result count from output port.
testMeta.sink.waitForResultCount(1, 12000);
for (int i = 60; i < 70; i++) {
testMeta.underTest.beginWindow(i);
Thread.sleep(10);
testMeta.underTest.endWindow();
}
// verify the number of tuples emitted.
Assert.assertEquals(1, testMeta.sink.getCount(false));
}
@Test
public void testFileClearing() throws Exception
{
String fileName = "s3-compaction_1.0";
String path = testMeta.outputPath + Path.SEPARATOR + fileName;
long size = 80;
File file = new File(path);
File tmpFile = new File(path + "." + System.currentTimeMillis() + ".tmp");
StringBuffer sb = new StringBuffer();
for (int i = 0; i < 10; i++) {
sb.append("Record" + i + "\n");
if (i == 5) {
FileUtils.write(tmpFile, sb.toString());
}
}
FileUtils.write(file, sb.toString());
FSRecordCompactionOperator.OutputMetaData outputMetaData = new FSRecordCompactionOperator.OutputMetaData(path, fileName, size);
testMeta.underTest.beginWindow(0);
testMeta.underTest.input.process(outputMetaData);
testMeta.underTest.endWindow();
for (int i = 1; i < 60; i++) {
testMeta.underTest.beginWindow(i);
testMeta.underTest.endWindow();
}
testMeta.underTest.committed(59);
for (int i = 60; i < 70; i++) {
testMeta.underTest.beginWindow(i);
Thread.sleep(10);
testMeta.underTest.endWindow();
}
Collection<File> files =
FileUtils.listFiles(new File(testMeta.outputPath), null, true);
Assert.assertEquals(0, files.size());
}
}