blob: c59aa26adf82397b5a5d08e190102f3c24c7362c [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.io;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.File;
import java.io.FileInputStream;
import java.util.zip.GZIPInputStream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.sqoop.testutil.ImportJobTestCase;
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
/**
* Test that the splittable buffered writer system works.
*/
public class TestSplittableBufferedWriter {
public static final Log LOG = LogFactory.getLog(
TestSplittableBufferedWriter.class.getName());
private String getWriteDir() {
return new File(ImportJobTestCase.TEMP_BASE_DIR,
"bufferedWriterTest").toString();
}
private Path getWritePath() {
return new Path(ImportJobTestCase.TEMP_BASE_DIR, "bufferedWriterTest");
}
/** Create the directory where we'll write our test files to; and
* make sure it has no files in it.
*/
private void ensureEmptyWriteDir() throws IOException {
FileSystem fs = FileSystem.getLocal(getConf());
Path writeDir = getWritePath();
fs.mkdirs(writeDir);
FileStatus [] stats = fs.listStatus(writeDir);
for (FileStatus stat : stats) {
if (stat.isDir()) {
fail("setUp(): Write directory " + writeDir
+ " contains subdirectories");
}
LOG.debug("setUp(): Removing " + stat.getPath());
if (!fs.delete(stat.getPath(), false)) {
fail("setUp(): Could not delete residual file " + stat.getPath());
}
}
if (!fs.exists(writeDir)) {
fail("setUp: Could not create " + writeDir);
}
}
@Before
public void setUp() throws IOException {
ensureEmptyWriteDir();
}
private Configuration getConf() {
Configuration conf = new Configuration();
conf.set("fs.default.name", "file:///");
return conf;
}
/** Verifies contents of an InputStream. Closes the InputStream on
* its way out. Fails the test if the file doesn't match the expected set
* of lines.
*/
private void verifyFileContents(InputStream is, String [] lines)
throws IOException {
BufferedReader r = new BufferedReader(new InputStreamReader(is));
try {
for (String expectedLine : lines) {
String actualLine = r.readLine();
assertNotNull(actualLine);
assertEquals("Input line mismatch", expectedLine, actualLine);
}
assertNull("Stream had additional contents after expected line",
r.readLine());
} finally {
r.close();
try {
is.close();
} catch (IOException ioe) {
// ignore IOE; may be closed by reader.
}
}
}
private void verifyFileExists(Path p) throws IOException {
FileSystem fs = FileSystem.getLocal(getConf());
assertTrue("File not found: " + p, fs.exists(p));
}
private void verifyFileDoesNotExist(Path p) throws IOException {
FileSystem fs = FileSystem.getLocal(getConf());
assertFalse("File found: " + p + " and we did not expect it", fs.exists(p));
}
@Test
public void testNonSplittingTextFile() throws IOException {
SplittingOutputStream os = new SplittingOutputStream(getConf(),
getWritePath(), "nonsplit-", 0, null);
try {
SplittableBufferedWriter w = new SplittableBufferedWriter(os, true);
try {
w.allowSplit();
w.write("This is a string!");
w.newLine();
w.write("This is another string!");
w.allowSplit();
} finally {
w.close();
}
} finally {
try {
os.close();
} catch (IOException ioe) {
// Ignored; may be thrown because w is already closed.
}
}
// Ensure we made exactly one file.
Path writePath = new Path(getWritePath(), "nonsplit-00000");
Path badPath = new Path(getWritePath(), "nonsplit-00001");
verifyFileExists(writePath);
verifyFileDoesNotExist(badPath); // Ensure we didn't make a second file.
// Now ensure all the data got there.
String [] expectedLines = {
"This is a string!",
"This is another string!",
};
InputStream fis = new FileInputStream(new File(getWriteDir(),
"nonsplit-00000"));
try {
verifyFileContents(fis, expectedLines);
} finally {
try {
fis.close();
} catch (IOException ioe) {
// Ignored; may be closed by verifyFileContents().
}
}
}
@Test
public void testNonSplittingGzipFile() throws IOException {
SplittingOutputStream os = new SplittingOutputStream(getConf(),
getWritePath(), "nonsplit-", 0, new GzipCodec());
SplittableBufferedWriter w = new SplittableBufferedWriter(os, true);
try {
w.allowSplit();
w.write("This is a string!");
w.newLine();
w.write("This is another string!");
w.allowSplit();
} finally {
w.close();
}
// Ensure we made exactly one file.
Path writePath = new Path(getWritePath(), "nonsplit-00000.gz");
Path badPath = new Path(getWritePath(), "nonsplit-00001.gz");
verifyFileExists(writePath);
verifyFileDoesNotExist(badPath); // Ensure we didn't make a second file.
// Now ensure all the data got there.
String [] expectedLines = {
"This is a string!",
"This is another string!",
};
verifyFileContents(
new GZIPInputStream(new FileInputStream(new File(getWriteDir(),
"nonsplit-00000.gz"))), expectedLines);
}
@Test
public void testSplittingTextFile() throws IOException {
SplittingOutputStream os = new SplittingOutputStream(getConf(),
getWritePath(), "split-", 10, null);
try {
SplittableBufferedWriter w = new SplittableBufferedWriter(os, true);
try {
w.allowSplit();
w.write("This is a string!");
w.newLine();
w.write("This is another string!");
} finally {
w.close();
}
} finally {
try {
os.close();
} catch (IOException ioe) {
// Ignored; may be thrown because w is already closed.
}
}
// Ensure we made exactly two files.
Path writePath = new Path(getWritePath(), "split-00000");
Path writePath2 = new Path(getWritePath(), "split-00001");
Path badPath = new Path(getWritePath(), "split-00002");
verifyFileExists(writePath);
verifyFileExists(writePath2);
verifyFileDoesNotExist(badPath); // Ensure we didn't make three files.
// Now ensure all the data got there.
String [] expectedLines0 = {
"This is a string!",
};
InputStream fis = new FileInputStream(new File(getWriteDir(),
"split-00000"));
try {
verifyFileContents(fis, expectedLines0);
} finally {
try {
fis.close();
} catch (IOException ioe) {
// ignored; may be generated because fis closed in verifyFileContents.
}
}
String [] expectedLines1 = {
"This is another string!",
};
fis = new FileInputStream(new File(getWriteDir(), "split-00001"));
try {
verifyFileContents(fis, expectedLines1);
} finally {
try {
fis.close();
} catch (IOException ioe) {
// Ignored; may be thrown because it's closed in verifyFileContents.
}
}
}
@Test
public void testSplittingGzipFile() throws IOException {
SplittingOutputStream os = new SplittingOutputStream(getConf(),
getWritePath(), "splitz-", 3, new GzipCodec());
SplittableBufferedWriter w = new SplittableBufferedWriter(os, true);
try {
w.write("This is a string!");
w.newLine();
w.write("This is another string!");
} finally {
w.close();
}
// Ensure we made exactly two files.
Path writePath = new Path(getWritePath(), "splitz-00000.gz");
Path writePath2 = new Path(getWritePath(), "splitz-00001.gz");
Path badPath = new Path(getWritePath(), "splitz-00002.gz");
verifyFileExists(writePath);
verifyFileExists(writePath2);
verifyFileDoesNotExist(badPath); // Ensure we didn't make three files.
// Now ensure all the data got there.
String [] expectedLines0 = {
"This is a string!",
};
verifyFileContents(
new GZIPInputStream(new FileInputStream(new File(getWriteDir(),
"splitz-00000.gz"))), expectedLines0);
String [] expectedLines1 = {
"This is another string!",
};
verifyFileContents(
new GZIPInputStream(new FileInputStream(new File(getWriteDir(),
"splitz-00001.gz"))), expectedLines1);
}
}