blob: a93784e08b2c6a72f538fa7e4c99e20961bd2ede [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.io;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import org.apache.hadoop.util.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.Shell;
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
/**
* Test the named fifo utility.
*/
public class TestNamedFifo {
public static final Log LOG = LogFactory.getLog(
TestNamedFifo.class.getName());
public static final Path TEMP_BASE_DIR;
static {
String tmpDir = System.getProperty("test.build.data", "/tmp/");
if (!tmpDir.endsWith(File.separator)) {
tmpDir = tmpDir + File.separator;
}
TEMP_BASE_DIR = new Path(new Path(tmpDir), "namedfifo");
}
private Configuration conf;
private FileSystem fs;
@Before
public void setUp() throws Exception {
conf = new Configuration();
conf.set("fs.default.name", "file:///");
fs = FileSystem.getLocal(conf);
fs.mkdirs(TEMP_BASE_DIR);
}
static final String MSG = "THIS IS THE MESSAGE\n";
static final String MSG2 = "Here is a follow-up.\n";
private static class ReaderThread extends Thread {
private File file;
private IOException exception;
public ReaderThread(File f) {
this.file = f;
}
/** return any exception during the run method. */
public IOException getException() {
return this.exception;
}
public void run() {
BufferedReader r = null;
try {
r = new BufferedReader(new InputStreamReader(
new FileInputStream(file)));
// Assert that after a flush, we get back what we wrote.
String line = r.readLine();
if (!MSG.trim().equals(line)) {
throw new IOException("Expected " + MSG.trim() + " but got "
+ line);
}
// Assert that after closing the writer, we get back what
// we wrote again.
line = r.readLine();
if (null == line) {
throw new IOException("line2 was null");
} else if (!MSG2.trim().equals(line)) {
throw new IOException("Expected " + MSG2.trim() + " but got "
+ line);
}
} catch (IOException ioe) {
this.exception = ioe;
} finally {
if (null != r) {
try {
r.close();
} catch (IOException ioe) {
LOG.warn("Error closing reader: " + ioe);
}
}
}
}
}
private static class WriterThread extends Thread {
private File file;
private IOException exception;
public WriterThread(File f) {
this.file = f;
}
/** return any exception during the run method. */
public IOException getException() {
return this.exception;
}
public void run() {
BufferedWriter w = null;
try {
w = new BufferedWriter(new OutputStreamWriter(
new FileOutputStream(file)));
w.write(MSG);
w.flush();
w.write(MSG2);
} catch (IOException ioe) {
this.exception = ioe;
} finally {
if (null != w) {
try {
w.close();
} catch (IOException ioe) {
LOG.warn("Error closing writer: " + ioe);
}
}
}
}
}
@Test
public void testNamedFifo() throws Exception {
if (Shell.WINDOWS) {
// NamedFifo uses Linux specific commands like mknod
// and mkfifo, so skip the test on Windows OS
LOG.warn("Named FIFO is not supported on Windows. Skipping test");
return;
}
File root = new File(TEMP_BASE_DIR.toString());
File fifo = new File(root, "foo-fifo");
NamedFifo nf = new NamedFifo(fifo);
nf.create();
File returned = nf.getFile();
// These should be the same object.
assertEquals(fifo, returned);
ReaderThread rt = new ReaderThread(returned);
WriterThread wt = new WriterThread(returned);
rt.start();
wt.start();
rt.join();
wt.join();
IOException rex = rt.getException();
IOException wex = wt.getException();
if (null != rex) {
LOG.error("reader exception: " + StringUtils.stringifyException(rex));
}
if (null != wex) {
LOG.error("writer exception: " + StringUtils.stringifyException(wex));
}
assertNull(rex);
assertNull(wex);
}
}