blob: ceb1e7d9b70c638cf2a8829499f0e2f33d34bb78 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.datatorrent.lib.io.fs;
import java.io.BufferedOutputStream;
import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.FileWriter;
import java.io.FilterOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import java.util.zip.GZIPInputStream;
import javax.annotation.Nonnull;
import javax.crypto.Cipher;
import javax.crypto.CipherInputStream;
import javax.crypto.CipherOutputStream;
import javax.crypto.KeyGenerator;
import javax.crypto.SecretKey;
import javax.crypto.spec.IvParameterSpec;
import javax.validation.ConstraintViolationException;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.Description;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionInputStream;
import org.apache.hadoop.io.compress.SnappyCodec;
import org.apache.hadoop.util.ReflectionUtils;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.google.common.io.LimitInputStream;
import com.datatorrent.api.Attribute;
import com.datatorrent.api.Context;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.DAG;
import com.datatorrent.api.LocalMode;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.annotation.Stateless;
import com.datatorrent.lib.testbench.RandomWordGenerator;
import com.datatorrent.lib.util.TestUtils;
import com.datatorrent.lib.util.TestUtils.TestInfo;
import com.datatorrent.netlet.util.DTThrowable;
import static com.datatorrent.lib.helper.OperatorContextTestHelper.mockOperatorContext;
import static org.junit.Assert.assertEquals;
public class AbstractFileOutputOperatorTest
{
private static final Logger LOG = LoggerFactory.getLogger(AbstractFileOutputOperatorTest.class);
private static final String SINGLE_FILE = "single.txt";
private static final String EVEN_FILE = "even.txt";
private static final String ODD_FILE = "odd.txt";
@Rule
public FSTestWatcher testMeta = new FSTestWatcher();
public static class FSTestWatcher extends TestInfo
{
public boolean writeToTmp = false;
public OperatorContext testOperatorContext;
@Override
protected void starting(Description description)
{
super.starting(description);
TestUtils.deleteTargetTestClassFolder(description);
try {
FileUtils.forceMkdir(new File(getDir()));
Attribute.AttributeMap.DefaultAttributeMap attributeMap = new Attribute.AttributeMap.DefaultAttributeMap();
attributeMap.put(Context.DAGContext.CHECKPOINT_WINDOW_COUNT, 60);
attributeMap.put(Context.OperatorContext.SPIN_MILLIS, 500);
testOperatorContext = mockOperatorContext(0, attributeMap);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
protected void finished(Description description)
{
super.finished(description);
TestUtils.deleteTargetTestClassFolder(description);
}
}
/**
* Simple writer which writes to two files.
*/
private static class EvenOddHDFSExactlyOnceWriter extends AbstractFileOutputOperator<Integer>
{
@Override
protected FileSystem getFSInstance() throws IOException
{
return FileSystem.getLocal(new Configuration()).getRaw();
}
@Override
protected String getFileName(Integer tuple)
{
if (tuple % 2 == 0) {
return EVEN_FILE;
} else {
return ODD_FILE;
}
}
@Override
protected byte[] getBytesForTuple(Integer tuple)
{
return (tuple.toString() + "\n").getBytes();
}
}
/**
* Simple writer which writes to one file.
*/
private static class SingleHDFSExactlyOnceWriter extends AbstractFileOutputOperator<Integer>
{
@Override
protected FileSystem getFSInstance() throws IOException
{
return FileSystem.getLocal(new Configuration()).getRaw();
}
@Override
protected String getFileName(Integer tuple)
{
return SINGLE_FILE;
}
@Override
protected byte[] getBytesForTuple(Integer tuple)
{
return (tuple.toString() + "\n").getBytes();
}
}
/**
* Simple writer which writes byte array tuples to one file.
*/
private static class SingleHDFSByteExactlyOnceWriter extends AbstractFileOutputOperator<byte[]>
{
SingleHDFSByteExactlyOnceWriter()
{
}
@Override
protected FileSystem getFSInstance() throws IOException
{
return FileSystem.getLocal(new Configuration()).getRaw();
}
@Override
protected String getFileName(byte[] tuple)
{
return SINGLE_FILE;
}
@Override
protected byte[] getBytesForTuple(byte[] tuple)
{
return tuple;
}
}
/**
* This is a test app to make sure that the operator validation works properly.
*/
private static class ValidationTestApp implements StreamingApplication
{
private final File testDir;
private final Long maxLength;
private final AbstractFileOutputOperator<byte[]> fsWriter;
ValidationTestApp(File testDir, Long maxLength, AbstractFileOutputOperator<byte[]> fsWriter)
{
this.testDir = testDir;
this.maxLength = maxLength;
this.fsWriter = fsWriter;
}
@Override
public void populateDAG(DAG dag, Configuration conf)
{
RandomWordGenerator randomWordGenerator = new RandomWordGenerator();
randomWordGenerator.setTuplesPerWindow(2);
dag.addOperator("random", randomWordGenerator);
if (maxLength != null) {
fsWriter.setMaxLength(maxLength);
}
fsWriter.setFilePath(testDir.getPath());
dag.addOperator("fswriter", fsWriter);
dag.addStream("fswriterstream", randomWordGenerator.output, fsWriter.input);
}
}
private void populateFile(String fileName, String contents) throws IOException
{
File testFile = new File(testMeta.getDir() + "/" + fileName);
testFile.createNewFile();
FileWriter fileWriter = new FileWriter(testFile);
fileWriter.write(contents);
fileWriter.close();
}
/**
* This method checkpoints the given writer.
* @param writer The writer to checkpoint.
* @return new writer.
*/
public static AbstractFileOutputOperator checkpoint(AbstractFileOutputOperator writer, long windowId)
{
if (windowId >= Stateless.WINDOW_ID) {
writer.beforeCheckpoint(windowId);
}
Kryo kryo = new Kryo();
ByteArrayOutputStream bos = new ByteArrayOutputStream();
Output loutput = new Output(bos);
kryo.writeObject(loutput, writer);
loutput.close();
Input lInput = new Input(bos.toByteArray());
@SuppressWarnings("unchecked")
AbstractFileOutputOperator checkPointedWriter = kryo.readObject(lInput, writer.getClass());
lInput.close();
return checkPointedWriter;
}
/**
* Restores the checkpointed writer.
* @param checkPointWriter The checkpointed writer.
* @param writer The writer to restore state into.
*/
@SuppressWarnings({"unchecked", "rawtypes"})
public static void restoreCheckPoint(AbstractFileOutputOperator checkPointWriter, AbstractFileOutputOperator writer)
{
writer.counts = checkPointWriter.counts;
writer.endOffsets = checkPointWriter.endOffsets;
writer.openPart = checkPointWriter.openPart;
writer.filePath = checkPointWriter.filePath;
writer.maxOpenFiles = checkPointWriter.maxOpenFiles;
writer.replication = checkPointWriter.replication;
writer.totalBytesWritten = checkPointWriter.totalBytesWritten;
writer.maxLength = checkPointWriter.maxLength;
writer.rollingFile = checkPointWriter.rollingFile;
writer.getFileNameToTmpName().putAll(checkPointWriter.getFileNameToTmpName());
writer.getFinalizedFiles().putAll(checkPointWriter.getFinalizedFiles());
}
public static void checkOutput(int fileCount, String baseFilePath, String expectedOutput)
{
if (fileCount >= 0) {
baseFilePath += "." + fileCount;
}
File file = new File(baseFilePath);
String fileContents = null;
try {
fileContents = FileUtils.readFileToString(file);
} catch (IOException ex) {
DTThrowable.rethrow(ex);
}
Assert.assertEquals("Single file " + fileCount + " output contents", expectedOutput, fileContents);
}
@Test
public void testSingleFileCompletedWrite()
{
SingleHDFSExactlyOnceWriter writer = new SingleHDFSExactlyOnceWriter();
testSingleFileCompletedWriteHelper(writer);
String singleFileName = testMeta.getDir() + File.separator + SINGLE_FILE;
String correctContents = "0\n" + "1\n" + "2\n" + "3\n";
checkOutput(-1, singleFileName, correctContents);
}
@Test
public void testSingleFileCompletedWriteTmp()
{
testMeta.writeToTmp = true;
testSingleFileCompletedWrite();
}
@Test
public void testSingleFileCompletedWriteOverwriteInitial() throws IOException
{
populateFile(SINGLE_FILE, "0\n" + "1\n" + "2\n");
SingleHDFSExactlyOnceWriter writer = new SingleHDFSExactlyOnceWriter();
testSingleFileCompletedWriteHelper(writer);
String singleFileName = testMeta.getDir() + File.separator + SINGLE_FILE;
String correctContents = "0\n" + "1\n" + "2\n" + "3\n";
checkOutput(-1, singleFileName, correctContents);
}
@Test
public void testSingleFileCompletedWriteOverwriteInitialTmp() throws IOException
{
testMeta.writeToTmp = true;
testSingleFileCompletedWriteOverwriteInitial();
}
private void testSingleFileCompletedWriteHelper(SingleHDFSExactlyOnceWriter writer)
{
writer.setFilePath(testMeta.getDir());
writer.setAlwaysWriteToTmp(testMeta.writeToTmp);
writer.setup(testMeta.testOperatorContext);
writer.beginWindow(0);
writer.input.put(0);
writer.input.put(1);
writer.endWindow();
writer.beginWindow(1);
writer.input.put(2);
writer.input.put(3);
writer.endWindow();
writer.requestFinalize(SINGLE_FILE);
writer.committed(1);
writer.teardown();
}
@Test
public void testSingleFileFailedWrite()
{
SingleHDFSExactlyOnceWriter writer = new SingleHDFSExactlyOnceWriter();
testSingleFileFailedWriteHelper(writer);
String singleFileName = testMeta.getDir() + File.separator + SINGLE_FILE;
String correctContents = "0\n" + "1\n" + "4\n" + "5\n" + "6\n" + "7\n";
checkOutput(-1, singleFileName, correctContents);
}
@Test
public void testSingleFileFailedWriteTmp()
{
testMeta.writeToTmp = true;
testSingleFileFailedWrite();
}
@Test
public void testSingleFileFailedWriteOverwriteInitial() throws IOException
{
SingleHDFSExactlyOnceWriter writer = new SingleHDFSExactlyOnceWriter();
populateFile(SINGLE_FILE, "0\n" + "1\n" + "2\n");
testSingleFileFailedWriteHelper(writer);
String singleFileName = testMeta.getDir() + File.separator + SINGLE_FILE;
String correctContents = "0\n" + "1\n" + "4\n" + "5\n" + "6\n" + "7\n";
checkOutput(-1, singleFileName, correctContents);
}
@Test
public void testSingleFileFailedWriteOverwriteInitiaTmp() throws IOException
{
testMeta.writeToTmp = true;
testSingleFileFailedWriteOverwriteInitial();
}
private void testSingleFileFailedWriteHelper(SingleHDFSExactlyOnceWriter writer)
{
File meta = new File(testMeta.getDir());
writer.setFilePath(meta.getAbsolutePath());
writer.setAlwaysWriteToTmp(testMeta.writeToTmp);
writer.setup(testMeta.testOperatorContext);
writer.beginWindow(0);
writer.input.put(0);
writer.requestFinalize(SINGLE_FILE);
writer.input.put(1);
writer.endWindow();
AbstractFileOutputOperator checkPointWriter = checkpoint(writer, Stateless.WINDOW_ID);
writer.beginWindow(1);
writer.input.put(2);
writer.teardown();
restoreCheckPoint(checkPointWriter, writer);
writer.setup(testMeta.testOperatorContext);
writer.beginWindow(1);
writer.input.put(4);
writer.requestFinalize(SINGLE_FILE);
writer.input.put(5);
writer.endWindow();
writer.beginWindow(2);
writer.input.put(6);
writer.input.put(7);
writer.endWindow();
writer.committed(2);
writer.teardown();
}
@Test
public void testMultiFileCompletedWrite()
{
EvenOddHDFSExactlyOnceWriter writer = new EvenOddHDFSExactlyOnceWriter();
testMultiFileCompletedWriteHelper(writer);
String evenFileName = testMeta.getDir() + File.separator + EVEN_FILE;
String correctContents = "0\n" + "2\n" + "4\n" + "6\n";
checkOutput(-1, evenFileName, correctContents);
String oddFileName = testMeta.getDir() + File.separator + ODD_FILE;
correctContents = "1\n" + "3\n" + "5\n" + "7\n";
checkOutput(-1, oddFileName, correctContents);
}
@Test
public void testMultiFileCompletedWriteTmp()
{
testMeta.writeToTmp = true;
testMultiFileCompletedWrite();
}
//@Ignore
@Test
public void testMultiFileCompletedWriteCache1()
{
EvenOddHDFSExactlyOnceWriter writer = new EvenOddHDFSExactlyOnceWriter();
writer.setMaxOpenFiles(1);
testMultiFileCompletedWriteHelper(writer);
String evenFileName = testMeta.getDir() + File.separator + EVEN_FILE;
String correctContents = "0\n" + "2\n" + "4\n" + "6\n";
checkOutput(-1, evenFileName, correctContents);
String oddFileName = testMeta.getDir() + File.separator + ODD_FILE;
correctContents = "1\n" + "3\n" + "5\n" + "7\n";
checkOutput(-1, oddFileName, correctContents);
}
@Test
public void testMultiFileCompletedWriteCache1Tmp()
{
testMeta.writeToTmp = true;
testMultiFileCompletedWriteCache1();
}
//@Ignore
@Test
public void testMultiFileCompletedWriteOverwriteInitial() throws IOException
{
populateFile(EVEN_FILE, "0\n" + "2\n");
populateFile(ODD_FILE, "1\n" + "3\n");
EvenOddHDFSExactlyOnceWriter writer = new EvenOddHDFSExactlyOnceWriter();
testMultiFileCompletedWriteHelper(writer);
String evenFileName = testMeta.getDir() + File.separator + EVEN_FILE;
String correctContents = "0\n" + "2\n" + "4\n" + "6\n";
checkOutput(-1, evenFileName, correctContents);
String oddFileName = testMeta.getDir() + File.separator + ODD_FILE;
correctContents = "1\n" + "3\n" + "5\n" + "7\n";
checkOutput(-1, oddFileName, correctContents);
}
@Test
public void testMultiFileCompletedWriteOverwriteInitialTmp() throws IOException
{
testMeta.writeToTmp = true;
testMultiFileCompletedWriteOverwriteInitial();
}
@Test
public void testMultiFileCompletedWriteOverwriteCache1Initial() throws IOException
{
populateFile(EVEN_FILE, "0\n" + "2\n");
populateFile(ODD_FILE, "1\n" + "3\n");
EvenOddHDFSExactlyOnceWriter writer = new EvenOddHDFSExactlyOnceWriter();
writer.setMaxOpenFiles(1);
testMultiFileCompletedWriteHelperCache1(writer);
String evenFileName = testMeta.getDir() + File.separator + EVEN_FILE;
String correctContents = "0\n" + "2\n" + "4\n" + "6\n";
checkOutput(-1, evenFileName, correctContents);
String oddFileName = testMeta.getDir() + File.separator + ODD_FILE;
correctContents = "1\n" + "3\n" + "5\n" + "7\n";
checkOutput(-1, oddFileName, correctContents);
}
@Test
public void testMultiFileCompletedWriteOverwriteCache1InitialTmp() throws IOException
{
testMeta.writeToTmp = true;
testMultiFileCompletedWriteOverwriteCache1Initial();
}
private void testMultiFileCompletedWriteHelperCache1(EvenOddHDFSExactlyOnceWriter writer)
{
File meta = new File(testMeta.getDir());
writer.setFilePath(meta.getAbsolutePath());
writer.setAlwaysWriteToTmp(testMeta.writeToTmp);
writer.setup(testMeta.testOperatorContext);
writer.beginWindow(0);
writer.input.put(0);
writer.input.put(1);
writer.input.put(2);
writer.input.put(3);
writer.endWindow();
writer.beginWindow(1);
writer.input.put(4);
writer.input.put(5);
writer.input.put(6);
writer.input.put(7);
writer.endWindow();
writer.requestFinalize(EVEN_FILE);
writer.requestFinalize(ODD_FILE);
writer.beforeCheckpoint(1);
writer.committed(1);
}
private void testMultiFileCompletedWriteHelper(EvenOddHDFSExactlyOnceWriter writer)
{
File meta = new File(testMeta.getDir());
writer.setFilePath(meta.getAbsolutePath());
writer.setAlwaysWriteToTmp(testMeta.writeToTmp);
writer.setup(testMeta.testOperatorContext);
writer.beginWindow(0);
writer.input.put(0);
writer.input.put(1);
writer.input.put(2);
writer.input.put(3);
writer.endWindow();
writer.beginWindow(1);
writer.input.put(4);
writer.input.put(5);
writer.input.put(6);
writer.input.put(7);
writer.endWindow();
writer.requestFinalize(ODD_FILE);
writer.requestFinalize(EVEN_FILE);
writer.beforeCheckpoint(1);
writer.committed(1);
}
@Test
public void testMultiFileFailedWrite()
{
EvenOddHDFSExactlyOnceWriter writer = new EvenOddHDFSExactlyOnceWriter();
testMultiFileFailedWriteHelper(writer);
String evenFileName = testMeta.getDir() + File.separator + EVEN_FILE;
String correctContents = "0\n" + "2\n" + "6\n" + "8\n";
checkOutput(-1, evenFileName, correctContents);
String oddFileName = testMeta.getDir() + File.separator + ODD_FILE;
correctContents = "1\n" + "3\n" + "7\n" + "9\n";
checkOutput(-1, oddFileName, correctContents);
}
@Test
public void testMultiFileFailedWriteTmp()
{
testMeta.writeToTmp = true;
testMultiFileFailedWrite();
}
@Test
public void testMultiFileFailedWriteCache1()
{
EvenOddHDFSExactlyOnceWriter writer = new EvenOddHDFSExactlyOnceWriter();
writer.setMaxOpenFiles(1);
testMultiFileFailedWriteHelper(writer);
String evenFileName = testMeta.getDir() + File.separator + EVEN_FILE;
String correctContents = "0\n" + "2\n" + "6\n" + "8\n";
checkOutput(-1, evenFileName, correctContents);
String oddFileName = testMeta.getDir() + File.separator + ODD_FILE;
correctContents = "1\n" + "3\n" + "7\n" + "9\n";
checkOutput(-1, oddFileName, correctContents);
}
@Test
public void testMultiFileFailedWriteCache1Tmp()
{
testMeta.writeToTmp = true;
testMultiFileFailedWriteCache1();
}
private void testMultiFileFailedWriteHelper(EvenOddHDFSExactlyOnceWriter writer)
{
File meta = new File(testMeta.getDir());
writer.setFilePath(meta.getAbsolutePath());
writer.setAlwaysWriteToTmp(testMeta.writeToTmp);
writer.setup(testMeta.testOperatorContext);
writer.beginWindow(0);
writer.input.put(0);
writer.input.put(1);
writer.input.put(2);
writer.input.put(3);
writer.requestFinalize(EVEN_FILE);
writer.endWindow();
AbstractFileOutputOperator checkPointWriter = checkpoint(writer, Stateless.WINDOW_ID);
writer.beginWindow(1);
writer.input.put(4);
writer.input.put(5);
writer.requestFinalize(ODD_FILE);
writer.endWindow();
writer.teardown();
restoreCheckPoint(checkPointWriter, writer);
writer.setup(testMeta.testOperatorContext);
writer.beginWindow(2);
writer.input.put(6);
writer.input.put(7);
writer.input.put(8);
writer.input.put(9);
writer.endWindow();
writer.beforeCheckpoint(2);
writer.committed(2);
}
@Test
public void testSingleRollingFileCompletedWrite()
{
SingleHDFSExactlyOnceWriter writer = new SingleHDFSExactlyOnceWriter();
testSingleRollingFileCompletedWriteHelper(writer);
//Rolling file 0
String singleFileName = testMeta.getDir() + File.separator + SINGLE_FILE;
String correctContents = "0\n" + "1\n" + "2\n";
checkOutput(0, singleFileName, correctContents);
//Rolling file 1
correctContents = "3\n" + "4\n" + "5\n";
checkOutput(1, singleFileName, correctContents);
}
@Test
public void testSingleRollingFileCompletedWriteTmp()
{
testMeta.writeToTmp = true;
testSingleRollingFileCompletedWrite();
}
@Test
public void testSingleRollingFileCompletedWriteOverwriteInitial() throws IOException
{
populateFile(SINGLE_FILE + ".0", "0\n" + "1\n" + "2\n");
populateFile(SINGLE_FILE + ".1", "0\n" + "1\n" + "2\n");
populateFile(SINGLE_FILE + ".2", "0\n" + "1\n" + "2\n");
SingleHDFSExactlyOnceWriter writer = new SingleHDFSExactlyOnceWriter();
testSingleRollingFileCompletedWriteHelper(writer);
//Rolling file 0
String singleFileName = testMeta.getDir() + File.separator + SINGLE_FILE;
String correctContents = "0\n" + "1\n" + "2\n";
checkOutput(0, singleFileName, correctContents);
//Rolling file 1
correctContents = "3\n" + "4\n" + "5\n";
checkOutput(1, singleFileName, correctContents);
}
@Test
public void testSingleRollingFileCompletedWriteOverwriteInitialTmp() throws IOException
{
testMeta.writeToTmp = true;
testSingleRollingFileCompletedWriteOverwriteInitial();
}
private void testSingleRollingFileCompletedWriteHelper(SingleHDFSExactlyOnceWriter writer)
{
writer.setFilePath(testMeta.getDir());
writer.setMaxLength(4);
writer.setAlwaysWriteToTmp(testMeta.writeToTmp);
writer.setup(testMeta.testOperatorContext);
writer.beginWindow(0);
writer.input.put(0);
writer.input.put(1);
writer.input.put(2);
writer.endWindow();
writer.beginWindow(1);
writer.input.put(3);
writer.input.put(4);
writer.input.put(5);
writer.input.put(6);
writer.endWindow();
writer.committed(1);
writer.teardown();
}
@Test
public void testSingleRollingFileEmptyWindowsWrite()
{
SingleHDFSExactlyOnceWriter writer = new SingleHDFSExactlyOnceWriter();
testSingleRollingFileEmptyWindowsWriteHelper(writer);
//Rolling file 0
String singleFileName = testMeta.getDir() + File.separator + SINGLE_FILE;
int numberOfFiles = new File(testMeta.getDir()).listFiles().length;
Assert.assertEquals("More than one File in Directory", 1, numberOfFiles);
String correctContents = "0\n" + "1\n" + "2\n";
checkOutput(0, singleFileName, correctContents);
}
private void testSingleRollingFileEmptyWindowsWriteHelper(SingleHDFSExactlyOnceWriter writer)
{
writer.setFilePath(testMeta.getDir());
writer.setMaxLength(4);
writer.setRotationWindows(1);
writer.setAlwaysWriteToTmp(testMeta.writeToTmp);
writer.setup(testMeta.testOperatorContext);
writer.beginWindow(0);
writer.input.put(0);
writer.input.put(1);
writer.input.put(2);
writer.endWindow();
writer.beginWindow(1);
writer.endWindow();
writer.beginWindow(2);
writer.endWindow();
writer.beforeCheckpoint(2);
writer.checkpointed(2);
writer.committed(2);
writer.teardown();
}
@Test
public void testSingleRollingFileFailedWrite()
{
SingleHDFSExactlyOnceWriter writer = new SingleHDFSExactlyOnceWriter();
testSingleRollingFileFailedWriteHelper(writer);
//Rolling file 0
String singleFileName = testMeta.getDir() + File.separator + SINGLE_FILE;
String correctContents = "0\n" + "1\n" + "2\n";
checkOutput(0, singleFileName, correctContents);
//Rolling file 1
correctContents = "3\n" + "4\n" + "5\n";
checkOutput(1, singleFileName, correctContents);
//Rolling file 2
correctContents = "6\n" + "7\n" + "8\n";
checkOutput(2, singleFileName, correctContents);
}
@Test
public void testSingleRollingFileFailedWriteTmp()
{
testMeta.writeToTmp = true;
testSingleRollingFileFailedWrite();
}
private void testSingleRollingFileFailedWriteHelper(SingleHDFSExactlyOnceWriter writer)
{
writer.setMaxLength(4);
writer.setFilePath(testMeta.getDir());
writer.setAlwaysWriteToTmp(testMeta.writeToTmp);
writer.setup(testMeta.testOperatorContext);
writer.beginWindow(0);
writer.input.put(0);
writer.input.put(1);
writer.input.put(2);
writer.endWindow();
AbstractFileOutputOperator checkPointWriter = checkpoint(writer, Stateless.WINDOW_ID);
writer.beginWindow(1);
writer.input.put(3);
writer.input.put(4);
writer.teardown();
restoreCheckPoint(checkPointWriter, writer);
writer.setup(testMeta.testOperatorContext);
writer.beginWindow(1);
writer.input.put(3);
writer.input.put(4);
writer.input.put(5);
writer.endWindow();
writer.beginWindow(2);
writer.input.put(6);
writer.input.put(7);
writer.input.put(8);
writer.endWindow();
writer.committed(2);
writer.teardown();
}
@Test
public void testSingleRollingFileFailedWrite1()
{
SingleHDFSExactlyOnceWriter writer = new SingleHDFSExactlyOnceWriter();
writer.setFilePath(testMeta.getDir());
writer.setMaxLength(4);
writer.setAlwaysWriteToTmp(testMeta.writeToTmp);
writer.setup(testMeta.testOperatorContext);
writer.beginWindow(0);
writer.input.put(0);
writer.input.put(1);
writer.input.put(2);
writer.endWindow();
writer.beginWindow(1);
writer.input.put(3);
writer.input.put(4);
writer.endWindow();
AbstractFileOutputOperator checkPointWriter = checkpoint(writer, Stateless.WINDOW_ID);
AbstractFileOutputOperator checkPointWriter1 = checkpoint(writer, Stateless.WINDOW_ID);
LOG.debug("Checkpoint endOffsets={}", checkPointWriter.endOffsets);
writer.beginWindow(2);
writer.input.put(5);
writer.teardown();
restoreCheckPoint(checkPointWriter, writer);
LOG.debug("Checkpoint endOffsets={}", checkPointWriter.endOffsets);
writer.setup(testMeta.testOperatorContext);
writer.beginWindow(2);
writer.input.put(5);
writer.endWindow();
writer.beginWindow(3);
writer.input.put(6);
writer.input.put(7);
writer.input.put(8);
writer.endWindow();
writer.teardown();
restoreCheckPoint(checkPointWriter1, writer);
writer.setup(testMeta.testOperatorContext);
writer.committed(2);
String singleFilePath = testMeta.getDir() + File.separator + SINGLE_FILE;
//Rolling file 0
String correctContents = "0\n" + "1\n" + "2\n";
checkOutput(0, singleFilePath, correctContents);
//Rolling file 1
correctContents = "3\n" + "4\n";
checkOutput(1, singleFilePath, correctContents);
}
@Test
public void testSingleRollingFileFailedWrite1Tmp()
{
testMeta.writeToTmp = true;
testSingleRollingFileFailedWrite1();
}
//@Ignore
@Test
public void testMultiRollingFileCompletedWrite()
{
EvenOddHDFSExactlyOnceWriter writer = new EvenOddHDFSExactlyOnceWriter();
testMultiRollingFileCompletedWriteHelper(writer);
}
@Test
public void testMultiRollingFileCompletedWriteTmp()
{
testMeta.writeToTmp = true;
testMultiRollingFileCompletedWrite();
}
@Test
public void testMultiRollingFileCompletedWriteCache1()
{
EvenOddHDFSExactlyOnceWriter writer = new EvenOddHDFSExactlyOnceWriter();
writer.setMaxOpenFiles(1);
testMultiRollingFileCompletedWriteHelper(writer);
}
@Test
public void testMultiRollingFileCompletedWriteCache1Tmp()
{
testMeta.writeToTmp = true;
testMultiRollingFileCompletedWriteCache1();
}
@Test
public void testMultiRollingFileCompletedWriteOverwrite()
{
EvenOddHDFSExactlyOnceWriter writer = new EvenOddHDFSExactlyOnceWriter();
testMultiRollingFileCompletedWriteHelper(writer);
}
@Test
public void testMultiRollingFileCompletedWriteOverwriteTmp()
{
testMeta.writeToTmp = true;
testMultiRollingFileCompletedWriteOverwrite();
}
@Test
public void testMultiRollingFileCompletedWriteOverwriteCache1()
{
EvenOddHDFSExactlyOnceWriter writer = new EvenOddHDFSExactlyOnceWriter();
writer.setMaxOpenFiles(1);
testMultiRollingFileCompletedWriteHelperCache1(writer);
}
@Test
public void testMultiRollingFileCompletedWriteOverwriteCache1Tmp()
{
testMeta.writeToTmp = true;
testMultiRollingFileCompletedWriteOverwriteCache1();
}
private void testMultiRollingFileCompletedWriteHelperCache1(EvenOddHDFSExactlyOnceWriter writer)
{
writer.setMaxLength(4);
writer.setAlwaysWriteToTmp(testMeta.writeToTmp);
File meta = new File(testMeta.getDir());
writer.setFilePath(meta.getAbsolutePath());
writer.setup(testMeta.testOperatorContext);
writer.beginWindow(0);
writer.input.put(0);
writer.input.put(1);
writer.input.put(2);
writer.input.put(3);
writer.input.put(4);
writer.input.put(5);
writer.endWindow();
writer.beginWindow(1);
writer.input.put(6);
writer.input.put(7);
writer.input.put(8);
writer.input.put(9);
writer.input.put(6);
writer.input.put(7);
writer.endWindow();
writer.committed(1);
//Even file
String evenFileName = testMeta.getDir() + File.separator + EVEN_FILE;
String correctContents = "0\n" + "2\n" + "4\n";
checkOutput(0, evenFileName, correctContents);
correctContents = "6\n" + "8\n" + "6\n";
checkOutput(1, evenFileName, correctContents);
//Odd file
String oddFileName = testMeta.getDir() + File.separator + ODD_FILE;
correctContents = "1\n" + "3\n" + "5\n";
checkOutput(0, oddFileName, correctContents);
correctContents = "7\n" + "9\n" + "7\n";
checkOutput(1, oddFileName, correctContents);
}
private void testMultiRollingFileCompletedWriteHelper(EvenOddHDFSExactlyOnceWriter writer)
{
writer.setMaxLength(4);
File meta = new File(testMeta.getDir());
writer.setFilePath(meta.getAbsolutePath());
writer.setAlwaysWriteToTmp(testMeta.writeToTmp);
writer.setup(testMeta.testOperatorContext);
writer.beginWindow(0);
writer.input.put(0);
writer.input.put(1);
writer.input.put(2);
writer.input.put(3);
writer.input.put(4);
writer.input.put(5);
writer.endWindow();
writer.beginWindow(1);
writer.input.put(6);
writer.input.put(7);
writer.input.put(8);
writer.input.put(9);
writer.input.put(6);
writer.input.put(7);
writer.endWindow();
writer.committed(1);
//Even file
String evenFileName = testMeta.getDir() + File.separator + EVEN_FILE;
String correctContents = "0\n" + "2\n" + "4\n";
checkOutput(0, evenFileName, correctContents);
correctContents = "6\n" + "8\n" + "6\n";
checkOutput(1, evenFileName, correctContents);
//Odd file
String oddFileName = testMeta.getDir() + File.separator + ODD_FILE;
correctContents = "1\n" + "3\n" + "5\n";
checkOutput(0, oddFileName, correctContents);
correctContents = "7\n" + "9\n" + "7\n";
checkOutput(1, oddFileName, correctContents);
}
@Test
public void testMultiRollingFileFailedWrite()
{
EvenOddHDFSExactlyOnceWriter writer = new EvenOddHDFSExactlyOnceWriter();
testMultiRollingFileFailedWriteHelperHelper(writer);
}
@Test
public void testMultiRollingFileFailedWriteTmp()
{
testMeta.writeToTmp = true;
testMultiRollingFileFailedWrite();
}
@Test
public void testMultiRollingFileFailedWriteCache1()
{
EvenOddHDFSExactlyOnceWriter writer = new EvenOddHDFSExactlyOnceWriter();
writer.setMaxOpenFiles(1);
testMultiRollingFileFailedWriteHelperHelper(writer);
}
@Test
public void testMultiRollingFileFailedWriteCache1Tmp()
{
testMeta.writeToTmp = true;
testMultiRollingFileFailedWriteCache1();
}
private void testMultiRollingFileFailedWriteHelperHelper(EvenOddHDFSExactlyOnceWriter writer)
{
testMultiRollingFileFailedWriteHelper(writer);
//Even file
String evenFileName = testMeta.getDir() + File.separator + EVEN_FILE;
String correctContents = "0\n" + "2\n" + "4\n";
checkOutput(0, evenFileName, correctContents);
correctContents = "6\n" + "8\n" + "6\n";
checkOutput(1, evenFileName, correctContents);
//Odd file
String oddFileName = testMeta.getDir() + File.separator + ODD_FILE;
correctContents = "1\n" + "3\n" + "5\n";
checkOutput(0, oddFileName, correctContents);
correctContents = "7\n" + "9\n" + "7\n";
checkOutput(1, oddFileName, correctContents);
}
private void testMultiRollingFileFailedWriteHelper(EvenOddHDFSExactlyOnceWriter writer)
{
File meta = new File(testMeta.getDir());
writer.setFilePath(meta.getAbsolutePath());
writer.setMaxLength(4);
writer.setAlwaysWriteToTmp(testMeta.writeToTmp);
writer.setup(testMeta.testOperatorContext);
writer.beginWindow(0);
writer.input.put(0);
writer.input.put(1);
writer.endWindow();
AbstractFileOutputOperator checkPointWriter = checkpoint(writer, Stateless.WINDOW_ID);
writer.beginWindow(1);
writer.input.put(2);
writer.input.put(3);
writer.teardown();
restoreCheckPoint(checkPointWriter, writer);
writer.setup(testMeta.testOperatorContext);
writer.beginWindow(1);
writer.input.put(2);
writer.input.put(3);
writer.endWindow();
writer.beginWindow(2);
writer.input.put(4);
writer.input.put(5);
writer.endWindow();
writer.beginWindow(3);
writer.input.put(6);
writer.input.put(7);
writer.input.put(8);
writer.input.put(9);
writer.input.put(6);
writer.input.put(7);
writer.endWindow();
writer.committed(3);
}
@Test
public void testMultiRollingFileFailedWriteOverwrite() throws IOException
{
EvenOddHDFSExactlyOnceWriter writer = new EvenOddHDFSExactlyOnceWriter();
testMultiRollingFileFailedWriteOverwriteHelper(writer);
}
@Test
public void testMultiRollingFileFailedWriteOverwriteTmp() throws IOException
{
testMeta.writeToTmp = true;
testMultiRollingFileFailedWriteOverwrite();
}
@Test
public void testMultiRollingFileFailedWriteOverwriteCache1() throws IOException
{
EvenOddHDFSExactlyOnceWriter writer = new EvenOddHDFSExactlyOnceWriter();
writer.setMaxOpenFiles(1);
String evenFileName = testMeta.getDir() + File.separator + EVEN_FILE;
String oddFileName = testMeta.getDir() + File.separator + ODD_FILE;
populateFile(EVEN_FILE + ".0", "0\n" + "2\n" + "4\n");
populateFile(ODD_FILE + ".0", "1\n" + "3\n" + "5\n");
testMultiRollingFileFailedWriteOverwriteHelperCache1(writer);
//Even file
String correctContents = "0\n" + "4\n" + "6\n";
checkOutput(0, evenFileName, correctContents);
correctContents = "8\n" + "6\n" + "10\n";
checkOutput(1, evenFileName, correctContents);
//Odd file
correctContents = "1\n" + "5\n" + "7\n";
checkOutput(0, oddFileName, correctContents);
correctContents = "9\n" + "7\n" + "11\n";
checkOutput(1, oddFileName, correctContents);
}
@Test
public void testMultiRollingFileFailedWriteOverwriteCache1Tmp() throws IOException
{
testMeta.writeToTmp = true;
testMultiRollingFileFailedWriteOverwriteCache1();
}
private void testMultiRollingFileFailedWriteOverwriteHelperCache1(EvenOddHDFSExactlyOnceWriter writer)
{
File meta = new File(testMeta.getDir());
writer.setFilePath(meta.getAbsolutePath());
writer.setMaxLength(4);
writer.setAlwaysWriteToTmp(testMeta.writeToTmp);
writer.setup(testMeta.testOperatorContext);
writer.beginWindow(0);
writer.input.put(0);
writer.input.put(1);
writer.endWindow();
AbstractFileOutputOperator checkPointWriter = checkpoint(writer, Stateless.WINDOW_ID);
writer.beginWindow(1);
writer.input.put(2);
writer.input.put(3);
writer.teardown();
restoreCheckPoint(checkPointWriter, writer);
writer.setup(testMeta.testOperatorContext);
writer.beginWindow(1);
writer.input.put(4);
writer.input.put(5);
writer.endWindow();
writer.beginWindow(2);
writer.input.put(6);
writer.input.put(7);
writer.input.put(8);
writer.input.put(9);
writer.input.put(6);
writer.input.put(7);
writer.input.put(10);
writer.input.put(11);
writer.endWindow();
writer.committed(2);
}
private void testMultiRollingFileFailedWriteOverwriteHelper(EvenOddHDFSExactlyOnceWriter writer) throws IOException
{
String evenFileName = testMeta.getDir() + File.separator + EVEN_FILE;
String oddFileName = testMeta.getDir() + File.separator + ODD_FILE;
populateFile(EVEN_FILE + ".0", "0\n" + "2\n" + "4\n");
populateFile(ODD_FILE + ".0", "1\n" + "3\n" + "5\n");
File meta = new File(testMeta.getDir());
writer.setFilePath(meta.getAbsolutePath());
writer.setMaxLength(4);
writer.setAlwaysWriteToTmp(testMeta.writeToTmp);
writer.setup(testMeta.testOperatorContext);
writer.beginWindow(0);
writer.input.process(0);
writer.input.process(1);
writer.endWindow();
AbstractFileOutputOperator checkPointWriter = checkpoint(writer, Stateless.WINDOW_ID);
writer.beginWindow(1);
writer.input.process(2);
writer.input.process(3);
writer.teardown();
restoreCheckPoint(checkPointWriter, writer);
writer.setup(testMeta.testOperatorContext);
writer.beginWindow(1);
writer.input.process(4);
writer.input.process(5);
writer.endWindow();
writer.beginWindow(2);
writer.input.process(6);
writer.input.process(7);
writer.input.process(8);
writer.input.process(9);
writer.input.process(6);
writer.input.process(7);
writer.input.process(10);
writer.input.process(11);
writer.endWindow();
writer.committed(2);
//Even file
String correctContents = "0\n" + "4\n" + "6\n";
checkOutput(0, evenFileName, correctContents);
correctContents = "8\n" + "6\n" + "10\n";
checkOutput(1, evenFileName, correctContents);
//Odd file
correctContents = "1\n" + "5\n" + "7\n";
checkOutput(0, oddFileName, correctContents);
correctContents = "9\n" + "7\n" + "11\n";
checkOutput(1, oddFileName, correctContents);
}
@Test
public void singleFileMultiRollingFailure()
{
SingleHDFSExactlyOnceWriter writer = new SingleHDFSExactlyOnceWriter();
File meta = new File(testMeta.getDir());
writer.setFilePath(meta.getAbsolutePath());
writer.setMaxLength(4);
singleFileMultiRollingFailureHelper(writer);
String singleFilePath = testMeta.getDir() + File.separator + SINGLE_FILE;
//Rolling file 0
String correctContents = "0\n" + "1\n" + "2\n";
checkOutput(0, singleFilePath, correctContents);
//Rolling file 1
correctContents = "3\n" + "4\n" + "0\n";
checkOutput(1, singleFilePath, correctContents);
//Rolling file 2
correctContents = "1\n" + "2\n" + "3\n";
checkOutput(2, singleFilePath, correctContents);
//Rolling file 3
correctContents = "4\n" + "5\n" + "6\n";
checkOutput(3, singleFilePath, correctContents);
}
@Test
public void singleFileMultiRollingFailureTmp()
{
testMeta.writeToTmp = true;
singleFileMultiRollingFailure();
}
private void singleFileMultiRollingFailureHelper(SingleHDFSExactlyOnceWriter writer)
{
writer.setAlwaysWriteToTmp(testMeta.writeToTmp);
writer.setup(testMeta.testOperatorContext);
writer.beginWindow(0);
writer.input.put(0);
writer.input.put(1);
writer.input.put(2);
writer.endWindow();
writer.beginWindow(1);
writer.input.put(3);
writer.input.put(4);
AbstractFileOutputOperator checkPointWriter = checkpoint(writer, Stateless.WINDOW_ID);
writer.input.put(3);
writer.input.put(4);
writer.input.put(5);
writer.endWindow();
writer.beginWindow(2);
writer.input.put(6);
writer.input.put(7);
writer.input.put(8);
writer.endWindow();
writer.teardown();
restoreCheckPoint(checkPointWriter, writer);
writer.setup(testMeta.testOperatorContext);
writer.beginWindow(1);
writer.input.put(0);
writer.input.put(1);
writer.input.put(2);
writer.endWindow();
writer.beginWindow(2);
writer.input.put(3);
writer.input.put(4);
writer.input.put(5);
writer.input.put(6);
writer.endWindow();
writer.committed(2);
}
@Test
public void validateNothingWrongTest()
{
ValidationTestApp validationTestApp = new ValidationTestApp(new File(testMeta.getDir()), null,
new SingleHDFSByteExactlyOnceWriter());
LocalMode.runApp(validationTestApp, 1);
}
@Test
public void validateNegativeMaxLengthTest()
{
ValidationTestApp validationTestApp = new ValidationTestApp(new File(testMeta.getDir()), -1L,
new SingleHDFSByteExactlyOnceWriter());
boolean error = false;
try {
LocalMode.runApp(validationTestApp, 1);
} catch (RuntimeException e) {
if (e.getCause() instanceof ConstraintViolationException) {
error = true;
}
}
Assert.assertEquals("Max length validation not thrown with -1 max length", true, error);
}
@Test
public void testPeriodicRotation()
{
EvenOddHDFSExactlyOnceWriter writer = new EvenOddHDFSExactlyOnceWriter();
File dir = new File(testMeta.getDir());
writer.setFilePath(testMeta.getDir());
writer.setRotationWindows(30);
writer.setAlwaysWriteToTmp(testMeta.writeToTmp);
writer.setup(testMeta.testOperatorContext);
// Check that rotation doesn't happen prematurely
for (int i = 0; i < 30; ++i) {
writer.beginWindow(i);
for (int j = 0; j < i; ++j) {
writer.input.put(2 * j + 1);
}
writer.endWindow();
}
writer.committed(29);
Set<String> fileNames = new TreeSet<String>();
fileNames.add(ODD_FILE + ".0");
Collection<File> files = FileUtils.listFiles(dir, null, false);
Assert.assertEquals("Number of part files", 1, files.size());
Assert.assertEquals("Part file names", fileNames, getFileNames(files));
// Check that rotation is happening consistently and for all files
for (int i = 30; i < 120; ++i) {
writer.beginWindow(i);
for (int j = 0; j < i; ++j) {
writer.input.put(j);
}
writer.endWindow();
}
writer.committed(119);
files = FileUtils.listFiles(dir, null, false);
Assert.assertEquals("Number of part files", 7, files.size());
for (int i = 0; i < 3; ++i) {
fileNames.add(EVEN_FILE + "." + i);
}
for (int i = 1; i < 4; ++i) {
fileNames.add(ODD_FILE + "." + i);
}
Assert.assertEquals("Part file names", fileNames, getFileNames(files));
// Check that rotation doesn't happen for files that don't have data during the rotation period
for (int i = 120; i < 180; ++i) {
writer.beginWindow(i);
for (int j = 0; j < i; ++j) {
writer.input.put(j * 2);
}
writer.endWindow();
}
writer.committed(179);
files = FileUtils.listFiles(dir, null, false);
Assert.assertEquals("Number of part files", 9, files.size());
for (int i = 3; i < 5; ++i) {
fileNames.add(EVEN_FILE + "." + i);
}
Assert.assertEquals("Part file names", fileNames, getFileNames(files));
writer.teardown();
}
@Test
public void testPeriodicRotationTmp()
{
testMeta.writeToTmp = true;
testPeriodicRotation();
}
@Test
public void testPeriodicRotationWithEviction() throws InterruptedException
{
EvenOddHDFSExactlyOnceWriter writer = new EvenOddHDFSExactlyOnceWriter();
File dir = new File(testMeta.getDir());
writer.setFilePath(testMeta.getDir());
writer.setRotationWindows(30);
writer.setAlwaysWriteToTmp(true);
writer.setExpireStreamAfterAccessMillis(1L);
writer.setup(testMeta.testOperatorContext);
// Check that rotation for even.txt.0 happens
for (int i = 0; i < 30; ++i) {
writer.beginWindow(i);
if (i == 0) {
writer.input.put(i);
}
Thread.sleep(100L);
writer.endWindow();
}
writer.committed(29);
Set<String> fileNames = new TreeSet<>();
fileNames.add(EVEN_FILE + ".0");
Collection<File> files = FileUtils.listFiles(dir, null, false);
Assert.assertEquals("Number of part files", 1, files.size());
Assert.assertEquals("Part file names", fileNames, getFileNames(files));
// Check that rotation doesn't happen for files that don't have data during the rotation period
for (int i = 30; i < 120; ++i) {
writer.beginWindow(i);
writer.endWindow();
}
writer.committed(119);
files = FileUtils.listFiles(dir, null, false);
Assert.assertEquals("Number of part files", 1, files.size());
Assert.assertEquals("Part file names", fileNames, getFileNames(files));
}
private void writeCompressedData(EvenOddHDFSExactlyOnceWriter writer, File evenFile,
File oddFile, List<Long> evenOffsets, List<Long> oddOffsets)
{
writer.setFilePath(testMeta.getDir());
writer.setAlwaysWriteToTmp(false);
writer.setup(testMeta.testOperatorContext);
for (int i = 0; i < 10; ++i) {
writer.beginWindow(i);
for (int j = 0; j < 1000; ++j) {
writer.input.put(i);
}
writer.endWindow();
if ((i % 2) == 1) {
writer.beforeCheckpoint(i);
evenOffsets.add(evenFile.length());
oddOffsets.add(oddFile.length());
}
}
writer.teardown();
}
@Test
public void testGzipCompression() throws IOException
{
EvenOddHDFSExactlyOnceWriter writer = new EvenOddHDFSExactlyOnceWriter();
writer.setFilterStreamProvider(new FilterStreamCodec.GZipFilterStreamProvider());
File evenFile = new File(testMeta.getDir(), EVEN_FILE);
File oddFile = new File(testMeta.getDir(), ODD_FILE);
// To get around the multi member gzip issue with openjdk
// http://bugs.java.com/bugdatabase/view_bug.do?bug_id=4691425
List<Long> evenOffsets = new ArrayList<Long>();
List<Long> oddOffsets = new ArrayList<Long>();
writeCompressedData(writer, evenFile, oddFile, evenOffsets, oddOffsets);
checkCompressedFile(evenFile, evenOffsets, 0, 5, 1000, null, null);
checkCompressedFile(oddFile, oddOffsets, 1, 5, 1000, null, null);
}
@Test
public void testSnappyStreamProvider() throws IOException
{
if (checkNativeSnappy()) {
return;
}
EvenOddHDFSExactlyOnceWriter writer = new EvenOddHDFSExactlyOnceWriter();
writer.setFilterStreamProvider(new FilterStreamCodec.SnappyFilterStreamProvider());
File evenFile = new File(testMeta.getDir(), EVEN_FILE);
File oddFile = new File(testMeta.getDir(), ODD_FILE);
// To get around the multi member gzip issue with openjdk
// http://bugs.java.com/bugdatabase/view_bug.do?bug_id=4691425
List<Long> evenOffsets = new ArrayList<Long>();
List<Long> oddOffsets = new ArrayList<Long>();
writeCompressedData(writer, evenFile, oddFile, evenOffsets, oddOffsets);
checkSnappyFile(evenFile, evenOffsets, 0, 5, 1000);
checkSnappyFile(oddFile, oddOffsets, 1, 5, 1000);
}
private boolean checkNativeSnappy()
{
try {
SnappyCodec.checkNativeCodeLoaded();
} catch (UnsatisfiedLinkError u) {
LOG.error("WARNING: Skipping Snappy compression test since native libraries were not found.");
return true;
} catch (RuntimeException e) {
LOG.error("WARNING: Skipping Snappy compression test since native libraries were not found.");
return true;
}
return false;
}
@Test
public void testSnappyCompressionSimple() throws IOException
{
if (checkNativeSnappy()) {
return;
}
File snappyFile = new File(testMeta.getDir(), "snappyTestFile.snappy");
BufferedOutputStream os = new BufferedOutputStream(new FileOutputStream(snappyFile));
Configuration conf = new Configuration();
CompressionCodec codec = (CompressionCodec)ReflectionUtils.newInstance(SnappyCodec.class, conf);
FilterStreamCodec.SnappyFilterStream filterStream = new FilterStreamCodec.SnappyFilterStream(
codec.createOutputStream(os));
int ONE_MB = 1024 * 1024;
String testStr = "TestSnap-16bytes";
for (int i = 0; i < ONE_MB; i++) { // write 16 MBs
filterStream.write(testStr.getBytes());
}
filterStream.flush();
filterStream.close();
CompressionInputStream is = codec.createInputStream(new FileInputStream(snappyFile));
byte[] recovered = new byte[testStr.length()];
int bytesRead = is.read(recovered);
is.close();
assertEquals(testStr, new String(recovered));
}
@Test
public void testRecoveryOfOpenFiles()
{
EvenOddHDFSExactlyOnceWriter writer = new EvenOddHDFSExactlyOnceWriter();
writer.setMaxLength(4);
File meta = new File(testMeta.getDir());
writer.setFilePath(meta.getAbsolutePath());
writer.setAlwaysWriteToTmp(true);
writer.setup(testMeta.testOperatorContext);
writer.beginWindow(0);
writer.input.put(0);
writer.input.put(1);
writer.input.put(2);
writer.input.put(3);
writer.endWindow();
writer.beforeCheckpoint(0);
//failure and restored
writer.setup(testMeta.testOperatorContext);
writer.input.put(4);
writer.input.put(5);
writer.endWindow();
writer.beginWindow(1);
writer.input.put(6);
writer.input.put(7);
writer.input.put(8);
writer.input.put(9);
writer.input.put(6);
writer.input.put(7);
writer.endWindow();
writer.committed(1);
//Part 0 checks
String evenFileName = testMeta.getDir() + File.separator + EVEN_FILE;
String correctContents = "0\n" + "2\n" + "4\n";
checkOutput(0, evenFileName, correctContents);
String oddFileName = testMeta.getDir() + File.separator + ODD_FILE;
correctContents = "1\n" + "3\n" + "5\n";
checkOutput(0, oddFileName, correctContents);
//Part 1 checks
correctContents = "6\n" + "8\n" + "6\n";
checkOutput(1, evenFileName, correctContents);
correctContents = "7\n" + "9\n" + "7\n";
checkOutput(1, oddFileName, correctContents);
}
private void checkCompressedFile(File file, List<Long> offsets, int startVal, int totalWindows, int totalRecords, SecretKey secretKey, byte[] iv) throws IOException
{
FileInputStream fis;
InputStream gss = null;
GZIPInputStream gis = null;
BufferedReader br = null;
Cipher cipher = null;
if (secretKey != null) {
try {
cipher = Cipher.getInstance("AES/CBC/PKCS5Padding");
IvParameterSpec ivps = new IvParameterSpec(iv);
cipher.init(Cipher.DECRYPT_MODE, secretKey, ivps);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
int numWindows = 0;
try {
fis = new FileInputStream(file);
//fis.skip(startOffset);
gss = fis;
if (secretKey != null) {
try {
/*
Cipher cipher = Cipher.getInstance("AES/CBC/PKCS5Padding");
IvParameterSpec ivps = new IvParameterSpec(iv);
cipher.init(Cipher.DECRYPT_MODE, secretKey, ivps);
*/
gss = new CipherInputStream(fis, cipher);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
long startOffset = 0;
for (long offset : offsets) {
// Skip initial case in case file is not yet created
if (offset == 0) {
continue;
}
long limit = offset - startOffset;
LimitInputStream lis = new LimitInputStream(gss, limit);
//gis = new GZIPInputStream(fis);
gis = new GZIPInputStream(lis);
br = new BufferedReader(new InputStreamReader(gis));
//br = new BufferedReader(new InputStreamReader(gss));
String eline = "" + (startVal + numWindows * 2);
int count = 0;
String line;
while ((line = br.readLine()) != null) {
Assert.assertEquals("File line", eline, line);
++count;
if ((count % totalRecords) == 0) {
++numWindows;
eline = "" + (startVal + numWindows * 2);
}
}
startOffset = offset;
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (br != null) {
br.close();
} else {
if (gis != null) {
gis.close();
} else if (gss != null) {
gss.close();
}
}
}
Assert.assertEquals("Total", totalWindows, numWindows);
}
private void checkSnappyFile(File file, List<Long> offsets, int startVal, int totalWindows, int totalRecords) throws IOException
{
FileInputStream fis;
InputStream gss = null;
Configuration conf = new Configuration();
CompressionCodec codec = (CompressionCodec)ReflectionUtils.newInstance(SnappyCodec.class, conf);
CompressionInputStream snappyIs = null;
BufferedReader br = null;
int numWindows = 0;
try {
fis = new FileInputStream(file);
gss = fis;
long startOffset = 0;
for (long offset : offsets) {
// Skip initial case in case file is not yet created
if (offset == 0) {
continue;
}
long limit = offset - startOffset;
LimitInputStream lis = new LimitInputStream(gss, limit);
snappyIs = codec.createInputStream(lis);
br = new BufferedReader(new InputStreamReader(snappyIs));
String eline = "" + (startVal + numWindows * 2);
int count = 0;
String line;
while ((line = br.readLine()) != null) {
Assert.assertEquals("File line", eline, line);
++count;
if ((count % totalRecords) == 0) {
++numWindows;
eline = "" + (startVal + numWindows * 2);
}
}
startOffset = offset;
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (br != null) {
br.close();
} else {
if (snappyIs != null) {
snappyIs.close();
} else if (gss != null) {
gss.close();
}
}
}
Assert.assertEquals("Total", totalWindows, numWindows);
}
@Test
public void testChainFilters() throws NoSuchAlgorithmException, IOException
{
EvenOddHDFSExactlyOnceWriter writer = new EvenOddHDFSExactlyOnceWriter();
KeyGenerator keygen = KeyGenerator.getInstance("AES");
keygen.init(128);
final SecretKey secretKey = keygen.generateKey();
byte[] iv = "TestParam16bytes".getBytes();
final IvParameterSpec ivps = new IvParameterSpec(iv);
FilterStreamProvider.FilterChainStreamProvider<FilterOutputStream, OutputStream> chainStreamProvider
= new FilterStreamProvider.FilterChainStreamProvider<FilterOutputStream, OutputStream>();
chainStreamProvider.addStreamProvider(new FilterStreamCodec.GZipFilterStreamProvider());
// The filter is to keep track of the offsets to handle multi member gzip issue with openjdk
// http://bugs.java.com/bugdatabase/view_bug.do?bug_id=4691425
final CounterFilterStreamContext evenCounterContext = new CounterFilterStreamContext();
final CounterFilterStreamContext oddCounterContext = new CounterFilterStreamContext();
chainStreamProvider.addStreamProvider(new FilterStreamProvider.SimpleFilterReusableStreamProvider<CounterFilterOutputStream, OutputStream>()
{
@Override
protected FilterStreamContext<CounterFilterOutputStream> createFilterStreamContext(OutputStream outputStream) throws IOException
{
if (evenCounterContext.isDoInit()) {
evenCounterContext.init(outputStream);
return evenCounterContext;
} else {
oddCounterContext.init(outputStream);
return oddCounterContext;
}
}
});
chainStreamProvider.addStreamProvider(new FilterStreamProvider.SimpleFilterReusableStreamProvider<CipherOutputStream, OutputStream>()
{
@Override
protected FilterStreamContext<CipherOutputStream> createFilterStreamContext(OutputStream outputStream) throws IOException
{
try {
Cipher cipher = Cipher.getInstance("AES/CBC/PKCS5Padding");
cipher.init(Cipher.ENCRYPT_MODE, secretKey, ivps);
return new FilterStreamCodec.CipherFilterStreamContext(outputStream, cipher);
} catch (Exception e) {
throw new IOException(e);
}
}
});
writer.setFilterStreamProvider(chainStreamProvider);
File evenFile = new File(testMeta.getDir(), EVEN_FILE);
File oddFile = new File(testMeta.getDir(), ODD_FILE);
List<Long> evenOffsets = new ArrayList<Long>();
List<Long> oddOffsets = new ArrayList<Long>();
writer.setFilePath(testMeta.getDir());
writer.setAlwaysWriteToTmp(false);
writer.setup(testMeta.testOperatorContext);
for (int i = 0; i < 10; ++i) {
writer.beginWindow(i);
for (int j = 0; j < 1000; ++j) {
writer.input.put(i);
}
writer.endWindow();
if ((i % 2) == 1) {
writer.beforeCheckpoint(i);
evenOffsets.add(evenCounterContext.getCounter());
oddOffsets.add(oddCounterContext.getCounter());
}
}
writer.teardown();
/*
evenOffsets.add(evenFile.length());
oddOffsets.add(oddFile.length());
*/
checkCompressedFile(evenFile, evenOffsets, 0, 5, 1000, secretKey, iv);
checkCompressedFile(oddFile, oddOffsets, 1, 5, 1000, secretKey, iv);
}
private Set<String> getFileNames(Collection<File> files)
{
Set<String> filesNames = new TreeSet<String>();
for (File file : files) {
filesNames.add(file.getName());
}
return filesNames;
}
private static class CounterFilterStreamContext implements FilterStreamContext<CounterFilterOutputStream>
{
private CounterFilterOutputStream counterStream;
public void init(OutputStream outputStream)
{
counterStream = new CounterFilterOutputStream(outputStream);
}
public boolean isDoInit()
{
return (counterStream == null);
}
@Override
public CounterFilterOutputStream getFilterStream()
{
return counterStream;
}
@Override
public void finalizeContext() throws IOException
{
}
public long getCounter()
{
if (isDoInit()) {
return 0;
} else {
return counterStream.getCounter();
}
}
}
private static class CounterFilterOutputStream extends FilterOutputStream
{
long counter;
int refCount;
public CounterFilterOutputStream(OutputStream out)
{
super(out);
}
@Override
public void write(int b) throws IOException
{
++refCount;
super.write(b);
--refCount;
if (refCount == 0) {
counter += 1;
}
}
@Override
public void write(@Nonnull byte[] b) throws IOException
{
++refCount;
super.write(b);
--refCount;
if (refCount == 0) {
counter += b.length;
}
}
@Override
public void write(@Nonnull byte[] b, int off, int len) throws IOException
{
++refCount;
super.write(b, off, len);
--refCount;
if (refCount == 0) {
counter += len;
}
}
public long getCounter()
{
return counter;
}
}
}